Distributed messaging failure modes

[this is a brainstorming document and implementations are subject to, and will most likely change during development.]

I have a good understanding of a few distributed storage systems that use consistent hashing and quorum reads/writes to load balance, scale out, and provide fault tolerance, and wanted to apply some of these ideas to my own projects. In that light, I’ve decided to dip my toe in the water and design a distributed messaging system that will eventually be used to replace the frail/legacy communications systems present in our virtual world platform.  The messaging system as well as connectivity components are all being designed as open source projects as part of our virtual world future initiative.

After starting the project I found a distributed message queue that meets my requirements, namely apache Kafka. However, since I can always use more distributed systems experience, I’ve decided to push forward with a distributed messaging system that is designed more closely to our requirements.

One of the most difficult challenges is coming up with what guarantees I want to make in the face of failure. Our requirements for this messaging system are.. well, simple compared to some. We need scalability, and fault tolerance. We need two modes of operation. Message piping, and persistent storage with a single consumer who will claim the messages when they make their first connection after being offline.

The “message pipe”

For the majority of the time the system spends running, it will pass messages between online users involved in private IMs as well as group conversations. As an example, each simulator will subscribe queues for instant messages belonging to the users they are managing. When a new group message comes in, consistent hashing on the queue identifier will route the message to the appropriate nodes for processing. The message will then be passed to either consumers connected to the queue and waiting for messages, or it will be dropped.

If there is a group IM session happening and only one person is subscribed to the group chat at that time, all messages posted to the queue will be dropped without being piped and we’ll return a status value indicating as such. We’ll configure this specific queue not to store messages that aren’t immediately consumed.

Since I don’t plan on including any kind of on-disk or in memory queues besides what will be required to forward the messages, this type of queue has the most “interesting” properties during a failure. Obviously plans may change as I determine exactly what guarantees I want to provide.

Message pipe failure modes

Let’s check out a few of the interesting things that can happen to this distributed system when running in production. We’re going to use quorum style reads and writes to provide fault tolerance. This means that every message published will be sent to at least two of three nodes before the send is confirmed, and that every consumer to a message will listen to two of three nodes for incoming messages.

(In all the following diagrams P represents the producer and C represents the consumer. Sorry ahead of time for the horrible diagrams)

Common node failure

Let’s start with an easy one. A situation arises where a single node honestly and truly goes down. Maybe someone tripped over the network cable, or the machine decided it would be a good day to start on fire. This node is seen as down to both the consumer and the producer processes. The node with the X through it below is now down.



We have a problem here. Though the consumer is listening to two different nodes, the producer is no longer writing to a node that the consumer has a link to. If the producer were to continue this way, the consumer would never get any more messages during this session.

Luckily, we’ve chosen a strategy that will prevent the producer from considering a write successful until it can write to at least two nodes. When the next message comes ready to be sent, the producer will block its delivery until it establishes a quorum write with the remaining nodes. As you’ll see below, this behavior allows the message to be delivered to the consumer


Once the producer establishes a new link it is able to send the message which will reach the consumer through the already established connection in black on the bottom left. We have fault tolerance for this scenario. When the dead node comes back up, more producers will continue writing to a quorum of nodes, and no messages need be lost.

Producer side common link failure

A situation arises where the producer can not contact a common node in the quorum, but the consumer node can. From the point of view of the producer, the top node is down.


The recovery from this is similar to the common node going down.


The producer will fail to obtain a quorum for writes, and will establish a connection to the bottom left node to repair the condition. At this point it can continue to deliver messages as demonstrated in the common node failure scenario.

Consumer side common link failure

This is the one that has me wondering what the “best” answer to the problem is. In this scenario, the consumer loses connection to the top node, severing it and preventing it from receiving messages from the producer. The top node is still up as far as the producer is concerned and can still write an satisfy a quorum.



However, when the producer writes to the quorum before the client reestablishes a connection, the consumer will not get the messages, and the producer will see the messages as not consumed. We’ll see a temporary disruption in messaging to this consumer until it is able to reconnect to the bottom right node.


Once this new connection is established, messages will resume flowing to the consumer. But in the meantime, some messages weren’t delivered.

This isn’t a huge deal for our use case, because all the important messages will be stored in this case and forwarded when the consumer reestablishes with the new node. However, I’d still love to determine a good way to solve the problem.

One thought is that I could keep a limited (time, space, or both) queue for messages hitting nodes that don’t yet have a consumer attached. The big problem with this solution is that I will end up eating a ton of memory for messages that will never be consumed.

Another thought is that all writes should go out to all nodes handling the range, but only considered successful if the write makes it to at least a quorum of nodes. That way, this failure scenario will only happen if we’re already in a degraded state.

Definitely interested in hearing ideas from people who have already designed systems like this. Feel free to leave comments!

One thought on “Distributed messaging failure modes

Add yours

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

Blog at WordPress.com.

Up ↑

%d bloggers like this: