c# - Parallel.ForEach stalled when integrated with BlockingCollection -
i adopted implementation of parallel/consumer based on code in this question
class parallelconsumer<t> : idisposable { private readonly int _maxparallel; private readonly action<t> _action; private readonly taskfactory _factory = new taskfactory(); private cancellationtokensource _tokensource; private readonly blockingcollection<t> _entries = new blockingcollection<t>(); private task _task; public parallelconsumer(int maxparallel, action<t> action) { _maxparallel = maxparallel; _action = action; } public void start() { try { _tokensource = new cancellationtokensource(); _task = _factory.startnew( () => { parallel.foreach( _entries.getconsumingenumerable(), new paralleloptions { maxdegreeofparallelism = _maxparallel, cancellationtoken = _tokensource.token }, (item, loopstate) => { log("taking" + item); if (!_tokensource.iscancellationrequested) { _action(item); log("finished" + item); } else { log("not taking" + item); _entries.completeadding(); loopstate.stop(); } }); }, _tokensource.token); } catch (operationcanceledexception oce) { system.diagnostics.debug.writeline(oce); } } private void log(string message) { console.writeline(message); } public void stop() { dispose(); } public void enqueue(t entry) { log("enqueuing" + entry); _entries.add(entry); } public void dispose() { if (_task == null) { return; } _tokensource.cancel(); while (!_task.iscanceled) { } _task.dispose(); _tokensource.dispose(); _task = null; } }
and here test code
class program { static void main(string[] args) { testrepeatedenqueue(100, 1); } private static void testrepeatedenqueue(int itemcount, int parallelcount) { bool[] flags = new bool[itemcount]; var consumer = new parallelconsumer<int>(parallelcount, (i) => { flags[i] = true; } ); consumer.start(); (int = 0; < itemcount; i++) { consumer.enqueue(i); } thread.sleep(1000); debug.assert(flags.all(b => b == true)); } }
the test fails - stuck @ around 93th-item 100 tested. idea part of code caused issue, , how fix it?
you cannot use parallel.foreach()
blockingcollection.getconsumingenumerable()
, have discovered.
for explanation, see blog post:
http://blogs.msdn.com/b/pfxteam/archive/2010/04/06/9990420.aspx
that blog provides source code method called getconsumingpartitioner()
can use solve problem.
excerpt blog:
blockingcollection’s getconsumingenumerable implementation using blockingcollection’s internal synchronization supports multiple consumers concurrently, foreach doesn’t know that, , enumerable-partitioning logic needs take lock while accessing enumerable.
as such, there’s more synchronization here necessary, resulting in potentially non-negligable performance hit.
[also] partitioning algorithm employed default both parallel.foreach , plinq use chunking in order minimize synchronization costs: rather taking lock once per element, it'll take lock, grab group of elements (a chunk), , release lock.
while design can overall throughput, scenarios focused more on low latency, chunking can prohibitive.
Comments
Post a Comment