Async Producer-Consumer Queue 3: More Flexibility

Last time we implemented an async-compatible producer/consumer queue using portable async-ready synchronization primitives. This time, we’ll give up some of that portability but increase flexibility: we’ll build an async-compatible producer/consumer collection that can be used as a queue (FIFO), stack (LIFO), or bag (unordered).

The BCL has already done some of the hard work for us here. The BlockingCollection type provides a wrapper around any kind of IProducerConsumerCollection, including ConcurrentQueue, ConcurrentStack, and ConcurrentBag. Our goal is to create the async-compatible equivalent of BlockingCollection.

Actually, it’s pretty simple. It’s almost exactly like our last implementation, which wrapped a Queue. This time, we wrap an IProducerConsumerCollection:

public sealed class AsyncCollection<T>
{
    // The underlying collection of items.
    private readonly IProducerConsumerCollection<T> collection;

    // The maximum number of items allowed.
    private readonly int maxCount;

    // Synchronization primitives.
    private readonly AsyncLock mutex;
    private readonly AsyncConditionVariable notFull;
    private readonly AsyncConditionVariable notEmpty;

    public AsyncCollection(IProducerConsumerCollection<T> collection = null, int maxCount = int.MaxValue)
    {
        if (maxCount <= 0)
            throw new ArgumentOutOfRangeException("maxCount", "The maximum count must be greater than zero.");
        this.collection = collection ?? new ConcurrentQueue<T>();
        this.maxCount = maxCount;

        mutex = new AsyncLock();
        notFull = new AsyncConditionVariable(mutex);
        notEmpty = new AsyncConditionVariable(mutex);
    }

    // Convenience properties to make the code a bit clearer.
    private bool Empty { get { return collection.Count == 0; } }
    private bool Full { get { return collection.Count == maxCount; } }

    public async Task AddAsync(T item)
    {
        using (await mutex.LockAsync())
        {
            while (Full)
                await notFull.WaitAsync();

            if (!collection.TryAdd(item))
                throw new InvalidOperationException("The underlying collection refused the item.");
            notEmpty.NotifyOne();
        }
    }

    public async Task<T> TakeAsync()
    {
        using (await mutex.LockAsync())
        {
            while (Empty)
                await notEmpty.WaitAsync();

            T ret;
            if (!collection.TryTake(out ret))
                throw new InvalidOperationException("The underlying collection refused to provide an item.");
            notFull.NotifyOne();
            return ret;
        }
    }
}

Now we have an AsyncCollection that can be used as a front for many different kinds of collections.

Update (2014-12-01): For more details, see Recipe 8.9 in my Concurrency Cookbook.