Streaming Serialization/Deserialization with MessagePack

Streaming feature of MessagePack

Streaming serialization/deserialization is one of the key features of MessagePack.
It enables you to store and load multiple objects to/from a stream (like files or network connections). It is easy to use and fast.

I’ll introduce the streaming features with Ruby codes.

Storing objects to a file

Now I want to store structured objects to a log file.
I can serialize objects with MessagePack and store them to a file. And then, I can take from the file objects out one after another.

Here is the code:

require 'msgpack'  # gem install msgpack

# Open the log file for write.
File.open("log.mpac", "w") do |f|
  # Serialize and store 3 objects.
  {'name'=>'msgpack'}.to_msgpack(f)
  {'name'=>'kumofs' }.to_msgpack(f)
  {'name'=>'frsyuki'}.to_msgpack(f)
end

# Open the log file for read.
File.open("log.mpac") do |f|
  # Create a instance of "MessagePack::Unpacker"
  pac = MessagePack::Unpacker.new(f)

  begin

    # Load the stored objects with "each" method.
    pac.each do |obj|
      # Do something with the deserialized object.
      on_message(obj)
    end

  rescue EOFError
    # EOF reached.
  end
end

Compression

Next, the size of the log file becams a big problem. Compression is very effective to reduce the size. MessagePack::Unpacker#feed method is useful to deal with special stream like Zlib::Inflate.

Here is the code:

require 'msgpack'  # gem install msgpack
require 'zlib'  # zlib is a standard library

# Open the log file for write.
File.open("log.zmpac", "w") do |f|
  # Serialize, compress and store the objects.
  zd = Zlib::Deflate.new
  zd << {'name'=>'msgpack'}.to_msgpack && f.write(zd.flush)
  zd << {'name'=>'kumofs' }.to_msgpack && f.write(zd.flush)
  zd << {'name'=>'frsyuki'}.to_msgpack && f.write(zd.flush)
  zd.close
end

# Open the log file for read.
File.open("log.zmpac") do |f|
  # Create instance of "Zlib::Inflate" and "MesasgePack::Unpacker"
  zi = Zlib::Inflate.new
  pac = MessagePack::Unpacker.new

  begin

    buffer = ''
    while f.sysread(1024, buffer)
      # Use "feed" method to append the inflated buffer.
      pac.feed zi.inflate(buffer)

      # Then, call each method.
      pac.each do |obj|
        # Do something with the deserialized object.
        on_message(obj)
      end
    end

  rescue EOFError
    # eof reached.
  end

  zi.close
end

Streaming on network connection

MessagePack::Unpacker#feed method is also useful to receive objects from other hosts via network connection.
In the following code, I use an event-processing library EventMacnie with MessagePack.

require 'msgpack'      # gem install msgpack
require 'eventmachine' # gem install eventmachine
require 'json'

# This server program receives objects serialized with MessagePack and convert them to JSON.
class MessagePackToJsonServer < EventMachine::Connection
  def initialize
    # Create the streaming deserializer of MessagePack.
    @pac = MessagePack::Unpacker.new
  end

  def receive_data(data)
    # Feed the recived data to the deserializer.
    @pac.feed data

    # Streaming deserialize.
    @pac.each do |obj|
      message_received(obj)
    end
  end

  def message_received(obj)
    # Do something with the deserialized object.
    send_data obj.to_json
  end
end

# Start the server.
EventMachine::run do
  host = '0.0.0.0'
  port = 8080
  EventMachine::start_server host, port, MessagePackToJsonServer
end

The streaming feature is available on other languages. It is very important feature to implement MessagePack-RPC.

Advertisements

About frsyuki

Twitter account is @frsyuki_ha. http://twitter.com/frsyuki_ha
This entry was posted in MessagePack, MessagePack-RPC, Ruby. Bookmark the permalink.

One Response to Streaming Serialization/Deserialization with MessagePack

  1. If I have a huge package and only want to read/use some of the data, do I need to deserialize the entire package or can I only deserialzie parts of it, i.e. those that I need? Do you support this kind of lazy deserialization? (My apologies if this is a newbie question. :))

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s