Async producer/consumer the easy way
This article was originally published at tech.blinemedical.com
In .net 4, a new class called BlockingCollection
was introduced, which let you have a threadsafe producer/consumer queue. Anyone consuming a BlockingCollection
blocks automatically until new items are added. This lets you easily add items to the collection in one thread and use another synchronized thread to consume items. This class is great since before this existed, you had to do all this work with mutexes and it was a lot of extra work (and more error prone). In general, a good time to use a decoupled producer consumer pattern is when you have a slow consuming function and a producer thread that is time sensitive.
Even though BlockingCollection
effectively synchronizes your producer/consumer, you still have to create boilerplate to manage the producer thread and the consumer thread. Also if you wanted to add extra exception handling or a cancellation token, you’d have to add all that yourself too. I wrapped this all up in a BlockingCollectionWrapper
class that handles all this for you.
An example
Here is an example where the consumer takes one second each time it consumes an item.
private readonly ManualResetEvent \_testMutex = new ManualResetEvent(false);
[Test]
public void TestCollection()
{
// create the wrapper
var asyncCollection = new BlockingCollectionWrapper\<string\>();
asyncCollection.FinishedEvent += FinishedEventHandler;
// make sure we dispose of it. this will stop the internal thread
using (asyncCollection)
{
// register a consuming action
asyncCollection.QueueConsumingAction = (producedItem) =\>
{
Thread.Sleep(TimeSpan.FromSeconds(1));
Console.WriteLine(DateTime.Now + ": Consuming item: " + producedItem);
};
// start consuming
asyncCollection.Start();
// start producing
for (int i = 0; i \< 10; i++)
{
Console.WriteLine(DateTime.Now + ": Produced item " + i);
asyncCollection.AddItem(i.ToString());
}
}
// wait for the finished handler to pulse this
\_testMutex.WaitOne();
Assert.True(asyncCollection.Finished);
}
private void FinishedEventHandler(object sender, BlockingCollectionEventArgs e)
{
\_testMutex.Set();
}
This prints out
9/17/2012 6:22:43 PM: Produced item 0
9/17/2012 6:22:43 PM: Produced item 1
9/17/2012 6:22:43 PM: Produced item 2
9/17/2012 6:22:43 PM: Produced item 3
9/17/2012 6:22:43 PM: Produced item 4
9/17/2012 6:22:43 PM: Produced item 5
9/17/2012 6:22:43 PM: Produced item 6
9/17/2012 6:22:43 PM: Produced item 7
9/17/2012 6:22:43 PM: Produced item 8
9/17/2012 6:22:43 PM: Produced item 9
9/17/2012 6:22:44 PM: Consuming item: 0
9/17/2012 6:22:45 PM: Consuming item: 1
9/17/2012 6:22:46 PM: Consuming item: 2
9/17/2012 6:22:47 PM: Consuming item: 3
9/17/2012 6:22:48 PM: Consuming item: 4
9/17/2012 6:22:49 PM: Consuming item: 5
9/17/2012 6:22:50 PM: Consuming item: 6
9/17/2012 6:22:51 PM: Consuming item: 7
9/17/2012 6:22:52 PM: Consuming item: 8
9/17/2012 6:22:53 PM: Consuming item: 9
First, I created the blocking collection wrapper and made sure to put it in a using
block since it’s disposable (the thread waiting on the blocking collection will need to be cleaned up). Then I registered a function to be executed each time an item is consumed. Calling Start()
begins consuming. Once I’m done - even after the using block disposes of the wrapper - the separate consumer thread could still be running (processing whatever is left), but it is no longer blocking on additions and will complete consuming any pending items.
The wrapper
When you call .Start()
we start our independent consumer thread.
/// \<summary\>
/// Start the consumer
/// \</summary\>
public void Start()
{
\_cancellationTokenSource = new CancellationTokenSource();
\_thread = new Thread(QueueConsumer) {Name = "BlockingConsumer"};
\_thread.Start();
}
This is the queue consumer that runs in the separate thread that executes the registered consumer action. The consuming action is locked to make changing the consuming action threadsafe.
/// \<summary\>
/// The actual consumer queue that runs in a seperate thread
/// \</summary\>
private void QueueConsumer()
{
try
{
// Block on \_queue.GetConsumerEnumerable
// When an item is added to the \_queue it will unblock and let us consume
foreach (var item in \_queue.GetConsumingEnumerable(\_cancellationTokenSource.Token))
{
// get a synchronized snapshot of the action
Action\<T\> consumerAction = QueueConsumingAction;
// execute our registered consuming action
if (consumerAction != null)
{
consumerAction(item);
}
}
// dispose of the token source
if (\_cancellationTokenSource != null)
{
\_cancellationTokenSource.Dispose();
}
//Log.Debug(this, "Done with queue consumer");
Finished = true;
if (FinishedEvent != null)
{
FinishedEvent(this, new BlockingCollectionEventArgs());
}
}
catch(OperationCanceledException)
{
//Log.Debug(this, "Blocking collection\<{0}\> cancelled", typeof(T));
}
catch (Exception ex)
{
//Log.Error(this, ex, "Error consuming from queue of type {0}", typeof(T));
}
}
And when the wrapper is disposed, we set CompleteAdding
on the blocking collection which tells the collection to stop waiting for new additions and finish out whatever is left in the queue.
protected void Dispose(bool disposing)
{
if(disposing)
{
if (\_queue !=null && !\_queue.IsAddingCompleted)
{
// mark the queue as complete
// the BlockingConsumer thread will now
// just process the remaining items
\_queue.CompleteAdding();
}
}
}
public void Dispose()
{
Dispose(true);
}
The remaining properties and functions on the wrapper let you
Force abort the consumer thread
Register a Finished event handler; disposing of the wrapper doesn’t mean that no more work is being done. It means that you are no longer adding items and the queue is effectively “closed”. Depending on your consumer function though, this could take some time to complete. This is why it’s good to hook into the finished event so you can be sure that all your processing is complete.
Manually mark the queue as AddedComplete (so the thread stops blocking)
Manually cancel the queue
Check if the queue is ended by looking at the
Finished
property
So to reiterate, the basic idea here is
Create a separate thread that has appropriate exception handling to be blocked while consuming the queued items
Handle cancellation gracefully
Be able to properly end our spawned thread so we don’t have anything leftover
It should be noted that even though this wrapper is built for a single consumer/single producer design, since we are leveraging GetConsumingEnumerable
we could modify the wrapper to allow for multiple threads acting as consumers on the same enumerable. This could give us a single producer/multiple synchronized consumer pattern where only one consumer thread gets the particular item but multiple consumer threads exist and can do work.
Full source and tests provided at our github.