1 /**
2 `std.parallelism` implements high-level primitives for SMP parallelism.
3 These include parallel foreach, parallel reduce, parallel eager map, pipelining
4 and future/promise parallelism.  `std.parallelism` is recommended when the
5 same operation is to be executed in parallel on different data, or when a
6 function is to be executed in a background thread and its result returned to a
7 well-defined main thread.  For communication between arbitrary threads, see
8 `std.concurrency`.
9 
10 `std.parallelism` is based on the concept of a `Task`.  A `Task` is an
11 object that represents the fundamental unit of work in this library and may be
12 executed in parallel with any other `Task`.  Using `Task`
13 directly allows programming with a future/promise paradigm.  All other
14 supported parallelism paradigms (parallel foreach, map, reduce, pipelining)
15 represent an additional level of abstraction over `Task`.  They
16 automatically create one or more `Task` objects, or closely related types
17 that are conceptually identical but not part of the public API.
18 
19 After creation, a `Task` may be executed in a new thread, or submitted
20 to a `TaskPool` for execution.  A `TaskPool` encapsulates a task queue
21 and its worker threads.  Its purpose is to efficiently map a large
22 number of `Task`s onto a smaller number of threads.  A task queue is a
23 FIFO queue of `Task` objects that have been submitted to the
24 `TaskPool` and are awaiting execution.  A worker thread is a thread that
25 is associated with exactly one task queue.  It executes the `Task` at the
26 front of its queue when the queue has work available, or sleeps when
27 no work is available.  Each task queue is associated with zero or
28 more worker threads.  If the result of a `Task` is needed before execution
29 by a worker thread has begun, the `Task` can be removed from the task queue
30 and executed immediately in the thread where the result is needed.
31 
32 Warning:  Unless marked as `@trusted` or `@safe`, artifacts in
33           this module allow implicit data sharing between threads and cannot
34           guarantee that client code is free from low level data races.
35 
36 Source:    $(PHOBOSSRC std/parallelism.d)
37 Author:  David Simcha
38 Copyright:  Copyright (c) 2009-2011, David Simcha.
39 License:    $(HTTP boost.org/LICENSE_1_0.txt, Boost License 1.0)
40 */
41 module std.parallelism;
42 
43 version (OSX)
44     version = Darwin;
45 else version (iOS)
46     version = Darwin;
47 else version (TVOS)
48     version = Darwin;
49 else version (WatchOS)
50     version = Darwin;
51 
52 ///
53 @system unittest
54 {
55     import std.algorithm.iteration : map;
56     import std.math.operations : isClose;
57     import std.parallelism : taskPool;
58     import std.range : iota;
59 
60     // Parallel reduce can be combined with
61     // std.algorithm.iteration.map to interesting effect.
62     // The following example (thanks to Russel Winder)
63     // calculates pi by quadrature  using
64     // std.algorithm.map and TaskPool.reduce.
65     // getTerm is evaluated in parallel as needed by
66     // TaskPool.reduce.
67     //
68     // Timings on an Intel i5-3450 quad core machine
69     // for n = 1_000_000_000:
70     //
71     // TaskPool.reduce:       1.067 s
72     // std.algorithm.reduce:  4.011 s
73 
74     enum n = 1_000_000;
75     enum delta = 1.0 / n;
76 
77     alias getTerm = (int i)
78     {
79         immutable x = ( i - 0.5 ) * delta;
80         return delta / ( 1.0 + x * x ) ;
81     };
82 
83     immutable pi = 4.0 * taskPool.reduce!"a + b"(n.iota.map!getTerm);
84 
85     assert(pi.isClose(3.14159, 1e-5));
86 }
87 
88 import core.atomic;
89 import core.memory;
90 import core.sync.condition;
91 import core.thread;
92 
93 import std.functional;
94 import std.meta;
95 import std.range.primitives;
96 import std.traits;
97 
98 /*
99 (For now public undocumented with reserved name.)
100 
101 A lazily initialized global constant. The underlying value is a shared global
102 statically initialized to `outOfBandValue` which must not be a legit value of
103 the constant. Upon the first call the situation is detected and the global is
104 initialized by calling `initializer`. The initializer is assumed to be pure
105 (even if not marked as such), i.e. return the same value upon repeated calls.
106 For that reason, no special precautions are taken so `initializer` may be called
107 more than one time leading to benign races on the cached value.
108 
109 In the quiescent state the cost of the function is an atomic load from a global.
110 
111 Params:
112     T = The type of the pseudo-constant (may be qualified)
113     outOfBandValue = A value that cannot be valid, it is used for initialization
114     initializer = The function performing initialization; must be `nothrow`
115 
116 Returns:
117     The lazily initialized value
118 */
119 @property pure
120 T __lazilyInitializedConstant(T, alias outOfBandValue, alias initializer)()
121 if (is(Unqual!T : T)
122     && is(typeof(initializer()) : T)
123     && is(typeof(outOfBandValue) : T))
124 {
125     static T impl() nothrow
126     {
127         // Thread-local cache
128         static Unqual!T tls = outOfBandValue;
129         auto local = tls;
130         // Shortest path, no atomic operations
131         if (local != outOfBandValue) return local;
132         // Process-level cache
133         static shared Unqual!T result = outOfBandValue;
134         // Initialize both process-level cache and tls
135         local = atomicLoad(result);
136         if (local == outOfBandValue)
137         {
138             local = initializer();
139             atomicStore(result, local);
140         }
141         tls = local;
142         return local;
143     }
144 
145     import std.traits : SetFunctionAttributes;
146     alias Fun = SetFunctionAttributes!(typeof(&impl), "D",
147         functionAttributes!(typeof(&impl)) | FunctionAttribute.pure_);
148     auto purified = (() @trusted => cast(Fun) &impl)();
149     return purified();
150 }
151 
152 // Returns the size of a cache line.
153 alias cacheLineSize =
154     __lazilyInitializedConstant!(immutable(size_t), size_t.max, cacheLineSizeImpl);
155 
156 private size_t cacheLineSizeImpl() @nogc nothrow @trusted
157 {
158     size_t result = 0;
159     import core.cpuid : datacache;
160     foreach (ref const cachelevel; datacache)
161     {
162         if (cachelevel.lineSize > result && cachelevel.lineSize < uint.max)
163         {
164             result = cachelevel.lineSize;
165         }
166     }
167     return result;
168 }
169 
170 @nogc @safe nothrow unittest
171 {
172     assert(cacheLineSize == cacheLineSizeImpl);
173 }
174 
175 /* Atomics code.  These forward to core.atomic, but are written like this
176    for two reasons:
177 
178    1.  They used to actually contain ASM code and I don' want to have to change
179        to directly calling core.atomic in a zillion different places.
180 
181    2.  core.atomic has some misc. issues that make my use cases difficult
182        without wrapping it.  If I didn't wrap it, casts would be required
183        basically everywhere.
184 */
185 private void atomicSetUbyte(T)(ref T stuff, T newVal)
186 if (__traits(isIntegral, T) && is(T : ubyte))
187 {
188     //core.atomic.cas(cast(shared) &stuff, stuff, newVal);
189     atomicStore(*(cast(shared) &stuff), newVal);
190 }
191 
192 private ubyte atomicReadUbyte(T)(ref T val)
193 if (__traits(isIntegral, T) && is(T : ubyte))
194 {
195     return atomicLoad(*(cast(shared) &val));
196 }
197 
198 // This gets rid of the need for a lot of annoying casts in other parts of the
199 // code, when enums are involved.
200 private bool atomicCasUbyte(T)(ref T stuff, T testVal, T newVal)
201 if (__traits(isIntegral, T) && is(T : ubyte))
202 {
203     return core.atomic.cas(cast(shared) &stuff, testVal, newVal);
204 }
205 
206 /*--------------------- Generic helper functions, etc.------------------------*/
207 private template MapType(R, functions...)
208 {
209     static assert(functions.length);
210 
211     ElementType!R e = void;
212     alias MapType =
213         typeof(adjoin!(staticMap!(unaryFun, functions))(e));
214 }
215 
216 private template ReduceType(alias fun, R, E)
217 {
218     alias ReduceType = typeof(binaryFun!fun(E.init, ElementType!R.init));
219 }
220 
221 private template noUnsharedAliasing(T)
222 {
223     enum bool noUnsharedAliasing = !hasUnsharedAliasing!T;
224 }
225 
226 // This template tests whether a function may be executed in parallel from
227 // @safe code via Task.executeInNewThread().  There is an additional
228 // requirement for executing it via a TaskPool.  (See isSafeReturn).
229 private template isSafeTask(F)
230 {
231     enum bool isSafeTask =
232         (functionAttributes!F & (FunctionAttribute.safe | FunctionAttribute.trusted)) != 0 &&
233         (functionAttributes!F & FunctionAttribute.ref_) == 0 &&
234         (isFunctionPointer!F || !hasUnsharedAliasing!F) &&
235         allSatisfy!(noUnsharedAliasing, Parameters!F);
236 }
237 
238 @safe unittest
239 {
240     alias F1 = void function() @safe;
241     alias F2 = void function();
242     alias F3 = void function(uint, string) @trusted;
243     alias F4 = void function(uint, char[]);
244 
245     static assert( isSafeTask!F1);
246     static assert(!isSafeTask!F2);
247     static assert( isSafeTask!F3);
248     static assert(!isSafeTask!F4);
249 
250     alias F5 = uint[] function(uint, string) pure @trusted;
251     static assert( isSafeTask!F5);
252 }
253 
254 // This function decides whether Tasks that meet all of the other requirements
255 // for being executed from @safe code can be executed on a TaskPool.
256 // When executing via TaskPool, it's theoretically possible
257 // to return a value that is also pointed to by a worker thread's thread local
258 // storage.  When executing from executeInNewThread(), the thread that executed
259 // the Task is terminated by the time the return value is visible in the calling
260 // thread, so this is a non-issue.  It's also a non-issue for pure functions
261 // since they can't read global state.
262 private template isSafeReturn(T)
263 {
264     static if (!hasUnsharedAliasing!(T.ReturnType))
265     {
266         enum isSafeReturn = true;
267     }
268     else static if (T.isPure)
269     {
270         enum isSafeReturn = true;
271     }
272     else
273     {
274         enum isSafeReturn = false;
275     }
276 }
277 
278 private template randAssignable(R)
279 {
280     enum randAssignable = isRandomAccessRange!R && hasAssignableElements!R;
281 }
282 
283 private enum TaskStatus : ubyte
284 {
285     notStarted,
286     inProgress,
287     done
288 }
289 
290 private template AliasReturn(alias fun, T...)
291 {
292     alias AliasReturn = typeof({ T args; return fun(args); });
293 }
294 
295 // Should be private, but std.algorithm.reduce is used in the zero-thread case
296 // and won't work w/ private.
297 template reduceAdjoin(functions...)
298 {
299     static if (functions.length == 1)
300     {
301         alias reduceAdjoin = binaryFun!(functions[0]);
302     }
303     else
304     {
305         T reduceAdjoin(T, U)(T lhs, U rhs)
306         {
307             alias funs = staticMap!(binaryFun, functions);
308 
309             foreach (i, Unused; typeof(lhs.expand))
310             {
311                 lhs.expand[i] = funs[i](lhs.expand[i], rhs);
312             }
313 
314             return lhs;
315         }
316     }
317 }
318 
319 private template reduceFinish(functions...)
320 {
321     static if (functions.length == 1)
322     {
323         alias reduceFinish = binaryFun!(functions[0]);
324     }
325     else
326     {
327         T reduceFinish(T)(T lhs, T rhs)
328         {
329             alias funs = staticMap!(binaryFun, functions);
330 
331             foreach (i, Unused; typeof(lhs.expand))
332             {
333                 lhs.expand[i] = funs[i](lhs.expand[i], rhs.expand[i]);
334             }
335 
336             return lhs;
337         }
338     }
339 }
340 
341 private template isRoundRobin(R : RoundRobinBuffer!(C1, C2), C1, C2)
342 {
343     enum isRoundRobin = true;
344 }
345 
346 private template isRoundRobin(T)
347 {
348     enum isRoundRobin = false;
349 }
350 
351 @safe unittest
352 {
353     static assert( isRoundRobin!(RoundRobinBuffer!(void delegate(char[]), bool delegate())));
354     static assert(!isRoundRobin!(uint));
355 }
356 
357 // This is the base "class" for all of the other tasks.  Using C-style
358 // polymorphism to allow more direct control over memory allocation, etc.
359 private struct AbstractTask
360 {
361     AbstractTask* prev;
362     AbstractTask* next;
363 
364     // Pointer to a function that executes this task.
365     void function(void*) runTask;
366 
367     Throwable exception;
368     ubyte taskStatus = TaskStatus.notStarted;
369 
370     bool done() @property
371     {
372         if (atomicReadUbyte(taskStatus) == TaskStatus.done)
373         {
374             if (exception)
375             {
376                 throw exception;
377             }
378 
379             return true;
380         }
381 
382         return false;
383     }
384 
385     void job()
386     {
387         runTask(&this);
388     }
389 }
390 
391 /**
392 `Task` represents the fundamental unit of work.  A `Task` may be
393 executed in parallel with any other `Task`.  Using this struct directly
394 allows future/promise parallelism.  In this paradigm, a function (or delegate
395 or other callable) is executed in a thread other than the one it was called
396 from.  The calling thread does not block while the function is being executed.
397 A call to `workForce`, `yieldForce`, or `spinForce` is used to
398 ensure that the `Task` has finished executing and to obtain the return
399 value, if any.  These functions and `done` also act as full memory barriers,
400 meaning that any memory writes made in the thread that executed the `Task`
401 are guaranteed to be visible in the calling thread after one of these functions
402 returns.
403 
404 The $(REF task, std,parallelism) and $(REF scopedTask, std,parallelism) functions can
405 be used to create an instance of this struct.  See `task` for usage examples.
406 
407 Function results are returned from `yieldForce`, `spinForce` and
408 `workForce` by ref.  If `fun` returns by ref, the reference will point
409 to the returned reference of `fun`.  Otherwise it will point to a
410 field in this struct.
411 
412 Copying of this struct is disabled, since it would provide no useful semantics.
413 If you want to pass this struct around, you should do so by reference or
414 pointer.
415 
416 Bugs:  Changes to `ref` and `out` arguments are not propagated to the
417        call site, only to `args` in this struct.
418 */
419 struct Task(alias fun, Args...)
420 {
421     private AbstractTask base = {runTask : &impl};
422     private alias base this;
423 
424     private @property AbstractTask* basePtr()
425     {
426         return &base;
427     }
428 
429     private static void impl(void* myTask)
430     {
431         import std.algorithm.internal : addressOf;
432 
433         Task* myCastedTask = cast(typeof(this)*) myTask;
434         static if (is(ReturnType == void))
435         {
436             fun(myCastedTask._args);
437         }
438         else static if (is(typeof(&(fun(myCastedTask._args)))))
439         {
440             myCastedTask.returnVal = addressOf(fun(myCastedTask._args));
441         }
442         else
443         {
444             myCastedTask.returnVal = fun(myCastedTask._args);
445         }
446     }
447 
448     private TaskPool pool;
449     private bool isScoped;  // True if created with scopedTask.
450 
451     Args _args;
452 
453     /**
454     The arguments the function was called with.  Changes to `out` and
455     `ref` arguments will be visible here.
456     */
457     static if (__traits(isSame, fun, run))
458     {
459         alias args = _args[1..$];
460     }
461     else
462     {
463         alias args = _args;
464     }
465 
466 
467     // The purpose of this code is to decide whether functions whose
468     // return values have unshared aliasing can be executed via
469     // TaskPool from @safe code.  See isSafeReturn.
470     static if (__traits(isSame, fun, run))
471     {
472         static if (isFunctionPointer!(_args[0]))
473         {
474             private enum bool isPure =
475             (functionAttributes!(Args[0]) & FunctionAttribute.pure_) != 0;
476         }
477         else
478         {
479             // BUG:  Should check this for delegates too, but std.traits
480             //       apparently doesn't allow this.  isPure is irrelevant
481             //       for delegates, at least for now since shared delegates
482             //       don't work.
483             private enum bool isPure = false;
484         }
485 
486     }
487     else
488     {
489         // We already know that we can't execute aliases in @safe code, so
490         // just put a dummy value here.
491         private enum bool isPure = false;
492     }
493 
494 
495     /**
496     The return type of the function called by this `Task`.  This can be
497     `void`.
498     */
499     alias ReturnType = typeof(fun(_args));
500 
501     static if (!is(ReturnType == void))
502     {
503         static if (is(typeof(&fun(_args))))
504         {
505             // Ref return.
506             ReturnType* returnVal;
507 
508             ref ReturnType fixRef(ReturnType* val)
509             {
510                 return *val;
511             }
512 
513         }
514         else
515         {
516             ReturnType returnVal;
517 
518             ref ReturnType fixRef(ref ReturnType val)
519             {
520                 return val;
521             }
522         }
523     }
524 
525     private void enforcePool()
526     {
527         import std.exception : enforce;
528         enforce(this.pool !is null, "Job not submitted yet.");
529     }
530 
531     static if (Args.length > 0)
532     {
533         private this(Args args)
534         {
535             _args = args;
536         }
537     }
538 
539     // Work around DMD bug https://issues.dlang.org/show_bug.cgi?id=6588,
540     // allow immutable elements.
541     static if (allSatisfy!(isAssignable, Args))
542     {
543         typeof(this) opAssign(typeof(this) rhs)
544         {
545             foreach (i, Type; typeof(this.tupleof))
546             {
547                 this.tupleof[i] = rhs.tupleof[i];
548             }
549             return this;
550         }
551     }
552     else
553     {
554         @disable typeof(this) opAssign(typeof(this) rhs);
555     }
556 
557     /**
558     If the `Task` isn't started yet, execute it in the current thread.
559     If it's done, return its return value, if any.  If it's in progress,
560     busy spin until it's done, then return the return value.  If it threw
561     an exception, rethrow that exception.
562 
563     This function should be used when you expect the result of the
564     `Task` to be available on a timescale shorter than that of an OS
565     context switch.
566      */
567     @property ref ReturnType spinForce() @trusted
568     {
569         enforcePool();
570 
571         this.pool.tryDeleteExecute(basePtr);
572 
573         while (atomicReadUbyte(this.taskStatus) != TaskStatus.done) {}
574 
575         if (exception)
576         {
577             throw exception;
578         }
579 
580         static if (!is(ReturnType == void))
581         {
582             return fixRef(this.returnVal);
583         }
584     }
585 
586     /**
587     If the `Task` isn't started yet, execute it in the current thread.
588     If it's done, return its return value, if any.  If it's in progress,
589     wait on a condition variable.  If it threw an exception, rethrow that
590     exception.
591 
592     This function should be used for expensive functions, as waiting on a
593     condition variable introduces latency, but avoids wasted CPU cycles.
594      */
595     @property ref ReturnType yieldForce() @trusted
596     {
597         enforcePool();
598         this.pool.tryDeleteExecute(basePtr);
599 
600         if (done)
601         {
602             static if (is(ReturnType == void))
603             {
604                 return;
605             }
606             else
607             {
608                 return fixRef(this.returnVal);
609             }
610         }
611 
612         pool.waiterLock();
613         scope(exit) pool.waiterUnlock();
614 
615         while (atomicReadUbyte(this.taskStatus) != TaskStatus.done)
616         {
617             pool.waitUntilCompletion();
618         }
619 
620         if (exception)
621         {
622             throw exception; // nocoverage
623         }
624 
625         static if (!is(ReturnType == void))
626         {
627             return fixRef(this.returnVal);
628         }
629     }
630 
631     /**
632     If this `Task` was not started yet, execute it in the current
633     thread.  If it is finished, return its result.  If it is in progress,
634     execute any other `Task` from the `TaskPool` instance that
635     this `Task` was submitted to until this one
636     is finished.  If it threw an exception, rethrow that exception.
637     If no other tasks are available or this `Task` was executed using
638     `executeInNewThread`, wait on a condition variable.
639      */
640     @property ref ReturnType workForce() @trusted
641     {
642         enforcePool();
643         this.pool.tryDeleteExecute(basePtr);
644 
645         while (true)
646         {
647             if (done)    // done() implicitly checks for exceptions.
648             {
649                 static if (is(ReturnType == void))
650                 {
651                     return;
652                 }
653                 else
654                 {
655                     return fixRef(this.returnVal);
656                 }
657             }
658 
659             AbstractTask* job;
660             {
661                 // Locking explicitly and calling popNoSync() because
662                 // pop() waits on a condition variable if there are no Tasks
663                 // in the queue.
664 
665                 pool.queueLock();
666                 scope(exit) pool.queueUnlock();
667                 job = pool.popNoSync();
668             }
669 
670 
671             if (job !is null)
672             {
673 
674                 version (verboseUnittest)
675                 {
676                     stderr.writeln("Doing workForce work.");
677                 }
678 
679                 pool.doJob(job);
680 
681                 if (done)
682                 {
683                     static if (is(ReturnType == void))
684                     {
685                         return;
686                     }
687                     else
688                     {
689                         return fixRef(this.returnVal);
690                     }
691                 }
692             }
693             else
694             {
695                 version (verboseUnittest)
696                 {
697                     stderr.writeln("Yield from workForce.");
698                 }
699 
700                 return yieldForce;
701             }
702         }
703     }
704 
705     /**
706     Returns `true` if the `Task` is finished executing.
707 
708     Throws:  Rethrows any exception thrown during the execution of the
709              `Task`.
710     */
711     @property bool done() @trusted
712     {
713         // Explicitly forwarded for documentation purposes.
714         return base.done;
715     }
716 
717     /**
718     Create a new thread for executing this `Task`, execute it in the
719     newly created thread, then terminate the thread.  This can be used for
720     future/promise parallelism.  An explicit priority may be given
721     to the `Task`.  If one is provided, its value is forwarded to
722     `core.thread.Thread.priority`. See $(REF task, std,parallelism) for
723     usage example.
724     */
725     void executeInNewThread() @trusted
726     {
727         pool = new TaskPool(basePtr);
728     }
729 
730     /// Ditto
731     void executeInNewThread(int priority) @trusted
732     {
733         pool = new TaskPool(basePtr, priority);
734     }
735 
736     @safe ~this()
737     {
738         if (isScoped && pool !is null && taskStatus != TaskStatus.done)
739         {
740             yieldForce;
741         }
742     }
743 
744     // When this is uncommented, it somehow gets called on returning from
745     // scopedTask even though the struct shouldn't be getting copied.
746     //@disable this(this) {}
747 }
748 
749 // Calls `fpOrDelegate` with `args`.  This is an
750 // adapter that makes `Task` work with delegates, function pointers and
751 // functors instead of just aliases.
752 ReturnType!F run(F, Args...)(F fpOrDelegate, ref Args args)
753 {
754     return fpOrDelegate(args);
755 }
756 
757 /**
758 Creates a `Task` on the GC heap that calls an alias.  This may be executed
759 via `Task.executeInNewThread` or by submitting to a
760 $(REF TaskPool, std,parallelism).  A globally accessible instance of
761 `TaskPool` is provided by $(REF taskPool, std,parallelism).
762 
763 Returns:  A pointer to the `Task`.
764 
765 Example:
766 ---
767 // Read two files into memory at the same time.
768 import std.file;
769 
770 void main()
771 {
772     // Create and execute a Task for reading
773     // foo.txt.
774     auto file1Task = task!read("foo.txt");
775     file1Task.executeInNewThread();
776 
777     // Read bar.txt in parallel.
778     auto file2Data = read("bar.txt");
779 
780     // Get the results of reading foo.txt.
781     auto file1Data = file1Task.yieldForce;
782 }
783 ---
784 
785 ---
786 // Sorts an array using a parallel quick sort algorithm.
787 // The first partition is done serially.  Both recursion
788 // branches are then executed in parallel.
789 //
790 // Timings for sorting an array of 1,000,000 doubles on
791 // an Athlon 64 X2 dual core machine:
792 //
793 // This implementation:               176 milliseconds.
794 // Equivalent serial implementation:  280 milliseconds
795 void parallelSort(T)(T[] data)
796 {
797     // Sort small subarrays serially.
798     if (data.length < 100)
799     {
800          std.algorithm.sort(data);
801          return;
802     }
803 
804     // Partition the array.
805     swap(data[$ / 2], data[$ - 1]);
806     auto pivot = data[$ - 1];
807     bool lessThanPivot(T elem) { return elem < pivot; }
808 
809     auto greaterEqual = partition!lessThanPivot(data[0..$ - 1]);
810     swap(data[$ - greaterEqual.length - 1], data[$ - 1]);
811 
812     auto less = data[0..$ - greaterEqual.length - 1];
813     greaterEqual = data[$ - greaterEqual.length..$];
814 
815     // Execute both recursion branches in parallel.
816     auto recurseTask = task!parallelSort(greaterEqual);
817     taskPool.put(recurseTask);
818     parallelSort(less);
819     recurseTask.yieldForce;
820 }
821 ---
822 */
823 auto task(alias fun, Args...)(Args args)
824 {
825     return new Task!(fun, Args)(args);
826 }
827 
828 /**
829 Creates a `Task` on the GC heap that calls a function pointer, delegate, or
830 class/struct with overloaded opCall.
831 
832 Example:
833 ---
834 // Read two files in at the same time again,
835 // but this time use a function pointer instead
836 // of an alias to represent std.file.read.
837 import std.file;
838 
839 void main()
840 {
841     // Create and execute a Task for reading
842     // foo.txt.
843     auto file1Task = task(&read!string, "foo.txt", size_t.max);
844     file1Task.executeInNewThread();
845 
846     // Read bar.txt in parallel.
847     auto file2Data = read("bar.txt");
848 
849     // Get the results of reading foo.txt.
850     auto file1Data = file1Task.yieldForce;
851 }
852 ---
853 
854 Notes: This function takes a non-scope delegate, meaning it can be
855        used with closures.  If you can't allocate a closure due to objects
856        on the stack that have scoped destruction, see `scopedTask`, which
857        takes a scope delegate.
858  */
859 auto task(F, Args...)(F delegateOrFp, Args args)
860 if (is(typeof(delegateOrFp(args))) && !isSafeTask!F)
861 {
862     return new Task!(run, F, Args)(delegateOrFp, args);
863 }
864 
865 /**
866 Version of `task` usable from `@safe` code.  Usage mechanics are
867 identical to the non-@safe case, but safety introduces some restrictions:
868 
869 1.  `fun` must be @safe or @trusted.
870 
871 2.  `F` must not have any unshared aliasing as defined by
872     $(REF hasUnsharedAliasing, std,traits).  This means it
873     may not be an unshared delegate or a non-shared class or struct
874     with overloaded `opCall`.  This also precludes accepting template
875     alias parameters.
876 
877 3.  `Args` must not have unshared aliasing.
878 
879 4.  `fun` must not return by reference.
880 
881 5.  The return type must not have unshared aliasing unless `fun` is
882     `pure` or the `Task` is executed via `executeInNewThread` instead
883     of using a `TaskPool`.
884 
885 */
886 @trusted auto task(F, Args...)(F fun, Args args)
887 if (__traits(compiles, () @safe => fun(args)) && isSafeTask!F)
888 {
889     return new Task!(run, F, Args)(fun, args);
890 }
891 
892 @safe unittest
893 {
894     static struct Oops {
895         int convert() {
896             *cast(int*) 0xcafebabe = 0xdeadbeef;
897             return 0;
898         }
899         alias convert this;
900     }
901     static void foo(int) @safe {}
902 
903     static assert(!__traits(compiles, task(&foo, Oops.init)));
904     static assert(!__traits(compiles, scopedTask(&foo, Oops.init)));
905 }
906 
907 /**
908 These functions allow the creation of `Task` objects on the stack rather
909 than the GC heap.  The lifetime of a `Task` created by `scopedTask`
910 cannot exceed the lifetime of the scope it was created in.
911 
912 `scopedTask` might be preferred over `task`:
913 
914 1.  When a `Task` that calls a delegate is being created and a closure
915     cannot be allocated due to objects on the stack that have scoped
916     destruction.  The delegate overload of `scopedTask` takes a `scope`
917     delegate.
918 
919 2.  As a micro-optimization, to avoid the heap allocation associated with
920     `task` or with the creation of a closure.
921 
922 Usage is otherwise identical to `task`.
923 
924 Notes:  `Task` objects created using `scopedTask` will automatically
925 call `Task.yieldForce` in their destructor if necessary to ensure
926 the `Task` is complete before the stack frame they reside on is destroyed.
927 */
928 auto scopedTask(alias fun, Args...)(Args args)
929 {
930     auto ret = Task!(fun, Args)(args);
931     ret.isScoped = true;
932     return ret;
933 }
934 
935 /// Ditto
936 auto scopedTask(F, Args...)(scope F delegateOrFp, Args args)
937 if (is(typeof(delegateOrFp(args))) && !isSafeTask!F)
938 {
939     auto ret = Task!(run, F, Args)(delegateOrFp, args);
940     ret.isScoped = true;
941     return ret;
942 }
943 
944 /// Ditto
945 @trusted auto scopedTask(F, Args...)(F fun, Args args)
946 if (__traits(compiles, () @safe => fun(args)) && isSafeTask!F)
947 {
948     auto ret = Task!(run, F, Args)(fun, args);
949     ret.isScoped = true;
950     return ret;
951 }
952 
953 /**
954 The total number of CPU cores available on the current machine, as reported by
955 the operating system.
956 */
957 alias totalCPUs =
958     __lazilyInitializedConstant!(immutable(uint), uint.max, totalCPUsImpl);
959 
960 uint totalCPUsImpl() @nogc nothrow @trusted
961 {
962     version (Windows)
963     {
964         // BUGS:  Only works on Windows 2000 and above.
965         import core.sys.windows.winbase : SYSTEM_INFO, GetSystemInfo;
966         import std.algorithm.comparison : max;
967         SYSTEM_INFO si;
968         GetSystemInfo(&si);
969         return max(1, cast(uint) si.dwNumberOfProcessors);
970     }
971     else version (linux)
972     {
973         import core.stdc.stdlib : calloc;
974         import core.stdc.string : memset;
975         import core.sys.linux.sched : CPU_ALLOC_SIZE, CPU_FREE, CPU_COUNT, CPU_COUNT_S, cpu_set_t, sched_getaffinity;
976         import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf;
977 
978         int count = 0;
979 
980         /**
981          * According to ruby's source code, CPU_ALLOC() doesn't work as expected.
982          *  see: https://github.com/ruby/ruby/commit/7d9e04de496915dd9e4544ee18b1a0026dc79242
983          *
984          *  The hardcode number also comes from ruby's source code.
985          *  see: https://github.com/ruby/ruby/commit/0fa75e813ecf0f5f3dd01f89aa76d0e25ab4fcd4
986          */
987         for (int n = 64; n <= 16384; n *= 2)
988         {
989             size_t size = CPU_ALLOC_SIZE(count);
990             if (size >= 0x400)
991             {
992                 auto cpuset = cast(cpu_set_t*) calloc(1, size);
993                 if (cpuset is null) break;
994                 if (sched_getaffinity(0, size, cpuset) == 0)
995                 {
996                     count = CPU_COUNT_S(size, cpuset);
997                 }
998                 CPU_FREE(cpuset);
999             }
1000             else
1001             {
1002                 cpu_set_t cpuset;
1003                 if (sched_getaffinity(0, cpu_set_t.sizeof, &cpuset) == 0)
1004                 {
1005                     count = CPU_COUNT(&cpuset);
1006                 }
1007             }
1008 
1009             if (count > 0)
1010                 return cast(uint) count;
1011         }
1012 
1013         return cast(uint) sysconf(_SC_NPROCESSORS_ONLN);
1014     }
1015     else version (Darwin)
1016     {
1017         import core.sys.darwin.sys.sysctl : sysctlbyname;
1018         uint result;
1019         size_t len = result.sizeof;
1020         sysctlbyname("hw.physicalcpu", &result, &len, null, 0);
1021         return result;
1022     }
1023     else version (DragonFlyBSD)
1024     {
1025         import core.sys.dragonflybsd.sys.sysctl : sysctlbyname;
1026         uint result;
1027         size_t len = result.sizeof;
1028         sysctlbyname("hw.ncpu", &result, &len, null, 0);
1029         return result;
1030     }
1031     else version (FreeBSD)
1032     {
1033         import core.sys.freebsd.sys.sysctl : sysctlbyname;
1034         uint result;
1035         size_t len = result.sizeof;
1036         sysctlbyname("hw.ncpu", &result, &len, null, 0);
1037         return result;
1038     }
1039     else version (NetBSD)
1040     {
1041         import core.sys.netbsd.sys.sysctl : sysctlbyname;
1042         uint result;
1043         size_t len = result.sizeof;
1044         sysctlbyname("hw.ncpu", &result, &len, null, 0);
1045         return result;
1046     }
1047     else version (Solaris)
1048     {
1049         import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf;
1050         return cast(uint) sysconf(_SC_NPROCESSORS_ONLN);
1051     }
1052     else version (OpenBSD)
1053     {
1054         import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf;
1055         return cast(uint) sysconf(_SC_NPROCESSORS_ONLN);
1056     }
1057     else version (Hurd)
1058     {
1059         import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf;
1060         return cast(uint) sysconf(_SC_NPROCESSORS_ONLN);
1061     }
1062     else
1063     {
1064         static assert(0, "Don't know how to get N CPUs on this OS.");
1065     }
1066 }
1067 
1068 /*
1069 This class serves two purposes:
1070 
1071 1.  It distinguishes std.parallelism threads from other threads so that
1072     the std.parallelism daemon threads can be terminated.
1073 
1074 2.  It adds a reference to the pool that the thread is a member of,
1075     which is also necessary to allow the daemon threads to be properly
1076     terminated.
1077 */
1078 private final class ParallelismThread : Thread
1079 {
1080     this(void delegate() dg)
1081     {
1082         super(dg);
1083     }
1084 
1085     TaskPool pool;
1086 }
1087 
1088 // Kill daemon threads.
1089 shared static ~this()
1090 {
1091     foreach (ref thread; Thread)
1092     {
1093         auto pthread = cast(ParallelismThread) thread;
1094         if (pthread is null) continue;
1095         auto pool = pthread.pool;
1096         if (!pool.isDaemon) continue;
1097         pool.stop();
1098         pthread.join();
1099     }
1100 }
1101 
1102 /**
1103 This class encapsulates a task queue and a set of worker threads.  Its purpose
1104 is to efficiently map a large number of `Task`s onto a smaller number of
1105 threads.  A task queue is a FIFO queue of `Task` objects that have been
1106 submitted to the `TaskPool` and are awaiting execution.  A worker thread is a
1107 thread that executes the `Task` at the front of the queue when one is
1108 available and sleeps when the queue is empty.
1109 
1110 This class should usually be used via the global instantiation
1111 available via the $(REF taskPool, std,parallelism) property.
1112 Occasionally it is useful to explicitly instantiate a `TaskPool`:
1113 
1114 1.  When you want `TaskPool` instances with multiple priorities, for example
1115     a low priority pool and a high priority pool.
1116 
1117 2.  When the threads in the global task pool are waiting on a synchronization
1118     primitive (for example a mutex), and you want to parallelize the code that
1119     needs to run before these threads can be resumed.
1120 
1121 Note: The worker threads in this pool will not stop until
1122       `stop` or `finish` is called, even if the main thread
1123       has finished already. This may lead to programs that
1124       never end. If you do not want this behaviour, you can set `isDaemon`
1125       to true.
1126  */
1127 final class TaskPool
1128 {
1129 private:
1130 
1131     // A pool can either be a regular pool or a single-task pool.  A
1132     // single-task pool is a dummy pool that's fired up for
1133     // Task.executeInNewThread().
1134     bool isSingleTask;
1135 
1136     ParallelismThread[] pool;
1137     Thread singleTaskThread;
1138 
1139     AbstractTask* head;
1140     AbstractTask* tail;
1141     PoolState status = PoolState.running;
1142     Condition workerCondition;
1143     Condition waiterCondition;
1144     Mutex queueMutex;
1145     Mutex waiterMutex;  // For waiterCondition
1146 
1147     // The instanceStartIndex of the next instance that will be created.
1148     __gshared size_t nextInstanceIndex = 1;
1149 
1150     // The index of the current thread.
1151     static size_t threadIndex;
1152 
1153     // The index of the first thread in this instance.
1154     immutable size_t instanceStartIndex;
1155 
1156     // The index that the next thread to be initialized in this pool will have.
1157     size_t nextThreadIndex;
1158 
1159     enum PoolState : ubyte
1160     {
1161         running,
1162         finishing,
1163         stopNow
1164     }
1165 
1166     void doJob(AbstractTask* job)
1167     {
1168         assert(job.taskStatus == TaskStatus.inProgress);
1169         assert(job.next is null);
1170         assert(job.prev is null);
1171 
1172         scope(exit)
1173         {
1174             if (!isSingleTask)
1175             {
1176                 waiterLock();
1177                 scope(exit) waiterUnlock();
1178                 notifyWaiters();
1179             }
1180         }
1181 
1182         try
1183         {
1184             job.job();
1185         }
1186         catch (Throwable e)
1187         {
1188             job.exception = e;
1189         }
1190 
1191         atomicSetUbyte(job.taskStatus, TaskStatus.done);
1192     }
1193 
1194     // This function is used for dummy pools created by Task.executeInNewThread().
1195     void doSingleTask()
1196     {
1197         // No synchronization.  Pool is guaranteed to only have one thread,
1198         // and the queue is submitted to before this thread is created.
1199         assert(head);
1200         auto t = head;
1201         t.next = t.prev = head = null;
1202         doJob(t);
1203     }
1204 
1205     // This function performs initialization for each thread that affects
1206     // thread local storage and therefore must be done from within the
1207     // worker thread.  It then calls executeWorkLoop().
1208     void startWorkLoop()
1209     {
1210         // Initialize thread index.
1211         {
1212             queueLock();
1213             scope(exit) queueUnlock();
1214             threadIndex = nextThreadIndex;
1215             nextThreadIndex++;
1216         }
1217 
1218         executeWorkLoop();
1219     }
1220 
1221     // This is the main work loop that worker threads spend their time in
1222     // until they terminate.  It's also entered by non-worker threads when
1223     // finish() is called with the blocking variable set to true.
1224     void executeWorkLoop()
1225     {
1226         while (atomicReadUbyte(status) != PoolState.stopNow)
1227         {
1228             AbstractTask* task = pop();
1229             if (task is null)
1230             {
1231                 if (atomicReadUbyte(status) == PoolState.finishing)
1232                 {
1233                     atomicSetUbyte(status, PoolState.stopNow);
1234                     return;
1235                 }
1236             }
1237             else
1238             {
1239                 doJob(task);
1240             }
1241         }
1242     }
1243 
1244     // Pop a task off the queue.
1245     AbstractTask* pop()
1246     {
1247         queueLock();
1248         scope(exit) queueUnlock();
1249         auto ret = popNoSync();
1250         while (ret is null && status == PoolState.running)
1251         {
1252             wait();
1253             ret = popNoSync();
1254         }
1255         return ret;
1256     }
1257 
1258     AbstractTask* popNoSync()
1259     out(returned)
1260     {
1261         /* If task.prev and task.next aren't null, then another thread
1262          * can try to delete this task from the pool after it's
1263          * alreadly been deleted/popped.
1264          */
1265         if (returned !is null)
1266         {
1267             assert(returned.next is null);
1268             assert(returned.prev is null);
1269         }
1270     }
1271     do
1272     {
1273         if (isSingleTask) return null;
1274 
1275         AbstractTask* returned = head;
1276         if (head !is null)
1277         {
1278             head = head.next;
1279             returned.prev = null;
1280             returned.next = null;
1281             returned.taskStatus = TaskStatus.inProgress;
1282         }
1283         if (head !is null)
1284         {
1285             head.prev = null;
1286         }
1287 
1288         return returned;
1289     }
1290 
1291     // Push a task onto the queue.
1292     void abstractPut(AbstractTask* task)
1293     {
1294         queueLock();
1295         scope(exit) queueUnlock();
1296         abstractPutNoSync(task);
1297     }
1298 
1299     void abstractPutNoSync(AbstractTask* task)
1300     in
1301     {
1302         assert(task);
1303     }
1304     out
1305     {
1306         import std.conv : text;
1307 
1308         assert(tail.prev !is tail);
1309         assert(tail.next is null, text(tail.prev, '\t', tail.next));
1310         if (tail.prev !is null)
1311         {
1312             assert(tail.prev.next is tail, text(tail.prev, '\t', tail.next));
1313         }
1314     }
1315     do
1316     {
1317         // Not using enforce() to save on function call overhead since this
1318         // is a performance critical function.
1319         if (status != PoolState.running)
1320         {
1321             throw new Error(
1322                 "Cannot submit a new task to a pool after calling " ~
1323                 "finish() or stop()."
1324             );
1325         }
1326 
1327         task.next = null;
1328         if (head is null)   //Queue is empty.
1329         {
1330             head = task;
1331             tail = task;
1332             tail.prev = null;
1333         }
1334         else
1335         {
1336             assert(tail);
1337             task.prev = tail;
1338             tail.next = task;
1339             tail = task;
1340         }
1341         notify();
1342     }
1343 
1344     void abstractPutGroupNoSync(AbstractTask* h, AbstractTask* t)
1345     {
1346         if (status != PoolState.running)
1347         {
1348             throw new Error(
1349                 "Cannot submit a new task to a pool after calling " ~
1350                 "finish() or stop()."
1351             );
1352         }
1353 
1354         if (head is null)
1355         {
1356             head = h;
1357             tail = t;
1358         }
1359         else
1360         {
1361             h.prev = tail;
1362             tail.next = h;
1363             tail = t;
1364         }
1365 
1366         notifyAll();
1367     }
1368 
1369     void tryDeleteExecute(AbstractTask* toExecute)
1370     {
1371         if (isSingleTask) return;
1372 
1373         if ( !deleteItem(toExecute) )
1374         {
1375             return;
1376         }
1377 
1378         try
1379         {
1380             toExecute.job();
1381         }
1382         catch (Exception e)
1383         {
1384             toExecute.exception = e;
1385         }
1386 
1387         atomicSetUbyte(toExecute.taskStatus, TaskStatus.done);
1388     }
1389 
1390     bool deleteItem(AbstractTask* item)
1391     {
1392         queueLock();
1393         scope(exit) queueUnlock();
1394         return deleteItemNoSync(item);
1395     }
1396 
1397     bool deleteItemNoSync(AbstractTask* item)
1398     {
1399         if (item.taskStatus != TaskStatus.notStarted)
1400         {
1401             return false;
1402         }
1403         item.taskStatus = TaskStatus.inProgress;
1404 
1405         if (item is head)
1406         {
1407             // Make sure head gets set properly.
1408             popNoSync();
1409             return true;
1410         }
1411         if (item is tail)
1412         {
1413             tail = tail.prev;
1414             if (tail !is null)
1415             {
1416                 tail.next = null;
1417             }
1418             item.next = null;
1419             item.prev = null;
1420             return true;
1421         }
1422         if (item.next !is null)
1423         {
1424             assert(item.next.prev is item);  // Check queue consistency.
1425             item.next.prev = item.prev;
1426         }
1427         if (item.prev !is null)
1428         {
1429             assert(item.prev.next is item);  // Check queue consistency.
1430             item.prev.next = item.next;
1431         }
1432         item.next = null;
1433         item.prev = null;
1434         return true;
1435     }
1436 
1437     void queueLock()
1438     {
1439         assert(queueMutex);
1440         if (!isSingleTask) queueMutex.lock();
1441     }
1442 
1443     void queueUnlock()
1444     {
1445         assert(queueMutex);
1446         if (!isSingleTask) queueMutex.unlock();
1447     }
1448 
1449     void waiterLock()
1450     {
1451         if (!isSingleTask) waiterMutex.lock();
1452     }
1453 
1454     void waiterUnlock()
1455     {
1456         if (!isSingleTask) waiterMutex.unlock();
1457     }
1458 
1459     void wait()
1460     {
1461         if (!isSingleTask) workerCondition.wait();
1462     }
1463 
1464     void notify()
1465     {
1466         if (!isSingleTask) workerCondition.notify();
1467     }
1468 
1469     void notifyAll()
1470     {
1471         if (!isSingleTask) workerCondition.notifyAll();
1472     }
1473 
1474     void waitUntilCompletion()
1475     {
1476         if (isSingleTask)
1477         {
1478             singleTaskThread.join();
1479         }
1480         else
1481         {
1482             waiterCondition.wait();
1483         }
1484     }
1485 
1486     void notifyWaiters()
1487     {
1488         if (!isSingleTask) waiterCondition.notifyAll();
1489     }
1490 
1491     // Private constructor for creating dummy pools that only have one thread,
1492     // only execute one Task, and then terminate.  This is used for
1493     // Task.executeInNewThread().
1494     this(AbstractTask* task, int priority = int.max)
1495     {
1496         assert(task);
1497 
1498         // Dummy value, not used.
1499         instanceStartIndex = 0;
1500 
1501         this.isSingleTask = true;
1502         task.taskStatus = TaskStatus.inProgress;
1503         this.head = task;
1504         singleTaskThread = new Thread(&doSingleTask);
1505         singleTaskThread.start();
1506 
1507         // Disabled until writing code to support
1508         // running thread with specified priority
1509         // See https://issues.dlang.org/show_bug.cgi?id=8960
1510 
1511         /*if (priority != int.max)
1512         {
1513             singleTaskThread.priority = priority;
1514         }*/
1515     }
1516 
1517 public:
1518     // This is used in parallel_algorithm but is too unstable to document
1519     // as public API.
1520     size_t defaultWorkUnitSize(size_t rangeLen) const @safe pure nothrow
1521     {
1522         import std.algorithm.comparison : max;
1523 
1524         if (this.size == 0)
1525         {
1526             return max(rangeLen, 1);
1527         }
1528 
1529         immutable size_t eightSize = 4 * (this.size + 1);
1530         auto ret = (rangeLen / eightSize) + ((rangeLen % eightSize == 0) ? 0 : 1);
1531         return max(ret, 1);
1532     }
1533 
1534     /**
1535     Default constructor that initializes a `TaskPool` with
1536     `totalCPUs` - 1 worker threads.  The minus 1 is included because the
1537     main thread will also be available to do work.
1538 
1539     Note:  On single-core machines, the primitives provided by `TaskPool`
1540            operate transparently in single-threaded mode.
1541      */
1542     this() @trusted
1543     {
1544         this(totalCPUs - 1);
1545     }
1546 
1547     /**
1548     Allows for custom number of worker threads.
1549     */
1550     this(size_t nWorkers) @trusted
1551     {
1552         synchronized(typeid(TaskPool))
1553         {
1554             instanceStartIndex = nextInstanceIndex;
1555 
1556             // The first worker thread to be initialized will have this index,
1557             // and will increment it.  The second worker to be initialized will
1558             // have this index plus 1.
1559             nextThreadIndex = instanceStartIndex;
1560             nextInstanceIndex += nWorkers;
1561         }
1562 
1563         queueMutex = new Mutex(this);
1564         waiterMutex = new Mutex();
1565         workerCondition = new Condition(queueMutex);
1566         waiterCondition = new Condition(waiterMutex);
1567 
1568         pool = new ParallelismThread[nWorkers];
1569         foreach (ref poolThread; pool)
1570         {
1571             poolThread = new ParallelismThread(&startWorkLoop);
1572             poolThread.pool = this;
1573             poolThread.start();
1574         }
1575     }
1576 
1577     /**
1578     Implements a parallel foreach loop over a range.  This works by implicitly
1579     creating and submitting one `Task` to the `TaskPool` for each worker
1580     thread.  A work unit is a set of consecutive elements of `range` to
1581     be processed by a worker thread between communication with any other
1582     thread.  The number of elements processed per work unit is controlled by the
1583     `workUnitSize` parameter.  Smaller work units provide better load
1584     balancing, but larger work units avoid the overhead of communicating
1585     with other threads frequently to fetch the next work unit.  Large work
1586     units also avoid false sharing in cases where the range is being modified.
1587     The less time a single iteration of the loop takes, the larger
1588     `workUnitSize` should be.  For very expensive loop bodies,
1589     `workUnitSize` should  be 1.  An overload that chooses a default work
1590     unit size is also available.
1591 
1592     Example:
1593     ---
1594     // Find the logarithm of every number from 1 to
1595     // 10_000_000 in parallel.
1596     auto logs = new double[10_000_000];
1597 
1598     // Parallel foreach works with or without an index
1599     // variable.  It can iterate by ref if range.front
1600     // returns by ref.
1601 
1602     // Iterate over logs using work units of size 100.
1603     foreach (i, ref elem; taskPool.parallel(logs, 100))
1604     {
1605         elem = log(i + 1.0);
1606     }
1607 
1608     // Same thing, but use the default work unit size.
1609     //
1610     // Timings on an Athlon 64 X2 dual core machine:
1611     //
1612     // Parallel foreach:  388 milliseconds
1613     // Regular foreach:   619 milliseconds
1614     foreach (i, ref elem; taskPool.parallel(logs))
1615     {
1616         elem = log(i + 1.0);
1617     }
1618     ---
1619 
1620     Notes:
1621 
1622     The memory usage of this implementation is guaranteed to be constant
1623     in `range.length`.
1624 
1625     Breaking from a parallel foreach loop via a break, labeled break,
1626     labeled continue, return or goto statement throws a
1627     `ParallelForeachError`.
1628 
1629     In the case of non-random access ranges, parallel foreach buffers lazily
1630     to an array of size `workUnitSize` before executing the parallel portion
1631     of the loop.  The exception is that, if a parallel foreach is executed
1632     over a range returned by `asyncBuf` or `map`, the copying is elided
1633     and the buffers are simply swapped.  In this case `workUnitSize` is
1634     ignored and the work unit size is set to the  buffer size of `range`.
1635 
1636     A memory barrier is guaranteed to be executed on exit from the loop,
1637     so that results produced by all threads are visible in the calling thread.
1638 
1639     $(B Exception Handling):
1640 
1641     When at least one exception is thrown from inside a parallel foreach loop,
1642     the submission of additional `Task` objects is terminated as soon as
1643     possible, in a non-deterministic manner.  All executing or
1644     enqueued work units are allowed to complete.  Then, all exceptions that
1645     were thrown by any work unit are chained using `Throwable.next` and
1646     rethrown.  The order of the exception chaining is non-deterministic.
1647     */
1648     ParallelForeach!R parallel(R)(R range, size_t workUnitSize)
1649     {
1650         import std.exception : enforce;
1651         enforce(workUnitSize > 0, "workUnitSize must be > 0.");
1652         alias RetType = ParallelForeach!R;
1653         return RetType(this, range, workUnitSize);
1654     }
1655 
1656 
1657     /// Ditto
1658     ParallelForeach!R parallel(R)(R range)
1659     {
1660         static if (hasLength!R)
1661         {
1662             // Default work unit size is such that we would use 4x as many
1663             // slots as are in this thread pool.
1664             size_t workUnitSize = defaultWorkUnitSize(range.length);
1665             return parallel(range, workUnitSize);
1666         }
1667         else
1668         {
1669             // Just use a really, really dumb guess if the user is too lazy to
1670             // specify.
1671             return parallel(range, 512);
1672         }
1673     }
1674 
1675     ///
1676     template amap(functions...)
1677     {
1678         /**
1679         Eager parallel map.  The eagerness of this function means it has less
1680         overhead than the lazily evaluated `TaskPool.map` and should be
1681         preferred where the memory requirements of eagerness are acceptable.
1682         `functions` are the functions to be evaluated, passed as template
1683         alias parameters in a style similar to
1684         $(REF map, std,algorithm,iteration).
1685         The first argument must be a random access range. For performance
1686         reasons, amap will assume the range elements have not yet been
1687         initialized. Elements will be overwritten without calling a destructor
1688         nor doing an assignment. As such, the range must not contain meaningful
1689         data$(DDOC_COMMENT not a section): either un-initialized objects, or
1690         objects in their `.init` state.
1691 
1692         ---
1693         auto numbers = iota(100_000_000.0);
1694 
1695         // Find the square roots of numbers.
1696         //
1697         // Timings on an Athlon 64 X2 dual core machine:
1698         //
1699         // Parallel eager map:                   0.802 s
1700         // Equivalent serial implementation:     1.768 s
1701         auto squareRoots = taskPool.amap!sqrt(numbers);
1702         ---
1703 
1704         Immediately after the range argument, an optional work unit size argument
1705         may be provided.  Work units as used by `amap` are identical to those
1706         defined for parallel foreach.  If no work unit size is provided, the
1707         default work unit size is used.
1708 
1709         ---
1710         // Same thing, but make work unit size 100.
1711         auto squareRoots = taskPool.amap!sqrt(numbers, 100);
1712         ---
1713 
1714         An output range for returning the results may be provided as the last
1715         argument.  If one is not provided, an array of the proper type will be
1716         allocated on the garbage collected heap.  If one is provided, it must be a
1717         random access range with assignable elements, must have reference
1718         semantics with respect to assignment to its elements, and must have the
1719         same length as the input range.  Writing to adjacent elements from
1720         different threads must be safe.
1721 
1722         ---
1723         // Same thing, but explicitly allocate an array
1724         // to return the results in.  The element type
1725         // of the array may be either the exact type
1726         // returned by functions or an implicit conversion
1727         // target.
1728         auto squareRoots = new float[numbers.length];
1729         taskPool.amap!sqrt(numbers, squareRoots);
1730 
1731         // Multiple functions, explicit output range, and
1732         // explicit work unit size.
1733         auto results = new Tuple!(float, real)[numbers.length];
1734         taskPool.amap!(sqrt, log)(numbers, 100, results);
1735         ---
1736 
1737         Note:
1738 
1739         A memory barrier is guaranteed to be executed after all results are written
1740         but before returning so that results produced by all threads are visible
1741         in the calling thread.
1742 
1743         Tips:
1744 
1745         To perform the mapping operation in place, provide the same range for the
1746         input and output range.
1747 
1748         To parallelize the copying of a range with expensive to evaluate elements
1749         to an array, pass an identity function (a function that just returns
1750         whatever argument is provided to it) to `amap`.
1751 
1752         $(B Exception Handling):
1753 
1754         When at least one exception is thrown from inside the map functions,
1755         the submission of additional `Task` objects is terminated as soon as
1756         possible, in a non-deterministic manner.  All currently executing or
1757         enqueued work units are allowed to complete.  Then, all exceptions that
1758         were thrown from any work unit are chained using `Throwable.next` and
1759         rethrown.  The order of the exception chaining is non-deterministic.
1760         */
1761         auto amap(Args...)(Args args)
1762         if (isRandomAccessRange!(Args[0]))
1763         {
1764             import core.internal.lifetime : emplaceRef;
1765 
1766             alias fun = adjoin!(staticMap!(unaryFun, functions));
1767 
1768             alias range = args[0];
1769             immutable len = range.length;
1770 
1771             static if (
1772                 Args.length > 1 &&
1773                 randAssignable!(Args[$ - 1]) &&
1774                 is(MapType!(Args[0], functions) : ElementType!(Args[$ - 1]))
1775                 )
1776             {
1777                 import std.conv : text;
1778                 import std.exception : enforce;
1779 
1780                 alias buf = args[$ - 1];
1781                 alias args2 = args[0..$ - 1];
1782                 alias Args2 = Args[0..$ - 1];
1783                 enforce(buf.length == len,
1784                         text("Can't use a user supplied buffer that's the wrong ",
1785                              "size.  (Expected  :", len, " Got:  ", buf.length));
1786             }
1787             else static if (randAssignable!(Args[$ - 1]) && Args.length > 1)
1788             {
1789                 static assert(0, "Wrong buffer type.");
1790             }
1791             else
1792             {
1793                 import std.array : uninitializedArray;
1794 
1795                 auto buf = uninitializedArray!(MapType!(Args[0], functions)[])(len);
1796                 alias args2 = args;
1797                 alias Args2 = Args;
1798             }
1799 
1800             if (!len) return buf;
1801 
1802             static if (isIntegral!(Args2[$ - 1]))
1803             {
1804                 static assert(args2.length == 2);
1805                 auto workUnitSize = cast(size_t) args2[1];
1806             }
1807             else
1808             {
1809                 static assert(args2.length == 1, Args);
1810                 auto workUnitSize = defaultWorkUnitSize(range.length);
1811             }
1812 
1813             alias R = typeof(range);
1814 
1815             if (workUnitSize > len)
1816             {
1817                 workUnitSize = len;
1818             }
1819 
1820             // Handle as a special case:
1821             if (size == 0)
1822             {
1823                 size_t index = 0;
1824                 foreach (elem; range)
1825                 {
1826                     emplaceRef(buf[index++], fun(elem));
1827                 }
1828                 return buf;
1829             }
1830 
1831             // Effectively -1:  chunkIndex + 1 == 0:
1832             shared size_t workUnitIndex = size_t.max;
1833             shared bool shouldContinue = true;
1834 
1835             void doIt()
1836             {
1837                 import std.algorithm.comparison : min;
1838 
1839                 scope(failure)
1840                 {
1841                     // If an exception is thrown, all threads should bail.
1842                     atomicStore(shouldContinue, false);
1843                 }
1844 
1845                 while (atomicLoad(shouldContinue))
1846                 {
1847                     immutable myUnitIndex = atomicOp!"+="(workUnitIndex, 1);
1848                     immutable start = workUnitSize * myUnitIndex;
1849                     if (start >= len)
1850                     {
1851                         atomicStore(shouldContinue, false);
1852                         break;
1853                     }
1854 
1855                     immutable end = min(len, start + workUnitSize);
1856 
1857                     static if (hasSlicing!R)
1858                     {
1859                         auto subrange = range[start .. end];
1860                         foreach (i; start .. end)
1861                         {
1862                             emplaceRef(buf[i], fun(subrange.front));
1863                             subrange.popFront();
1864                         }
1865                     }
1866                     else
1867                     {
1868                         foreach (i; start .. end)
1869                         {
1870                             emplaceRef(buf[i], fun(range[i]));
1871                         }
1872                     }
1873                 }
1874             }
1875 
1876             submitAndExecute(this, &doIt);
1877             return buf;
1878         }
1879     }
1880 
1881     ///
1882     template map(functions...)
1883     {
1884         /**
1885         A semi-lazy parallel map that can be used for pipelining.  The map
1886         functions are evaluated for the first `bufSize` elements and stored in a
1887         buffer and made available to `popFront`.  Meanwhile, in the
1888         background a second buffer of the same size is filled.  When the first
1889         buffer is exhausted, it is swapped with the second buffer and filled while
1890         the values from what was originally the second buffer are read.  This
1891         implementation allows for elements to be written to the buffer without
1892         the need for atomic operations or synchronization for each write, and
1893         enables the mapping function to be evaluated efficiently in parallel.
1894 
1895         `map` has more overhead than the simpler procedure used by `amap`
1896         but avoids the need to keep all results in memory simultaneously and works
1897         with non-random access ranges.
1898 
1899         Params:
1900 
1901         source = The $(REF_ALTTEXT input range, isInputRange, std,range,primitives)
1902         to be mapped.  If `source` is not random
1903         access it will be lazily buffered to an array of size `bufSize` before
1904         the map function is evaluated.  (For an exception to this rule, see Notes.)
1905 
1906         bufSize = The size of the buffer to store the evaluated elements.
1907 
1908         workUnitSize = The number of elements to evaluate in a single
1909         `Task`.  Must be less than or equal to `bufSize`, and
1910         should be a fraction of `bufSize` such that all worker threads can be
1911         used.  If the default of size_t.max is used, workUnitSize will be set to
1912         the pool-wide default.
1913 
1914         Returns:  An input range representing the results of the map.  This range
1915                   has a length iff `source` has a length.
1916 
1917         Notes:
1918 
1919         If a range returned by `map` or `asyncBuf` is used as an input to
1920         `map`, then as an optimization the copying from the output buffer
1921         of the first range to the input buffer of the second range is elided, even
1922         though the ranges returned by `map` and `asyncBuf` are non-random
1923         access ranges.  This means that the `bufSize` parameter passed to the
1924         current call to `map` will be ignored and the size of the buffer
1925         will be the buffer size of `source`.
1926 
1927         Example:
1928         ---
1929         // Pipeline reading a file, converting each line
1930         // to a number, taking the logarithms of the numbers,
1931         // and performing the additions necessary to find
1932         // the sum of the logarithms.
1933 
1934         auto lineRange = File("numberList.txt").byLine();
1935         auto dupedLines = std.algorithm.map!"a.idup"(lineRange);
1936         auto nums = taskPool.map!(to!double)(dupedLines);
1937         auto logs = taskPool.map!log10(nums);
1938 
1939         double sum = 0;
1940         foreach (elem; logs)
1941         {
1942             sum += elem;
1943         }
1944         ---
1945 
1946         $(B Exception Handling):
1947 
1948         Any exceptions thrown while iterating over `source`
1949         or computing the map function are re-thrown on a call to `popFront` or,
1950         if thrown during construction, are simply allowed to propagate to the
1951         caller.  In the case of exceptions thrown while computing the map function,
1952         the exceptions are chained as in `TaskPool.amap`.
1953         */
1954         auto
1955         map(S)(S source, size_t bufSize = 100, size_t workUnitSize = size_t.max)
1956         if (isInputRange!S)
1957         {
1958             import std.exception : enforce;
1959 
1960             enforce(workUnitSize == size_t.max || workUnitSize <= bufSize,
1961                     "Work unit size must be smaller than buffer size.");
1962             alias fun = adjoin!(staticMap!(unaryFun, functions));
1963 
1964             static final class Map
1965             {
1966                 // This is a class because the task needs to be located on the
1967                 // heap and in the non-random access case source needs to be on
1968                 // the heap, too.
1969 
1970             private:
1971                 enum bufferTrick = is(typeof(source.buf1)) &&
1972                 is(typeof(source.bufPos)) &&
1973                 is(typeof(source.doBufSwap()));
1974 
1975                 alias E = MapType!(S, functions);
1976                 E[] buf1, buf2;
1977                 S source;
1978                 TaskPool pool;
1979                 Task!(run, E[] delegate(E[]), E[]) nextBufTask;
1980                 size_t workUnitSize;
1981                 size_t bufPos;
1982                 bool lastTaskWaited;
1983 
1984             static if (isRandomAccessRange!S)
1985             {
1986                 alias FromType = S;
1987 
1988                 void popSource()
1989                 {
1990                     import std.algorithm.comparison : min;
1991 
1992                     static if (__traits(compiles, source[0 .. source.length]))
1993                     {
1994                         source = source[min(buf1.length, source.length)..source.length];
1995                     }
1996                     else static if (__traits(compiles, source[0..$]))
1997                     {
1998                         source = source[min(buf1.length, source.length)..$];
1999                     }
2000                     else
2001                     {
2002                         static assert(0, "S must have slicing for Map."
2003                                       ~ "  " ~ S.stringof ~ " doesn't.");
2004                     }
2005                 }
2006             }
2007             else static if (bufferTrick)
2008             {
2009                 // Make sure we don't have the buffer recycling overload of
2010                 // asyncBuf.
2011                 static if (
2012                     is(typeof(source.source)) &&
2013                     isRoundRobin!(typeof(source.source))
2014                 )
2015                 {
2016                     static assert(0, "Cannot execute a parallel map on " ~
2017                                   "the buffer recycling overload of asyncBuf."
2018                                  );
2019                 }
2020 
2021                 alias FromType = typeof(source.buf1);
2022                 FromType from;
2023 
2024                 // Just swap our input buffer with source's output buffer.
2025                 // No need to copy element by element.
2026                 FromType dumpToFrom()
2027                 {
2028                     import std.algorithm.mutation : swap;
2029 
2030                     assert(source.buf1.length <= from.length);
2031                     from.length = source.buf1.length;
2032                     swap(source.buf1, from);
2033 
2034                     // Just in case this source has been popped before
2035                     // being sent to map:
2036                     from = from[source.bufPos..$];
2037 
2038                     static if (is(typeof(source._length)))
2039                     {
2040                         source._length -= (from.length - source.bufPos);
2041                     }
2042 
2043                     source.doBufSwap();
2044 
2045                     return from;
2046                 }
2047             }
2048             else
2049             {
2050                 alias FromType = ElementType!S[];
2051 
2052                 // The temporary array that data is copied to before being
2053                 // mapped.
2054                 FromType from;
2055 
2056                 FromType dumpToFrom()
2057                 {
2058                     assert(from !is null);
2059 
2060                     size_t i;
2061                     for (; !source.empty && i < from.length; source.popFront())
2062                     {
2063                         from[i++] = source.front;
2064                     }
2065 
2066                     from = from[0 .. i];
2067                     return from;
2068                 }
2069             }
2070 
2071             static if (hasLength!S)
2072             {
2073                 size_t _length;
2074 
2075                 public @property size_t length() const @safe pure nothrow
2076                 {
2077                     return _length;
2078                 }
2079             }
2080 
2081                 this(S source, size_t bufSize, size_t workUnitSize, TaskPool pool)
2082                 {
2083                     static if (bufferTrick)
2084                     {
2085                         bufSize = source.buf1.length;
2086                     }
2087 
2088                     buf1.length = bufSize;
2089                     buf2.length = bufSize;
2090 
2091                     static if (!isRandomAccessRange!S)
2092                     {
2093                         from.length = bufSize;
2094                     }
2095 
2096                     this.workUnitSize = (workUnitSize == size_t.max) ?
2097                             pool.defaultWorkUnitSize(bufSize) : workUnitSize;
2098                     this.source = source;
2099                     this.pool = pool;
2100 
2101                     static if (hasLength!S)
2102                     {
2103                         _length = source.length;
2104                     }
2105 
2106                     buf1 = fillBuf(buf1);
2107                     submitBuf2();
2108                 }
2109 
2110                 // The from parameter is a dummy and ignored in the random access
2111                 // case.
2112                 E[] fillBuf(E[] buf)
2113                 {
2114                     import std.algorithm.comparison : min;
2115 
2116                     static if (isRandomAccessRange!S)
2117                     {
2118                         import std.range : take;
2119                         auto toMap = take(source, buf.length);
2120                         scope(success) popSource();
2121                     }
2122                     else
2123                     {
2124                         auto toMap = dumpToFrom();
2125                     }
2126 
2127                     buf = buf[0 .. min(buf.length, toMap.length)];
2128 
2129                     // Handle as a special case:
2130                     if (pool.size == 0)
2131                     {
2132                         size_t index = 0;
2133                         foreach (elem; toMap)
2134                         {
2135                             buf[index++] = fun(elem);
2136                         }
2137                         return buf;
2138                     }
2139 
2140                     pool.amap!functions(toMap, workUnitSize, buf);
2141 
2142                     return buf;
2143                 }
2144 
2145                 void submitBuf2()
2146                 in
2147                 {
2148                     assert(nextBufTask.prev is null);
2149                     assert(nextBufTask.next is null);
2150                 }
2151                 do
2152                 {
2153                     // Hack to reuse the task object.
2154 
2155                     nextBufTask = typeof(nextBufTask).init;
2156                     nextBufTask._args[0] = &fillBuf;
2157                     nextBufTask._args[1] = buf2;
2158                     pool.put(nextBufTask);
2159                 }
2160 
2161                 void doBufSwap()
2162                 {
2163                     if (lastTaskWaited)
2164                     {
2165                         // Then the source is empty.  Signal it here.
2166                         buf1 = null;
2167                         buf2 = null;
2168 
2169                         static if (!isRandomAccessRange!S)
2170                         {
2171                             from = null;
2172                         }
2173 
2174                         return;
2175                     }
2176 
2177                     buf2 = buf1;
2178                     buf1 = nextBufTask.yieldForce;
2179                     bufPos = 0;
2180 
2181                     if (source.empty)
2182                     {
2183                         lastTaskWaited = true;
2184                     }
2185                     else
2186                     {
2187                         submitBuf2();
2188                     }
2189                 }
2190 
2191             public:
2192                 @property auto front()
2193                 {
2194                     return buf1[bufPos];
2195                 }
2196 
2197                 void popFront()
2198                 {
2199                     static if (hasLength!S)
2200                     {
2201                         _length--;
2202                     }
2203 
2204                     bufPos++;
2205                     if (bufPos >= buf1.length)
2206                     {
2207                         doBufSwap();
2208                     }
2209                 }
2210 
2211                 static if (isInfinite!S)
2212                 {
2213                     enum bool empty = false;
2214                 }
2215                 else
2216                 {
2217 
2218                     bool empty() const @property
2219                     {
2220                         // popFront() sets this when source is empty
2221                         return buf1.length == 0;
2222                     }
2223                 }
2224             }
2225             return new Map(source, bufSize, workUnitSize, this);
2226         }
2227     }
2228 
2229     /**
2230     Given a `source` range that is expensive to iterate over, returns an
2231     $(REF_ALTTEXT input range, isInputRange, std,range,primitives) that
2232     asynchronously buffers the contents of `source` into a buffer of `bufSize` elements in a worker thread,
2233     while making previously buffered elements from a second buffer, also of size
2234     `bufSize`, available via the range interface of the returned
2235     object.  The returned range has a length iff `hasLength!S`.
2236     `asyncBuf` is useful, for example, when performing expensive operations
2237     on the elements of ranges that represent data on a disk or network.
2238 
2239     Example:
2240     ---
2241     import std.conv, std.stdio;
2242 
2243     void main()
2244     {
2245         // Fetch lines of a file in a background thread
2246         // while processing previously fetched lines,
2247         // dealing with byLine's buffer recycling by
2248         // eagerly duplicating every line.
2249         auto lines = File("foo.txt").byLine();
2250         auto duped = std.algorithm.map!"a.idup"(lines);
2251 
2252         // Fetch more lines in the background while we
2253         // process the lines already read into memory
2254         // into a matrix of doubles.
2255         double[][] matrix;
2256         auto asyncReader = taskPool.asyncBuf(duped);
2257 
2258         foreach (line; asyncReader)
2259         {
2260             auto ls = line.split("\t");
2261             matrix ~= to!(double[])(ls);
2262         }
2263     }
2264     ---
2265 
2266     $(B Exception Handling):
2267 
2268     Any exceptions thrown while iterating over `source` are re-thrown on a
2269     call to `popFront` or, if thrown during construction, simply
2270     allowed to propagate to the caller.
2271     */
2272     auto asyncBuf(S)(S source, size_t bufSize = 100)
2273     if (isInputRange!S)
2274     {
2275         static final class AsyncBuf
2276         {
2277             // This is a class because the task and source both need to be on
2278             // the heap.
2279 
2280             // The element type of S.
2281             alias E = ElementType!S;  // Needs to be here b/c of forward ref bugs.
2282 
2283         private:
2284             E[] buf1, buf2;
2285             S source;
2286             TaskPool pool;
2287             Task!(run, E[] delegate(E[]), E[]) nextBufTask;
2288             size_t bufPos;
2289             bool lastTaskWaited;
2290 
2291             static if (hasLength!S)
2292             {
2293                 size_t _length;
2294 
2295                 // Available if hasLength!S.
2296                 public @property size_t length() const @safe pure nothrow
2297                 {
2298                     return _length;
2299                 }
2300             }
2301 
2302             this(S source, size_t bufSize, TaskPool pool)
2303             {
2304                 buf1.length = bufSize;
2305                 buf2.length = bufSize;
2306 
2307                 this.source = source;
2308                 this.pool = pool;
2309 
2310                 static if (hasLength!S)
2311                 {
2312                     _length = source.length;
2313                 }
2314 
2315                 buf1 = fillBuf(buf1);
2316                 submitBuf2();
2317             }
2318 
2319             E[] fillBuf(E[] buf)
2320             {
2321                 assert(buf !is null);
2322 
2323                 size_t i;
2324                 for (; !source.empty && i < buf.length; source.popFront())
2325                 {
2326                     buf[i++] = source.front;
2327                 }
2328 
2329                 buf = buf[0 .. i];
2330                 return buf;
2331             }
2332 
2333             void submitBuf2()
2334             in
2335             {
2336                 assert(nextBufTask.prev is null);
2337                 assert(nextBufTask.next is null);
2338             }
2339             do
2340             {
2341                 // Hack to reuse the task object.
2342 
2343                 nextBufTask = typeof(nextBufTask).init;
2344                 nextBufTask._args[0] = &fillBuf;
2345                 nextBufTask._args[1] = buf2;
2346                 pool.put(nextBufTask);
2347             }
2348 
2349             void doBufSwap()
2350             {
2351                 if (lastTaskWaited)
2352                 {
2353                     // Then source is empty.  Signal it here.
2354                     buf1 = null;
2355                     buf2 = null;
2356                     return;
2357                 }
2358 
2359                 buf2 = buf1;
2360                 buf1 = nextBufTask.yieldForce;
2361                 bufPos = 0;
2362 
2363                 if (source.empty)
2364                 {
2365                     lastTaskWaited = true;
2366                 }
2367                 else
2368                 {
2369                     submitBuf2();
2370                 }
2371             }
2372 
2373         public:
2374             E front() @property
2375             {
2376                 return buf1[bufPos];
2377             }
2378 
2379             void popFront()
2380             {
2381                 static if (hasLength!S)
2382                 {
2383                     _length--;
2384                 }
2385 
2386                 bufPos++;
2387                 if (bufPos >= buf1.length)
2388                 {
2389                     doBufSwap();
2390                 }
2391             }
2392 
2393             static if (isInfinite!S)
2394             {
2395                 enum bool empty = false;
2396             }
2397 
2398             else
2399             {
2400                 ///
2401                 bool empty() @property
2402                 {
2403                     // popFront() sets this when source is empty:
2404                     return buf1.length == 0;
2405                 }
2406             }
2407         }
2408         return new AsyncBuf(source, bufSize, this);
2409     }
2410 
2411     /**
2412     Given a callable object `next` that writes to a user-provided buffer and
2413     a second callable object `empty` that determines whether more data is
2414     available to write via `next`, returns an input range that
2415     asynchronously calls `next` with a set of size `nBuffers` of buffers
2416     and makes the results available in the order they were obtained via the
2417     input range interface of the returned object.  Similarly to the
2418     input range overload of `asyncBuf`, the first half of the buffers
2419     are made available via the range interface while the second half are
2420     filled and vice-versa.
2421 
2422     Params:
2423 
2424     next = A callable object that takes a single argument that must be an array
2425            with mutable elements.  When called, `next` writes data to
2426            the array provided by the caller.
2427 
2428     empty = A callable object that takes no arguments and returns a type
2429             implicitly convertible to `bool`.  This is used to signify
2430             that no more data is available to be obtained by calling `next`.
2431 
2432     initialBufSize = The initial size of each buffer.  If `next` takes its
2433                      array by reference, it may resize the buffers.
2434 
2435     nBuffers = The number of buffers to cycle through when calling `next`.
2436 
2437     Example:
2438     ---
2439     // Fetch lines of a file in a background
2440     // thread while processing previously fetched
2441     // lines, without duplicating any lines.
2442     auto file = File("foo.txt");
2443 
2444     void next(ref char[] buf)
2445     {
2446         file.readln(buf);
2447     }
2448 
2449     // Fetch more lines in the background while we
2450     // process the lines already read into memory
2451     // into a matrix of doubles.
2452     double[][] matrix;
2453     auto asyncReader = taskPool.asyncBuf(&next, &file.eof);
2454 
2455     foreach (line; asyncReader)
2456     {
2457         auto ls = line.split("\t");
2458         matrix ~= to!(double[])(ls);
2459     }
2460     ---
2461 
2462     $(B Exception Handling):
2463 
2464     Any exceptions thrown while iterating over `range` are re-thrown on a
2465     call to `popFront`.
2466 
2467     Warning:
2468 
2469     Using the range returned by this function in a parallel foreach loop
2470     will not work because buffers may be overwritten while the task that
2471     processes them is in queue.  This is checked for at compile time
2472     and will result in a static assertion failure.
2473     */
2474     auto asyncBuf(C1, C2)(C1 next, C2 empty, size_t initialBufSize = 0, size_t nBuffers = 100)
2475     if (is(typeof(C2.init()) : bool) &&
2476         Parameters!C1.length == 1 &&
2477         Parameters!C2.length == 0 &&
2478         isArray!(Parameters!C1[0])
2479     ) {
2480         auto roundRobin = RoundRobinBuffer!(C1, C2)(next, empty, initialBufSize, nBuffers);
2481         return asyncBuf(roundRobin, nBuffers / 2);
2482     }
2483 
2484     ///
2485     template reduce(functions...)
2486     {
2487         /**
2488         Parallel reduce on a random access range.  Except as otherwise noted,
2489         usage is similar to $(REF _reduce, std,algorithm,iteration).  There is
2490         also $(LREF fold) which does the same thing with a different parameter
2491         order.
2492 
2493         This function works by splitting the range to be reduced into work
2494         units, which are slices to be reduced in parallel.  Once the results
2495         from all work units are computed, a final serial reduction is performed
2496         on these results to compute the final answer. Therefore, care must be
2497         taken to choose the seed value appropriately.
2498 
2499         Because the reduction is being performed in parallel, `functions`
2500         must be associative.  For notational simplicity, let # be an
2501         infix operator representing `functions`.  Then, (a # b) # c must equal
2502         a # (b # c).  Floating point addition is not associative
2503         even though addition in exact arithmetic is.  Summing floating
2504         point numbers using this function may give different results than summing
2505         serially.  However, for many practical purposes floating point addition
2506         can be treated as associative.
2507 
2508         Note that, since `functions` are assumed to be associative,
2509         additional optimizations are made to the serial portion of the reduction
2510         algorithm. These take advantage of the instruction level parallelism of
2511         modern CPUs, in addition to the thread-level parallelism that the rest
2512         of this module exploits.  This can lead to better than linear speedups
2513         relative to $(REF _reduce, std,algorithm,iteration), especially for
2514         fine-grained benchmarks like dot products.
2515 
2516         An explicit seed may be provided as the first argument.  If
2517         provided, it is used as the seed for all work units and for the final
2518         reduction of results from all work units.  Therefore, if it is not the
2519         identity value for the operation being performed, results may differ
2520         from those generated by $(REF _reduce, std,algorithm,iteration) or
2521         depending on how many work units are used.  The next argument must be
2522         the range to be reduced.
2523         ---
2524         // Find the sum of squares of a range in parallel, using
2525         // an explicit seed.
2526         //
2527         // Timings on an Athlon 64 X2 dual core machine:
2528         //
2529         // Parallel reduce:                     72 milliseconds
2530         // Using std.algorithm.reduce instead:  181 milliseconds
2531         auto nums = iota(10_000_000.0f);
2532         auto sumSquares = taskPool.reduce!"a + b"(
2533             0.0, std.algorithm.map!"a * a"(nums)
2534         );
2535         ---
2536 
2537         If no explicit seed is provided, the first element of each work unit
2538         is used as a seed.  For the final reduction, the result from the first
2539         work unit is used as the seed.
2540         ---
2541         // Find the sum of a range in parallel, using the first
2542         // element of each work unit as the seed.
2543         auto sum = taskPool.reduce!"a + b"(nums);
2544         ---
2545 
2546         An explicit work unit size may be specified as the last argument.
2547         Specifying too small a work unit size will effectively serialize the
2548         reduction, as the final reduction of the result of each work unit will
2549         dominate computation time.  If `TaskPool.size` for this instance
2550         is zero, this parameter is ignored and one work unit is used.
2551         ---
2552         // Use a work unit size of 100.
2553         auto sum2 = taskPool.reduce!"a + b"(nums, 100);
2554 
2555         // Work unit size of 100 and explicit seed.
2556         auto sum3 = taskPool.reduce!"a + b"(0.0, nums, 100);
2557         ---
2558 
2559         Parallel reduce supports multiple functions, like
2560         `std.algorithm.reduce`.
2561         ---
2562         // Find both the min and max of nums.
2563         auto minMax = taskPool.reduce!(min, max)(nums);
2564         assert(minMax[0] == reduce!min(nums));
2565         assert(minMax[1] == reduce!max(nums));
2566         ---
2567 
2568         $(B Exception Handling):
2569 
2570         After this function is finished executing, any exceptions thrown
2571         are chained together via `Throwable.next` and rethrown.  The chaining
2572         order is non-deterministic.
2573 
2574         See_Also:
2575 
2576             $(LREF fold) is functionally equivalent to $(LREF _reduce) except the
2577             range parameter comes first and there is no need to use
2578             $(REF_ALTTEXT `tuple`,tuple,std,typecons) for multiple seeds.
2579          */
2580         auto reduce(Args...)(Args args)
2581         {
2582             import core.exception : OutOfMemoryError;
2583             import core.internal.lifetime : emplaceRef;
2584             import std.exception : enforce;
2585 
2586             alias fun = reduceAdjoin!functions;
2587             alias finishFun = reduceFinish!functions;
2588 
2589             static if (isIntegral!(Args[$ - 1]))
2590             {
2591                 size_t workUnitSize = cast(size_t) args[$ - 1];
2592                 alias args2 = args[0..$ - 1];
2593                 alias Args2 = Args[0..$ - 1];
2594             }
2595             else
2596             {
2597                 alias args2 = args;
2598                 alias Args2 = Args;
2599             }
2600 
2601             auto makeStartValue(Type)(Type e)
2602             {
2603                 static if (functions.length == 1)
2604                 {
2605                     return e;
2606                 }
2607                 else
2608                 {
2609                     typeof(adjoin!(staticMap!(binaryFun, functions))(e, e)) seed = void;
2610                     foreach (i, T; seed.Types)
2611                     {
2612                         emplaceRef(seed.expand[i], e);
2613                     }
2614 
2615                     return seed;
2616                 }
2617             }
2618 
2619             static if (args2.length == 2)
2620             {
2621                 static assert(isInputRange!(Args2[1]));
2622                 alias range = args2[1];
2623                 alias seed = args2[0];
2624                 enum explicitSeed = true;
2625 
2626                 static if (!is(typeof(workUnitSize)))
2627                 {
2628                     size_t workUnitSize = defaultWorkUnitSize(range.length);
2629                 }
2630             }
2631             else
2632             {
2633                 static assert(args2.length == 1);
2634                 alias range = args2[0];
2635 
2636                 static if (!is(typeof(workUnitSize)))
2637                 {
2638                     size_t workUnitSize = defaultWorkUnitSize(range.length);
2639                 }
2640 
2641                 enforce(!range.empty,
2642                     "Cannot reduce an empty range with first element as start value.");
2643 
2644                 auto seed = makeStartValue(range.front);
2645                 enum explicitSeed = false;
2646                 range.popFront();
2647             }
2648 
2649             alias E = typeof(seed);
2650             alias R = typeof(range);
2651 
2652             E reduceOnRange(R range, size_t lowerBound, size_t upperBound)
2653             {
2654                 // This is for exploiting instruction level parallelism by
2655                 // using multiple accumulator variables within each thread,
2656                 // since we're assuming functions are associative anyhow.
2657 
2658                 // This is so that loops can be unrolled automatically.
2659                 enum ilpTuple = AliasSeq!(0, 1, 2, 3, 4, 5);
2660                 enum nILP = ilpTuple.length;
2661                 immutable subSize = (upperBound - lowerBound) / nILP;
2662 
2663                 if (subSize <= 1)
2664                 {
2665                     // Handle as a special case.
2666                     static if (explicitSeed)
2667                     {
2668                         E result = seed;
2669                     }
2670                     else
2671                     {
2672                         E result = makeStartValue(range[lowerBound]);
2673                         lowerBound++;
2674                     }
2675 
2676                     foreach (i; lowerBound .. upperBound)
2677                     {
2678                         result = fun(result, range[i]);
2679                     }
2680 
2681                     return result;
2682                 }
2683 
2684                 assert(subSize > 1);
2685                 E[nILP] results;
2686                 size_t[nILP] offsets;
2687 
2688                 foreach (i; ilpTuple)
2689                 {
2690                     offsets[i] = lowerBound + subSize * i;
2691 
2692                     static if (explicitSeed)
2693                     {
2694                         results[i] = seed;
2695                     }
2696                     else
2697                     {
2698                         results[i] = makeStartValue(range[offsets[i]]);
2699                         offsets[i]++;
2700                     }
2701                 }
2702 
2703                 immutable nLoop = subSize - (!explicitSeed);
2704                 foreach (i; 0 .. nLoop)
2705                 {
2706                     foreach (j; ilpTuple)
2707                     {
2708                         results[j] = fun(results[j], range[offsets[j]]);
2709                         offsets[j]++;
2710                     }
2711                 }
2712 
2713                 // Finish the remainder.
2714                 foreach (i; nILP * subSize + lowerBound .. upperBound)
2715                 {
2716                     results[$ - 1] = fun(results[$ - 1], range[i]);
2717                 }
2718 
2719                 foreach (i; ilpTuple[1..$])
2720                 {
2721                     results[0] = finishFun(results[0], results[i]);
2722                 }
2723 
2724                 return results[0];
2725             }
2726 
2727             immutable len = range.length;
2728             if (len == 0)
2729             {
2730                 return seed;
2731             }
2732 
2733             if (this.size == 0)
2734             {
2735                 return finishFun(seed, reduceOnRange(range, 0, len));
2736             }
2737 
2738             // Unlike the rest of the functions here, I can't use the Task object
2739             // recycling trick here because this has to work on non-commutative
2740             // operations.  After all the tasks are done executing, fun() has to
2741             // be applied on the results of these to get a final result, but
2742             // it can't be evaluated out of order.
2743 
2744             if (workUnitSize > len)
2745             {
2746                 workUnitSize = len;
2747             }
2748 
2749             immutable size_t nWorkUnits = (len / workUnitSize) + ((len % workUnitSize == 0) ? 0 : 1);
2750             assert(nWorkUnits * workUnitSize >= len);
2751 
2752             alias RTask = Task!(run, typeof(&reduceOnRange), R, size_t, size_t);
2753             RTask[] tasks;
2754 
2755             // Can't use alloca() due to https://issues.dlang.org/show_bug.cgi?id=3753
2756             // Use a fixed buffer backed by malloc().
2757             enum maxStack = 2_048;
2758             byte[maxStack] buf = void;
2759             immutable size_t nBytesNeeded = nWorkUnits * RTask.sizeof;
2760 
2761             import core.stdc.stdlib : malloc, free;
2762             if (nBytesNeeded <= maxStack)
2763             {
2764                 tasks = (cast(RTask*) buf.ptr)[0 .. nWorkUnits];
2765             }
2766             else
2767             {
2768                 auto ptr = cast(RTask*) malloc(nBytesNeeded);
2769                 if (!ptr)
2770                 {
2771                     throw new OutOfMemoryError(
2772                         "Out of memory in std.parallelism."
2773                     );
2774                 }
2775 
2776                 tasks = ptr[0 .. nWorkUnits];
2777             }
2778 
2779             scope(exit)
2780             {
2781                 if (nBytesNeeded > maxStack)
2782                 {
2783                     free(tasks.ptr);
2784                 }
2785             }
2786 
2787             // Hack to take the address of a nested function w/o
2788             // making a closure.
2789             static auto scopedAddress(D)(scope D del) @system
2790             {
2791                 auto tmp = del;
2792                 return tmp;
2793             }
2794 
2795             size_t curPos = 0;
2796             void useTask(ref RTask task)
2797             {
2798                 import std.algorithm.comparison : min;
2799                 import core.lifetime : emplace;
2800 
2801                 // Private constructor, so can't feed it's arguments directly
2802                 // to emplace
2803                 emplace(&task, RTask
2804                 (
2805                     scopedAddress(&reduceOnRange),
2806                     range,
2807                     curPos, // lower bound.
2808                     cast() min(len, curPos + workUnitSize)  // upper bound.
2809                 ));
2810 
2811                 task.pool = this;
2812 
2813                 curPos += workUnitSize;
2814             }
2815 
2816             foreach (ref task; tasks)
2817             {
2818                 useTask(task);
2819             }
2820 
2821             foreach (i; 1 .. tasks.length - 1)
2822             {
2823                 tasks[i].next = tasks[i + 1].basePtr;
2824                 tasks[i + 1].prev = tasks[i].basePtr;
2825             }
2826 
2827             if (tasks.length > 1)
2828             {
2829                 queueLock();
2830                 scope(exit) queueUnlock();
2831 
2832                 abstractPutGroupNoSync(
2833                     tasks[1].basePtr,
2834                     tasks[$ - 1].basePtr
2835                 );
2836             }
2837 
2838             if (tasks.length > 0)
2839             {
2840                 try
2841                 {
2842                     tasks[0].job();
2843                 }
2844                 catch (Throwable e)
2845                 {
2846                     tasks[0].exception = e;
2847                 }
2848                 tasks[0].taskStatus = TaskStatus.done;
2849 
2850                 // Try to execute each of these in the current thread
2851                 foreach (ref task; tasks[1..$])
2852                 {
2853                     tryDeleteExecute(task.basePtr);
2854                 }
2855             }
2856 
2857             // Now that we've tried to execute every task, they're all either
2858             // done or in progress.  Force all of them.
2859             E result = seed;
2860 
2861             Throwable firstException;
2862 
2863             foreach (ref task; tasks)
2864             {
2865                 try
2866                 {
2867                     task.yieldForce;
2868                 }
2869                 catch (Throwable e)
2870                 {
2871                     /* Chain e to front because order doesn't matter and because
2872                      * e is not likely to be a chain itself (so fewer traversals)
2873                      */
2874                     firstException = Throwable.chainTogether(e, firstException);
2875                     continue;
2876                 }
2877 
2878                 if (!firstException) result = finishFun(result, task.returnVal);
2879             }
2880 
2881             if (firstException) throw firstException;
2882 
2883             return result;
2884         }
2885     }
2886 
2887     ///
2888     template fold(functions...)
2889     {
2890         /** Implements the homonym function (also known as `accumulate`, `compress`,
2891             `inject`, or `foldl`) present in various programming languages of
2892             functional flavor.
2893 
2894             `fold` is functionally equivalent to $(LREF reduce) except the range
2895             parameter comes first and there is no need to use $(REF_ALTTEXT
2896             `tuple`,tuple,std,typecons) for multiple seeds.
2897 
2898             There may be one or more callable entities (`functions` argument) to
2899             apply.
2900 
2901             Params:
2902                 args = Just the range to _fold over; or the range and one seed
2903                        per function; or the range, one seed per function, and
2904                        the work unit size
2905 
2906             Returns:
2907                 The accumulated result as a single value for single function and
2908                 as a tuple of values for multiple functions
2909 
2910             See_Also:
2911             Similar to $(REF _fold, std,algorithm,iteration), `fold` is a wrapper around $(LREF reduce).
2912 
2913             Example:
2914             ---
2915             static int adder(int a, int b)
2916             {
2917                 return a + b;
2918             }
2919             static int multiplier(int a, int b)
2920             {
2921                 return a * b;
2922             }
2923 
2924             // Just the range
2925             auto x = taskPool.fold!adder([1, 2, 3, 4]);
2926             assert(x == 10);
2927 
2928             // The range and the seeds (0 and 1 below; also note multiple
2929             // functions in this example)
2930             auto y = taskPool.fold!(adder, multiplier)([1, 2, 3, 4], 0, 1);
2931             assert(y[0] == 10);
2932             assert(y[1] == 24);
2933 
2934             // The range, the seed (0), and the work unit size (20)
2935             auto z = taskPool.fold!adder([1, 2, 3, 4], 0, 20);
2936             assert(z == 10);
2937             ---
2938         */
2939         auto fold(Args...)(Args args)
2940         {
2941             static assert(isInputRange!(Args[0]), "First argument must be an InputRange");
2942 
2943             alias range = args[0];
2944 
2945             static if (Args.length == 1)
2946             {
2947                 // Just the range
2948                 return reduce!functions(range);
2949             }
2950             else static if (Args.length == 1 + functions.length ||
2951                             Args.length == 1 + functions.length + 1)
2952             {
2953                 static if (functions.length == 1)
2954                 {
2955                     alias seeds = args[1];
2956                 }
2957                 else
2958                 {
2959                     auto seeds()
2960                     {
2961                         import std.typecons : tuple;
2962                         return tuple(args[1 .. functions.length+1]);
2963                     }
2964                 }
2965 
2966                 static if (Args.length == 1 + functions.length)
2967                 {
2968                     // The range and the seeds
2969                     return reduce!functions(seeds, range);
2970                 }
2971                 else static if (Args.length == 1 + functions.length + 1)
2972                 {
2973                     // The range, the seeds, and the work unit size
2974                     static assert(isIntegral!(Args[$-1]), "Work unit size must be an integral type");
2975                     return reduce!functions(seeds, range, args[$-1]);
2976                 }
2977             }
2978             else
2979             {
2980                 import std.conv : text;
2981                 static assert(0, "Invalid number of arguments (" ~ Args.length.text ~ "): Should be an input range, "
2982                               ~ functions.length.text ~ " optional seed(s), and an optional work unit size.");
2983             }
2984         }
2985     }
2986 
2987     // This test is not included in the documentation because even though these
2988     // examples are for the inner fold() template, with their current location,
2989     // they would appear under the outer one. (We can't move this inside the
2990     // outer fold() template because then dmd runs out of memory possibly due to
2991     // recursive template instantiation, which is surprisingly not caught.)
2992     @system unittest
2993     {
2994         // Just the range
2995         auto x = taskPool.fold!"a + b"([1, 2, 3, 4]);
2996         assert(x == 10);
2997 
2998         // The range and the seeds (0 and 1 below; also note multiple
2999         // functions in this example)
3000         auto y = taskPool.fold!("a + b", "a * b")([1, 2, 3, 4], 0, 1);
3001         assert(y[0] == 10);
3002         assert(y[1] == 24);
3003 
3004         // The range, the seed (0), and the work unit size (20)
3005         auto z = taskPool.fold!"a + b"([1, 2, 3, 4], 0, 20);
3006         assert(z == 10);
3007     }
3008 
3009     /**
3010     Gets the index of the current thread relative to this `TaskPool`.  Any
3011     thread not in this pool will receive an index of 0.  The worker threads in
3012     this pool receive unique indices of 1 through `this.size`.
3013 
3014     This function is useful for maintaining worker-local resources.
3015 
3016     Example:
3017     ---
3018     // Execute a loop that computes the greatest common
3019     // divisor of every number from 0 through 999 with
3020     // 42 in parallel.  Write the results out to
3021     // a set of files, one for each thread.  This allows
3022     // results to be written out without any synchronization.
3023 
3024     import std.conv, std.range, std.numeric, std.stdio;
3025 
3026     void main()
3027     {
3028         auto filesHandles = new File[taskPool.size + 1];
3029         scope(exit) {
3030             foreach (ref handle; fileHandles)
3031             {
3032                 handle.close();
3033             }
3034         }
3035 
3036         foreach (i, ref handle; fileHandles)
3037         {
3038             handle = File("workerResults" ~ to!string(i) ~ ".txt");
3039         }
3040 
3041         foreach (num; parallel(iota(1_000)))
3042         {
3043             auto outHandle = fileHandles[taskPool.workerIndex];
3044             outHandle.writeln(num, '\t', gcd(num, 42));
3045         }
3046     }
3047     ---
3048     */
3049     size_t workerIndex() @property @safe const nothrow
3050     {
3051         immutable rawInd = threadIndex;
3052         return (rawInd >= instanceStartIndex && rawInd < instanceStartIndex + size) ?
3053                 (rawInd - instanceStartIndex + 1) : 0;
3054     }
3055 
3056     /**
3057     Struct for creating worker-local storage.  Worker-local storage is
3058     thread-local storage that exists only for worker threads in a given
3059     `TaskPool` plus a single thread outside the pool.  It is allocated on the
3060     garbage collected heap in a way that avoids _false sharing, and doesn't
3061     necessarily have global scope within any thread.  It can be accessed from
3062     any worker thread in the `TaskPool` that created it, and one thread
3063     outside this `TaskPool`.  All threads outside the pool that created a
3064     given instance of worker-local storage share a single slot.
3065 
3066     Since the underlying data for this struct is heap-allocated, this struct
3067     has reference semantics when passed between functions.
3068 
3069     The main uses cases for `WorkerLocalStorage` are:
3070 
3071     1.  Performing parallel reductions with an imperative, as opposed to
3072         functional, programming style.  In this case, it's useful to treat
3073         `WorkerLocalStorage` as local to each thread for only the parallel
3074         portion of an algorithm.
3075 
3076     2.  Recycling temporary buffers across iterations of a parallel foreach loop.
3077 
3078     Example:
3079     ---
3080     // Calculate pi as in our synopsis example, but
3081     // use an imperative instead of a functional style.
3082     immutable n = 1_000_000_000;
3083     immutable delta = 1.0L / n;
3084 
3085     auto sums = taskPool.workerLocalStorage(0.0L);
3086     foreach (i; parallel(iota(n)))
3087     {
3088         immutable x = ( i - 0.5L ) * delta;
3089         immutable toAdd = delta / ( 1.0 + x * x );
3090         sums.get += toAdd;
3091     }
3092 
3093     // Add up the results from each worker thread.
3094     real pi = 0;
3095     foreach (threadResult; sums.toRange)
3096     {
3097         pi += 4.0L * threadResult;
3098     }
3099     ---
3100      */
3101     static struct WorkerLocalStorage(T)
3102     {
3103     private:
3104         TaskPool pool;
3105         size_t size;
3106 
3107         size_t elemSize;
3108         bool* stillThreadLocal;
3109 
3110         static size_t roundToLine(size_t num) pure nothrow
3111         {
3112             if (num % cacheLineSize == 0)
3113             {
3114                 return num;
3115             }
3116             else
3117             {
3118                 return ((num / cacheLineSize) + 1) * cacheLineSize;
3119             }
3120         }
3121 
3122         void* data;
3123 
3124         void initialize(TaskPool pool)
3125         {
3126             this.pool = pool;
3127             size = pool.size + 1;
3128             stillThreadLocal = new bool;
3129             *stillThreadLocal = true;
3130 
3131             // Determines whether the GC should scan the array.
3132             auto blkInfo = (typeid(T).flags & 1) ?
3133                            cast(GC.BlkAttr) 0 :
3134                            GC.BlkAttr.NO_SCAN;
3135 
3136             immutable nElem = pool.size + 1;
3137             elemSize = roundToLine(T.sizeof);
3138 
3139             // The + 3 is to pad one full cache line worth of space on either side
3140             // of the data structure to make sure false sharing with completely
3141             // unrelated heap data is prevented, and to provide enough padding to
3142             // make sure that data is cache line-aligned.
3143             data = GC.malloc(elemSize * (nElem + 3), blkInfo) + elemSize;
3144 
3145             // Cache line align data ptr.
3146             data = cast(void*) roundToLine(cast(size_t) data);
3147 
3148             foreach (i; 0 .. nElem)
3149             {
3150                 this.opIndex(i) = T.init;
3151             }
3152         }
3153 
3154         ref opIndex(this Qualified)(size_t index)
3155         {
3156             import std.conv : text;
3157             assert(index < size, text(index, '\t', uint.max));
3158             return *(cast(CopyTypeQualifiers!(Qualified, T)*) (data + elemSize * index));
3159         }
3160 
3161         void opIndexAssign(T val, size_t index)
3162         {
3163             assert(index < size);
3164             *(cast(T*) (data + elemSize * index)) = val;
3165         }
3166 
3167     public:
3168         /**
3169         Get the current thread's instance.  Returns by ref.
3170         Note that calling `get` from any thread
3171         outside the `TaskPool` that created this instance will return the
3172         same reference, so an instance of worker-local storage should only be
3173         accessed from one thread outside the pool that created it.  If this
3174         rule is violated, undefined behavior will result.
3175 
3176         If assertions are enabled and `toRange` has been called, then this
3177         WorkerLocalStorage instance is no longer worker-local and an assertion
3178         failure will result when calling this method.  This is not checked
3179         when assertions are disabled for performance reasons.
3180          */
3181         ref get(this Qualified)() @property
3182         {
3183             assert(*stillThreadLocal,
3184                 "Cannot call get() on this instance of WorkerLocalStorage " ~
3185                 "because it is no longer worker-local."
3186             );
3187             return opIndex(pool.workerIndex);
3188         }
3189 
3190         /**
3191         Assign a value to the current thread's instance.  This function has
3192         the same caveats as its overload.
3193         */
3194         void get(T val) @property
3195         {
3196             assert(*stillThreadLocal,
3197                 "Cannot call get() on this instance of WorkerLocalStorage " ~
3198                 "because it is no longer worker-local."
3199             );
3200 
3201             opIndexAssign(val, pool.workerIndex);
3202         }
3203 
3204         /**
3205         Returns a range view of the values for all threads, which can be used
3206         to further process the results of each thread after running the parallel
3207         part of your algorithm.  Do not use this method in the parallel portion
3208         of your algorithm.
3209 
3210         Calling this function sets a flag indicating that this struct is no
3211         longer worker-local, and attempting to use the `get` method again
3212         will result in an assertion failure if assertions are enabled.
3213          */
3214         WorkerLocalStorageRange!T toRange() @property
3215         {
3216             if (*stillThreadLocal)
3217             {
3218                 *stillThreadLocal = false;
3219 
3220                 // Make absolutely sure results are visible to all threads.
3221                 // This is probably not necessary since some other
3222                 // synchronization primitive will be used to signal that the
3223                 // parallel part of the algorithm is done, but the
3224                 // performance impact should be negligible, so it's better
3225                 // to be safe.
3226                 ubyte barrierDummy;
3227                 atomicSetUbyte(barrierDummy, 1);
3228             }
3229 
3230             return WorkerLocalStorageRange!T(this);
3231         }
3232     }
3233 
3234     /**
3235     Range primitives for worker-local storage.  The purpose of this is to
3236     access results produced by each worker thread from a single thread once you
3237     are no longer using the worker-local storage from multiple threads.
3238     Do not use this struct in the parallel portion of your algorithm.
3239 
3240     The proper way to instantiate this object is to call
3241     `WorkerLocalStorage.toRange`.  Once instantiated, this object behaves
3242     as a finite random-access range with assignable, lvalue elements and
3243     a length equal to the number of worker threads in the `TaskPool` that
3244     created it plus 1.
3245      */
3246     static struct WorkerLocalStorageRange(T)
3247     {
3248     private:
3249         WorkerLocalStorage!T workerLocalStorage;
3250 
3251         size_t _length;
3252         size_t beginOffset;
3253 
3254         this(WorkerLocalStorage!T wl)
3255         {
3256             this.workerLocalStorage = wl;
3257             _length = wl.size;
3258         }
3259 
3260     public:
3261         ref front(this Qualified)() @property
3262         {
3263             return this[0];
3264         }
3265 
3266         ref back(this Qualified)() @property
3267         {
3268             return this[_length - 1];
3269         }
3270 
3271         void popFront()
3272         {
3273             if (_length > 0)
3274             {
3275                 beginOffset++;
3276                 _length--;
3277             }
3278         }
3279 
3280         void popBack()
3281         {
3282             if (_length > 0)
3283             {
3284                 _length--;
3285             }
3286         }
3287 
3288         typeof(this) save() @property
3289         {
3290             return this;
3291         }
3292 
3293         ref opIndex(this Qualified)(size_t index)
3294         {
3295             assert(index < _length);
3296             return workerLocalStorage[index + beginOffset];
3297         }
3298 
3299         void opIndexAssign(T val, size_t index)
3300         {
3301             assert(index < _length);
3302             workerLocalStorage[index] = val;
3303         }
3304 
3305         typeof(this) opSlice(size_t lower, size_t upper)
3306         {
3307             assert(upper <= _length);
3308             auto newWl = this.workerLocalStorage;
3309             newWl.data += lower * newWl.elemSize;
3310             newWl.size = upper - lower;
3311             return typeof(this)(newWl);
3312         }
3313 
3314         bool empty() const @property
3315         {
3316             return length == 0;
3317         }
3318 
3319         size_t length() const @property
3320         {
3321             return _length;
3322         }
3323     }
3324 
3325     /**
3326     Creates an instance of worker-local storage, initialized with a given
3327     value.  The value is `lazy` so that you can, for example, easily
3328     create one instance of a class for each worker.  For usage example,
3329     see the `WorkerLocalStorage` struct.
3330      */
3331     WorkerLocalStorage!T workerLocalStorage(T)(lazy T initialVal = T.init)
3332     {
3333         WorkerLocalStorage!T ret;
3334         ret.initialize(this);
3335         foreach (i; 0 .. size + 1)
3336         {
3337             ret[i] = initialVal;
3338         }
3339 
3340         // Memory barrier to make absolutely sure that what we wrote is
3341         // visible to worker threads.
3342         ubyte barrierDummy;
3343         atomicSetUbyte(barrierDummy, 0);
3344 
3345         return ret;
3346     }
3347 
3348     /**
3349     Signals to all worker threads to terminate as soon as they are finished
3350     with their current `Task`, or immediately if they are not executing a
3351     `Task`.  `Task`s that were in queue will not be executed unless
3352     a call to `Task.workForce`, `Task.yieldForce` or `Task.spinForce`
3353     causes them to be executed.
3354 
3355     Use only if you have waited on every `Task` and therefore know the
3356     queue is empty, or if you speculatively executed some tasks and no longer
3357     need the results.
3358      */
3359     void stop() @trusted
3360     {
3361         queueLock();
3362         scope(exit) queueUnlock();
3363         atomicSetUbyte(status, PoolState.stopNow);
3364         notifyAll();
3365     }
3366 
3367     /**
3368     Signals worker threads to terminate when the queue becomes empty.
3369 
3370     If blocking argument is true, wait for all worker threads to terminate
3371     before returning.  This option might be used in applications where
3372     task results are never consumed-- e.g. when `TaskPool` is employed as a
3373     rudimentary scheduler for tasks which communicate by means other than
3374     return values.
3375 
3376     Warning:  Calling this function with $(D blocking = true) from a worker
3377               thread that is a member of the same `TaskPool` that
3378               `finish` is being called on will result in a deadlock.
3379      */
3380     void finish(bool blocking = false) @trusted
3381     {
3382         {
3383             queueLock();
3384             scope(exit) queueUnlock();
3385             atomicCasUbyte(status, PoolState.running, PoolState.finishing);
3386             notifyAll();
3387         }
3388         if (blocking)
3389         {
3390             // Use this thread as a worker until everything is finished.
3391             executeWorkLoop();
3392 
3393             foreach (t; pool)
3394             {
3395                 // Maybe there should be something here to prevent a thread
3396                 // from calling join() on itself if this function is called
3397                 // from a worker thread in the same pool, but:
3398                 //
3399                 // 1.  Using an if statement to skip join() would result in
3400                 //     finish() returning without all tasks being finished.
3401                 //
3402                 // 2.  If an exception were thrown, it would bubble up to the
3403                 //     Task from which finish() was called and likely be
3404                 //     swallowed.
3405                 t.join();
3406             }
3407         }
3408     }
3409 
3410     /// Returns the number of worker threads in the pool.
3411     @property size_t size() @safe const pure nothrow
3412     {
3413         return pool.length;
3414     }
3415 
3416     /**
3417     Put a `Task` object on the back of the task queue.  The `Task`
3418     object may be passed by pointer or reference.
3419 
3420     Example:
3421     ---
3422     import std.file;
3423 
3424     // Create a task.
3425     auto t = task!read("foo.txt");
3426 
3427     // Add it to the queue to be executed.
3428     taskPool.put(t);
3429     ---
3430 
3431     Notes:
3432 
3433     @trusted overloads of this function are called for `Task`s if
3434     $(REF hasUnsharedAliasing, std,traits) is false for the `Task`'s
3435     return type or the function the `Task` executes is `pure`.
3436     `Task` objects that meet all other requirements specified in the
3437     `@trusted` overloads of `task` and `scopedTask` may be created
3438     and executed from `@safe` code via `Task.executeInNewThread` but
3439     not via `TaskPool`.
3440 
3441     While this function takes the address of variables that may
3442     be on the stack, some overloads are marked as @trusted.
3443     `Task` includes a destructor that waits for the task to complete
3444     before destroying the stack frame it is allocated on.  Therefore,
3445     it is impossible for the stack frame to be destroyed before the task is
3446     complete and no longer referenced by a `TaskPool`.
3447     */
3448     void put(alias fun, Args...)(ref Task!(fun, Args) task)
3449     if (!isSafeReturn!(typeof(task)))
3450     {
3451         task.pool = this;
3452         abstractPut(task.basePtr);
3453     }
3454 
3455     /// Ditto
3456     void put(alias fun, Args...)(Task!(fun, Args)* task)
3457     if (!isSafeReturn!(typeof(*task)))
3458     {
3459         import std.exception : enforce;
3460         enforce(task !is null, "Cannot put a null Task on a TaskPool queue.");
3461         put(*task);
3462     }
3463 
3464     @trusted void put(alias fun, Args...)(ref Task!(fun, Args) task)
3465     if (isSafeReturn!(typeof(task)))
3466     {
3467         task.pool = this;
3468         abstractPut(task.basePtr);
3469     }
3470 
3471     @trusted void put(alias fun, Args...)(Task!(fun, Args)* task)
3472     if (isSafeReturn!(typeof(*task)))
3473     {
3474         import std.exception : enforce;
3475         enforce(task !is null, "Cannot put a null Task on a TaskPool queue.");
3476         put(*task);
3477     }
3478 
3479     /**
3480     These properties control whether the worker threads are daemon threads.
3481     A daemon thread is automatically terminated when all non-daemon threads
3482     have terminated.  A non-daemon thread will prevent a program from
3483     terminating as long as it has not terminated.
3484 
3485     If any `TaskPool` with non-daemon threads is active, either `stop`
3486     or `finish` must be called on it before the program can terminate.
3487 
3488     The worker treads in the `TaskPool` instance returned by the
3489     `taskPool` property are daemon by default.  The worker threads of
3490     manually instantiated task pools are non-daemon by default.
3491 
3492     Note:  For a size zero pool, the getter arbitrarily returns true and the
3493            setter has no effect.
3494     */
3495     bool isDaemon() @property @trusted
3496     {
3497         queueLock();
3498         scope(exit) queueUnlock();
3499         return (size == 0) ? true : pool[0].isDaemon;
3500     }
3501 
3502     /// Ditto
3503     void isDaemon(bool newVal) @property @trusted
3504     {
3505         queueLock();
3506         scope(exit) queueUnlock();
3507         foreach (thread; pool)
3508         {
3509             thread.isDaemon = newVal;
3510         }
3511     }
3512 
3513     /**
3514     These functions allow getting and setting the OS scheduling priority of
3515     the worker threads in this `TaskPool`.  They forward to
3516     `core.thread.Thread.priority`, so a given priority value here means the
3517     same thing as an identical priority value in `core.thread`.
3518 
3519     Note:  For a size zero pool, the getter arbitrarily returns
3520            `core.thread.Thread.PRIORITY_MIN` and the setter has no effect.
3521     */
3522     int priority() @property @trusted
3523     {
3524         return (size == 0) ? core.thread.Thread.PRIORITY_MIN :
3525         pool[0].priority;
3526     }
3527 
3528     /// Ditto
3529     void priority(int newPriority) @property @trusted
3530     {
3531         if (size > 0)
3532         {
3533             foreach (t; pool)
3534             {
3535                 t.priority = newPriority;
3536             }
3537         }
3538     }
3539 }
3540 
3541 @system unittest
3542 {
3543     import std.algorithm.iteration : sum;
3544     import std.range : iota;
3545     import std.typecons : tuple;
3546 
3547     enum N = 100;
3548     auto r = iota(1, N + 1);
3549     const expected = r.sum();
3550 
3551     // Just the range
3552     assert(taskPool.fold!"a + b"(r) == expected);
3553 
3554     // Range and seeds
3555     assert(taskPool.fold!"a + b"(r, 0) == expected);
3556     assert(taskPool.fold!("a + b", "a + b")(r, 0, 0) == tuple(expected, expected));
3557 
3558     // Range, seeds, and work unit size
3559     assert(taskPool.fold!"a + b"(r, 0, 42) == expected);
3560     assert(taskPool.fold!("a + b", "a + b")(r, 0, 0, 42) == tuple(expected, expected));
3561 }
3562 
3563 // Issue 16705
3564 @system unittest
3565 {
3566     struct MyIota
3567     {
3568         size_t front;
3569         void popFront()(){front++;}
3570         auto empty(){return front >= 25;}
3571         auto opIndex(size_t i){return front+i;}
3572         auto length(){return 25-front;}
3573     }
3574 
3575     auto mySum = taskPool.reduce!"a + b"(MyIota());
3576 }
3577 
3578 /**
3579 Returns a lazily initialized global instantiation of `TaskPool`.
3580 This function can safely be called concurrently from multiple non-worker
3581 threads.  The worker threads in this pool are daemon threads, meaning that it
3582 is not necessary to call `TaskPool.stop` or `TaskPool.finish` before
3583 terminating the main thread.
3584 */
3585 @property TaskPool taskPool() @trusted
3586 {
3587     import std.concurrency : initOnce;
3588     __gshared TaskPool pool;
3589     return initOnce!pool({
3590         auto p = new TaskPool(defaultPoolThreads);
3591         p.isDaemon = true;
3592         return p;
3593     }());
3594 }
3595 
3596 private shared uint _defaultPoolThreads = uint.max;
3597 
3598 /**
3599 These properties get and set the number of worker threads in the `TaskPool`
3600 instance returned by `taskPool`.  The default value is `totalCPUs` - 1.
3601 Calling the setter after the first call to `taskPool` does not changes
3602 number of worker threads in the instance returned by `taskPool`.
3603 */
3604 @property uint defaultPoolThreads() @trusted
3605 {
3606     const local = atomicLoad(_defaultPoolThreads);
3607     return local < uint.max ? local : totalCPUs - 1;
3608 }
3609 
3610 /// Ditto
3611 @property void defaultPoolThreads(uint newVal) @trusted
3612 {
3613     atomicStore(_defaultPoolThreads, newVal);
3614 }
3615 
3616 /**
3617 Convenience functions that forwards to `taskPool.parallel`.  The
3618 purpose of these is to make parallel foreach less verbose and more
3619 readable.
3620 
3621 Example:
3622 ---
3623 // Find the logarithm of every number from
3624 // 1 to 1_000_000 in parallel, using the
3625 // default TaskPool instance.
3626 auto logs = new double[1_000_000];
3627 
3628 foreach (i, ref elem; parallel(logs))
3629 {
3630     elem = log(i + 1.0);
3631 }
3632 ---
3633 
3634 */
3635 ParallelForeach!R parallel(R)(R range)
3636 {
3637     return taskPool.parallel(range);
3638 }
3639 
3640 /// Ditto
3641 ParallelForeach!R parallel(R)(R range, size_t workUnitSize)
3642 {
3643     return taskPool.parallel(range, workUnitSize);
3644 }
3645 
3646 //  `each` should be usable with parallel
3647 // https://issues.dlang.org/show_bug.cgi?id=17019
3648 @system unittest
3649 {
3650     import std.algorithm.iteration : each, sum;
3651     import std.range : iota;
3652 
3653     // check behavior with parallel
3654     auto arr = new int[10];
3655     parallel(arr).each!((ref e) => e += 1);
3656     assert(arr.sum == 10);
3657 
3658     auto arrIndex = new int[10];
3659     parallel(arrIndex).each!((i, ref e) => e += i);
3660     assert(arrIndex.sum == 10.iota.sum);
3661 }
3662 
3663 // https://issues.dlang.org/show_bug.cgi?id=22745
3664 @system unittest
3665 {
3666     auto pool = new TaskPool(0);
3667     int[] empty;
3668     foreach (i; pool.parallel(empty)) {}
3669     pool.finish();
3670 }
3671 
3672 // Thrown when a parallel foreach loop is broken from.
3673 class ParallelForeachError : Error
3674 {
3675     this()
3676     {
3677         super("Cannot break from a parallel foreach loop using break, return, "
3678               ~ "labeled break/continue or goto statements.");
3679     }
3680 }
3681 
3682 /*------Structs that implement opApply for parallel foreach.------------------*/
3683 private template randLen(R)
3684 {
3685     enum randLen = isRandomAccessRange!R && hasLength!R;
3686 }
3687 
3688 private void submitAndExecute(
3689     TaskPool pool,
3690     scope void delegate() doIt
3691 )
3692 {
3693     import core.exception : OutOfMemoryError;
3694     immutable nThreads = pool.size + 1;
3695 
3696     alias PTask = typeof(scopedTask(doIt));
3697     import core.stdc.stdlib : malloc, free;
3698     import core.stdc.string : memcpy;
3699 
3700     // The logical thing to do would be to just use alloca() here, but that
3701     // causes problems on Windows for reasons that I don't understand
3702     // (tentatively a compiler bug) and definitely doesn't work on Posix due
3703     // to https://issues.dlang.org/show_bug.cgi?id=3753.
3704     // Therefore, allocate a fixed buffer and fall back to `malloc()` if
3705     // someone's using a ridiculous amount of threads.
3706     // Also, the using a byte array instead of a PTask array as the fixed buffer
3707     // is to prevent d'tors from being called on uninitialized excess PTask
3708     // instances.
3709     enum nBuf = 64;
3710     byte[nBuf * PTask.sizeof] buf = void;
3711     PTask[] tasks;
3712     if (nThreads <= nBuf)
3713     {
3714         tasks = (cast(PTask*) buf.ptr)[0 .. nThreads];
3715     }
3716     else
3717     {
3718         auto ptr = cast(PTask*) malloc(nThreads * PTask.sizeof);
3719         if (!ptr) throw new OutOfMemoryError("Out of memory in std.parallelism.");
3720         tasks = ptr[0 .. nThreads];
3721     }
3722 
3723     scope(exit)
3724     {
3725         if (nThreads > nBuf)
3726         {
3727             free(tasks.ptr);
3728         }
3729     }
3730 
3731     foreach (ref t; tasks)
3732     {
3733         import core.stdc.string : memcpy;
3734 
3735         // This silly looking code is necessary to prevent d'tors from being
3736         // called on uninitialized objects.
3737         auto temp = scopedTask(doIt);
3738         memcpy(&t, &temp, PTask.sizeof);
3739 
3740         // This has to be done to t after copying, not temp before copying.
3741         // Otherwise, temp's destructor will sit here and wait for the
3742         // task to finish.
3743         t.pool = pool;
3744     }
3745 
3746     foreach (i; 1 .. tasks.length - 1)
3747     {
3748         tasks[i].next = tasks[i + 1].basePtr;
3749         tasks[i + 1].prev = tasks[i].basePtr;
3750     }
3751 
3752     if (tasks.length > 1)
3753     {
3754         pool.queueLock();
3755         scope(exit) pool.queueUnlock();
3756 
3757         pool.abstractPutGroupNoSync(
3758             tasks[1].basePtr,
3759             tasks[$ - 1].basePtr
3760         );
3761     }
3762 
3763     if (tasks.length > 0)
3764     {
3765         try
3766         {
3767             tasks[0].job();
3768         }
3769         catch (Throwable e)
3770         {
3771             tasks[0].exception = e; // nocoverage
3772         }
3773         tasks[0].taskStatus = TaskStatus.done;
3774 
3775         // Try to execute each of these in the current thread
3776         foreach (ref task; tasks[1..$])
3777         {
3778             pool.tryDeleteExecute(task.basePtr);
3779         }
3780     }
3781 
3782     Throwable firstException;
3783 
3784     foreach (i, ref task; tasks)
3785     {
3786         try
3787         {
3788             task.yieldForce;
3789         }
3790         catch (Throwable e)
3791         {
3792             /* Chain e to front because order doesn't matter and because
3793              * e is not likely to be a chain itself (so fewer traversals)
3794              */
3795             firstException = Throwable.chainTogether(e, firstException);
3796             continue;
3797         }
3798     }
3799 
3800     if (firstException) throw firstException;
3801 }
3802 
3803 void foreachErr()
3804 {
3805     throw new ParallelForeachError();
3806 }
3807 
3808 int doSizeZeroCase(R, Delegate)(ref ParallelForeach!R p, Delegate dg)
3809 {
3810     with(p)
3811     {
3812         int res = 0;
3813         size_t index = 0;
3814 
3815         // The explicit ElementType!R in the foreach loops is necessary for
3816         // correct behavior when iterating over strings.
3817         static if (hasLvalueElements!R)
3818         {
3819             foreach (ref ElementType!R elem; range)
3820             {
3821                 static if (Parameters!dg.length == 2)
3822                 {
3823                     res = dg(index, elem);
3824                 }
3825                 else
3826                 {
3827                     res = dg(elem);
3828                 }
3829                 if (res) break;
3830                 index++;
3831             }
3832         }
3833         else
3834         {
3835             foreach (ElementType!R elem; range)
3836             {
3837                 static if (Parameters!dg.length == 2)
3838                 {
3839                     res = dg(index, elem);
3840                 }
3841                 else
3842                 {
3843                     res = dg(elem);
3844                 }
3845                 if (res) break;
3846                 index++;
3847             }
3848         }
3849         if (res) foreachErr;
3850         return res;
3851     }
3852 }
3853 
3854 private enum string parallelApplyMixinRandomAccess = q{
3855     // Handle empty thread pool as special case.
3856     if (pool.size == 0)
3857     {
3858         return doSizeZeroCase(this, dg);
3859     }
3860 
3861     // Whether iteration is with or without an index variable.
3862     enum withIndex = Parameters!(typeof(dg)).length == 2;
3863 
3864     shared size_t workUnitIndex = size_t.max;  // Effectively -1:  chunkIndex + 1 == 0
3865     immutable len = range.length;
3866     if (!len) return 0;
3867 
3868     shared bool shouldContinue = true;
3869 
3870     void doIt()
3871     {
3872         import std.algorithm.comparison : min;
3873 
3874         scope(failure)
3875         {
3876             // If an exception is thrown, all threads should bail.
3877             atomicStore(shouldContinue, false);
3878         }
3879 
3880         while (atomicLoad(shouldContinue))
3881         {
3882             immutable myUnitIndex = atomicOp!"+="(workUnitIndex, 1);
3883             immutable start = workUnitSize * myUnitIndex;
3884             if (start >= len)
3885             {
3886                 atomicStore(shouldContinue, false);
3887                 break;
3888             }
3889 
3890             immutable end = min(len, start + workUnitSize);
3891 
3892             foreach (i; start .. end)
3893             {
3894                 static if (withIndex)
3895                 {
3896                     if (dg(i, range[i])) foreachErr();
3897                 }
3898                 else
3899                 {
3900                     if (dg(range[i])) foreachErr();
3901                 }
3902             }
3903         }
3904     }
3905 
3906     submitAndExecute(pool, &doIt);
3907 
3908     return 0;
3909 };
3910 
3911 enum string parallelApplyMixinInputRange = q{
3912     // Handle empty thread pool as special case.
3913     if (pool.size == 0)
3914     {
3915         return doSizeZeroCase(this, dg);
3916     }
3917 
3918     // Whether iteration is with or without an index variable.
3919     enum withIndex = Parameters!(typeof(dg)).length == 2;
3920 
3921     // This protects the range while copying it.
3922     auto rangeMutex = new Mutex();
3923 
3924     shared bool shouldContinue = true;
3925 
3926     // The total number of elements that have been popped off range.
3927     // This is updated only while protected by rangeMutex;
3928     size_t nPopped = 0;
3929 
3930     static if (
3931         is(typeof(range.buf1)) &&
3932         is(typeof(range.bufPos)) &&
3933         is(typeof(range.doBufSwap()))
3934     )
3935     {
3936         // Make sure we don't have the buffer recycling overload of
3937         // asyncBuf.
3938         static if (
3939             is(typeof(range.source)) &&
3940             isRoundRobin!(typeof(range.source))
3941         )
3942         {
3943             static assert(0, "Cannot execute a parallel foreach loop on " ~
3944             "the buffer recycling overload of asyncBuf.");
3945         }
3946 
3947         enum bool bufferTrick = true;
3948     }
3949     else
3950     {
3951         enum bool bufferTrick = false;
3952     }
3953 
3954     void doIt()
3955     {
3956         scope(failure)
3957         {
3958             // If an exception is thrown, all threads should bail.
3959             atomicStore(shouldContinue, false);
3960         }
3961 
3962         static if (hasLvalueElements!R)
3963         {
3964             alias Temp = ElementType!R*[];
3965             Temp temp;
3966 
3967             // Returns:  The previous value of nPopped.
3968             size_t makeTemp()
3969             {
3970                 import std.algorithm.internal : addressOf;
3971                 import std.array : uninitializedArray;
3972 
3973                 if (temp is null)
3974                 {
3975                     temp = uninitializedArray!Temp(workUnitSize);
3976                 }
3977 
3978                 rangeMutex.lock();
3979                 scope(exit) rangeMutex.unlock();
3980 
3981                 size_t i = 0;
3982                 for (; i < workUnitSize && !range.empty; range.popFront(), i++)
3983                 {
3984                     temp[i] = addressOf(range.front);
3985                 }
3986 
3987                 temp = temp[0 .. i];
3988                 auto ret = nPopped;
3989                 nPopped += temp.length;
3990                 return ret;
3991             }
3992 
3993         }
3994         else
3995         {
3996 
3997             alias Temp = ElementType!R[];
3998             Temp temp;
3999 
4000             // Returns:  The previous value of nPopped.
4001             static if (!bufferTrick) size_t makeTemp()
4002             {
4003                 import std.array : uninitializedArray;
4004 
4005                 if (temp is null)
4006                 {
4007                     temp = uninitializedArray!Temp(workUnitSize);
4008                 }
4009 
4010                 rangeMutex.lock();
4011                 scope(exit) rangeMutex.unlock();
4012 
4013                 size_t i = 0;
4014                 for (; i < workUnitSize && !range.empty; range.popFront(), i++)
4015                 {
4016                     temp[i] = range.front;
4017                 }
4018 
4019                 temp = temp[0 .. i];
4020                 auto ret = nPopped;
4021                 nPopped += temp.length;
4022                 return ret;
4023             }
4024 
4025             static if (bufferTrick) size_t makeTemp()
4026             {
4027                 import std.algorithm.mutation : swap;
4028                 rangeMutex.lock();
4029                 scope(exit) rangeMutex.unlock();
4030 
4031                 // Elide copying by just swapping buffers.
4032                 temp.length = range.buf1.length;
4033                 swap(range.buf1, temp);
4034 
4035                 // This is necessary in case popFront() has been called on
4036                 // range before entering the parallel foreach loop.
4037                 temp = temp[range.bufPos..$];
4038 
4039                 static if (is(typeof(range._length)))
4040                 {
4041                     range._length -= (temp.length - range.bufPos);
4042                 }
4043 
4044                 range.doBufSwap();
4045                 auto ret = nPopped;
4046                 nPopped += temp.length;
4047                 return ret;
4048             }
4049         }
4050 
4051         while (atomicLoad(shouldContinue))
4052         {
4053             auto overallIndex = makeTemp();
4054             if (temp.empty)
4055             {
4056                 atomicStore(shouldContinue, false);
4057                 break;
4058             }
4059 
4060             foreach (i; 0 .. temp.length)
4061             {
4062                 scope(success) overallIndex++;
4063 
4064                 static if (hasLvalueElements!R)
4065                 {
4066                     static if (withIndex)
4067                     {
4068                         if (dg(overallIndex, *temp[i])) foreachErr();
4069                     }
4070                     else
4071                     {
4072                         if (dg(*temp[i])) foreachErr();
4073                     }
4074                 }
4075                 else
4076                 {
4077                     static if (withIndex)
4078                     {
4079                         if (dg(overallIndex, temp[i])) foreachErr();
4080                     }
4081                     else
4082                     {
4083                         if (dg(temp[i])) foreachErr();
4084                     }
4085                 }
4086             }
4087         }
4088     }
4089 
4090     submitAndExecute(pool, &doIt);
4091 
4092     return 0;
4093 };
4094 
4095 
4096 private struct ParallelForeach(R)
4097 {
4098     TaskPool pool;
4099     R range;
4100     size_t workUnitSize;
4101     alias E = ElementType!R;
4102 
4103     static if (hasLvalueElements!R)
4104     {
4105         alias NoIndexDg = int delegate(ref E);
4106         alias IndexDg = int delegate(size_t, ref E);
4107     }
4108     else
4109     {
4110         alias NoIndexDg = int delegate(E);
4111         alias IndexDg = int delegate(size_t, E);
4112     }
4113 
4114     int opApply(scope NoIndexDg dg)
4115     {
4116         static if (randLen!R)
4117         {
4118             mixin(parallelApplyMixinRandomAccess);
4119         }
4120         else
4121         {
4122             mixin(parallelApplyMixinInputRange);
4123         }
4124     }
4125 
4126     int opApply(scope IndexDg dg)
4127     {
4128         static if (randLen!R)
4129         {
4130             mixin(parallelApplyMixinRandomAccess);
4131         }
4132         else
4133         {
4134             mixin(parallelApplyMixinInputRange);
4135         }
4136     }
4137 }
4138 
4139 /*
4140 This struct buffers the output of a callable that outputs data into a
4141 user-supplied buffer into a set of buffers of some fixed size.  It allows these
4142 buffers to be accessed with an input range interface.  This is used internally
4143 in the buffer-recycling overload of TaskPool.asyncBuf, which creates an
4144 instance and forwards it to the input range overload of asyncBuf.
4145 */
4146 private struct RoundRobinBuffer(C1, C2)
4147 {
4148     // No need for constraints because they're already checked for in asyncBuf.
4149 
4150     alias Array = Parameters!(C1.init)[0];
4151     alias T = typeof(Array.init[0]);
4152 
4153     T[][] bufs;
4154     size_t index;
4155     C1 nextDel;
4156     C2 emptyDel;
4157     bool _empty;
4158     bool primed;
4159 
4160     this(
4161         C1 nextDel,
4162         C2 emptyDel,
4163         size_t initialBufSize,
4164         size_t nBuffers
4165     ) {
4166         this.nextDel = nextDel;
4167         this.emptyDel = emptyDel;
4168         bufs.length = nBuffers;
4169 
4170         foreach (ref buf; bufs)
4171         {
4172             buf.length = initialBufSize;
4173         }
4174     }
4175 
4176     void prime()
4177     in
4178     {
4179         assert(!empty);
4180     }
4181     do
4182     {
4183         scope(success) primed = true;
4184         nextDel(bufs[index]);
4185     }
4186 
4187 
4188     T[] front() @property
4189     in
4190     {
4191         assert(!empty);
4192     }
4193     do
4194     {
4195         if (!primed) prime();
4196         return bufs[index];
4197     }
4198 
4199     void popFront()
4200     {
4201         if (empty || emptyDel())
4202         {
4203             _empty = true;
4204             return;
4205         }
4206 
4207         index = (index + 1) % bufs.length;
4208         primed = false;
4209     }
4210 
4211     bool empty() @property const @safe pure nothrow
4212     {
4213         return _empty;
4214     }
4215 }
4216 
4217 version (StdUnittest)
4218 {
4219     // This was the only way I could get nested maps to work.
4220     private __gshared TaskPool poolInstance;
4221 }
4222 
4223 // These test basic functionality but don't stress test for threading bugs.
4224 // These are the tests that should be run every time Phobos is compiled.
4225 @system unittest
4226 {
4227     import std.algorithm.comparison : equal, min, max;
4228     import std.algorithm.iteration : filter, map, reduce;
4229     import std.array : split;
4230     import std.conv : text;
4231     import std.exception : assertThrown;
4232     import std.math.operations : isClose;
4233     import std.math.algebraic : sqrt, abs;
4234     import std.math.exponential : log;
4235     import std.range : indexed, iota, join;
4236     import std.typecons : Tuple, tuple;
4237     import std.stdio;
4238 
4239     poolInstance = new TaskPool(2);
4240     scope(exit) poolInstance.stop();
4241 
4242     // The only way this can be verified is manually.
4243     debug(std_parallelism) stderr.writeln("totalCPUs = ", totalCPUs);
4244 
4245     auto oldPriority = poolInstance.priority;
4246     poolInstance.priority = Thread.PRIORITY_MAX;
4247     assert(poolInstance.priority == Thread.PRIORITY_MAX);
4248 
4249     poolInstance.priority = Thread.PRIORITY_MIN;
4250     assert(poolInstance.priority == Thread.PRIORITY_MIN);
4251 
4252     poolInstance.priority = oldPriority;
4253     assert(poolInstance.priority == oldPriority);
4254 
4255     static void refFun(ref uint num)
4256     {
4257         num++;
4258     }
4259 
4260     uint x;
4261 
4262     // Test task().
4263     auto t = task!refFun(x);
4264     poolInstance.put(t);
4265     t.yieldForce;
4266     assert(t.args[0] == 1);
4267 
4268     auto t2 = task(&refFun, x);
4269     poolInstance.put(t2);
4270     t2.yieldForce;
4271     assert(t2.args[0] == 1);
4272 
4273     // Test scopedTask().
4274     auto st = scopedTask!refFun(x);
4275     poolInstance.put(st);
4276     st.yieldForce;
4277     assert(st.args[0] == 1);
4278 
4279     auto st2 = scopedTask(&refFun, x);
4280     poolInstance.put(st2);
4281     st2.yieldForce;
4282     assert(st2.args[0] == 1);
4283 
4284     // Test executeInNewThread().
4285     auto ct = scopedTask!refFun(x);
4286     ct.executeInNewThread(Thread.PRIORITY_MAX);
4287     ct.yieldForce;
4288     assert(ct.args[0] == 1);
4289 
4290     // Test ref return.
4291     uint toInc = 0;
4292     static ref T makeRef(T)(ref T num)
4293     {
4294         return num;
4295     }
4296 
4297     auto t3 = task!makeRef(toInc);
4298     taskPool.put(t3);
4299     assert(t3.args[0] == 0);
4300     t3.spinForce++;
4301     assert(t3.args[0] == 1);
4302 
4303     static void testSafe() @safe {
4304         static int bump(int num)
4305         {
4306             return num + 1;
4307         }
4308 
4309         auto safePool = new TaskPool(0);
4310         auto t = task(&bump, 1);
4311         taskPool.put(t);
4312         assert(t.yieldForce == 2);
4313 
4314         auto st = scopedTask(&bump, 1);
4315         taskPool.put(st);
4316         assert(st.yieldForce == 2);
4317         safePool.stop();
4318     }
4319 
4320     auto arr = [1,2,3,4,5];
4321     auto nums = new uint[5];
4322     auto nums2 = new uint[5];
4323 
4324     foreach (i, ref elem; poolInstance.parallel(arr))
4325     {
4326         elem++;
4327         nums[i] = cast(uint) i + 2;
4328         nums2[i] = elem;
4329     }
4330 
4331     assert(nums == [2,3,4,5,6], text(nums));
4332     assert(nums2 == nums, text(nums2));
4333     assert(arr == nums, text(arr));
4334 
4335     // Test const/immutable arguments.
4336     static int add(int lhs, int rhs)
4337     {
4338         return lhs + rhs;
4339     }
4340     immutable addLhs = 1;
4341     immutable addRhs = 2;
4342     auto addTask = task(&add, addLhs, addRhs);
4343     auto addScopedTask = scopedTask(&add, addLhs, addRhs);
4344     poolInstance.put(addTask);
4345     poolInstance.put(addScopedTask);
4346     assert(addTask.yieldForce == 3);
4347     assert(addScopedTask.yieldForce == 3);
4348 
4349     // Test parallel foreach with non-random access range.
4350     auto range = filter!"a != 666"([0, 1, 2, 3, 4]);
4351 
4352     foreach (i, elem; poolInstance.parallel(range))
4353     {
4354         nums[i] = cast(uint) i;
4355     }
4356 
4357     assert(nums == [0,1,2,3,4]);
4358 
4359     auto logs = new double[1_000_000];
4360     foreach (i, ref elem; poolInstance.parallel(logs))
4361     {
4362         elem = log(i + 1.0);
4363     }
4364 
4365     foreach (i, elem; logs)
4366     {
4367         assert(isClose(elem, log(double(i + 1))));
4368     }
4369 
4370     assert(poolInstance.amap!"a * a"([1,2,3,4,5]) == [1,4,9,16,25]);
4371     assert(poolInstance.amap!"a * a"([1,2,3,4,5], new long[5]) == [1,4,9,16,25]);
4372     assert(poolInstance.amap!("a * a", "-a")([1,2,3]) ==
4373            [tuple(1, -1), tuple(4, -2), tuple(9, -3)]);
4374 
4375     auto tupleBuf = new Tuple!(int, int)[3];
4376     poolInstance.amap!("a * a", "-a")([1,2,3], tupleBuf);
4377     assert(tupleBuf == [tuple(1, -1), tuple(4, -2), tuple(9, -3)]);
4378     poolInstance.amap!("a * a", "-a")([1,2,3], 5, tupleBuf);
4379     assert(tupleBuf == [tuple(1, -1), tuple(4, -2), tuple(9, -3)]);
4380 
4381     // Test amap with a non-array buffer.
4382     auto toIndex = new int[5];
4383     auto ind = indexed(toIndex, [3, 1, 4, 0, 2]);
4384     poolInstance.amap!"a * 2"([1, 2, 3, 4, 5], ind);
4385     assert(equal(ind, [2, 4, 6, 8, 10]));
4386     assert(equal(toIndex, [8, 4, 10, 2, 6]));
4387     poolInstance.amap!"a / 2"(ind, ind);
4388     assert(equal(ind, [1, 2, 3, 4, 5]));
4389     assert(equal(toIndex, [4, 2, 5, 1, 3]));
4390 
4391     auto buf = new int[5];
4392     poolInstance.amap!"a * a"([1,2,3,4,5], buf);
4393     assert(buf == [1,4,9,16,25]);
4394     poolInstance.amap!"a * a"([1,2,3,4,5], 4, buf);
4395     assert(buf == [1,4,9,16,25]);
4396 
4397     assert(poolInstance.reduce!"a + b"([1]) == 1);
4398     assert(poolInstance.reduce!"a + b"([1,2,3,4]) == 10);
4399     assert(poolInstance.reduce!"a + b"(0.0, [1,2,3,4]) == 10);
4400     assert(poolInstance.reduce!"a + b"(0.0, [1,2,3,4], 1) == 10);
4401     assert(poolInstance.reduce!(min, max)([1,2,3,4]) == tuple(1, 4));
4402     assert(poolInstance.reduce!("a + b", "a * b")(tuple(0, 1), [1,2,3,4]) ==
4403            tuple(10, 24));
4404 
4405     immutable serialAns = reduce!"a + b"(iota(1000));
4406     assert(poolInstance.reduce!"a + b"(0, iota(1000)) == serialAns);
4407     assert(poolInstance.reduce!"a + b"(iota(1000)) == serialAns);
4408 
4409     // Test worker-local storage.
4410     auto wl = poolInstance.workerLocalStorage(0);
4411     foreach (i; poolInstance.parallel(iota(1000), 1))
4412     {
4413         wl.get = wl.get + i;
4414     }
4415 
4416     auto wlRange = wl.toRange;
4417     auto parallelSum = poolInstance.reduce!"a + b"(wlRange);
4418     assert(parallelSum == 499500);
4419     assert(wlRange[0 .. 1][0] == wlRange[0]);
4420     assert(wlRange[1 .. 2][0] == wlRange[1]);
4421 
4422     // Test finish()
4423     {
4424         static void slowFun() { Thread.sleep(dur!"msecs"(1)); }
4425 
4426         auto pool1 = new TaskPool();
4427         auto tSlow = task!slowFun();
4428         pool1.put(tSlow);
4429         pool1.finish();
4430         tSlow.yieldForce;
4431         // Can't assert that pool1.status == PoolState.stopNow because status
4432         // doesn't change until after the "done" flag is set and the waiting
4433         // thread is woken up.
4434 
4435         auto pool2 = new TaskPool();
4436         auto tSlow2 = task!slowFun();
4437         pool2.put(tSlow2);
4438         pool2.finish(true); // blocking
4439         assert(tSlow2.done);
4440 
4441         // Test fix for https://issues.dlang.org/show_bug.cgi?id=8582 by making pool size zero.
4442         auto pool3 = new TaskPool(0);
4443         auto tSlow3 = task!slowFun();
4444         pool3.put(tSlow3);
4445         pool3.finish(true); // blocking
4446         assert(tSlow3.done);
4447 
4448         // This is correct because no thread will terminate unless pool2.status
4449         // and pool3.status have already been set to stopNow.
4450         assert(pool2.status == TaskPool.PoolState.stopNow);
4451         assert(pool3.status == TaskPool.PoolState.stopNow);
4452     }
4453 
4454     // Test default pool stuff.
4455     assert(taskPool.size == totalCPUs - 1);
4456 
4457     nums = new uint[1000];
4458     foreach (i; parallel(iota(1000)))
4459     {
4460         nums[i] = cast(uint) i;
4461     }
4462     assert(equal(nums, iota(1000)));
4463 
4464     assert(equal(
4465                poolInstance.map!"a * a"(iota(3_000_001), 10_000),
4466                map!"a * a"(iota(3_000_001))
4467            ));
4468 
4469     // The filter is to kill random access and test the non-random access
4470     // branch.
4471     assert(equal(
4472                poolInstance.map!"a * a"(
4473                    filter!"a == a"(iota(3_000_001)
4474                                   ), 10_000, 1000),
4475                map!"a * a"(iota(3_000_001))
4476            ));
4477 
4478     assert(
4479         reduce!"a + b"(0UL,
4480                        poolInstance.map!"a * a"(iota(300_001), 10_000)
4481                       ) ==
4482         reduce!"a + b"(0UL,
4483                        map!"a * a"(iota(300_001))
4484                       )
4485     );
4486 
4487     assert(equal(
4488                iota(1_000_002),
4489                poolInstance.asyncBuf(filter!"a == a"(iota(1_000_002)))
4490            ));
4491 
4492     {
4493         import std.conv : to;
4494         import std.file : deleteme;
4495 
4496         string temp_file = deleteme ~ "-tempDelMe.txt";
4497         auto file = File(temp_file, "wb");
4498         scope(exit)
4499         {
4500             file.close();
4501             import std.file;
4502             remove(temp_file);
4503         }
4504 
4505         auto written = [[1.0, 2, 3], [4.0, 5, 6], [7.0, 8, 9]];
4506         foreach (row; written)
4507         {
4508             file.writeln(join(to!(string[])(row), "\t"));
4509         }
4510 
4511         file = File(temp_file);
4512 
4513         void next(ref char[] buf)
4514         {
4515             file.readln(buf);
4516             import std.string : chomp;
4517             buf = chomp(buf);
4518         }
4519 
4520         double[][] read;
4521         auto asyncReader = taskPool.asyncBuf(&next, &file.eof);
4522 
4523         foreach (line; asyncReader)
4524         {
4525             if (line.length == 0) continue;
4526             auto ls = line.split("\t");
4527             read ~= to!(double[])(ls);
4528         }
4529 
4530         assert(read == written);
4531         file.close();
4532     }
4533 
4534     // Test Map/AsyncBuf chaining.
4535 
4536     auto abuf = poolInstance.asyncBuf(iota(-1.0, 3_000_000), 100);
4537     auto temp = poolInstance.map!sqrt(
4538                     abuf, 100, 5
4539                 );
4540     auto lmchain = poolInstance.map!"a * a"(temp, 100, 5);
4541     lmchain.popFront();
4542 
4543     int ii;
4544     foreach ( elem; (lmchain))
4545     {
4546         if (!isClose(elem, ii))
4547         {
4548             stderr.writeln(ii, '\t', elem);
4549         }
4550         ii++;
4551     }
4552 
4553     // Test buffer trick in parallel foreach.
4554     abuf = poolInstance.asyncBuf(iota(-1.0, 1_000_000), 100);
4555     abuf.popFront();
4556     auto bufTrickTest = new size_t[abuf.length];
4557     foreach (i, elem; parallel(abuf))
4558     {
4559         bufTrickTest[i] = i;
4560     }
4561 
4562     assert(equal(iota(1_000_000), bufTrickTest));
4563 
4564     auto myTask = task!(abs)(-1);
4565     taskPool.put(myTask);
4566     assert(myTask.spinForce == 1);
4567 
4568     // Test that worker local storage from one pool receives an index of 0
4569     // when the index is queried w.r.t. another pool.  The only way to do this
4570     // is non-deterministically.
4571     foreach (i; parallel(iota(1000), 1))
4572     {
4573         assert(poolInstance.workerIndex == 0);
4574     }
4575 
4576     foreach (i; poolInstance.parallel(iota(1000), 1))
4577     {
4578         assert(taskPool.workerIndex == 0);
4579     }
4580 
4581     // Test exception handling.
4582     static void parallelForeachThrow()
4583     {
4584         foreach (elem; parallel(iota(10)))
4585         {
4586             throw new Exception("");
4587         }
4588     }
4589 
4590     assertThrown!Exception(parallelForeachThrow());
4591 
4592     static int reduceException(int a, int b)
4593     {
4594         throw new Exception("");
4595     }
4596 
4597     assertThrown!Exception(
4598         poolInstance.reduce!reduceException(iota(3))
4599     );
4600 
4601     static int mapException(int a)
4602     {
4603         throw new Exception("");
4604     }
4605 
4606     assertThrown!Exception(
4607         poolInstance.amap!mapException(iota(3))
4608     );
4609 
4610     static void mapThrow()
4611     {
4612         auto m = poolInstance.map!mapException(iota(3));
4613         m.popFront();
4614     }
4615 
4616     assertThrown!Exception(mapThrow());
4617 
4618     struct ThrowingRange
4619     {
4620         @property int front()
4621         {
4622             return 1;
4623         }
4624         void popFront()
4625         {
4626             throw new Exception("");
4627         }
4628         enum bool empty = false;
4629     }
4630 
4631     assertThrown!Exception(poolInstance.asyncBuf(ThrowingRange.init));
4632 }
4633 
4634 //version = parallelismStressTest;
4635 
4636 // These are more like stress tests than real unit tests.  They print out
4637 // tons of stuff and should not be run every time make unittest is run.
4638 version (parallelismStressTest)
4639 {
4640     @system unittest
4641     {
4642         import std.stdio : stderr, writeln, readln;
4643         import std.range : iota;
4644         import std.algorithm.iteration : filter, reduce;
4645 
4646         size_t attempt;
4647         for (; attempt < 10; attempt++)
4648             foreach (poolSize; [0, 4])
4649         {
4650 
4651             poolInstance = new TaskPool(poolSize);
4652 
4653             uint[] numbers = new uint[1_000];
4654 
4655             foreach (i; poolInstance.parallel( iota(0, numbers.length)) )
4656             {
4657                 numbers[i] = cast(uint) i;
4658             }
4659 
4660             // Make sure it works.
4661             foreach (i; 0 .. numbers.length)
4662             {
4663                 assert(numbers[i] == i);
4664             }
4665 
4666             stderr.writeln("Done creating nums.");
4667 
4668 
4669             auto myNumbers = filter!"a % 7 > 0"( iota(0, 1000));
4670             foreach (num; poolInstance.parallel(myNumbers))
4671             {
4672                 assert(num % 7 > 0 && num < 1000);
4673             }
4674             stderr.writeln("Done modulus test.");
4675 
4676             uint[] squares = poolInstance.amap!"a * a"(numbers, 100);
4677             assert(squares.length == numbers.length);
4678             foreach (i, number; numbers)
4679             {
4680                 assert(squares[i] == number * number);
4681             }
4682             stderr.writeln("Done squares.");
4683 
4684             auto sumFuture = task!( reduce!"a + b" )(numbers);
4685             poolInstance.put(sumFuture);
4686 
4687             ulong sumSquares = 0;
4688             foreach (elem; numbers)
4689             {
4690                 sumSquares += elem * elem;
4691             }
4692 
4693             uint mySum = sumFuture.spinForce();
4694             assert(mySum == 999 * 1000 / 2);
4695 
4696             auto mySumParallel = poolInstance.reduce!"a + b"(numbers);
4697             assert(mySum == mySumParallel);
4698             stderr.writeln("Done sums.");
4699 
4700             auto myTask = task(
4701             {
4702                 synchronized writeln("Our lives are parallel...Our lives are parallel.");
4703             });
4704             poolInstance.put(myTask);
4705 
4706             auto nestedOuter = "abcd";
4707             auto nestedInner =  iota(0, 10, 2);
4708 
4709             foreach (i, letter; poolInstance.parallel(nestedOuter, 1))
4710             {
4711                 foreach (j, number; poolInstance.parallel(nestedInner, 1))
4712                 {
4713                     synchronized writeln(i, ": ", letter, "  ", j, ": ", number);
4714                 }
4715             }
4716 
4717             poolInstance.stop();
4718         }
4719 
4720         assert(attempt == 10);
4721         writeln("Press enter to go to next round of unittests.");
4722         readln();
4723     }
4724 
4725     // These unittests are intended more for actual testing and not so much
4726     // as examples.
4727     @system unittest
4728     {
4729         import std.stdio : stderr;
4730         import std.range : iota;
4731         import std.algorithm.iteration : filter, reduce;
4732         import std.math.algebraic : sqrt;
4733         import std.math.operations : isClose;
4734         import std.math.traits : isNaN;
4735         import std.conv : text;
4736 
4737         foreach (attempt; 0 .. 10)
4738         foreach (poolSize; [0, 4])
4739         {
4740             poolInstance = new TaskPool(poolSize);
4741 
4742             // Test indexing.
4743             stderr.writeln("Creator Raw Index:  ", poolInstance.threadIndex);
4744             assert(poolInstance.workerIndex() == 0);
4745 
4746             // Test worker-local storage.
4747             auto workerLocalStorage = poolInstance.workerLocalStorage!uint(1);
4748             foreach (i; poolInstance.parallel(iota(0U, 1_000_000)))
4749             {
4750                 workerLocalStorage.get++;
4751             }
4752             assert(reduce!"a + b"(workerLocalStorage.toRange) ==
4753             1_000_000 + poolInstance.size + 1);
4754 
4755             // Make sure work is reasonably balanced among threads.  This test is
4756             // non-deterministic and is more of a sanity check than something that
4757             // has an absolute pass/fail.
4758             shared(uint)[void*] nJobsByThread;
4759             foreach (thread; poolInstance.pool)
4760             {
4761                 nJobsByThread[cast(void*) thread] = 0;
4762             }
4763             nJobsByThread[ cast(void*) Thread.getThis()] = 0;
4764 
4765             foreach (i; poolInstance.parallel( iota(0, 1_000_000), 100 ))
4766             {
4767                 atomicOp!"+="( nJobsByThread[ cast(void*) Thread.getThis() ], 1);
4768             }
4769 
4770             stderr.writeln("\nCurrent thread is:  ",
4771             cast(void*) Thread.getThis());
4772             stderr.writeln("Workload distribution:  ");
4773             foreach (k, v; nJobsByThread)
4774             {
4775                 stderr.writeln(k, '\t', v);
4776             }
4777 
4778             // Test whether amap can be nested.
4779             real[][] matrix = new real[][](1000, 1000);
4780             foreach (i; poolInstance.parallel( iota(0, matrix.length) ))
4781             {
4782                 foreach (j; poolInstance.parallel( iota(0, matrix[0].length) ))
4783                 {
4784                     matrix[i][j] = i * j;
4785                 }
4786             }
4787 
4788             // Get around weird bugs having to do w/ sqrt being an intrinsic:
4789             static real mySqrt(real num)
4790             {
4791                 return sqrt(num);
4792             }
4793 
4794             static real[] parallelSqrt(real[] nums)
4795             {
4796                 return poolInstance.amap!mySqrt(nums);
4797             }
4798 
4799             real[][] sqrtMatrix = poolInstance.amap!parallelSqrt(matrix);
4800 
4801             foreach (i, row; sqrtMatrix)
4802             {
4803                 foreach (j, elem; row)
4804                 {
4805                     real shouldBe = sqrt( cast(real) i * j);
4806                     assert(isClose(shouldBe, elem));
4807                     sqrtMatrix[i][j] = shouldBe;
4808                 }
4809             }
4810 
4811             auto saySuccess = task(
4812             {
4813                 stderr.writeln(
4814                     "Success doing matrix stuff that involves nested pool use.");
4815             });
4816             poolInstance.put(saySuccess);
4817             saySuccess.workForce();
4818 
4819             // A more thorough test of amap, reduce:  Find the sum of the square roots of
4820             // matrix.
4821 
4822             static real parallelSum(real[] input)
4823             {
4824                 return poolInstance.reduce!"a + b"(input);
4825             }
4826 
4827             auto sumSqrt = poolInstance.reduce!"a + b"(
4828                                poolInstance.amap!parallelSum(
4829                                    sqrtMatrix
4830                                )
4831                            );
4832 
4833             assert(isClose(sumSqrt, 4.437e8, 1e-2));
4834             stderr.writeln("Done sum of square roots.");
4835 
4836             // Test whether tasks work with function pointers.
4837             /+ // This part is buggy and needs to be fixed...
4838             auto nanTask = task(&isNaN, 1.0L);
4839             poolInstance.put(nanTask);
4840             assert(nanTask.spinForce == false);
4841 
4842             if (poolInstance.size > 0)
4843             {
4844                 // Test work waiting.
4845                 static void uselessFun()
4846                 {
4847                     foreach (i; 0 .. 1_000_000) {}
4848                 }
4849 
4850                 auto uselessTasks = new typeof(task(&uselessFun))[1000];
4851                 foreach (ref uselessTask; uselessTasks)
4852                 {
4853                     uselessTask = task(&uselessFun);
4854                 }
4855                 foreach (ref uselessTask; uselessTasks)
4856                 {
4857                     poolInstance.put(uselessTask);
4858                 }
4859                 foreach (ref uselessTask; uselessTasks)
4860                 {
4861                     uselessTask.workForce();
4862                 }
4863             }
4864              +/
4865 
4866             // Test the case of non-random access + ref returns.
4867             int[] nums = [1,2,3,4,5];
4868             static struct RemoveRandom
4869             {
4870                 int[] arr;
4871 
4872                 ref int front()
4873                 {
4874                     return arr.front;
4875                 }
4876                 void popFront()
4877                 {
4878                     arr.popFront();
4879                 }
4880                 bool empty()
4881                 {
4882                     return arr.empty;
4883                 }
4884             }
4885 
4886             auto refRange = RemoveRandom(nums);
4887             foreach (ref elem; poolInstance.parallel(refRange))
4888             {
4889                 elem++;
4890             }
4891             assert(nums == [2,3,4,5,6], text(nums));
4892             stderr.writeln("Nums:  ", nums);
4893 
4894             poolInstance.stop();
4895         }
4896     }
4897 }
4898 
4899 @system unittest
4900 {
4901     static struct __S_12733
4902     {
4903         invariant() { assert(checksum == 1_234_567_890); }
4904         this(ulong u){n = u;}
4905         void opAssign(__S_12733 s){this.n = s.n;}
4906         ulong n;
4907         ulong checksum = 1_234_567_890;
4908     }
4909 
4910     static auto __genPair_12733(ulong n) { return __S_12733(n); }
4911     immutable ulong[] data = [ 2UL^^59-1, 2UL^^59-1, 2UL^^59-1, 112_272_537_195_293UL ];
4912 
4913     auto result = taskPool.amap!__genPair_12733(data);
4914 }
4915 
4916 @safe unittest
4917 {
4918     import std.range : iota;
4919 
4920     // this test was in std.range, but caused cycles.
4921     assert(__traits(compiles, { foreach (i; iota(0, 100UL).parallel) {} }));
4922 }
4923 
4924 @safe unittest
4925 {
4926     import std.algorithm.iteration : each;
4927 
4928     long[] arr;
4929     static assert(is(typeof({
4930         arr.parallel.each!"a++";
4931     })));
4932 }
4933 
4934 // https://issues.dlang.org/show_bug.cgi?id=17539
4935 @system unittest
4936 {
4937     import std.random : rndGen;
4938     // ensure compilation
4939     try foreach (rnd; rndGen.parallel) break;
4940     catch (ParallelForeachError e) {}
4941 }