Sunday, December 23, 2012

ZeroMQ True Peer Connectivity (Harmony Pattern)




Since ØMQ is designed to make distributed messaging easy, people often ask how to interconnect a set of true peers (as compared to obvious clients and servers). It is a thorny question and ØMQ doesn't really provide a single clear answer.

TCP, which is the most commonly-used transport in ØMQ, is not symmetric; one side must bind and one must connect and though ØMQ tries to be neutral about this, it's not. When you connect, you create an outgoing message pipe. When you bind, you do not. When there is no pipe, you cannot write messages (ØMQ will return EAGAIN).

Developers who study ØMQ and then try to create N-to-N connections between sets of equal peers often try a ROUTER-to-ROUTER flow. It's obvious why: each peer needs to address a set of peers, which requires ROUTER. It usually ends with a plaintive email to the list.

My conclusion after trying several times from different angles is that ROUTER-to-ROUTER does not work. And the ØMQ reference manual does not allow it when it discusses ROUTER sockets in zmq_socket(). At a minimum, one peer must bind and one must connect, meaning the architecture is not symmetrical. But also because you simply can't tell when you are allowed to safely send a message to a peer. It's Catch-22: you can talk to a peer after it's talked to you. But the peer can't talk to you until you've talked to it. One side or the other will be losing messages and thus has to retry, which means the peers cannot be equal.

I'm going to explain the Harmony pattern, which solves this problem, and which we use in Zyre.

We want a guarantee that when a peer "appears" on our network, we can talk to it safely, without ØMQ dropping messages. For this, we have to use a DEALER or PUSH socket which connects out to the peer so that even if that connection takes some non-zero time, there is immediately a pipe, and ØMQ will accept outgoing messages.

A DEALER socket cannot address multiple peers individually. But if we have one DEALER per peer, and we connect that DEALER to the peer, we can safely send messages to a peer as soon as we've connected to it.

Now, the next problem is to know who sent us a particular message. We need a reply address, that is the UUID of the node who sent any given message. DEALER can't do this unless we prefix every single message with that 16-byte UUID, which would be wasteful. ROUTER does, if we set the identity properly before connecting to the router.

And so the Harmony pattern comes down to:

One ROUTER socket that we bind to a ephemeral port, which we broadcast in our beacons.
One DEALER socket per peer that we connect to the peer's ROUTER socket.
Reading from our ROUTER socket.
Writing to the peer's DEALER socket.
Next problem is that discovery isn't neatly synchronized. We can get the first beacon from a peer after we start to receive messages from it. A message comes in on the ROUTER socket and has a nice UUID attached to it. But no physical IP address and port. We have to force discovery over TCP. To do this, our first command to any new peer we connect to is an OHAI command with our IP address and port. This ensure that the receiver connects back to us before trying to send us any command.

Breaking this down into steps:

If we receive a UDP beacon we connect to the peer.
We read messages from our ROUTER socket, and each message comes with the UUID of the sender.
If it's an OHAI message we connect back to that peer if not already connected to it.
If it's any other message, we must already be connected to the peer (a good place for an assertion).
We send messages to each peer using a dedicated per-peer DEALER socket, which must be connected.
When we connect to a peer we also tell our application that the peer exists.
Every time we get a message from a peer, we treat that as a heartbeat (it's alive).
If we were not using UDP but some other discovery mechanism, I'd still use the Harmony pattern for a true peer network: one ROUTER for input from all peers, and one DEALER per peer for output. Bind the ROUTER, connect the DEALER, and start each conversation with an OHAI equivalent that provides the return IP address and port. You would need some external mechanism to bootstrap each connection.

No comments:

Post a Comment