2009-04-30

Sample code: Length-prefix message framing for streams

(This post is part of the TCP/IP .NET Sockets FAQ)

The necessity of message framing is discussed at http://blog.stephencleary.com/2009/04/message-framing.html.

The class below is a modified version of Nito.Async.Sockets.SocketPacketProtocol from the Nito.Async library. The main difference is that the Nito.Async.Sockets.SocketPacketProtocol class communicates directly with the asynchronous Nito socket classes, allowing a more efficient implementation. The PacketProtocol class below is slightly less efficient, but can be used with any socket classes, including synchronous sockets or even non-socket streams such as files.

Note that PacketProtocol is not threadsafe, so the instance members of this class must be synchronized when necessary.

// Original source: http://blog.stephencleary.com/2009/04/sample-code-length-prefix-message.html
/// <summary>
/// Maintains the necessary buffers for applying a length-prefix message framing protocol over a stream.
/// </summary>
/// <remarks>
/// <para>Create one instance of this class for each incoming stream, and assign a handler to <see cref="MessageArrived"/>. As bytes arrive at the stream, pass them to <see cref="DataReceived"/>, which will invoke <see cref="MessageArrived"/> as necessary.</para>
/// <para>If <see cref="DataReceived"/> raises <see cref="System.Net.ProtocolViolationException"/>, then the stream data should be considered invalid. After that point, no methods should be called on that <see cref="PacketProtocol"/> instance.</para>
/// <para>This class uses a 4-byte signed integer length prefix, which allows for message sizes up to 2 GB. Keepalive messages are supported as messages with a length prefix of 0 and no message data.</para>
/// <para>This is EXAMPLE CODE! It is not particularly efficient; in particular, if this class is rewritten so that a particular interface is used (e.g., Socket's IAsyncResult methods), some buffer copies become unnecessary and may be removed.</para>
/// </remarks>
public class PacketProtocol
{
    /// <summary>
    /// Wraps a message. The wrapped message is ready to send to a stream.
    /// </summary>
    /// <remarks>
    /// <para>Generates a length prefix for the message and returns the combined length prefix and message.</para>
    /// </remarks>
    /// <param name="message">The message to send.</param>
    public static byte[] WrapMessage(byte[] message)
    {
        // Get the length prefix for the message
        byte[] lengthPrefix = BitConverter.GetBytes(message.Length);
  
        // Concatenate the length prefix and the message
        byte[] ret = new byte[lengthPrefix.Length + message.Length];
        lengthPrefix.CopyTo(ret, 0);
        message.CopyTo(ret, lengthPrefix.Length);
  
        return ret;
    }
  
    /// <summary>
    /// Wraps a keepalive (0-length) message. The wrapped message is ready to send to a stream.
    /// </summary>
    public static byte[] WrapKeepaliveMessage()
    {
        return BitConverter.GetBytes((int)0);
    }
  
    /// <summary>
    /// Initializes a new <see cref="PacketProtocol"/>, limiting message sizes to the given maximum size.
    /// </summary>
    /// <param name="maxMessageSize">The maximum message size supported by this protocol. This may be less than or equal to zero to indicate no maximum message size.</param>
    public PacketProtocol(int maxMessageSize)
    {
        // We allocate the buffer for receiving message lengths immediately
        this.lengthBuffer = new byte[sizeof(int)];
        this.maxMessageSize = maxMessageSize;
    }
  
    /// <summary>
    /// The buffer for the length prefix; this is always 4 bytes long.
    /// </summary>
    private byte[] lengthBuffer;
  
    /// <summary>
    /// The buffer for the data; this is null if we are receiving the length prefix buffer.
    /// </summary>
    private byte[] dataBuffer;
  
    /// <summary>
    /// The number of bytes already read into the buffer (the length buffer if <see cref="dataBuffer"/> is null, otherwise the data buffer).
    /// </summary>
    private int bytesReceived;
  
    /// <summary>
    /// The maximum size of messages allowed.
    /// </summary>
    private int maxMessageSize;
  
    /// <summary>
    /// Indicates the completion of a message read from the stream.
    /// </summary>
    /// <remarks>
    /// <para>This may be called with an empty message, indicating that the other end had sent a keepalive message. This will never be called with a null message.</para>
    /// <para>This event is invoked from within a call to <see cref="DataReceived"/>. Handlers for this event should not call <see cref="DataReceived"/>.</para>
    /// </remarks>
    public Action<byte[]> MessageArrived { get; set; }
  
