Dispatching protobuf messages

When working with networked software, one of the initial decisions that you’re going to have to make is how you’re going to encode and dispatch messages. It is important to make these decisions early because once you start following down a certain path of decoding and adding network messages to your source, it can be hard to change it up.

For a project I’m working on, I decided on using google’s protobuf for message serialization. Protobuf is a fast serialization format that boasts extensibility as well as high performance. It is an ideal choice for any latency sensitive software and has served me well when designing virtual world messaging protocols.

The C++ code that protoc generates contains your message classes and gives them a base class of google::protobuf::Message. While this base class has a lot of useful functions, it doesn’t solve the problem of determining the message type on the wire, nor does it solve the problem of dispatching the messages back to your code.

For the first problem of determining the message type on the wire, the protobuf documentation has two recommendations. The first recommendation is using union types. This is a message type that contains all of the messages your program may ever send in a union. The protobuf documentation offers the following example:

message OneMessage {
  enum Type { FOO = 1; BAR = 2; BAZ = 3; }

  // Identifies which field is filled in.
  required Type type = 1;

  // One of the following will be filled in.
  optional Foo foo = 2;
  optional Bar bar = 3;
  optional Baz baz = 4;
}

I was going to use this method until I realized how unwieldily it could get. I also wanted to better separate out messages types between the node and client.

Another method is to create a “header” type message to wrap the “meat” message with the data:

message HeaderMessage {
  required int32 type = 1;

  // The embedded message data.
  required bytes message_data = 2;
}

I didn’t to use this method because of the overhead of reading through the meat of the message twice. Once for the header, then again when we want to deserialize the contained message.

Instead, I’ve decided to prefix the bytestream in my protocol with the message type, followed by the length of the message. I’ve pledged to include nothing else in the wrapping bytes because putting too much into my header defeats the purpose of using an extensible serialization format. So the message on the wire looks like the following:

0       1       2       3       4       5       6    ...X    
[Type (2 bytes)][---------Size (4 bytes)-------][ message ...]

With this, we now have enough information to decode and dispatch new messages. My goal was to avoid forcing all message consumers to include switch statements in their downstream handling code. So I decided that the best way to deal with this was double dispatch.

One thing I noticed is that the generated C++ code doesn’t seem to include a method to support the observer pattern/double dispatch. This would’ve made it so that I didn’t need to create any giant switch statements. Unfortunately because of this, I had to create one of my own. But I pledged not to make any other users of the code have to do anything similarly silly. I also didn’t want them to have to write unnecessary code for handlers they don’t need. Enter my two part dispatch.

When a message comes in, it is decoded, and then first passed to a switch based dispatch:

void messageutil::switch_dispatch(message_context_ptr ctx)
{
    switch (ctx->type)
    {
        case MT_GET_CHALLENGE:
            messageutil::template_dispatch(ctx, std::make_shared<GetChallengeMessage>());
            break;

        default:
            throw std::runtime_error("messageutil::switch_dispatch() unhandled message type"
                                     + boost::lexical_cast<std::string>(ctx->type));
    }
}

template_dispatch starts using C++ magic to make my life easier. We use a template function to parse the message and dispatch it to a dispatch handler:

///
/// After the message type is decoded, we do the rest of the work here
///
template <typename T>
static void template_dispatch(message_context_ptr ctx, T message)
{
    if (! message->ParseFromArray(ctx->message_buffer.get(), ctx->message_size))
    {
        //error
        ctx->error_callback(sopmq::error::network_error("Unable to parse new message of type "
                                                        + boost::lexical_cast<std::string>(ctx->type)
                                                        + " message corrupted?"));
    }
    else
    {
        //dispatch
        ctx->dispatcher.dispatch(message);
    }
}

Now that we have the actual message class T, we can use overloaded functions to handle the messages by type. But more than this, we want to make sure that users of this code don’t have to waste time banging out member functions for messages they’re not going to use. To provide this functionality, we create a dispatcher class that allows people to register message handlers only for the messages they need, and default handler that will be called for messages they didn’t expect.

///
/// Handles dispatch of network messages to selected functions
///
class message_dispatcher
{
public:
    message_dispatcher(std::function<void(Message_ptr)> unhandledHandler);
    virtual ~message_dispatcher();

    ///
    /// Dispatches the GetChallenge message to the correct handler
    ///
    void dispatch(GetChallengeMessage_ptr getChallengeMessage);

public:
    ///
    /// Sets the handler function for a GetChallenge message
    ///
    void set_handler(std::function<void(GetChallengeMessage_ptr)> handler);

private:
    std::function<void(Message_ptr)> _unhandledHandler;
    std::function<void(GetChallengeMessage_ptr)>  _getChallengeHandler;

    ///
    /// Template function to execute the given handler if it is available, or
    /// the unhandled handler if it is not
    ///
    template <typename handler, typename message>
    void do_dispatch(handler h, message m)
    {
        if (h)
        {
            h(m);
        }
        else
        {
            _unhandledHandler(std::static_pointer_cast<::google::protobuf::Message>(m));
        }
    }
};

