Dispatching protobuf messages

When working with networked software, one of the initial decisions that you’re going to have to make is how you’re going to encode and dispatch messages. It is important to make these decisions early because once you start following down a certain path of decoding and adding network messages to your source, it can be hard to change it up.

For a project I’m working on, I decided on using google’s protobuf for message serialization. Protobuf is a fast serialization format that boasts extensibility as well as high performance. It is an ideal choice for any latency sensitive software and has served me well when designing virtual world messaging protocols.

The C++ code that protoc generates contains your message classes and gives them a base class of google::protobuf::Message. While this base class has a lot of useful functions, it doesn’t solve the problem of determining the message type on the wire, nor does it solve the problem of dispatching the messages back to your code.

For the first problem of determining the message type on the wire, the protobuf documentation has two recommendations. The first recommendation is using union types. This is a message type that contains all of the messages your program may ever send in a union. The protobuf documentation offers the following example:

message OneMessage {
  enum Type { FOO = 1; BAR = 2; BAZ = 3; }

  // Identifies which field is filled in.
  required Type type = 1;

  // One of the following will be filled in.
  optional Foo foo = 2;
  optional Bar bar = 3;
  optional Baz baz = 4;
}

I was going to use this method until I realized how unwieldily it could get. I also wanted to better separate out messages types between the node and client.

Another method is to create a “header” type message to wrap the “meat” message with the data:

message HeaderMessage {
  required int32 type = 1;

  // The embedded message data.
  required bytes message_data = 2;
}

I didn’t to use this method because of the overhead of reading through the meat of the message twice. Once for the header, then again when we want to deserialize the contained message.

Instead, I’ve decided to prefix the bytestream in my protocol with the message type, followed by the length of the message. I’ve pledged to include nothing else in the wrapping bytes because putting too much into my header defeats the purpose of using an extensible serialization format. So the message on the wire looks like the following:

0       1       2       3       4       5       6    ...X    
[Type (2 bytes)][---------Size (4 bytes)-------][ message ...]

With this, we now have enough information to decode and dispatch new messages. My goal was to avoid forcing all message consumers to include switch statements in their downstream handling code. So I decided that the best way to deal with this was double dispatch.

One thing I noticed is that the generated C++ code doesn’t seem to include a method to support the observer pattern/double dispatch. This would’ve made it so that I didn’t need to create any giant switch statements. Unfortunately because of this, I had to create one of my own. But I pledged not to make any other users of the code have to do anything similarly silly. I also didn’t want them to have to write unnecessary code for handlers they don’t need. Enter my two part dispatch.

When a message comes in, it is decoded, and then first passed to a switch based dispatch:

void messageutil::switch_dispatch(message_context_ptr ctx)
{
    switch (ctx->type)
    {
        case MT_GET_CHALLENGE:
            messageutil::template_dispatch(ctx, std::make_shared<GetChallengeMessage>());
            break;

        default:
            throw std::runtime_error("messageutil::switch_dispatch() unhandled message type"
                                     + boost::lexical_cast<std::string>(ctx->type));
    }
}

template_dispatch starts using C++ magic to make my life easier. We use a template function to parse the message and dispatch it to a dispatch handler:

///
/// After the message type is decoded, we do the rest of the work here
///
template <typename T>
static void template_dispatch(message_context_ptr ctx, T message)
{
    if (! message->ParseFromArray(ctx->message_buffer.get(), ctx->message_size))
    {
        //error
        ctx->error_callback(sopmq::error::network_error("Unable to parse new message of type "
                                                        + boost::lexical_cast<std::string>(ctx->type)
                                                        + " message corrupted?"));
    }
    else
    {
        //dispatch
        ctx->dispatcher.dispatch(message);
    }
}

Now that we have the actual message class T, we can use overloaded functions to handle the messages by type. But more than this, we want to make sure that users of this code don’t have to waste time banging out member functions for messages they’re not going to use. To provide this functionality, we create a dispatcher class that allows people to register message handlers only for the messages they need, and default handler that will be called for messages they didn’t expect.

///
/// Handles dispatch of network messages to selected functions
///
class message_dispatcher
{
public:
    message_dispatcher(std::function<void(Message_ptr)> unhandledHandler);
    virtual ~message_dispatcher();

    ///
    /// Dispatches the GetChallenge message to the correct handler
    ///
    void dispatch(GetChallengeMessage_ptr getChallengeMessage);

public:
    ///
    /// Sets the handler function for a GetChallenge message
    ///
    void set_handler(std::function<void(GetChallengeMessage_ptr)> handler);

private:
    std::function<void(Message_ptr)> _unhandledHandler;
    std::function<void(GetChallengeMessage_ptr)>  _getChallengeHandler;

    ///
    /// Template function to execute the given handler if it is available, or
    /// the unhandled handler if it is not
    ///
    template <typename handler, typename message>
    void do_dispatch(handler h, message m)
    {
        if (h)
        {
            h(m);
        }
        else
        {
            _unhandledHandler(std::static_pointer_cast<::google::protobuf::Message>(m));
        }
    }
};

message_dispatcher::message_dispatcher(std::function<void(Message_ptr)> unhandledHandler)
: _unhandledHandler(unhandledHandler)
{

}

message_dispatcher::~message_dispatcher()
{

}

void message_dispatcher::dispatch(GetChallengeMessage_ptr getChallengeMessage)
{
    do_dispatch(_getChallengeHandler, getChallengeMessage);
}

void message_dispatcher::set_handler(std::function<void(GetChallengeMessage_ptr)> handler)
{
    _getChallengeHandler = handler;
}

For each message type the user would like to handle, they call set_handler which takes a std::function object that will handle the incoming message. The internal code then dispatches the message by calling the do_dispatch template method which either sends the message to the registered handler, or calls the registered function for unhandled messages.

To handle any inbound network message types, one simply needs to call into the messageutil class, give it a message_dispatcher with the types the caller can handle registered, and tell it to read from the network.

You can see implementations of all this at the links below:

message_dispatcher.cpp
message_dispatcher.h
messageutil.h
messageutil.cpp

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

Blog at WordPress.com.

Up ↑

%d bloggers like this: