c# - Parallel.ForEach stalled when integrated with BlockingCollection -


i adopted implementation of parallel/consumer based on code in this question

class parallelconsumer<t> : idisposable {     private readonly int _maxparallel;     private readonly action<t> _action;     private readonly taskfactory _factory = new taskfactory();     private cancellationtokensource _tokensource;     private readonly blockingcollection<t> _entries = new blockingcollection<t>();     private task _task;      public parallelconsumer(int maxparallel, action<t> action)     {         _maxparallel = maxparallel;         _action = action;     }      public void start()     {         try         {             _tokensource = new cancellationtokensource();             _task = _factory.startnew(                 () =>                 {                     parallel.foreach(                         _entries.getconsumingenumerable(),                         new paralleloptions { maxdegreeofparallelism = _maxparallel, cancellationtoken = _tokensource.token },                         (item, loopstate) =>                         {                             log("taking" + item);                             if (!_tokensource.iscancellationrequested)                             {                                 _action(item);                                 log("finished" + item);                             }                             else                             {                                 log("not taking" + item);                                 _entries.completeadding();                                 loopstate.stop();                             }                         });                 },                 _tokensource.token);         }         catch (operationcanceledexception oce)         {             system.diagnostics.debug.writeline(oce);         }     }      private void log(string message)     {         console.writeline(message);     }      public void stop()     {         dispose();     }      public void enqueue(t entry)     {         log("enqueuing" + entry);         _entries.add(entry);     }      public void dispose()     {         if (_task == null)         {             return;         }          _tokensource.cancel();         while (!_task.iscanceled)         {         }          _task.dispose();         _tokensource.dispose();         _task = null;     } } 

and here test code

class program {     static void main(string[] args)     {         testrepeatedenqueue(100, 1);     }      private static void testrepeatedenqueue(int itemcount, int parallelcount)     {         bool[] flags = new bool[itemcount];         var consumer = new parallelconsumer<int>(parallelcount,                                               (i) =>                                               {                                                   flags[i] = true;                                               }             );         consumer.start();         (int = 0; < itemcount; i++)         {             consumer.enqueue(i);         }         thread.sleep(1000);         debug.assert(flags.all(b => b == true));        } } 

the test fails - stuck @ around 93th-item 100 tested. idea part of code caused issue, , how fix it?

you cannot use parallel.foreach() blockingcollection.getconsumingenumerable(), have discovered.

for explanation, see blog post:

http://blogs.msdn.com/b/pfxteam/archive/2010/04/06/9990420.aspx

that blog provides source code method called getconsumingpartitioner() can use solve problem.

excerpt blog:

blockingcollection’s getconsumingenumerable implementation using blockingcollection’s internal synchronization supports multiple consumers concurrently, foreach doesn’t know that, , enumerable-partitioning logic needs take lock while accessing enumerable.

as such, there’s more synchronization here necessary, resulting in potentially non-negligable performance hit.

[also] partitioning algorithm employed default both parallel.foreach , plinq use chunking in order minimize synchronization costs: rather taking lock once per element, it'll take lock, grab group of elements (a chunk), , release lock.

while design can overall throughput, scenarios focused more on low latency, chunking can prohibitive.


Comments

Popular posts from this blog

javascript - DIV "hiding" when changing dropdown value -

Does Firefox offer AppleScript support to get URL of windows? -

android - How to install packaged app on Firefox for mobile? -