message_dispatcher::message_dispatcher(std::function<void(Message_ptr)> unhandledHandler)
: _unhandledHandler(unhandledHandler)
{

}

message_dispatcher::~message_dispatcher()
{

}

void message_dispatcher::dispatch(GetChallengeMessage_ptr getChallengeMessage)
{
    do_dispatch(_getChallengeHandler, getChallengeMessage);
}

void message_dispatcher::set_handler(std::function<void(GetChallengeMessage_ptr)> handler)
{
    _getChallengeHandler = handler;
}

For each message type the user would like to handle, they call set_handler which takes a std::function object that will handle the incoming message. The internal code then dispatches the message by calling the do_dispatch template method which either sends the message to the registered handler, or calls the registered function for unhandled messages.

To handle any inbound network message types, one simply needs to call into the messageutil class, give it a message_dispatcher with the types the caller can handle registered, and tell it to read from the network.

You can see implementations of all this at the links below:

message_dispatcher.cpp
message_dispatcher.h
messageutil.h
messageutil.cpp

In the blink of an eye

In the blink of an eye

I got a call this morning about a good friend of mine.

He was the best man at my wedding, and has been there for me though more hard times than I can remember. He’s the type of guy that if you’re feeling down for any reason, he’s got your back and will help you with whatever you need, or just sit and tell you stupid jokes until you laugh out of reflex.

He was my “boss” at my first programming job. I put boss in quotes because he never acted like that. I always felt like I was part of a team. He was the guy that got to get in between the programmers and the employees to really flesh out how we could best make the software we were writing work for the company.

Just last week I was helping him work out an issue where after the installation of a bunch of pieces of software all of a sudden DNS lookups started to to randomly fail. It just seems so surreal.

This morning the voice on the call was his father. He called to tell me that my friend was in the hospital with multiple blood clots. He’s stable, but they expect him to be in the hospital for a week in recovery.

All adults realize that stuff like this can happen at any time, but I don’t think we really let it sink in until it does. You always hear the stories and sayings that go something like “at some point if you live long enough, you get to a certain age where life starts taking away rather than giving”.

My friend has children, all girls, with a wife who is 7 months pregnant with his first boy. That’s all I could think about. For a good hour I was pretty frozen just thinking that in the blink of an eye, the provider becomes the one in need. Everything gets turned upside-down and everyone has to cope and try to make the best of what has happened. We lean on our friends and family.

I am grateful that the prognosis is good, and I’ll see him tomorrow, but obviously mortality was first and foremost on my mind, and selfishly, after thinking about his mortality, I thought about my own.

I’ve had an experience in my life that completely changed me. Many years ago, I experienced a period of approximately 2 months where I had constant heart rhythm issues. I got checked out by the doctor, and a cardiologist, was poked and prodded. It felt like any minute I might pass out, and my heart might stop. They found nothing structurally wrong with my heart, and there was no medication to stop the misfiring issues I was experiencing.

When I slept, my heart rate dropped to 30 and 40 beats per minute. When I tried to relax and have fun, the skipped beats, PVCs, PACs, and dizziness would take my sanity away. The awkward, unorchestrated pounding in my chest 10 – 20 times a minute reminded me that I was mortal. I couldn’t escape it. I’d lay awake worrying that my time would come that night.

I was ineffective at my job. I couldn’t pursue my dreams anymore. I started lashing out at the people who loved me because it felt like they were pulling away. I could think of nothing but this thing in my chest that is so important, but that was so broken. I shrunk into myself and ran away from everything, just trying find some relief from this constant reminder that as far as I was concerned, I was dying.

This cycle continued for a long time. Until one day, I just didn’t care anymore.

You read that right. I didn’t care if I lived or died on that day. I decided that whatever this was, it may have the power to take everything else from me, but it will no longer take my happiness. It will no longer control what time I have left. Fearing the unknown was a choice that I had made, and I was letting that fear destroy my life before whatever the problem was could take it. I could’ve died any other day just as easily as this one. Why the hell was I so damn concerned now?

This is my life. If I give fear power over my life, I have no one to blame but myself.

A couple weeks after I stopped the cycle of fear and adrenaline, my physical symptoms disappeared, and my EKG went back to normal. The problems had started in the middle of the night while I was sleeping, not triggered by fear, but they ended because I made a choice to not be afraid.

All we can do is make the best of the time we’re given. There are no guarantees. Make the best of your life. Use the time you were given to help others build their lives, their dreams. Be a friend to those that need you. Don’t be afraid.

Tomorrow during the day I’m going to work extra hard on some code. Then I’m going to visit some kids that could use a friendly face. They have a lot to be fearful of right now.

Distributed messaging failure modes

Distributed messaging failure modes

