{"id":55930,"date":"2010-04-06T10:00:00","date_gmt":"2010-04-06T10:00:00","guid":{"rendered":"https:\/\/blogs.msdn.microsoft.com\/pfxteam\/2010\/04\/06\/parallelextensionsextras-tour-4-blockingcollectionextensions\/"},"modified":"2010-04-06T10:00:00","modified_gmt":"2010-04-06T10:00:00","slug":"parallelextensionsextras-tour-4-blockingcollectionextensions","status":"publish","type":"post","link":"https:\/\/devblogs.microsoft.com\/dotnet\/parallelextensionsextras-tour-4-blockingcollectionextensions\/","title":{"rendered":"ParallelExtensionsExtras Tour &#8211; #4 &#8211; BlockingCollectionExtensions"},"content":{"rendered":"<p><font size=\"3\"><font face=\"Calibri\"><\/p>\n<p class=\"MsoNormal\"><font size=\"3\" face=\"Calibri\"><em>(The full set of ParallelExtensionsExtras Tour posts is available&nbsp;<\/em><a href=\"https:\/\/blogs.msdn.com\/pfxteam\/archive\/2010\/04\/04\/9990342.aspx\"><font color=\"#dd4a21\"><em>here<\/em><\/font><\/a><em>.)<\/em>&nbsp;&nbsp;<\/font><\/p>\n<p class=\"MsoNormal\"><a href=\"http:\/\/msdn.microsoft.com\/en-us\/library\/dd267312(VS.100).aspx\">BlockingCollection&lt;T&gt;<\/a> encapsulates the core synchronization and coordination necessary to enable classic producer\/consumer patterns.<span>&nbsp; <\/span><\/p>\n<p><\/font><a href=\"https:\/\/code.msdn.microsoft.com\/ParExtSamples\"><font size=\"3\" face=\"Calibri\">ParallelExtensionsExtras<\/font><\/a><font size=\"3\"><font face=\"Calibri\"> provides the BlockingCollectionExtensions.cs file, which contains several extension methods for BlockingCollection&lt;T&gt; to make several common solutions easier.<\/p>\n<p><\/font><\/font><\/p>\n<p class=\"MsoNormal\"><b><font size=\"3\"><font face=\"Calibri\">GetConsumingPartitioner<\/p>\n<p><\/font><\/font><\/b><\/p>\n<p class=\"MsoNormal\"><font size=\"3\"><font face=\"Calibri\">It&rsquo;s common to want to use BlockingCollection&lt;T&gt; in conjunction with either Parallel.ForEach or PLINQ in streaming scenarios: while a producer pumps events into the BlockingCollection&lt;T&gt;, ForEach or PLINQ are used to process those events as quickly as possible.<span>&nbsp; <\/span>The connection between BlockingCollection&lt;T&gt; and the consuming parallelization construct can be easily achieved using BlockingCollection&lt;T&gt;&rsquo;s GetConsumingEnumerable method, which returns an enumerable that continually takes from the collection; this enumerable can then be passed as input to ForEach or PLINQ, e.g.<\/p>\n<p><\/font><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\">Parallel.ForEach(_bc.GetConsumingEnumerable(), item =&gt;<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\">{<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>&hellip; \/\/ process item here<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\">});<\/p>\n<p><\/font><\/p>\n<p class=\"Code\">\n<p><font face=\"Consolas\">&nbsp;<\/font><\/p>\n<\/p>\n<p class=\"MsoNormal\"><font size=\"3\"><font face=\"Calibri\">However, there are some potentially important flaws with this simple approach.<span>&nbsp; <\/span>First, BlockingCollection&lt;T&gt;&rsquo;s GetConsumingEnumerable implementation is using BlockingCollection&lt;T&gt;&rsquo;s internal synchronization which already supports multiple consumers concurrently, but ForEach doesn&rsquo;t know that, and its enumerable-partitioning logic also needs to take a lock while accessing the enumerable.<span>&nbsp; <\/span>As such, there&rsquo;s more synchronization here than is actually necessary, resulting in a potentially non-negligable&nbsp;performance hit.<span>&nbsp; <\/span>Second, the partitioning algorithm employed by default by both Parallel.ForEach and PLINQ use chunking in order to minimize synchronization costs: rather than taking the lock once per element, it&#8217;ll take the lock, grab a group of elements (a chunk), and then release the lock.<span>&nbsp; <\/span>While this design can help with overall throughput, for scenarios that are focused more on low latency, that chunking can be prohibitive.<\/p>\n<p><\/font><\/font><\/p>\n<p class=\"MsoNormal\"><font size=\"3\"><font face=\"Calibri\">We can solve both of these problems by writing a custom partitioner for BlockingCollection&lt;T&gt;.<span>&nbsp; <\/span>Our goal for this partitioner is to address both of the drawbacks mentioned previously, and as it turns out doing so is easy.<span>&nbsp; <\/span>A Partitioner&lt;T&gt; represents each partition with an IEnumerator&lt;T&gt;, and the object it uses to generate these partitions is an IEnumerable&lt;T&gt;.<span>&nbsp; <\/span>We already have an appropriate generator of such an IEnumerable&lt;T&gt; in the form of GetConsumingEnumerable&lt;T&gt;, so to support a dynamic number of partitions in the partitioner, all we need to do is return the result of GetConsumingEnumerable&lt;T&gt; from the GetDynamicPartitions override.<span>&nbsp; <\/span>Supporting a static number of partitions then just requires delegating to the dynamic support to retrieve a fixed number of positions.<span>&nbsp; <\/span>Here is the implementation:<\/p>\n<p><\/font><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\">public static Partitioner&lt;T&gt; GetConsumingPartitioner&lt;T&gt;(<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>this BlockingCollection&lt;T&gt; collection)<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\">{<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>return new BlockingCollectionPartitioner&lt;T&gt;(collection);<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\">}<\/p>\n<p><\/font><\/p>\n<p class=\"Code\">\n<p><font face=\"Consolas\">&nbsp;<\/font><\/p>\n<\/p>\n<p class=\"Code\"><font face=\"Consolas\">private class BlockingCollectionPartitioner&lt;T&gt; : Partitioner&lt;T&gt;<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\">{<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>private BlockingCollection&lt;T&gt; _collection;<\/p>\n<p><\/font><\/p>\n<p class=\"Code\">\n<p><font face=\"Consolas\">&nbsp;<\/font><\/p>\n<\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>internal BlockingCollectionPartitioner(<\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; BlockingCollection&lt;T&gt; collection)<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>{<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <\/span>if (collection == null) <\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; throw new ArgumentNullException(&#8220;collection&#8221;);<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <\/span>_collection = collection;<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>}<\/p>\n<p><\/font><\/p>\n<p class=\"Code\">\n<p><font face=\"Consolas\">&nbsp;<\/font><\/p>\n<\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>public override bool SupportsDynamicPartitions { <\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; get { return true; } <\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\">&nbsp;&nbsp;&nbsp; }<\/p>\n<p><\/font><\/p>\n<p class=\"Code\">\n<p><font face=\"Consolas\">&nbsp;<\/font><\/p>\n<\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>public override IList&lt;IEnumerator&lt;T&gt;&gt; GetPartitions(<\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; int partitionCount)<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>{<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <\/span>if (partitionCount &lt; 1) <\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <\/span>throw new ArgumentOutOfRangeException(&#8220;partitionCount&#8221;);<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <\/span>var dynamicPartitioner = GetDynamicPartitions();<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <\/span>return Enumerable.Range(0, partitionCount).Select(_ =&gt; <\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <\/span>dynamicPartitioner.GetEnumerator()).ToArray();<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>}<\/p>\n<p><\/font><\/p>\n<p class=\"Code\">\n<p><font face=\"Consolas\">&nbsp;<\/font><\/p>\n<\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>public override IEnumerable&lt;T&gt; GetDynamicPartitions()<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>{<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <\/span>return _collection.GetConsumingEnumerable();<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>}<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\">}<\/p>\n<p><\/font><\/p>\n<p class=\"Code\">\n<p><font face=\"Consolas\">&nbsp;<\/font><\/p>\n<\/p>\n<p class=\"MsoNormal\"><font size=\"3\"><font face=\"Calibri\">With this in place, we can implement our streaming with Parallel.ForEach just as we did previously, except by using GetConsumingPartitioner instead of GetConsumingEnumerable.<span>&nbsp; <\/span>This relies on the fact that both Parallel.ForEach and PLINQ&rsquo;s AsParallel have overloads that work with partitioners:<\/p>\n<p><\/font><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\">Parallel.ForEach(_bc.GetConsumingPartitioner(), item =&gt;<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\">{<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>&hellip; \/\/ process item here<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\">});<\/p>\n<p><\/font><\/p>\n<p class=\"Code\">\n<p><font face=\"Consolas\">&nbsp;<\/font><\/p>\n<\/p>\n<p class=\"MsoNormal\"><b><font size=\"3\"><font face=\"Calibri\">AddFromEnumerable and AddFromObservable<\/p>\n<p><\/font><\/font><\/b><\/p>\n<p class=\"MsoNormal\"><font size=\"3\"><font face=\"Calibri\">In producer\/consumer scenarios, it&rsquo;s sometimes the case that the producer is presenting the data in the form of an IEnumerable&lt;T&gt; already filled with the relevant data.<span>&nbsp; <\/span>To get that data into a BlockingCollection&lt;T&gt;, we can utilize a simple utility method to transfer the contents into the BlockingCollection&lt;T&gt;:<\/p>\n<p><\/font><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\">public static void AddFromEnumerable&lt;T&gt;(<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>this BlockingCollection&lt;T&gt; target, IEnumerable&lt;T&gt; source, <\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>bool completeAddingWhenDone)<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\">{<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>try { foreach (var item in source) target.Add(item); }<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>finally { if (completeAddingWhenDone) target.CompleteAdding(); }<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\">}<\/p>\n<p><\/font><\/p>\n<p class=\"Code\">\n<p><font face=\"Consolas\">&nbsp;<\/font><\/p>\n<\/p>\n<p class=\"MsoNormal\"><font size=\"3\"><font face=\"Calibri\">Similarly, the data may be arriving asynchronously in the form of an IObservable&lt;T&gt;.<span>&nbsp; <\/span>We can use a simple DelegateBasedObserver class which just implements the IObserver&lt;T&gt; interface to marshal calls to OnNext, OnError, and OnCompleted to the respective delegates passed into the observer instance.<span>&nbsp; <\/span>Then, when someone calls AddFromObservable, we subscribe a delegate that takes any data from the observable and adds it to the BlockingCollection&lt;T&gt;:<\/p>\n<p><\/font><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\">public static IDisposable AddFromObservable&lt;T&gt;(<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>this BlockingCollection&lt;T&gt; target, IObservable&lt;T&gt; source, <\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>bool completeAddingWhenDone)<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\">{<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>return source.Subscribe(new DelegateBasedObserver&lt;T&gt;<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>(<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <\/span>onNext: item =&gt; target.Add(item),<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <\/span>onError: error =&gt; { <\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <\/span>if (completeAddingWhenDone) target.CompleteAdding(); },<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <\/span>onCompleted: () =&gt; { <\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <\/span>if (completeAddingWhenDone) target.CompleteAdding(); }<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>));<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\">}<\/p>\n<p><\/font><\/p>\n<p class=\"Code\">\n<p><font face=\"Consolas\">&nbsp;<\/font><\/p>\n<\/p>\n<p class=\"MsoNormal\"><b><font size=\"3\"><font face=\"Calibri\">ToProducerConsumerCollection<\/p>\n<p><\/font><\/font><\/b><\/p>\n<p class=\"MsoNormal\"><font size=\"3\"><font face=\"Calibri\">The IProducerConsumerCollection&lt;T&gt; interface was introduced in the .NET Framework 4 to represent concurrent collections that support add\/take operations and that are meant to be used in producer\/consumer scenarios.<span>&nbsp; <\/span>.NET 4 ships three such implementations in the form of ConcurrentQueue&lt;T&gt;, ConcurrentStack&lt;T&gt;, and ConcurrentBag&lt;T&gt;.<span>&nbsp; <\/span>BlockingCollection&lt;T&gt; is also thread-safe and supports add\/take operations, but it noticeably does not implement IProducerConsumerCollection.<span>&nbsp; <\/span>This is a decision we struggled with, and in the end we decided not to implement the interface because there didn&rsquo;t seem to be one right answer on how adds and takes should behave.<span>&nbsp; <\/span>Should they be non-blocking or blocking? If blocking, should they incorporate timeouts?<span>&nbsp; Etc. <\/span>Instead, we verified that it&rsquo;s easy to create an IProducerConsumerCollection&lt;T&gt; wrapper for BlockingCollection&lt;T&gt; so that developers can specify whatever semantics they desire, and we added such an implementation into BlockingCollectionExtensions.cs in ParallelExtensionExtras.<\/p>\n<p><\/font><\/font><\/p>\n<p class=\"MsoNormal\"><font size=\"3\"><font face=\"Calibri\">The provided solution includes several overloads of a ToProducerConsumerCollection; here is the largest overload:<\/p>\n<p><\/font><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\">public static IProducerConsumerCollection&lt;T&gt; ToProducerConsumerCollection&lt;T&gt;(<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>this BlockingCollection&lt;T&gt; collection, int millisecondsTimeout, <\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>CancellationToken cancellationToken)<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\">{<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>return new ProducerConsumerWrapper&lt;T&gt;(<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <\/span>collection, millisecondsTimeout, cancellationToken);<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\">}<\/p>\n<p><\/font><\/p>\n<p class=\"Code\">\n<p><font face=\"Consolas\">&nbsp;<\/font><\/p>\n<\/p>\n<p class=\"MsoNormal\"><font size=\"3\"><font face=\"Calibri\">In addition to the target BlockingCollection&lt;T&gt;, both a timeout and a CancellationToken are also provided: these serves to enable blocking operations with timeouts as well as with cancellation semantics, and these two values are used when calling BlockingCollection&lt;T&gt;&rsquo;s TryAdd and TryTake methods.<\/p>\n<p><\/font><\/font><\/p>\n<p class=\"MsoNormal\"><font size=\"3\"><font face=\"Calibri\">The rest of the implementation is straightforward:<\/p>\n<p><\/font><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\">internal sealed class ProducerConsumerWrapper&lt;T&gt; : <\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>IProducerConsumerCollection&lt;T&gt;<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\">{<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>private readonly BlockingCollection&lt;T&gt; _collection;<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>private readonly int _millisecondsTimeout;<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>private readonly CancellationToken _cancellationToken;<\/p>\n<p><\/font><\/p>\n<p class=\"Code\">\n<p><font face=\"Consolas\">&nbsp;<\/font><\/p>\n<\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>public ProducerConsumerWrapper(<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <\/span>BlockingCollection&lt;T&gt; collection, int millisecondsTimeout, <\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <\/span><span>&nbsp;&nbsp;<\/span>CancellationToken cancellationToken) <\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>{ <\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <\/span>if (collection == null) <\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;throw new ArgumentNullException(&#8220;bc&#8221;);<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <\/span>if (millisecondsTimeout &lt; -1) <\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <\/span>throw new ArgumentOutOfRangeException(<\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; &#8220;millisecondsTimeout&#8221;);<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <\/span>_collection = collection;<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <\/span>_millisecondsTimeout = millisecondsTimeout;<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <\/span>_cancellationToken = cancellationToken;<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>}<\/p>\n<p><\/font><\/p>\n<p class=\"Code\">\n<p><font face=\"Consolas\">&nbsp;<\/font><\/p>\n<\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>public bool TryAdd(T item)<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>{<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <\/span>return _collection.TryAdd(<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <\/span>item, _millisecondsTimeout, <\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; _cancellationToken);<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>}<\/p>\n<p><\/font><\/p>\n<p class=\"Code\">\n<p><font face=\"Consolas\">&nbsp;<\/font><\/p>\n<\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>public bool TryTake(out T item)<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>{<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <\/span>return _collection.TryTake(<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <\/span>out item, _millisecondsTimeout, <\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; _cancellationToken);<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>}<\/p>\n<p><\/font><\/p>\n<p class=\"Code\">\n<p><font face=\"Consolas\">&nbsp;<\/font><\/p>\n<\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>public void CopyTo(T[] array, int index)<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>{<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <\/span>_collection.CopyTo(array, index);<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>}<\/p>\n<p><\/font><\/p>\n<p class=\"Code\">\n<p><font face=\"Consolas\">&nbsp;<\/font><\/p>\n<\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>public T[] ToArray()<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>{<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <\/span>return _collection.ToArray();<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>}<\/p>\n<p><\/font><\/p>\n<p class=\"Code\">\n<p><font face=\"Consolas\">&nbsp;<\/font><\/p>\n<\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>&#8230; \/\/ the rest of ICollection&rsquo;s implementation<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\">}<\/p>\n<p><\/font><\/p>\n<p class=\"MsoNormal\">\n<p><font size=\"3\" face=\"Calibri\">&nbsp;<\/font><\/p>\n<\/p>\n<p><\/font><\/p>\n","protected":false},"excerpt":{"rendered":"<p>(The full set of ParallelExtensionsExtras Tour posts is available&nbsp;here.)&nbsp;&nbsp; BlockingCollection&lt;T&gt; encapsulates the core synchronization and coordination necessary to enable classic producer\/consumer patterns.&nbsp; ParallelExtensionsExtras provides the BlockingCollectionExtensions.cs file, which contains several extension methods for BlockingCollection&lt;T&gt; to make several common solutions easier. GetConsumingPartitioner It&rsquo;s common to want to use BlockingCollection&lt;T&gt; in conjunction with either Parallel.ForEach or PLINQ [&hellip;]<\/p>\n","protected":false},"author":360,"featured_media":58792,"comment_status":"open","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"_acf_changed":false,"footnotes":""},"categories":[7908],"tags":[7907,7916,7909,7924],"class_list":["post-55930","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-pfxteam","tag-net-4","tag-coordination-data-structures","tag-parallel-extensions","tag-parallelextensionsextras"],"acf":[],"blog_post_summary":"<p>(The full set of ParallelExtensionsExtras Tour posts is available&nbsp;here.)&nbsp;&nbsp; BlockingCollection&lt;T&gt; encapsulates the core synchronization and coordination necessary to enable classic producer\/consumer patterns.&nbsp; ParallelExtensionsExtras provides the BlockingCollectionExtensions.cs file, which contains several extension methods for BlockingCollection&lt;T&gt; to make several common solutions easier. GetConsumingPartitioner It&rsquo;s common to want to use BlockingCollection&lt;T&gt; in conjunction with either Parallel.ForEach or PLINQ [&hellip;]<\/p>\n","_links":{"self":[{"href":"https:\/\/devblogs.microsoft.com\/dotnet\/wp-json\/wp\/v2\/posts\/55930","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/devblogs.microsoft.com\/dotnet\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/devblogs.microsoft.com\/dotnet\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/dotnet\/wp-json\/wp\/v2\/users\/360"}],"replies":[{"embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/dotnet\/wp-json\/wp\/v2\/comments?post=55930"}],"version-history":[{"count":0,"href":"https:\/\/devblogs.microsoft.com\/dotnet\/wp-json\/wp\/v2\/posts\/55930\/revisions"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/dotnet\/wp-json\/wp\/v2\/media\/58792"}],"wp:attachment":[{"href":"https:\/\/devblogs.microsoft.com\/dotnet\/wp-json\/wp\/v2\/media?parent=55930"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/dotnet\/wp-json\/wp\/v2\/categories?post=55930"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/dotnet\/wp-json\/wp\/v2\/tags?post=55930"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}