Today we’ll build on what we learned about Dataflow to build an async-compatible producer/consumer queue.
A producer/consumer queue is a classic problem in multithreading: you have one (or more) “producers” which are producing data, you have one (or more) “consumers” which are consuming data, and you need some kind of data structure that will receive data from the producer(s) and provide it to the consumer(s).
Using TPL Dataflow, this is incredibly easy: a BufferBlock is an async-ready producer/consumer queue. We’ll start with the simple example of a single producer and consumer, and build from there.
Our producer can just enqueue a sequence of values, and then mark the queue as complete:
Similarly, the consumer can just await until a value is ready in the queue, and then add it to its collection of received values. Note that this works only if we have a single consumer; if we have multiple consumers, then they would all see the output available, but they wouldn’t all be able to receive it.
We can wrap these up in a simple unit test:
A common requirement for producer/consumer queues is a throttling restriction. We don’t want to run out of memory if the producers can produce data items faster than consumers can consume them!
First, we need to change our producer. Post may return false once the throttling threshold is reached, so we’ll switch to the asynchronous SendAsync (and make the producer itself asynchronous), which will automatically wait for space to be available:
Dataflow blocks have built-in support for throttling, so adding this to the mesh is rather easy once we have an asynchronous producer. We can say there should never be more than 5 data items in the queue, and it’s a one-line change (the line that defines the mesh):
We can have multiple producers pushing data to the same queue (or any dataflow mesh). The only thing we have to change is when the block is completed.
First, we remove the block completion from the producers:
Now, we can write a simple “producer manager” method that will complete the queue when all producers complete:
The updated test looks like this (note that because we have three independent producers, the order of results is no longer guaranteed):
The consumer side of this example does work, but it’s not done in a TPL Dataflowish sort of way. It starts to get more complicated when we consider multiple consumers, because there’s no TryReceiveAsync available on our block.
Instead of fighting the flow, let’s change our consumer side to be TPL Dataflowish. Specifically, we’re going to replace the consumer method with a dataflow ActionBlock:
Notice that we set the ExecutionDataflowBlockOptions.BoundedCapacity for the consumer block to 1. This is necessary if we want to maintain throttling. Without this set, the producers could produce tons of data items which pass through the queue block and get buffered up in the consumer block (making our queue throttling meaningless).
Now that we have a consumer block, it’s much more straightforward to add multiple consumers:
Note that ExecutionDataflowBlockOptions.BoundedCapacity is now performing another important function: in addition to maintaining the throttling, it is performing load balancing. If this is left at the default value (DataflowBlockOptions.Unbounded), then all of the data items will end up in the first consumer, which will buffer them up until it can process them. With the buffer limited to a single data item, the queue will offer its item to the next consumer when the first consumer is busy.
In summary, we just reviewed two scenarios where we should set BoundedCapacity to a low number: when we want to maintain throttling throughout a pipeline, and when we have a “T” in our dataflow mesh.