So far, we’ve been learning quite a bit about the core
await support that was added to C# in VS2012. Today, we’ll start with a conceptual overview of the first large, useful library built with
async in mind: TPL Dataflow.
TPL Dataflow allows you to easily create a mesh through which your data flows. The simplest meshes are pipelines (very similar to pipelines in PowerShell). More complex meshes can split and join the data flows, and even contain data flow loops!
Every mesh is composed of a number of blocks which are linked together. TPL Dataflow provides quite a few blocks which address different needs; we will just use the most basic blocks for our examples, but you can see the Introduction to TPL Dataflow document for a full description of the different types of blocks.
A block is a part of a dataflow mesh through which data can flow. The block usually processes the data in some way, but it doesn’t have to.
Blocks themselves have components: usually a buffer component and a task component. Buffers hold data (either data that has been sent to this block but which has not yet been processed, or data that has been processed and is waiting to leave the block).
Different block types have different configurations of buffers and tasks. Some blocks have multiple buffers (e.g., “join” blocks). Tasks have two purposes. Most blocks use a task to push data out of the block; some blocks also use a task to do the processing of the data.
Basic Data Flow
You can push data to a block by calling
await SendAsync(item). Normally, you could just use
Post(item) to (synchronously) place the data into the block’s input buffer. However, it’s possible to throttle a block by limiting its buffer size; in this case, you could use
SendAsync to (asynchronously) wait for space to be available and then place the data into the block’s input buffer.
Getting data out at first appears pretty easy: you can call
ReceiveAsync to retrieve the next item, and there’s an
OutputAvailableAsync that lets you know when the next item is ready to be retrieved. However, there are a couple of “gotchas”:
- When a block finishes processing,
await ReceiveAsyncwill both throw exceptions. This is not ideal.
OutputAvailableAsyncis not very useful if there are multiple receivers reading from the same block.
From a procedural perspective, it would be great if we had a
Tuple<bool, T> TryReceiveAsync that would either retrieve the item or return false, rather than raising an exception.
But if you look at the problem from a dataflow perspective, there is already a solution:
ActionBlock<T> will execute a callback for any data pushed to it. This is the “dataflowish” way of getting the results out of a dataflow mesh: just link your block to an
You can connect two blocks by linking them together. The actual negotiation of data transfer is fairly complex (to allow scenarios such as “join” blocks) - but you usually don’t have to think about it. Just link from source to target (
source.LinkTo(target);), and the library takes care of propagating the data.
There’s no limit to what you can link. It is possible to have a loop in your dataflow mesh.
There are several options you have when setting up a link. You can have the link disengage after so many data items, or specify a filter so only certain data items will be propagated along the link, etc.
A lot of my dataflow “meshes” end up being “pipelines”, so one option I use more than others is
PropagateCompletion, which propagates completion as well as data items.
Eventually, you’re going to finish sending data to your dataflow mesh, and you’re going to want to know when the mesh is done processing it. Each block supports an asynchronous form of completion: you call
Complete and some time later, the block’s
Completion task will complete.
Side note: you should always assume that
Completion might be signaled asynchronously, even if there are no data items to process.
If you have a simple dataflow mesh (like a pipeline), then you can tell the blocks to propagate their completion when you link them together. Then when you’re finished, you can just complete the first block and await the completion of the last block.
If your dataflow mesh is more complex, then you may have to propagate completion manually. The common tools for this are
Task.ContinueWith. If you do have to do this, I recommend that you wrap your dataflow mesh into a separate class (possibly exposing the blocks as properties) and implement your own
Completion members just like a dataflow block does.
Update (2014-12-01): For more details, see Chapter 4 in my Concurrency Cookbook.