In my last Dataflow post, I implemented an
async-compatible producer/consumer queue using
BufferBlock<T>. While this approach works, it’s limited to platforms that Dataflow supports (as of the time of this writing: .NET 4.5, .NET 4.0, Windows Store, and Portable net45+win8).
The Microsoft.Bcl.Async package extends
async support to .NET 4.5/4.0, Windows Store, Silverlight 5/4, Windows Phone 8.0/7.5, and portable libraries targeting any subset of these. Today we’ll build an
async-compatible producer/consumer queue that is supported on all those same platforms. This would be useful, e.g., for a portable library that had to target both Windows Store and Windows Phone.
Traditional producer/consumer queues are easily built using locks and condition variables (or monitors). I want the ability to throttle the queue with a maximum number of elements, so my implementation will use one lock and two condition variables. One condition variable will be signaled when the queue is not full (releasing a producer), and the other condition variable will be signaled when the queue is not empty (releasing a consumer).
Here’s a simple implementation using
async-compatible synchronization primitives from AsyncEx (which supports all the same targets as Microsoft.Bcl.Async):
Most of this is boilerplate. The interesting parts are in
DequeueAsync. They both parallel each other: take the lock, wait until their operation can be performed, perform the operation, and notify the other side that the other condition is met.
One question that comes up with condition variables is why use a while loop instead of just an if statement? It’s actually because the condition may not be true by the time the method resumes! Consider this sequence of events (keeping in mind that waiting on a condition variable temporarily releases the lock, and re-acquires it after the condition variable is signaled):
- One producer is attempting to enqueue an item to a full queue. It waits on the
- A consumer takes an item from the queue, signaling
notFulland releasing the lock.
- The producer is signaled and is about to re-acquire the lock when it is suspended.
- Another producer calls
EnqueueAsync, takes the lock, sees that the queue is not full, enqueues its item, and completes, releasing the lock.
- The original producer resumes execution and re-acquires the lock. However, the queue is now full again.
This is a very unlikely scenario, but it is possible. For this reason, condition variables are almost always used with
while loops. There may be occasional situations where an
if statement would suffice (e.g., if there is only one producer and it only produced one item at a time), but it’s barely an optimization at all. It’s safer to always use
while loops with condition variables.
There is an implementation of this type in the AsyncEx library which is more complex than the simple version in this blog post. The AsyncEx version includes:
- Full cancellation support.
- Marking a queue as “complete for adding”. Any attempt to enqueue to a queue that is complete for adding will fail. Attempts to dequeue from a queue that is complete for adding will also fail once the queue is empty.
- The ability to attempt an enqueue/dequeue to/from multiple queues simultaneously, with only one enqueue/dequeue actually taking place.
Try*variants for all operations.
Update (2014-12-01): For more details, see Recipes 8.8 and 8.10 in my Concurrency Cookbook.