[this is a brainstorming document and implementations are subject to, and will most likely change during development.]

I have a good understanding of a few distributed storage systems that use consistent hashing and quorum reads/writes to load balance, scale out, and provide fault tolerance, and wanted to apply some of these ideas to my own projects. In that light, I’ve decided to dip my toe in the water and design a distributed messaging system that will eventually be used to replace the frail/legacy communications systems present in our virtual world platform.  The messaging system as well as connectivity components are all being designed as open source projects as part of our virtual world future initiative.

After starting the project I found a distributed message queue that meets my requirements, namely apache Kafka. However, since I can always use more distributed systems experience, I’ve decided to push forward with a distributed messaging system that is designed more closely to our requirements.

One of the most difficult challenges is coming up with what guarantees I want to make in the face of failure. Our requirements for this messaging system are.. well, simple compared to some. We need scalability, and fault tolerance. We need two modes of operation. Message piping, and persistent storage with a single consumer who will claim the messages when they make their first connection after being offline.

The “message pipe”

For the majority of the time the system spends running, it will pass messages between online users involved in private IMs as well as group conversations. As an example, each simulator will subscribe queues for instant messages belonging to the users they are managing. When a new group message comes in, consistent hashing on the queue identifier will route the message to the appropriate nodes for processing. The message will then be passed to either consumers connected to the queue and waiting for messages, or it will be dropped.

If there is a group IM session happening and only one person is subscribed to the group chat at that time, all messages posted to the queue will be dropped without being piped and we’ll return a status value indicating as such. We’ll configure this specific queue not to store messages that aren’t immediately consumed.

Since I don’t plan on including any kind of on-disk or in memory queues besides what will be required to forward the messages, this type of queue has the most “interesting” properties during a failure. Obviously plans may change as I determine exactly what guarantees I want to provide.

Message pipe failure modes

Let’s check out a few of the interesting things that can happen to this distributed system when running in production. We’re going to use quorum style reads and writes to provide fault tolerance. This means that every message published will be sent to at least two of three nodes before the send is confirmed, and that every consumer to a message will listen to two of three nodes for incoming messages.

(In all the following diagrams P represents the producer and C represents the consumer. Sorry ahead of time for the horrible diagrams)

Common node failure

Let’s start with an easy one. A situation arises where a single node honestly and truly goes down. Maybe someone tripped over the network cable, or the machine decided it would be a good day to start on fire. This node is seen as down to both the consumer and the producer processes. The node with the X through it below is now down.

common_node_fail

 

We have a problem here. Though the consumer is listening to two different nodes, the producer is no longer writing to a node that the consumer has a link to. If the producer were to continue this way, the consumer would never get any more messages during this session.

Luckily, we’ve chosen a strategy that will prevent the producer from considering a write successful until it can write to at least two nodes. When the next message comes ready to be sent, the producer will block its delivery until it establishes a quorum write with the remaining nodes. As you’ll see below, this behavior allows the message to be delivered to the consumer

common_node_fail_recovery

Once the producer establishes a new link it is able to send the message which will reach the consumer through the already established connection in black on the bottom left. We have fault tolerance for this scenario. When the dead node comes back up, more producers will continue writing to a quorum of nodes, and no messages need be lost.

Producer side common link failure

A situation arises where the producer can not contact a common node in the quorum, but the consumer node can. From the point of view of the producer, the top node is down.

producer_common_link_failure

The recovery from this is similar to the common node going down.

producer_common_link_failure_recovery

The producer will fail to obtain a quorum for writes, and will establish a connection to the bottom left node to repair the condition. At this point it can continue to deliver messages as demonstrated in the common node failure scenario.

Consumer side common link failure

This is the one that has me wondering what the “best” answer to the problem is. In this scenario, the consumer loses connection to the top node, severing it and preventing it from receiving messages from the producer. The top node is still up as far as the producer is concerned and can still write an satisfy a quorum.

consumer_common_link_failure

 

However, when the producer writes to the quorum before the client reestablishes a connection, the consumer will not get the messages, and the producer will see the messages as not consumed. We’ll see a temporary disruption in messaging to this consumer until it is able to reconnect to the bottom right node.

consumer_common_link_recovery

Once this new connection is established, messages will resume flowing to the consumer. But in the meantime, some messages weren’t delivered.

This isn’t a huge deal for our use case, because all the important messages will be stored in this case and forwarded when the consumer reestablishes with the new node. However, I’d still love to determine a good way to solve the problem.

One thought is that I could keep a limited (time, space, or both) queue for messages hitting nodes that don’t yet have a consumer attached. The big problem with this solution is that I will end up eating a ton of memory for messages that will never be consumed.

Another thought is that all writes should go out to all nodes handling the range, but only considered successful if the write makes it to at least a quorum of nodes. That way, this failure scenario will only happen if we’re already in a degraded state.

Definitely interested in hearing ideas from people who have already designed systems like this. Feel free to leave comments!