    /// <summary>
    /// Notifies the <see cref="PacketProtocol"/> instance that incoming data has been received from the stream. This method will invoke <see cref="MessageArrived"/> as necessary.
    /// </summary>
    /// <remarks>
    /// <para>This method may invoke <see cref="MessageArrived"/> zero or more times.</para>
    /// <para>Zero-length receives are ignored. Many streams use a 0-length read to indicate the end of a stream, but <see cref="PacketProtocol"/> takes no action in this case.</para>
    /// </remarks>
    /// <param name="data">The data received from the stream. Cannot be null.</param>
    /// <exception cref="System.Net.ProtocolViolationException">If the data received is not a properly-formed message.</exception>
    public void DataReceived(byte[] data)
    {
        // Process the incoming data in chunks, as the ReadCompleted requests it
  
        // Logically, we are satisfying read requests with the received data, instead of processing the
        //  incoming buffer looking for messages.
  
        int i = 0;
        while (i != data.Length)
        {
            // Determine how many bytes we want to transfer to the buffer and transfer them
            int bytesAvailable = data.Length - i;
            if (this.dataBuffer != null)
            {
                // We're reading into the data buffer
                int bytesRequested = this.dataBuffer.Length - this.bytesReceived;
  
                // Copy the incoming bytes into the buffer
                int bytesTransferred = Math.Min(bytesRequested, bytesAvailable);
                Array.Copy(data, i, this.dataBuffer, this.bytesReceived, bytesTransferred);
                i += bytesTransferred;
  
                // Notify "read completion"
                this.ReadCompleted(bytesTransferred);
            }
            else
            {
                // We're reading into the length prefix buffer
                int bytesRequested = this.lengthBuffer.Length - this.bytesReceived;
  
                // Copy the incoming bytes into the buffer
                int bytesTransferred = Math.Min(bytesRequested, bytesAvailable);
                Array.Copy(data, i, this.lengthBuffer, this.bytesReceived, bytesTransferred);
                i += bytesTransferred;
  
                // Notify "read completion"
                this.ReadCompleted(bytesTransferred);
            }
        }
    }
  
    /// <summary>
    /// Called when a read completes. Parses the received data and calls <see cref="MessageArrived"/> if necessary.
    /// </summary>
    /// <param name="count">The number of bytes read.</param>
    /// <exception cref="System.Net.ProtocolViolationException">If the data received is not a properly-formed message.</exception>
    private void ReadCompleted(int count)
    {
        // Get the number of bytes read into the buffer
        this.bytesReceived += count;
  
        if (this.dataBuffer == null)
        {
            // We're currently receiving the length buffer
  
            if (this.bytesReceived != sizeof(int))
            {
                // We haven't gotten all the length buffer yet: just wait for more data to arrive
            }
            else
            {
                // We've gotten the length buffer
                int length = BitConverter.ToInt32(this.lengthBuffer, 0);
  
                // Sanity check for length < 0
                if (length < 0)
                    throw new System.Net.ProtocolViolationException("Message length is less than zero");
  
                // Another sanity check is needed here for very large packets, to prevent denial-of-service attacks
                if (this.maxMessageSize > 0 && length > this.maxMessageSize)
                    throw new System.Net.ProtocolViolationException("Message length " + length.ToString(System.Globalization.CultureInfo.InvariantCulture) + " is larger than maximum message size " + this.maxMessageSize.ToString(System.Globalization.CultureInfo.InvariantCulture));
  
                // Zero-length packets are allowed as keepalives
                if (length == 0)
                {
                    this.bytesReceived = 0;
                    if (this.MessageArrived != null)
                        this.MessageArrived(new byte[0]);
                }
                else
                {
                    // Create the data buffer and start reading into it
                    this.dataBuffer = new byte[length];
                    this.bytesReceived = 0;
                }
            }
        }
        else
        {
            if (this.bytesReceived != this.dataBuffer.Length)
            {
                // We haven't gotten all the data buffer yet: just wait for more data to arrive
            }
            else
            {
                // We've gotten an entire packet
                if (this.MessageArrived != null)
                    this.MessageArrived(this.dataBuffer);
  
                // Start reading the length buffer again
                this.dataBuffer = null;
                this.bytesReceived = 0;
            }
        }
    }
}

(This post is part of the TCP/IP .NET Sockets FAQ)

