(This post is part of the TCP/IP .NET Sockets FAQ)
The necessity of message framing is discussed at http://blog.stephencleary.com/2009/04/message-framing.html.
The class below is a modified version of Nito.Async.Sockets.SocketPacketProtocol from the Nito.Async library. The main difference is that the Nito.Async.Sockets.SocketPacketProtocol class communicates directly with the asynchronous Nito socket classes, allowing a more efficient implementation. The PacketProtocol class below is slightly less efficient, but can be used with any socket classes, including synchronous sockets or even non-socket streams such as files.
Note that PacketProtocol is not threadsafe, so the instance members of this class must be synchronized when necessary.
// Original source: http://blog.stephencleary.com/2009/04/sample-code-length-prefix-message.html
/// <summary>
/// Maintains the necessary buffers for applying a length-prefix message framing protocol over a stream.
/// </summary>
/// <remarks>
/// <para>Create one instance of this class for each incoming stream, and assign a handler to <see cref="MessageArrived"/>. As bytes arrive at the stream, pass them to <see cref="DataReceived"/>, which will invoke <see cref="MessageArrived"/> as necessary.</para>
/// <para>If <see cref="DataReceived"/> raises <see cref="System.Net.ProtocolViolationException"/>, then the stream data should be considered invalid. After that point, no methods should be called on that <see cref="PacketProtocol"/> instance.</para>
/// <para>This class uses a 4-byte signed integer length prefix, which allows for message sizes up to 2 GB. Keepalive messages are supported as messages with a length prefix of 0 and no message data.</para>
/// <para>This is EXAMPLE CODE! It is not particularly efficient; in particular, if this class is rewritten so that a particular interface is used (e.g., Socket's IAsyncResult methods), some buffer copies become unnecessary and may be removed.</para>
/// </remarks>
public class PacketProtocol
{
/// <summary>
/// Wraps a message. The wrapped message is ready to send to a stream.
/// </summary>
/// <remarks>
/// <para>Generates a length prefix for the message and returns the combined length prefix and message.</para>
/// </remarks>
/// <param name="message">The message to send.</param>
public static byte[] WrapMessage(byte[] message)
{
// Get the length prefix for the message
byte[] lengthPrefix = BitConverter.GetBytes(message.Length);
// Concatenate the length prefix and the message
byte[] ret = new byte[lengthPrefix.Length + message.Length];
lengthPrefix.CopyTo(ret, 0);
message.CopyTo(ret, lengthPrefix.Length);
return ret;
}
/// <summary>
/// Wraps a keepalive (0-length) message. The wrapped message is ready to send to a stream.
/// </summary>
public static byte[] WrapKeepaliveMessage()
{
return BitConverter.GetBytes((int)0);
}
/// <summary>
/// Initializes a new <see cref="PacketProtocol"/>, limiting message sizes to the given maximum size.
/// </summary>
/// <param name="maxMessageSize">The maximum message size supported by this protocol. This may be less than or equal to zero to indicate no maximum message size.</param>
public PacketProtocol(int maxMessageSize)
{
// We allocate the buffer for receiving message lengths immediately
this.lengthBuffer = new byte[sizeof(int)];
this.maxMessageSize = maxMessageSize;
}
/// <summary>
/// The buffer for the length prefix; this is always 4 bytes long.
/// </summary>
private byte[] lengthBuffer;
/// <summary>
/// The buffer for the data; this is null if we are receiving the length prefix buffer.
/// </summary>
private byte[] dataBuffer;
/// <summary>
/// The number of bytes already read into the buffer (the length buffer if <see cref="dataBuffer"/> is null, otherwise the data buffer).
/// </summary>
private int bytesReceived;
/// <summary>
/// The maximum size of messages allowed.
/// </summary>
private int maxMessageSize;
/// <summary>
/// Indicates the completion of a message read from the stream.
/// </summary>
/// <remarks>
/// <para>This may be called with an empty message, indicating that the other end had sent a keepalive message. This will never be called with a null message.</para>
/// <para>This event is invoked from within a call to <see cref="DataReceived"/>. Handlers for this event should not call <see cref="DataReceived"/>.</para>
/// </remarks>
public Action<byte[]> MessageArrived { get; set; }
/// <summary>
/// Notifies the <see cref="PacketProtocol"/> instance that incoming data has been received from the stream. This method will invoke <see cref="MessageArrived"/> as necessary.
/// </summary>
/// <remarks>
/// <para>This method may invoke <see cref="MessageArrived"/> zero or more times.</para>
/// <para>Zero-length receives are ignored. Many streams use a 0-length read to indicate the end of a stream, but <see cref="PacketProtocol"/> takes no action in this case.</para>
/// </remarks>
/// <param name="data">The data received from the stream. Cannot be null.</param>
/// <exception cref="System.Net.ProtocolViolationException">If the data received is not a properly-formed message.</exception>
public void DataReceived(byte[] data)
{
// Process the incoming data in chunks, as the ReadCompleted requests it
// Logically, we are satisfying read requests with the received data, instead of processing the
// incoming buffer looking for messages.
int i = 0;
while (i != data.Length)
{
// Determine how many bytes we want to transfer to the buffer and transfer them
int bytesAvailable = data.Length - i;
if (this.dataBuffer != null)
{
// We're reading into the data buffer
int bytesRequested = this.dataBuffer.Length - this.bytesReceived;
// Copy the incoming bytes into the buffer
int bytesTransferred = Math.Min(bytesRequested, bytesAvailable);
Array.Copy(data, i, this.dataBuffer, this.bytesReceived, bytesTransferred);
i += bytesTransferred;
// Notify "read completion"
this.ReadCompleted(bytesTransferred);
}
else
{
// We're reading into the length prefix buffer
int bytesRequested = this.lengthBuffer.Length - this.bytesReceived;
// Copy the incoming bytes into the buffer
int bytesTransferred = Math.Min(bytesRequested, bytesAvailable);
Array.Copy(data, i, this.lengthBuffer, this.bytesReceived, bytesTransferred);
i += bytesTransferred;
// Notify "read completion"
this.ReadCompleted(bytesTransferred);
}
}
}
/// <summary>
/// Called when a read completes. Parses the received data and calls <see cref="MessageArrived"/> if necessary.
/// </summary>
/// <param name="count">The number of bytes read.</param>
/// <exception cref="System.Net.ProtocolViolationException">If the data received is not a properly-formed message.</exception>
private void ReadCompleted(int count)
{
// Get the number of bytes read into the buffer
this.bytesReceived += count;
if (this.dataBuffer == null)
{
// We're currently receiving the length buffer
if (this.bytesReceived != sizeof(int))
{
// We haven't gotten all the length buffer yet: just wait for more data to arrive
}
else
{
// We've gotten the length buffer
int length = BitConverter.ToInt32(this.lengthBuffer, 0);
// Sanity check for length < 0
if (length < 0)
throw new System.Net.ProtocolViolationException("Message length is less than zero");
// Another sanity check is needed here for very large packets, to prevent denial-of-service attacks
if (this.maxMessageSize > 0 && length > this.maxMessageSize)
throw new System.Net.ProtocolViolationException("Message length " + length.ToString(System.Globalization.CultureInfo.InvariantCulture) + " is larger than maximum message size " + this.maxMessageSize.ToString(System.Globalization.CultureInfo.InvariantCulture));
// Zero-length packets are allowed as keepalives
if (length == 0)
{
this.bytesReceived = 0;
if (this.MessageArrived != null)
this.MessageArrived(new byte[0]);
}
else
{
// Create the data buffer and start reading into it
this.dataBuffer = new byte[length];
this.bytesReceived = 0;
}
}
}
else
{
if (this.bytesReceived != this.dataBuffer.Length)
{
// We haven't gotten all the data buffer yet: just wait for more data to arrive
}
else
{
// We've gotten an entire packet
if (this.MessageArrived != null)
this.MessageArrived(this.dataBuffer);
// Start reading the length buffer again
this.dataBuffer = null;
this.bytesReceived = 0;
}
}
}
}
(This post is part of the TCP/IP .NET Sockets FAQ)
@Anonymous:
ReplyDeleteAs far as I can tell, you're not using the PacketProtocol class defined in this blog post, so I'm not sure why you're posting here.
A better place to ask for review would be the NCL forum:
http://social.msdn.microsoft.com/Forums/en-US/ncl/threads
which I see you have already done:
http://social.msdn.microsoft.com/Forums/en-US/ncl/thread/3d6681f5-7164-48d7-b703-e63ece938be0
Im not sure if you still check this, but i've tried to use this class as-is, and i find that it only works on the first message. Any subsiquent messages return an empty string.
ReplyDeleteDo you have any examples of how to use this class, perhaps im missing something, cheers
@Wannabeuk:
ReplyDeleteI just tried it, and it worked fine. Here's my test code:
[TestMethod]
public void TestMethod1()
{
int messages = 0;
var packetizer = new PacketProtocol(2000);
packetizer.MessageArrived += _ => ++messages;
packetizer.DataReceived(
PacketProtocol.WrapMessage(new byte[3]).Concat(
PacketProtocol.WrapMessage(new byte[4])).ToArray());
Assert.AreEqual(2, messages);
}
Here's an additional test, including checking the messages passed:
ReplyDelete[TestMethod]
public void TestMethod1()
{
int numMessages = 0;
var messages = new string[2];
var packetizer = new PacketProtocol(2000);
packetizer.MessageArrived += message =>
{
messages[numMessages] = Encoding.UTF8.GetString(message);
++numMessages;
};
packetizer.DataReceived(
PacketProtocol.WrapMessage(Encoding.UTF8.GetBytes("Hello")).Concat(
PacketProtocol.WrapMessage(Encoding.UTF8.GetBytes("World"))).ToArray());
Assert.AreEqual(2, numMessages);
Assert.AreEqual("Hello", messages[0]);
Assert.AreEqual("World", messages[1]);
}
Great code!
ReplyDeleteJust one question:
If a client writes for example 200 bytes of data to the stream, and then stops, and this code reads that data (which is one length buffer + data), won't the code just continue to loop through the rest of the readbuffer (which for example is 4096 bytes) until it finds some length data?
What I mean is that it will loop through useless data, when there are maybe some new data that has been written after the readBuffer was filled?
So instead of accepting zero length packets, shouldn't the code be notified and then just throw away the rest of the readBuffer? Or am I totally wrong now?
The code in this blog post assumes that DataReceived will be passed the bytes read from the stream, not a partially-filled read buffer. It accepts zero-length packets to allow the other side to detect half-open connections (http://nitoprograms.blogspot.com/2009/05/detection-of-half-open-dropped.html).
DeleteHi guys,
ReplyDeleteI wanted to ask...this code,does it wrap any kind of messages to IP message?
I needed to wrap Sampled Value messages based on IEC61850-9-2 to IP message, then to transmit to another PC, unwrap and read it?
How could I do that?
Thanks in advance!
No. This code only does length prefixing. If you want to wrap it into another kind of frame (like IP), you'd have to write that code yourself.
DeleteWell done Stephen,
ReplyDeleteThese series of articles have been very helpful.
I am working on a middle-ware application that receives messages from multiple applications as xml or fixed-length strings, sends them over TCP/IP socket to a back-end system, gets responses and passes back to the respective application.
How do I ensure this middle-ware application is able to cope with the many requests it will have to handle?
You should be using asynchronous I/O for all your network communications. This is good advice in general, but especially if you have scaling concerns.
DeleteIf you have control over the protocol, then impose some kind of message framing on the xml messages. It's possible to detect xml messages without explicit message framing, but it's quite complicated (and inefficient).
This code has made my day! I've been struggling to create a message framing interface (without knowing that was what it was called). I was trying to implement a message delimited solution with no luck. I stumbled across your description of message framing and subsequently this article. Fantastic! It worked perfectly as soon as I dropped it into my code and I instantly saw all of my communication errors go away. Thanks buddy!
ReplyDelete