Streaming feature of MessagePack
Streaming serialization/deserialization is one of the key features of MessagePack.
It enables you to store and load multiple objects to/from a stream (like files or network connections). It is easy to use and fast.
I’ll introduce the streaming features with Ruby codes.
Storing objects to a file
Now I want to store structured objects to a log file.
I can serialize objects with MessagePack and store them to a file. And then, I can take from the file objects out one after another.
Here is the code:
require 'msgpack' # gem install msgpack
# Open the log file for write.
File.open("log.mpac", "w") do |f|
# Serialize and store 3 objects.
{'name'=>'msgpack'}.to_msgpack(f)
{'name'=>'kumofs' }.to_msgpack(f)
{'name'=>'frsyuki'}.to_msgpack(f)
end
# Open the log file for read.
File.open("log.mpac") do |f|
# Create a instance of "MessagePack::Unpacker"
pac = MessagePack::Unpacker.new(f)
begin
# Load the stored objects with "each" method.
pac.each do |obj|
# Do something with the deserialized object.
on_message(obj)
end
rescue EOFError
# EOF reached.
end
end
Compression
Next, the size of the log file becams a big problem. Compression is very effective to reduce the size. MessagePack::Unpacker#feed method is useful to deal with special stream like Zlib::Inflate.
Here is the code:
require 'msgpack' # gem install msgpack
require 'zlib' # zlib is a standard library
# Open the log file for write.
File.open("log.zmpac", "w") do |f|
# Serialize, compress and store the objects.
zd = Zlib::Deflate.new
zd << {'name'=>'msgpack'}.to_msgpack && f.write(zd.flush)
zd << {'name'=>'kumofs' }.to_msgpack && f.write(zd.flush)
zd << {'name'=>'frsyuki'}.to_msgpack && f.write(zd.flush)
zd.close
end
# Open the log file for read.
File.open("log.zmpac") do |f|
# Create instance of "Zlib::Inflate" and "MesasgePack::Unpacker"
zi = Zlib::Inflate.new
pac = MessagePack::Unpacker.new
begin
buffer = ''
while f.sysread(1024, buffer)
# Use "feed" method to append the inflated buffer.
pac.feed zi.inflate(buffer)
# Then, call each method.
pac.each do |obj|
# Do something with the deserialized object.
on_message(obj)
end
end
rescue EOFError
# eof reached.
end
zi.close
end
Streaming on network connection
MessagePack::Unpacker#feed method is also useful to receive objects from other hosts via network connection.
In the following code, I use an event-processing library EventMacnie with MessagePack.
require 'msgpack' # gem install msgpack
require 'eventmachine' # gem install eventmachine
require 'json'
# This server program receives objects serialized with MessagePack and convert them to JSON.
class MessagePackToJsonServer < EventMachine::Connection
def initialize
# Create the streaming deserializer of MessagePack.
@pac = MessagePack::Unpacker.new
end
def receive_data(data)
# Feed the recived data to the deserializer.
@pac.feed data
# Streaming deserialize.
@pac.each do |obj|
message_received(obj)
end
end
def message_received(obj)
# Do something with the deserialized object.
send_data obj.to_json
end
end
# Start the server.
EventMachine::run do
host = '0.0.0.0'
port = 8080
EventMachine::start_server host, port, MessagePackToJsonServer
end
The streaming feature is available on other languages. It is very important feature to implement MessagePack-RPC.
If I have a huge package and only want to read/use some of the data, do I need to deserialize the entire package or can I only deserialzie parts of it, i.e. those that I need? Do you support this kind of lazy deserialization? (My apologies if this is a newbie question.
)