16 comments:

  1. @Anonymous:

    As far as I can tell, you're not using the PacketProtocol class defined in this blog post, so I'm not sure why you're posting here.

    A better place to ask for review would be the NCL forum:
    http://social.msdn.microsoft.com/Forums/en-US/ncl/threads

    which I see you have already done:
    http://social.msdn.microsoft.com/Forums/en-US/ncl/thread/3d6681f5-7164-48d7-b703-e63ece938be0

    ReplyDelete
  2. Im not sure if you still check this, but i've tried to use this class as-is, and i find that it only works on the first message. Any subsiquent messages return an empty string.

    Do you have any examples of how to use this class, perhaps im missing something, cheers

    ReplyDelete
  3. @Wannabeuk:

    I just tried it, and it worked fine. Here's my test code:

    [TestMethod]
    public void TestMethod1()
    {
    int messages = 0;
    var packetizer = new PacketProtocol(2000);
    packetizer.MessageArrived += _ => ++messages;
    packetizer.DataReceived(
    PacketProtocol.WrapMessage(new byte[3]).Concat(
    PacketProtocol.WrapMessage(new byte[4])).ToArray());
    Assert.AreEqual(2, messages);
    }

    ReplyDelete
  4. Here's an additional test, including checking the messages passed:

    [TestMethod]
    public void TestMethod1()
    {
    int numMessages = 0;
    var messages = new string[2];
    var packetizer = new PacketProtocol(2000);
    packetizer.MessageArrived += message =>
    {
    messages[numMessages] = Encoding.UTF8.GetString(message);
    ++numMessages;
    };
    packetizer.DataReceived(
    PacketProtocol.WrapMessage(Encoding.UTF8.GetBytes("Hello")).Concat(
    PacketProtocol.WrapMessage(Encoding.UTF8.GetBytes("World"))).ToArray());
    Assert.AreEqual(2, numMessages);
    Assert.AreEqual("Hello", messages[0]);
    Assert.AreEqual("World", messages[1]);
    }

    ReplyDelete
  5. Great code!
    Just one question:
    If a client writes for example 200 bytes of data to the stream, and then stops, and this code reads that data (which is one length buffer + data), won't the code just continue to loop through the rest of the readbuffer (which for example is 4096 bytes) until it finds some length data?

    What I mean is that it will loop through useless data, when there are maybe some new data that has been written after the readBuffer was filled?

    So instead of accepting zero length packets, shouldn't the code be notified and then just throw away the rest of the readBuffer? Or am I totally wrong now?

    ReplyDelete
    Replies
    1. The code in this blog post assumes that DataReceived will be passed the bytes read from the stream, not a partially-filled read buffer. It accepts zero-length packets to allow the other side to detect half-open connections (http://nitoprograms.blogspot.com/2009/05/detection-of-half-open-dropped.html).

      Delete
  6. Hi guys,

    I wanted to ask...this code,does it wrap any kind of messages to IP message?

    I needed to wrap Sampled Value messages based on IEC61850-9-2 to IP message, then to transmit to another PC, unwrap and read it?
    How could I do that?

    Thanks in advance!

    ReplyDelete
    Replies
    1. No. This code only does length prefixing. If you want to wrap it into another kind of frame (like IP), you'd have to write that code yourself.

      Delete
  7. Well done Stephen,

    These series of articles have been very helpful.

    I am working on a middle-ware application that receives messages from multiple applications as xml or fixed-length strings, sends them over TCP/IP socket to a back-end system, gets responses and passes back to the respective application.
    How do I ensure this middle-ware application is able to cope with the many requests it will have to handle?

    ReplyDelete
    Replies
    1. You should be using asynchronous I/O for all your network communications. This is good advice in general, but especially if you have scaling concerns.

      If you have control over the protocol, then impose some kind of message framing on the xml messages. It's possible to detect xml messages without explicit message framing, but it's quite complicated (and inefficient).

      Delete
  8. This code has made my day! I've been struggling to create a message framing interface (without knowing that was what it was called). I was trying to implement a message delimited solution with no luck. I stumbled across your description of message framing and subsequently this article. Fantastic! It worked perfectly as soon as I dropped it into my code and I instantly saw all of my communication errors go away. Thanks buddy!

    ReplyDelete
  9. My thanks as well. This article and the example cleared my thoughts, and I believe I have finally grasped the issue.

    ReplyDelete
  10. Lots of thanks for your articles. I stumbled upon EXACTLY the issue with my server-client working perfectly on my local network, but when used over the internet, i realized that multiple sends dont always come up as multiple receives, and I was not grasping the issue. After reading this framing tutorial, I will now sit down and rewrite the entire backend of my code. Not that much fun, but Im happy to have found this site for guidance!!! Thanks again /Andreas, Sweden

    ReplyDelete
  11. This is great. I have a suggestion though. It's allocating a new dataBuffer for every message it receives. If you are developing an application that is receiving tens of thousands of messages per second, couldn't that impact performance? Couldn't you change it to just allocate the dataBuffer once to be the max length, and then reuse the same buffer? It would mean that for the MessageArrived delegate, you'd have to pass in the length as well as the bytes. Thanks for the sweet code.

    ReplyDelete
    Replies
    1. Yes; in some of my high-volume solutions I do use a "buffer manager" to reuse buffers when possible. However, I did not put that in the sample code here because the purpose of this sample code is to illustrate proper packetizing, which is complex enough without the buffer management logic. :)

      Delete