1 /**
2 `std.parallelism` implements high-level primitives for SMP parallelism.
3 These include parallel foreach, parallel reduce, parallel eager map, pipelining
4 and future/promise parallelism.  `std.parallelism` is recommended when the
5 same operation is to be executed in parallel on different data, or when a
6 function is to be executed in a background thread and its result returned to a
7 well-defined main thread.  For communication between arbitrary threads, see
8 `std.concurrency`.
9 
10 `std.parallelism` is based on the concept of a `Task`.  A `Task` is an
11 object that represents the fundamental unit of work in this library and may be
12 executed in parallel with any other `Task`.  Using `Task`
13 directly allows programming with a future/promise paradigm.  All other
14 supported parallelism paradigms (parallel foreach, map, reduce, pipelining)
15 represent an additional level of abstraction over `Task`.  They
16 automatically create one or more `Task` objects, or closely related types
17 that are conceptually identical but not part of the public API.
18 
19 After creation, a `Task` may be executed in a new thread, or submitted
20 to a `TaskPool` for execution.  A `TaskPool` encapsulates a task queue
21 and its worker threads.  Its purpose is to efficiently map a large
22 number of `Task`s onto a smaller number of threads.  A task queue is a
23 FIFO queue of `Task` objects that have been submitted to the
24 `TaskPool` and are awaiting execution.  A worker thread is a thread that
25 is associated with exactly one task queue.  It executes the `Task` at the
26 front of its queue when the queue has work available, or sleeps when
27 no work is available.  Each task queue is associated with zero or
28 more worker threads.  If the result of a `Task` is needed before execution
29 by a worker thread has begun, the `Task` can be removed from the task queue
30 and executed immediately in the thread where the result is needed.
31 
32 Warning:  Unless marked as `@trusted` or `@safe`, artifacts in
33           this module allow implicit data sharing between threads and cannot
34           guarantee that client code is free from low level data races.
35 
36 Source:    $(PHOBOSSRC std/parallelism.d)
37 Author:  David Simcha
38 Copyright:  Copyright (c) 2009-2011, David Simcha.
39 License:    $(HTTP boost.org/LICENSE_1_0.txt, Boost License 1.0)
40 */
41 module std.parallelism;
42 
43 version (OSX)
44     version = Darwin;
45 else version (iOS)
46     version = Darwin;
47 else version (TVOS)
48     version = Darwin;
49 else version (WatchOS)
50     version = Darwin;
51 
52 ///
53 @system unittest
54 {
55     import std.algorithm.iteration : map;
56     import std.math.operations : isClose;
57     import std.parallelism : taskPool;
58     import std.range : iota;
59 
60     // Parallel reduce can be combined with
61     // std.algorithm.iteration.map to interesting effect.
62     // The following example (thanks to Russel Winder)
63     // calculates pi by quadrature  using
64     // std.algorithm.map and TaskPool.reduce.
65     // getTerm is evaluated in parallel as needed by
66     // TaskPool.reduce.
67     //
68     // Timings on an Intel i5-3450 quad core machine
69     // for n = 1_000_000_000:
70     //
71     // TaskPool.reduce:       1.067 s
72     // std.algorithm.reduce:  4.011 s
73 
74     enum n = 1_000_000;
75     enum delta = 1.0 / n;
76 
77     alias getTerm = (int i)
78     {
79         immutable x = ( i - 0.5 ) * delta;
80         return delta / ( 1.0 + x * x ) ;
81     };
82 
83     immutable pi = 4.0 * taskPool.reduce!"a + b"(n.iota.map!getTerm);
84 
85     assert(pi.isClose(3.14159, 1e-5));
86 }
87 
88 import core.atomic;
89 import core.memory;
90 import core.sync.condition;
91 import core.thread;
92 
93 import std.functional;
94 import std.meta;
95 import std.range.primitives;
96 import std.traits;
97 
98 /*
99 (For now public undocumented with reserved name.)
100 
101 A lazily initialized global constant. The underlying value is a shared global
102 statically initialized to `outOfBandValue` which must not be a legit value of
103 the constant. Upon the first call the situation is detected and the global is
104 initialized by calling `initializer`. The initializer is assumed to be pure
105 (even if not marked as such), i.e. return the same value upon repeated calls.
106 For that reason, no special precautions are taken so `initializer` may be called
107 more than one time leading to benign races on the cached value.
108 
109 In the quiescent state the cost of the function is an atomic load from a global.
110 
111 Params:
112     T = The type of the pseudo-constant (may be qualified)
113     outOfBandValue = A value that cannot be valid, it is used for initialization
114     initializer = The function performing initialization; must be `nothrow`
115 
116 Returns:
117     The lazily initialized value
118 */
119 @property pure
120 T __lazilyInitializedConstant(T, alias outOfBandValue, alias initializer)()
121 if (is(Unqual!T : T)
122     && is(typeof(initializer()) : T)
123     && is(typeof(outOfBandValue) : T))
124 {
125     static T impl() nothrow
126     {
127         // Thread-local cache
128         static Unqual!T tls = outOfBandValue;
129         auto local = tls;
130         // Shortest path, no atomic operations
131         if (local != outOfBandValue) return local;
132         // Process-level cache
133         static shared Unqual!T result = outOfBandValue;
134         // Initialize both process-level cache and tls
135         local = atomicLoad(result);
136         if (local == outOfBandValue)
137         {
138             local = initializer();
139             atomicStore(result, local);
140         }
141         tls = local;
142         return local;
143     }
144 
145     import std.traits : SetFunctionAttributes;
146     alias Fun = SetFunctionAttributes!(typeof(&impl), "D",
147         functionAttributes!(typeof(&impl)) | FunctionAttribute.pure_);
148     auto purified = (() @trusted => cast(Fun) &impl)();
149     return purified();
150 }
151 
152 // Returns the size of a cache line.
153 alias cacheLineSize =
154     __lazilyInitializedConstant!(immutable(size_t), size_t.max, cacheLineSizeImpl);
155 
156 private size_t cacheLineSizeImpl() @nogc nothrow @trusted
157 {
158     size_t result = 0;
159     import core.cpuid : datacache;
160     foreach (ref const cachelevel; datacache)
161     {
162         if (cachelevel.lineSize > result && cachelevel.lineSize < uint.max)
163         {
164             result = cachelevel.lineSize;
165         }
166     }
167     return result;
168 }
169 
170 @nogc @safe nothrow unittest
171 {
172     assert(cacheLineSize == cacheLineSizeImpl);
173 }
174 
175 /* Atomics code.  These forward to core.atomic, but are written like this
176    for two reasons:
177 
178    1.  They used to actually contain ASM code and I don' want to have to change
179        to directly calling core.atomic in a zillion different places.
180 
181    2.  core.atomic has some misc. issues that make my use cases difficult
182        without wrapping it.  If I didn't wrap it, casts would be required
183        basically everywhere.
184 */
185 private void atomicSetUbyte(T)(ref T stuff, T newVal)
186 if (__traits(isIntegral, T) && is(T : ubyte))
187 {
188     //core.atomic.cas(cast(shared) &stuff, stuff, newVal);
189     atomicStore(*(cast(shared) &stuff), newVal);
190 }
191 
192 private ubyte atomicReadUbyte(T)(ref T val)
193 if (__traits(isIntegral, T) && is(T : ubyte))
194 {
195     return atomicLoad(*(cast(shared) &val));
196 }
197 
198 // This gets rid of the need for a lot of annoying casts in other parts of the
199 // code, when enums are involved.
200 private bool atomicCasUbyte(T)(ref T stuff, T testVal, T newVal)
201 if (__traits(isIntegral, T) && is(T : ubyte))
202 {
203     return core.atomic.cas(cast(shared) &stuff, testVal, newVal);
204 }
205 
206 /*--------------------- Generic helper functions, etc.------------------------*/
207 private template MapType(R, functions...)
208 {
209     static assert(functions.length);
210 
211     ElementType!R e = void;
212     alias MapType =
213         typeof(adjoin!(staticMap!(unaryFun, functions))(e));
214 }
215 
216 private template ReduceType(alias fun, R, E)
217 {
218     alias ReduceType = typeof(binaryFun!fun(E.init, ElementType!R.init));
219 }
220 
221 private template noUnsharedAliasing(T)
222 {
223     enum bool noUnsharedAliasing = !hasUnsharedAliasing!T;
224 }
225 
226 // This template tests whether a function may be executed in parallel from
227 // @safe code via Task.executeInNewThread().  There is an additional
228 // requirement for executing it via a TaskPool.  (See isSafeReturn).
229 private template isSafeTask(F)
230 {
231     enum bool isSafeTask =
232         (functionAttributes!F & (FunctionAttribute.safe | FunctionAttribute.trusted)) != 0 &&
233         (functionAttributes!F & FunctionAttribute.ref_) == 0 &&
234         (isFunctionPointer!F || !hasUnsharedAliasing!F) &&
235         allSatisfy!(noUnsharedAliasing, Parameters!F);
236 }
237 
238 @safe unittest
239 {
240     alias F1 = void function() @safe;
241     alias F2 = void function();
242     alias F3 = void function(uint, string) @trusted;
243     alias F4 = void function(uint, char[]);
244 
245     static assert( isSafeTask!F1);
246     static assert(!isSafeTask!F2);
247     static assert( isSafeTask!F3);
248     static assert(!isSafeTask!F4);
249 
250     alias F5 = uint[] function(uint, string) pure @trusted;
251     static assert( isSafeTask!F5);
252 }
253 
254 // This function decides whether Tasks that meet all of the other requirements
255 // for being executed from @safe code can be executed on a TaskPool.
256 // When executing via TaskPool, it's theoretically possible
257 // to return a value that is also pointed to by a worker thread's thread local
258 // storage.  When executing from executeInNewThread(), the thread that executed
259 // the Task is terminated by the time the return value is visible in the calling
260 // thread, so this is a non-issue.  It's also a non-issue for pure functions
261 // since they can't read global state.
262 private template isSafeReturn(T)
263 {
264     static if (!hasUnsharedAliasing!(T.ReturnType))
265     {
266         enum isSafeReturn = true;
267     }
268     else static if (T.isPure)
269     {
270         enum isSafeReturn = true;
271     }
272     else
273     {
274         enum isSafeReturn = false;
275     }
276 }
277 
278 private template randAssignable(R)
279 {
280     enum randAssignable = isRandomAccessRange!R && hasAssignableElements!R;
281 }
282 
283 private enum TaskStatus : ubyte
284 {
285     notStarted,
286     inProgress,
287     done
288 }
289 
290 private template AliasReturn(alias fun, T...)
291 {
292     alias AliasReturn = typeof({ T args; return fun(args); });
293 }
294 
295 // Should be private, but std.algorithm.reduce is used in the zero-thread case
296 // and won't work w/ private.
297 template reduceAdjoin(functions...)
298 {
299     static if (functions.length == 1)
300     {
301         alias reduceAdjoin = binaryFun!(functions[0]);
302     }
303     else
304     {
305         T reduceAdjoin(T, U)(T lhs, U rhs)
306         {
307             alias funs = staticMap!(binaryFun, functions);
308 
309             foreach (i, Unused; typeof(lhs.expand))
310             {
311                 lhs.expand[i] = funs[i](lhs.expand[i], rhs);
312             }
313 
314             return lhs;
315         }
316     }
317 }
318 
319 private template reduceFinish(functions...)
320 {
321     static if (functions.length == 1)
322     {
323         alias reduceFinish = binaryFun!(functions[0]);
324     }
325     else
326     {
327         T reduceFinish(T)(T lhs, T rhs)
328         {
329             alias funs = staticMap!(binaryFun, functions);
330 
331             foreach (i, Unused; typeof(lhs.expand))
332             {
333                 lhs.expand[i] = funs[i](lhs.expand[i], rhs.expand[i]);
334             }
335 
336             return lhs;
337         }
338     }
339 }
340 
341 private template isRoundRobin(R : RoundRobinBuffer!(C1, C2), C1, C2)
342 {
343     enum isRoundRobin = true;
344 }
345 
346 private template isRoundRobin(T)
347 {
348     enum isRoundRobin = false;
349 }
350 
351 @safe unittest
352 {
353     static assert( isRoundRobin!(RoundRobinBuffer!(void delegate(char[]), bool delegate())));
354     static assert(!isRoundRobin!(uint));
355 }
356 
357 // This is the base "class" for all of the other tasks.  Using C-style
358 // polymorphism to allow more direct control over memory allocation, etc.
359 private struct AbstractTask
360 {
361     AbstractTask* prev;
362     AbstractTask* next;
363 
364     // Pointer to a function that executes this task.
365     void function(void*) runTask;
366 
367     Throwable exception;
368     ubyte taskStatus = TaskStatus.notStarted;
369 
370     bool done() @property
371     {
372         if (atomicReadUbyte(taskStatus) == TaskStatus.done)
373         {
374             if (exception)
375             {
376                 throw exception;
377             }
378 
379             return true;
380         }
381 
382         return false;
383     }
384 
385     void job()
386     {
387         runTask(&this);
388     }
389 }
390 
391 /**
392 `Task` represents the fundamental unit of work.  A `Task` may be
393 executed in parallel with any other `Task`.  Using this struct directly
394 allows future/promise parallelism.  In this paradigm, a function (or delegate
395 or other callable) is executed in a thread other than the one it was called
396 from.  The calling thread does not block while the function is being executed.
397 A call to `workForce`, `yieldForce`, or `spinForce` is used to
398 ensure that the `Task` has finished executing and to obtain the return
399 value, if any.  These functions and `done` also act as full memory barriers,
400 meaning that any memory writes made in the thread that executed the `Task`
401 are guaranteed to be visible in the calling thread after one of these functions
402 returns.
403 
404 The $(REF task, std,parallelism) and $(REF scopedTask, std,parallelism) functions can
405 be used to create an instance of this struct.  See `task` for usage examples.
406 
407 Function results are returned from `yieldForce`, `spinForce` and
408 `workForce` by ref.  If `fun` returns by ref, the reference will point
409 to the returned reference of `fun`.  Otherwise it will point to a
410 field in this struct.
411 
412 Copying of this struct is disabled, since it would provide no useful semantics.
413 If you want to pass this struct around, you should do so by reference or
414 pointer.
415 
416 Bugs:  Changes to `ref` and `out` arguments are not propagated to the
417        call site, only to `args` in this struct.
418 */
419 struct Task(alias fun, Args...)
420 {
421     private AbstractTask base = {runTask : &impl};
422     private alias base this;
423 
424     private @property AbstractTask* basePtr()
425     {
426         return &base;
427     }
428 
429     private static void impl(void* myTask)
430     {
431         import std.algorithm.internal : addressOf;
432 
433         Task* myCastedTask = cast(typeof(this)*) myTask;
434         static if (is(ReturnType == void))
435         {
436             fun(myCastedTask._args);
437         }
438         else static if (is(typeof(&(fun(myCastedTask._args)))))
439         {
440             myCastedTask.returnVal = addressOf(fun(myCastedTask._args));
441         }
442         else
443         {
444             myCastedTask.returnVal = fun(myCastedTask._args);
445         }
446     }
447 
448     private TaskPool pool;
449     private bool isScoped;  // True if created with scopedTask.
450 
451     Args _args;
452 
453     /**
454     The arguments the function was called with.  Changes to `out` and
455     `ref` arguments will be visible here.
456     */
457     static if (__traits(isSame, fun, run))
458     {
459         alias args = _args[1..$];
460     }
461     else
462     {
463         alias args = _args;
464     }
465 
466 
467     // The purpose of this code is to decide whether functions whose
468     // return values have unshared aliasing can be executed via
469     // TaskPool from @safe code.  See isSafeReturn.
470     static if (__traits(isSame, fun, run))
471     {
472         static if (isFunctionPointer!(_args[0]))
473         {
474             private enum bool isPure =
475             (functionAttributes!(Args[0]) & FunctionAttribute.pure_) != 0;
476         }
477         else
478         {
479             // BUG:  Should check this for delegates too, but std.traits
480             //       apparently doesn't allow this.  isPure is irrelevant
481             //       for delegates, at least for now since shared delegates
482             //       don't work.
483             private enum bool isPure = false;
484         }
485 
486     }
487     else
488     {
489         // We already know that we can't execute aliases in @safe code, so
490         // just put a dummy value here.
491         private enum bool isPure = false;
492     }
493 
494 
495     /**
496     The return type of the function called by this `Task`.  This can be
497     `void`.
498     */
499     alias ReturnType = typeof(fun(_args));
500 
501     static if (!is(ReturnType == void))
502     {
503         static if (is(typeof(&fun(_args))))
504         {
505             // Ref return.
506             ReturnType* returnVal;
507 
508             ref ReturnType fixRef(ReturnType* val)
509             {
510                 return *val;
511             }
512 
513         }
514         else
515         {
516             ReturnType returnVal;
517 
518             ref ReturnType fixRef(ref ReturnType val)
519             {
520                 return val;
521             }
522         }
523     }
524 
525     private void enforcePool()
526     {
527         import std.exception : enforce;
528         enforce(this.pool !is null, "Job not submitted yet.");
529     }
530 
531     static if (Args.length > 0)
532     {
533         private this(Args args)
534         {
535             _args = args;
536         }
537     }
538 
539     // Work around DMD bug https://issues.dlang.org/show_bug.cgi?id=6588,
540     // allow immutable elements.
541     static if (allSatisfy!(isAssignable, Args))
542     {
543         typeof(this) opAssign(typeof(this) rhs)
544         {
545             foreach (i, Type; typeof(this.tupleof))
546             {
547                 this.tupleof[i] = rhs.tupleof[i];
548             }
549             return this;
550         }
551     }
552     else
553     {
554         @disable typeof(this) opAssign(typeof(this) rhs);
555     }
556 
557     /**
558     If the `Task` isn't started yet, execute it in the current thread.
559     If it's done, return its return value, if any.  If it's in progress,
560     busy spin until it's done, then return the return value.  If it threw
561     an exception, rethrow that exception.
562 
563     This function should be used when you expect the result of the
564     `Task` to be available on a timescale shorter than that of an OS
565     context switch.
566      */
567     @property ref ReturnType spinForce() @trusted
568     {
569         enforcePool();
570 
571         this.pool.tryDeleteExecute(basePtr);
572 
573         while (atomicReadUbyte(this.taskStatus) != TaskStatus.done) {}
574 
575         if (exception)
576         {
577             throw exception;
578         }
579 
580         static if (!is(ReturnType == void))
581         {
582             return fixRef(this.returnVal);
583         }
584     }
585 
586     /**
587     If the `Task` isn't started yet, execute it in the current thread.
588     If it's done, return its return value, if any.  If it's in progress,
589     wait on a condition variable.  If it threw an exception, rethrow that
590     exception.
591 
592     This function should be used for expensive functions, as waiting on a
593     condition variable introduces latency, but avoids wasted CPU cycles.
594      */
595     @property ref ReturnType yieldForce() @trusted
596     {
597         enforcePool();
598         this.pool.tryDeleteExecute(basePtr);
599 
600         if (done)
601         {
602             static if (is(ReturnType == void))
603             {
604                 return;
605             }
606             else
607             {
608                 return fixRef(this.returnVal);
609             }
610         }
611 
612         pool.waiterLock();
613         scope(exit) pool.waiterUnlock();
614 
615         while (atomicReadUbyte(this.taskStatus) != TaskStatus.done)
616         {
617             pool.waitUntilCompletion();
618         }
619 
620         if (exception)
621         {
622             throw exception; // nocoverage
623         }
624 
625         static if (!is(ReturnType == void))
626         {
627             return fixRef(this.returnVal);
628         }
629     }
630 
631     /**
632     If this `Task` was not started yet, execute it in the current
633     thread.  If it is finished, return its result.  If it is in progress,
634     execute any other `Task` from the `TaskPool` instance that
635     this `Task` was submitted to until this one
636     is finished.  If it threw an exception, rethrow that exception.
637     If no other tasks are available or this `Task` was executed using
638     `executeInNewThread`, wait on a condition variable.
639      */
640     @property ref ReturnType workForce() @trusted
641     {
642         enforcePool();
643         this.pool.tryDeleteExecute(basePtr);
644 
645         while (true)
646         {
647             if (done)    // done() implicitly checks for exceptions.
648             {
649                 static if (is(ReturnType == void))
650                 {
651                     return;
652                 }
653                 else
654                 {
655                     return fixRef(this.returnVal);
656                 }
657             }
658 
659             AbstractTask* job;
660             {
661                 // Locking explicitly and calling popNoSync() because
662                 // pop() waits on a condition variable if there are no Tasks
663                 // in the queue.
664 
665                 pool.queueLock();
666                 scope(exit) pool.queueUnlock();
667                 job = pool.popNoSync();
668             }
669 
670 
671             if (job !is null)
672             {
673 
674                 version (verboseUnittest)
675                 {
676                     stderr.writeln("Doing workForce work.");
677                 }
678 
679                 pool.doJob(job);
680 
681                 if (done)
682                 {
683                     static if (is(ReturnType == void))
684                     {
685                         return;
686                     }
687                     else
688                     {
689                         return fixRef(this.returnVal);
690                     }
691                 }
692             }
693             else
694             {
695                 version (verboseUnittest)
696                 {
697                     stderr.writeln("Yield from workForce.");
698                 }
699 
700                 return yieldForce;
701             }
702         }
703     }
704 
705     /**
706     Returns `true` if the `Task` is finished executing.
707 
708     Throws:  Rethrows any exception thrown during the execution of the
709              `Task`.
710     */
711     @property bool done() @trusted
712     {
713         // Explicitly forwarded for documentation purposes.
714         return base.done;
715     }
716 
717     /**
718     Create a new thread for executing this `Task`, execute it in the
719     newly created thread, then terminate the thread.  This can be used for
720     future/promise parallelism.  An explicit priority may be given
721     to the `Task`.  If one is provided, its value is forwarded to
722     `core.thread.Thread.priority`. See $(REF task, std,parallelism) for
723     usage example.
724     */
725     void executeInNewThread() @trusted
726     {
727         pool = new TaskPool(basePtr);
728     }
729 
730     /// Ditto
731     void executeInNewThread(int priority) @trusted
732     {
733         pool = new TaskPool(basePtr, priority);
734     }
735 
736     @safe ~this()
737     {
738         if (isScoped && pool !is null && taskStatus != TaskStatus.done)
739         {
740             yieldForce;
741         }
742     }
743 
744     // When this is uncommented, it somehow gets called on returning from
745     // scopedTask even though the struct shouldn't be getting copied.
746     //@disable this(this) {}
747 }
748 
749 // Calls `fpOrDelegate` with `args`.  This is an
750 // adapter that makes `Task` work with delegates, function pointers and
751 // functors instead of just aliases.
752 ReturnType!F run(F, Args...)(F fpOrDelegate, ref Args args)
753 {
754     return fpOrDelegate(args);
755 }
756 
757 /**
758 Creates a `Task` on the GC heap that calls an alias.  This may be executed
759 via `Task.executeInNewThread` or by submitting to a
760 $(REF TaskPool, std,parallelism).  A globally accessible instance of
761 `TaskPool` is provided by $(REF taskPool, std,parallelism).
762 
763 Returns:  A pointer to the `Task`.
764 
765 Example:
766 ---
767 // Read two files into memory at the same time.
768 import std.file;
769 
770 void main()
771 {
772     // Create and execute a Task for reading
773     // foo.txt.
774     auto file1Task = task!read("foo.txt");
775     file1Task.executeInNewThread();
776 
777     // Read bar.txt in parallel.
778     auto file2Data = read("bar.txt");
779 
780     // Get the results of reading foo.txt.
781     auto file1Data = file1Task.yieldForce;
782 }
783 ---
784 
785 ---
786 // Sorts an array using a parallel quick sort algorithm.
787 // The first partition is done serially.  Both recursion
788 // branches are then executed in parallel.
789 //
790 // Timings for sorting an array of 1,000,000 doubles on
791 // an Athlon 64 X2 dual core machine:
792 //
793 // This implementation:               176 milliseconds.
794 // Equivalent serial implementation:  280 milliseconds
795 void parallelSort(T)(T[] data)
796 {
797     // Sort small subarrays serially.
798     if (data.length < 100)
799     {
800          std.algorithm.sort(data);
801          return;
802     }
803 
804     // Partition the array.
805     swap(data[$ / 2], data[$ - 1]);
806     auto pivot = data[$ - 1];
807     bool lessThanPivot(T elem) { return elem < pivot; }
808 
809     auto greaterEqual = partition!lessThanPivot(data[0..$ - 1]);
810     swap(data[$ - greaterEqual.length - 1], data[$ - 1]);
811 
812     auto less = data[0..$ - greaterEqual.length - 1];
813     greaterEqual = data[$ - greaterEqual.length..$];
814 
815     // Execute both recursion branches in parallel.
816     auto recurseTask = task!parallelSort(greaterEqual);
817     taskPool.put(recurseTask);
818     parallelSort(less);
819     recurseTask.yieldForce;
820 }
821 ---
822 */
823 auto task(alias fun, Args...)(Args args)
824 {
825     return new Task!(fun, Args)(args);
826 }
827 
828 /**
829 Creates a `Task` on the GC heap that calls a function pointer, delegate, or
830 class/struct with overloaded opCall.
831 
832 Example:
833 ---
834 // Read two files in at the same time again,
835 // but this time use a function pointer instead
836 // of an alias to represent std.file.read.
837 import std.file;
838 
839 void main()
840 {
841     // Create and execute a Task for reading
842     // foo.txt.
843     auto file1Task = task(&read!string, "foo.txt", size_t.max);
844     file1Task.executeInNewThread();
845 
846     // Read bar.txt in parallel.
847     auto file2Data = read("bar.txt");
848 
849     // Get the results of reading foo.txt.
850     auto file1Data = file1Task.yieldForce;
851 }
852 ---
853 
854 Notes: This function takes a non-scope delegate, meaning it can be
855        used with closures.  If you can't allocate a closure due to objects
856        on the stack that have scoped destruction, see `scopedTask`, which
857        takes a scope delegate.
858  */
859 auto task(F, Args...)(F delegateOrFp, Args args)
860 if (is(typeof(delegateOrFp(args))) && !isSafeTask!F)
861 {
862     return new Task!(run, F, Args)(delegateOrFp, args);
863 }
864 
865 /**
866 Version of `task` usable from `@safe` code.  Usage mechanics are
867 identical to the non-@safe case, but safety introduces some restrictions:
868 
869 1.  `fun` must be @safe or @trusted.
870 
871 2.  `F` must not have any unshared aliasing as defined by
872     $(REF hasUnsharedAliasing, std,traits).  This means it
873     may not be an unshared delegate or a non-shared class or struct
874     with overloaded `opCall`.  This also precludes accepting template
875     alias parameters.
876 
877 3.  `Args` must not have unshared aliasing.
878 
879 4.  `fun` must not return by reference.
880 
881 5.  The return type must not have unshared aliasing unless `fun` is
882     `pure` or the `Task` is executed via `executeInNewThread` instead
883     of using a `TaskPool`.
884 
885 */
886 @trusted auto task(F, Args...)(F fun, Args args)
887 if (__traits(compiles, () @safe => fun(args)) && isSafeTask!F)
888 {
889     return new Task!(run, F, Args)(fun, args);
890 }
891 
892 @safe unittest
893 {
894     static struct Oops {
895         int convert() {
896             *cast(int*) 0xcafebabe = 0xdeadbeef;
897             return 0;
898         }
899         alias convert this;
900     }
901     static void foo(int) @safe {}
902 
903     static assert(!__traits(compiles, task(&foo, Oops.init)));
904     static assert(!__traits(compiles, scopedTask(&foo, Oops.init)));
905 }
906 
907 /**
908 These functions allow the creation of `Task` objects on the stack rather
909 than the GC heap.  The lifetime of a `Task` created by `scopedTask`
910 cannot exceed the lifetime of the scope it was created in.
911 
912 `scopedTask` might be preferred over `task`:
913 
914 1.  When a `Task` that calls a delegate is being created and a closure
915     cannot be allocated due to objects on the stack that have scoped
916     destruction.  The delegate overload of `scopedTask` takes a `scope`
917     delegate.
918 
919 2.  As a micro-optimization, to avoid the heap allocation associated with
920     `task` or with the creation of a closure.
921 
922 Usage is otherwise identical to `task`.
923 
924 Notes:  `Task` objects created using `scopedTask` will automatically
925 call `Task.yieldForce` in their destructor if necessary to ensure
926 the `Task` is complete before the stack frame they reside on is destroyed.
927 */
928 auto scopedTask(alias fun, Args...)(Args args)
929 {
930     auto ret = Task!(fun, Args)(args);
931     ret.isScoped = true;
932     return ret;
933 }
934 
935 /// Ditto
936 auto scopedTask(F, Args...)(scope F delegateOrFp, Args args)
937 if (is(typeof(delegateOrFp(args))) && !isSafeTask!F)
938 {
939     auto ret = Task!(run, F, Args)(delegateOrFp, args);
940     ret.isScoped = true;
941     return ret;
942 }
943 
944 /// Ditto
945 @trusted auto scopedTask(F, Args...)(F fun, Args args)
946 if (__traits(compiles, () @safe => fun(args)) && isSafeTask!F)
947 {
948     auto ret = Task!(run, F, Args)(fun, args);
949     ret.isScoped = true;
950     return ret;
951 }
952 
953 /**
954 The total number of CPU cores available on the current machine, as reported by
955 the operating system.
956 */
957 alias totalCPUs =
958     __lazilyInitializedConstant!(immutable(uint), uint.max, totalCPUsImpl);
959 
960 uint totalCPUsImpl() @nogc nothrow @trusted
961 {
962     version (Windows)
963     {
964         // BUGS:  Only works on Windows 2000 and above.
965         import core.sys.windows.winbase : SYSTEM_INFO, GetSystemInfo;
966         import std.algorithm.comparison : max;
967         SYSTEM_INFO si;
968         GetSystemInfo(&si);
969         return max(1, cast(uint) si.dwNumberOfProcessors);
970     }
971     else version (linux)
972     {
973         import core.stdc.stdlib : calloc;
974         import core.stdc.string : memset;
975         import core.sys.linux.sched : CPU_ALLOC_SIZE, CPU_FREE, CPU_COUNT, CPU_COUNT_S, cpu_set_t, sched_getaffinity;
976         import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf;
977 
978         int count = 0;
979 
980         /**
981          * According to ruby's source code, CPU_ALLOC() doesn't work as expected.
982          *  see: https://github.com/ruby/ruby/commit/7d9e04de496915dd9e4544ee18b1a0026dc79242
983          *
984          *  The hardcode number also comes from ruby's source code.
985          *  see: https://github.com/ruby/ruby/commit/0fa75e813ecf0f5f3dd01f89aa76d0e25ab4fcd4
986          */
987         for (int n = 64; n <= 16384; n *= 2)
988         {
989             size_t size = CPU_ALLOC_SIZE(count);
990             if (size >= 0x400)
991             {
992                 auto cpuset = cast(cpu_set_t*) calloc(1, size);
993                 if (cpuset is null) break;
994                 if (sched_getaffinity(0, size, cpuset) == 0)
995                 {
996                     count = CPU_COUNT_S(size, cpuset);
997                 }
998                 CPU_FREE(cpuset);
999             }
1000             else
1001             {
1002                 cpu_set_t cpuset;
1003                 if (sched_getaffinity(0, cpu_set_t.sizeof, &cpuset) == 0)
1004                 {
1005                     count = CPU_COUNT(&cpuset);
1006                 }
1007             }
1008 
1009             if (count > 0)
1010                 return cast(uint) count;
1011         }
1012 
1013         return cast(uint) sysconf(_SC_NPROCESSORS_ONLN);
1014     }
1015     else version (Darwin)
1016     {
1017         import core.sys.darwin.sys.sysctl : sysctlbyname;
1018         uint result;
1019         size_t len = result.sizeof;
1020         sysctlbyname("hw.physicalcpu", &result, &len, null, 0);
1021         return result;
1022     }
1023     else version (DragonFlyBSD)
1024     {
1025         import core.sys.dragonflybsd.sys.sysctl : sysctlbyname;
1026         uint result;
1027         size_t len = result.sizeof;
1028         sysctlbyname("hw.ncpu", &result, &len, null, 0);
1029         return result;
1030     }
1031     else version (FreeBSD)
1032     {
1033         import core.sys.freebsd.sys.sysctl : sysctlbyname;
1034         uint result;
1035         size_t len = result.sizeof;
1036         sysctlbyname("hw.ncpu", &result, &len, null, 0);
1037         return result;
1038     }
1039     else version (NetBSD)
1040     {
1041         import core.sys.netbsd.sys.sysctl : sysctlbyname;
1042         uint result;
1043         size_t len = result.sizeof;
1044         sysctlbyname("hw.ncpu", &result, &len, null, 0);
1045         return result;
1046     }
1047     else version (Solaris)
1048     {
1049         import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf;
1050         return cast(uint) sysconf(_SC_NPROCESSORS_ONLN);
1051     }
1052     else version (OpenBSD)
1053     {
1054         import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf;
1055         return cast(uint) sysconf(_SC_NPROCESSORS_ONLN);
1056     }
1057     else version (Hurd)
1058     {
1059         import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf;
1060         return cast(uint) sysconf(_SC_NPROCESSORS_ONLN);
1061     }
1062     else
1063     {
1064         static assert(0, "Don't know how to get N CPUs on this OS.");
1065     }
1066 }
1067 
1068 /*
1069 This class serves two purposes:
1070 
1071 1.  It distinguishes std.parallelism threads from other threads so that
1072     the std.parallelism daemon threads can be terminated.
1073 
1074 2.  It adds a reference to the pool that the thread is a member of,
1075     which is also necessary to allow the daemon threads to be properly
1076     terminated.
1077 */
1078 private final class ParallelismThread : Thread
1079 {
1080     this(void delegate() dg)
1081     {
1082         super(dg);
1083     }
1084 
1085     TaskPool pool;
1086 }
1087 
1088 // Kill daemon threads.
1089 shared static ~this()
1090 {
1091     foreach (ref thread; Thread)
1092     {
1093         auto pthread = cast(ParallelismThread) thread;
1094         if (pthread is null) continue;
1095         auto pool = pthread.pool;
1096         if (!pool.isDaemon) continue;
1097         pool.stop();
1098         pthread.join();
1099     }
1100 }
1101 
1102 /**
1103 This class encapsulates a task queue and a set of worker threads.  Its purpose
1104 is to efficiently map a large number of `Task`s onto a smaller number of
1105 threads.  A task queue is a FIFO queue of `Task` objects that have been
1106 submitted to the `TaskPool` and are awaiting execution.  A worker thread is a
1107 thread that executes the `Task` at the front of the queue when one is
1108 available and sleeps when the queue is empty.
1109 
1110 This class should usually be used via the global instantiation
1111 available via the $(REF taskPool, std,parallelism) property.
1112 Occasionally it is useful to explicitly instantiate a `TaskPool`:
1113 
1114 1.  When you want `TaskPool` instances with multiple priorities, for example
1115     a low priority pool and a high priority pool.
1116 
1117 2.  When the threads in the global task pool are waiting on a synchronization
1118     primitive (for example a mutex), and you want to parallelize the code that
1119     needs to run before these threads can be resumed.
1120 
1121 Note: The worker threads in this pool will not stop until
1122       `stop` or `finish` is called, even if the main thread
1123       has finished already. This may lead to programs that
1124       never end. If you do not want this behaviour, you can set `isDaemon`
1125       to true.
1126  */
1127 final class TaskPool
1128 {
1129 private:
1130 
1131     // A pool can either be a regular pool or a single-task pool.  A
1132     // single-task pool is a dummy pool that's fired up for
1133     // Task.executeInNewThread().
1134     bool isSingleTask;
1135 
1136     ParallelismThread[] pool;
1137     Thread singleTaskThread;
1138 
1139     AbstractTask* head;
1140     AbstractTask* tail;
1141     PoolState status = PoolState.running;
1142     Condition workerCondition;
1143     Condition waiterCondition;
1144     Mutex queueMutex;
1145     Mutex waiterMutex;  // For waiterCondition
1146 
1147     // The instanceStartIndex of the next instance that will be created.
1148     __gshared size_t nextInstanceIndex = 1;
1149 
1150     // The index of the current thread.
1151     static size_t threadIndex;
1152 
1153     // The index of the first thread in this instance.
1154     immutable size_t instanceStartIndex;
1155 
1156     // The index that the next thread to be initialized in this pool will have.
1157     size_t nextThreadIndex;
1158 
1159     enum PoolState : ubyte
1160     {
1161         running,
1162         finishing,
1163         stopNow
1164     }
1165 
1166     void doJob(AbstractTask* job)
1167     {
1168         assert(job.taskStatus == TaskStatus.inProgress);
1169         assert(job.next is null);
1170         assert(job.prev is null);
1171 
1172         scope(exit)
1173         {
1174             if (!isSingleTask)
1175             {
1176                 waiterLock();
1177                 scope(exit) waiterUnlock();
1178                 notifyWaiters();
1179             }
1180         }
1181 
1182         try
1183         {
1184             job.job();
1185         }
1186         catch (Throwable e)
1187         {
1188             job.exception = e;
1189         }
1190 
1191         atomicSetUbyte(job.taskStatus, TaskStatus.done);
1192     }
1193 
1194     // This function is used for dummy pools created by Task.executeInNewThread().
1195     void doSingleTask()
1196     {
1197         // No synchronization.  Pool is guaranteed to only have one thread,
1198         // and the queue is submitted to before this thread is created.
1199         assert(head);
1200         auto t = head;
1201         t.next = t.prev = head = null;
1202         doJob(t);
1203     }
1204 
1205     // This function performs initialization for each thread that affects
1206     // thread local storage and therefore must be done from within the
1207     // worker thread.  It then calls executeWorkLoop().
1208     void startWorkLoop()
1209     {
1210         // Initialize thread index.
1211         {
1212             queueLock();
1213             scope(exit) queueUnlock();
1214             threadIndex = nextThreadIndex;
1215             nextThreadIndex++;
1216         }
1217 
1218         executeWorkLoop();
1219     }
1220 
1221     // This is the main work loop that worker threads spend their time in
1222     // until they terminate.  It's also entered by non-worker threads when
1223     // finish() is called with the blocking variable set to true.
1224     void executeWorkLoop()
1225     {
1226         while (atomicReadUbyte(status) != PoolState.stopNow)
1227         {
1228             AbstractTask* task = pop();
1229             if (task is null)
1230             {
1231                 if (atomicReadUbyte(status) == PoolState.finishing)
1232                 {
1233                     atomicSetUbyte(status, PoolState.stopNow);
1234                     return;
1235                 }
1236             }
1237             else
1238             {
1239                 doJob(task);
1240             }
1241         }
1242     }
1243 
1244     // Pop a task off the queue.
1245     AbstractTask* pop()
1246     {
1247         queueLock();
1248         scope(exit) queueUnlock();
1249         auto ret = popNoSync();
1250         while (ret is null && status == PoolState.running)
1251         {
1252             wait();
1253             ret = popNoSync();
1254         }
1255         return ret;
1256     }
1257 
1258     AbstractTask* popNoSync()
1259     out(returned)
1260     {
1261         /* If task.prev and task.next aren't null, then another thread
1262          * can try to delete this task from the pool after it's
1263          * alreadly been deleted/popped.
1264          */
1265         if (returned !is null)
1266         {
1267             assert(returned.next is null);
1268             assert(returned.prev is null);
1269         }
1270     }
1271     do
1272     {
1273         if (isSingleTask) return null;
1274 
1275         AbstractTask* returned = head;
1276         if (head !is null)
1277         {
1278             head = head.next;
1279             returned.prev = null;
1280             returned.next = null;
1281             returned.taskStatus = TaskStatus.inProgress;
1282         }
1283         if (head !is null)
1284         {
1285             head.prev = null;
1286         }
1287 
1288         return returned;
1289     }
1290 
1291     // Push a task onto the queue.
1292     void abstractPut(AbstractTask* task)
1293     {
1294         queueLock();
1295         scope(exit) queueUnlock();
1296         abstractPutNoSync(task);
1297     }
1298 
1299     void abstractPutNoSync(AbstractTask* task)
1300     in
1301     {
1302         assert(task);
1303     }
1304     out
1305     {
1306         import std.conv : text;
1307 
1308         assert(tail.prev !is tail);
1309         assert(tail.next is null, text(tail.prev, '\t', tail.next));
1310         if (tail.prev !is null)
1311         {
1312             assert(tail.prev.next is tail, text(tail.prev, '\t', tail.next));
1313         }
1314     }
1315     do
1316     {
1317         // Not using enforce() to save on function call overhead since this
1318         // is a performance critical function.
1319         if (status != PoolState.running)
1320         {
1321             throw new Error(
1322                 "Cannot submit a new task to a pool after calling " ~
1323                 "finish() or stop()."
1324             );
1325         }
1326 
1327         task.next = null;
1328         if (head is null)   //Queue is empty.
1329         {
1330             head = task;
1331             tail = task;
1332             tail.prev = null;
1333         }
1334         else
1335         {
1336             assert(tail);
1337             task.prev = tail;
1338             tail.next = task;
1339             tail = task;
1340         }
1341         notify();
1342     }
1343 
1344     void abstractPutGroupNoSync(AbstractTask* h, AbstractTask* t)
1345     {
1346         if (status != PoolState.running)
1347         {
1348             throw new Error(
1349                 "Cannot submit a new task to a pool after calling " ~
1350                 "finish() or stop()."
1351             );
1352         }
1353 
1354         if (head is null)
1355         {
1356             head = h;
1357             tail = t;
1358         }
1359         else
1360         {
1361             h.prev = tail;
1362             tail.next = h;
1363             tail = t;
1364         }
1365 
1366         notifyAll();
1367     }
1368 
1369     void tryDeleteExecute(AbstractTask* toExecute)
1370     {
1371         if (isSingleTask) return;
1372 
1373         if ( !deleteItem(toExecute) )
1374         {
1375             return;
1376         }
1377 
1378         try
1379         {
1380             toExecute.job();
1381         }
1382         catch (Exception e)
1383         {
1384             toExecute.exception = e;
1385         }
1386 
1387         atomicSetUbyte(toExecute.taskStatus, TaskStatus.done);
1388     }
1389 
1390     bool deleteItem(AbstractTask* item)
1391     {
1392         queueLock();
1393         scope(exit) queueUnlock();
1394         return deleteItemNoSync(item);
1395     }
1396 
1397     bool deleteItemNoSync(AbstractTask* item)
1398     {
1399         if (item.taskStatus != TaskStatus.notStarted)
1400         {
1401             return false;
1402         }
1403         item.taskStatus = TaskStatus.inProgress;
1404 
1405         if (item is head)
1406         {
1407             // Make sure head gets set properly.
1408             popNoSync();
1409             return true;
1410         }
1411         if (item is tail)
1412         {
1413             tail = tail.prev;
1414             if (tail !is null)
1415             {
1416                 tail.next = null;
1417             }
1418             item.next = null;
1419             item.prev = null;
1420             return true;
1421         }
1422         if (item.next !is null)
1423         {
1424             assert(item.next.prev is item);  // Check queue consistency.
1425             item.next.prev = item.prev;
1426         }
1427         if (item.prev !is null)
1428         {
1429             assert(item.prev.next is item);  // Check queue consistency.
1430             item.prev.next = item.next;
1431         }
1432         item.next = null;
1433         item.prev = null;
1434         return true;
1435     }
1436 
1437     void queueLock()
1438     {
1439         assert(queueMutex);
1440         if (!isSingleTask) queueMutex.lock();
1441     }
1442 
1443     void queueUnlock()
1444     {
1445         assert(queueMutex);
1446         if (!isSingleTask) queueMutex.unlock();
1447     }
1448 
1449     void waiterLock()
1450     {
1451         if (!isSingleTask) waiterMutex.lock();
1452     }
1453 
1454     void waiterUnlock()
1455     {
1456         if (!isSingleTask) waiterMutex.unlock();
1457     }
1458 
1459     void wait()
1460     {
1461         if (!isSingleTask) workerCondition.wait();
1462     }
1463 
1464     void notify()
1465     {
1466         if (!isSingleTask) workerCondition.notify();
1467     }
1468 
1469     void notifyAll()
1470     {
1471         if (!isSingleTask) workerCondition.notifyAll();
1472     }
1473 
1474     void waitUntilCompletion()
1475     {
1476         if (isSingleTask)
1477         {
1478             singleTaskThread.join();
1479         }
1480         else
1481         {
1482             waiterCondition.wait();
1483         }
1484     }
1485 
1486     void notifyWaiters()
1487     {
1488         if (!isSingleTask) waiterCondition.notifyAll();
1489     }
1490 
1491     // Private constructor for creating dummy pools that only have one thread,
1492     // only execute one Task, and then terminate.  This is used for
1493     // Task.executeInNewThread().
1494     this(AbstractTask* task, int priority = int.max)
1495     {
1496         assert(task);
1497 
1498         // Dummy value, not used.
1499         instanceStartIndex = 0;
1500 
1501         this.isSingleTask = true;
1502         task.taskStatus = TaskStatus.inProgress;
1503         this.head = task;
1504         singleTaskThread = new Thread(&doSingleTask);
1505         singleTaskThread.start();
1506 
1507         // Disabled until writing code to support
1508         // running thread with specified priority
1509         // See https://issues.dlang.org/show_bug.cgi?id=8960
1510 
1511         /*if (priority != int.max)
1512         {
1513             singleTaskThread.priority = priority;
1514         }*/
1515     }
1516 
1517 public:
1518     // This is used in parallel_algorithm but is too unstable to document
1519     // as public API.
1520     size_t defaultWorkUnitSize(size_t rangeLen) const @safe pure nothrow
1521     {
1522         import std.algorithm.comparison : max;
1523 
1524         if (this.size == 0)
1525         {
1526             return max(rangeLen, 1);
1527         }
1528 
1529         immutable size_t eightSize = 4 * (this.size + 1);
1530         auto ret = (rangeLen / eightSize) + ((rangeLen % eightSize == 0) ? 0 : 1);
1531         return max(ret, 1);
1532     }
1533 
1534     /**
1535     Default constructor that initializes a `TaskPool` with
1536     `totalCPUs` - 1 worker threads.  The minus 1 is included because the
1537     main thread will also be available to do work.
1538 
1539     Note:  On single-core machines, the primitives provided by `TaskPool`
1540            operate transparently in single-threaded mode.
1541      */
1542     this() @trusted
1543     {
1544         this(totalCPUs - 1);
1545     }
1546 
1547     /**
1548     Allows for custom number of worker threads.
1549     */
1550     this(size_t nWorkers) @trusted
1551     {
1552         synchronized(typeid(TaskPool))
1553         {
1554             instanceStartIndex = nextInstanceIndex;
1555 
1556             // The first worker thread to be initialized will have this index,
1557             // and will increment it.  The second worker to be initialized will
1558             // have this index plus 1.
1559             nextThreadIndex = instanceStartIndex;
1560             nextInstanceIndex += nWorkers;
1561         }
1562 
1563         queueMutex = new Mutex(this);
1564         waiterMutex = new Mutex();
1565         workerCondition = new Condition(queueMutex);
1566         waiterCondition = new Condition(waiterMutex);
1567 
1568         pool = new ParallelismThread[nWorkers];
1569         foreach (ref poolThread; pool)
1570         {
1571             poolThread = new ParallelismThread(&startWorkLoop);
1572             poolThread.pool = this;
1573             poolThread.start();
1574         }
1575     }
1576 
1577     /**
1578     Implements a parallel foreach loop over a range.  This works by implicitly
1579     creating and submitting one `Task` to the `TaskPool` for each worker
1580     thread.  A work unit is a set of consecutive elements of `range` to
1581     be processed by a worker thread between communication with any other
1582     thread.  The number of elements processed per work unit is controlled by the
1583     `workUnitSize` parameter.  Smaller work units provide better load
1584     balancing, but larger work units avoid the overhead of communicating
1585     with other threads frequently to fetch the next work unit.  Large work
1586     units also avoid false sharing in cases where the range is being modified.
1587     The less time a single iteration of the loop takes, the larger
1588     `workUnitSize` should be.  For very expensive loop bodies,
1589     `workUnitSize` should  be 1.  An overload that chooses a default work
1590     unit size is also available.
1591 
1592     Example:
1593     ---
1594     // Find the logarithm of every number from 1 to
1595     // 10_000_000 in parallel.
1596     auto logs = new double[10_000_000];
1597 
1598     // Parallel foreach works with or without an index
1599     // variable.  It can iterate by ref if range.front
1600     // returns by ref.
1601 
1602     // Iterate over logs using work units of size 100.
1603     foreach (i, ref elem; taskPool.parallel(logs, 100))
1604     {
1605         elem = log(i + 1.0);
1606     }
1607 
1608     // Same thing, but use the default work unit size.
1609     //
1610     // Timings on an Athlon 64 X2 dual core machine:
1611     //
1612     // Parallel foreach:  388 milliseconds
1613     // Regular foreach:   619 milliseconds
1614     foreach (i, ref elem; taskPool.parallel(logs))
1615     {
1616         elem = log(i + 1.0);
1617     }
1618     ---
1619 
1620     Notes:
1621 
1622     The memory usage of this implementation is guaranteed to be constant
1623     in `range.length`.
1624 
1625     Breaking from a parallel foreach loop via a break, labeled break,
1626     labeled continue, return or goto statement throws a
1627     `ParallelForeachError`.
1628 
1629     In the case of non-random access ranges, parallel foreach buffers lazily
1630     to an array of size `workUnitSize` before executing the parallel portion
1631     of the loop.  The exception is that, if a parallel foreach is executed
1632     over a range returned by `asyncBuf` or `map`, the copying is elided
1633     and the buffers are simply swapped.  In this case `workUnitSize` is
1634     ignored and the work unit size is set to the  buffer size of `range`.
1635 
1636     A memory barrier is guaranteed to be executed on exit from the loop,
1637     so that results produced by all threads are visible in the calling thread.
1638 
1639     $(B Exception Handling):
1640 
1641     When at least one exception is thrown from inside a parallel foreach loop,
1642     the submission of additional `Task` objects is terminated as soon as
1643     possible, in a non-deterministic manner.  All executing or
1644     enqueued work units are allowed to complete.  Then, all exceptions that
1645     were thrown by any work unit are chained using `Throwable.next` and
1646     rethrown.  The order of the exception chaining is non-deterministic.
1647     */
1648     ParallelForeach!R parallel(R)(R range, size_t workUnitSize)
1649     {
1650         import std.exception : enforce;
1651         enforce(workUnitSize > 0, "workUnitSize must be > 0.");
1652         alias RetType = ParallelForeach!R;
1653         return RetType(this, range, workUnitSize);
1654     }
1655 
1656 
1657     /// Ditto
1658     ParallelForeach!R parallel(R)(R range)
1659     {
1660         static if (hasLength!R)
1661         {
1662             // Default work unit size is such that we would use 4x as many
1663             // slots as are in this thread pool.
1664             size_t workUnitSize = defaultWorkUnitSize(range.length);
1665             return parallel(range, workUnitSize);
1666         }
1667         else
1668         {
1669             // Just use a really, really dumb guess if the user is too lazy to
1670             // specify.
1671             return parallel(range, 512);
1672         }
1673     }
1674 
1675     ///
1676     template amap(functions...)
1677     {
1678         /**
1679         Eager parallel map.  The eagerness of this function means it has less
1680         overhead than the lazily evaluated `TaskPool.map` and should be
1681         preferred where the memory requirements of eagerness are acceptable.
1682         `functions` are the functions to be evaluated, passed as template
1683         alias parameters in a style similar to
1684         $(REF map, std,algorithm,iteration).
1685         The first argument must be a random access range. For performance
1686         reasons, amap will assume the range elements have not yet been
1687         initialized. Elements will be overwritten without calling a destructor
1688         nor doing an assignment. As such, the range must not contain meaningful
1689         data$(DDOC_COMMENT not a section): either un-initialized objects, or
1690         objects in their `.init` state.
1691 
1692         ---
1693         auto numbers = iota(100_000_000.0);
1694 
1695         // Find the square roots of numbers.
1696         //
1697         // Timings on an Athlon 64 X2 dual core machine:
1698         //
1699         // Parallel eager map:                   0.802 s
1700         // Equivalent serial implementation:     1.768 s
1701         auto squareRoots = taskPool.amap!sqrt(numbers);
1702         ---
1703 
1704         Immediately after the range argument, an optional work unit size argument
1705         may be provided.  Work units as used by `amap` are identical to those
1706         defined for parallel foreach.  If no work unit size is provided, the
1707         default work unit size is used.
1708 
1709         ---
1710         // Same thing, but make work unit size 100.
1711         auto squareRoots = taskPool.amap!sqrt(numbers, 100);
1712         ---
1713 
1714         An output range for returning the results may be provided as the last
1715         argument.  If one is not provided, an array of the proper type will be
1716         allocated on the garbage collected heap.  If one is provided, it must be a
1717         random access range with assignable elements, must have reference
1718         semantics with respect to assignment to its elements, and must have the
1719         same length as the input range.  Writing to adjacent elements from
1720         different threads must be safe.
1721 
1722         ---
1723         // Same thing, but explicitly allocate an array
1724         // to return the results in.  The element type
1725         // of the array may be either the exact type
1726         // returned by functions or an implicit conversion
1727         // target.
1728         auto squareRoots = new float[numbers.length];
1729         taskPool.amap!sqrt(numbers, squareRoots);
1730 
1731         // Multiple functions, explicit output range, and
1732         // explicit work unit size.
1733         auto results = new Tuple!(float, real)[numbers.length];
1734         taskPool.amap!(sqrt, log)(numbers, 100, results);
1735         ---
1736 
1737         Note:
1738 
1739         A memory barrier is guaranteed to be executed after all results are written
1740         but before returning so that results produced by all threads are visible
1741         in the calling thread.
1742 
1743         Tips:
1744 
1745         To perform the mapping operation in place, provide the same range for the
1746         input and output range.
1747 
1748         To parallelize the copying of a range with expensive to evaluate elements
1749         to an array, pass an identity function (a function that just returns
1750         whatever argument is provided to it) to `amap`.
1751 
1752         $(B Exception Handling):
1753 
1754         When at least one exception is thrown from inside the map functions,
1755         the submission of additional `Task` objects is terminated as soon as
1756         possible, in a non-deterministic manner.  All currently executing or
1757         enqueued work units are allowed to complete.  Then, all exceptions that
1758         were thrown from any work unit are chained using `Throwable.next` and
1759         rethrown.  The order of the exception chaining is non-deterministic.
1760         */
1761         auto amap(Args...)(Args args)
1762         if (isRandomAccessRange!(Args[0]))
1763         {
1764             import core.internal.lifetime : emplaceRef;
1765 
1766             alias fun = adjoin!(staticMap!(unaryFun, functions));
1767 
1768             alias range = args[0];
1769             immutable len = range.length;
1770 
1771             static if (
1772                 Args.length > 1 &&
1773                 randAssignable!(Args[$ - 1]) &&
1774                 is(MapType!(Args[0], functions) : ElementType!(Args[$ - 1]))
1775                 )
1776             {
1777                 import std.conv : text;
1778                 import std.exception : enforce;
1779 
1780                 alias buf = args[$ - 1];
1781                 alias args2 = args[0..$ - 1];
1782                 alias Args2 = Args[0..$ - 1];
1783                 enforce(buf.length == len,
1784                         text("Can't use a user supplied buffer that's the wrong ",
1785                              "size.  (Expected  :", len, " Got:  ", buf.length));
1786             }
1787             else static if (randAssignable!(Args[$ - 1]) && Args.length > 1)
1788             {
1789                 static assert(0, "Wrong buffer type.");
1790             }
1791             else
1792             {
1793                 import std.array : uninitializedArray;
1794 
1795                 auto buf = uninitializedArray!(MapType!(Args[0], functions)[])(len);
1796                 alias args2 = args;
1797                 alias Args2 = Args;
1798             }
1799 
1800             if (!len) return buf;
1801 
1802             static if (isIntegral!(Args2[$ - 1]))
1803             {
1804                 static assert(args2.length == 2);
1805                 auto workUnitSize = cast(size_t) args2[1];
1806             }
1807             else
1808             {
1809                 static assert(args2.length == 1, Args);
1810                 auto workUnitSize = defaultWorkUnitSize(range.length);
1811             }
1812 
1813             alias R = typeof(range);
1814 
1815             if (workUnitSize > len)
1816             {
1817                 workUnitSize = len;
1818             }
1819 
1820             // Handle as a special case:
1821             if (size == 0)
1822             {
1823                 size_t index = 0;
1824                 foreach (elem; range)
1825                 {
1826                     emplaceRef(buf[index++], fun(elem));
1827                 }
1828                 return buf;
1829             }
1830 
1831             // Effectively -1:  chunkIndex + 1 == 0:
1832             shared size_t workUnitIndex = size_t.max;
1833             shared bool shouldContinue = true;
1834 
1835             void doIt()
1836             {
1837                 import std.algorithm.comparison : min;
1838 
1839                 scope(failure)
1840                 {
1841                     // If an exception is thrown, all threads should bail.
1842                     atomicStore(shouldContinue, false);
1843                 }
1844 
1845                 while (atomicLoad(shouldContinue))
1846                 {
1847                     immutable myUnitIndex = atomicOp!"+="(workUnitIndex, 1);
1848                     immutable start = workUnitSize * myUnitIndex;
1849                     if (start >= len)
1850                     {
1851                         atomicStore(shouldContinue, false);
1852                         break;
1853                     }
1854 
1855                     immutable end = min(len, start + workUnitSize);
1856 
1857                     static if (hasSlicing!R)
1858                     {
1859                         auto subrange = range[start .. end];
1860                         foreach (i; start .. end)
1861                         {
1862                             emplaceRef(buf[i], fun(subrange.front));
1863                             subrange.popFront();
1864                         }
1865                     }
1866                     else
1867                     {
1868                         foreach (i; start .. end)
1869                         {
1870                             emplaceRef(buf[i], fun(range[i]));
1871                         }
1872                     }
1873                 }
1874             }
1875 
1876             submitAndExecute(this, &doIt);
1877             return buf;
1878         }
1879     }
1880 
1881     ///
1882     template map(functions...)
1883     {
1884         /**
1885         A semi-lazy parallel map that can be used for pipelining.  The map
1886         functions are evaluated for the first `bufSize` elements and stored in a
1887         buffer and made available to `popFront`.  Meanwhile, in the
1888         background a second buffer of the same size is filled.  When the first
1889         buffer is exhausted, it is swapped with the second buffer and filled while
1890         the values from what was originally the second buffer are read.  This
1891         implementation allows for elements to be written to the buffer without
1892         the need for atomic operations or synchronization for each write, and
1893         enables the mapping function to be evaluated efficiently in parallel.
1894 
1895         `map` has more overhead than the simpler procedure used by `amap`
1896         but avoids the need to keep all results in memory simultaneously and works
1897         with non-random access ranges.
1898 
1899         Params:
1900 
1901         source = The $(REF_ALTTEXT input range, isInputRange, std,range,primitives)
1902         to be mapped.  If `source` is not random
1903         access it will be lazily buffered to an array of size `bufSize` before
1904         the map function is evaluated.  (For an exception to this rule, see Notes.)
1905 
1906         bufSize = The size of the buffer to store the evaluated elements.
1907 
1908         workUnitSize = The number of elements to evaluate in a single
1909         `Task`.  Must be less than or equal to `bufSize`, and
1910         should be a fraction of `bufSize` such that all worker threads can be
1911         used.  If the default of size_t.max is used, workUnitSize will be set to
1912         the pool-wide default.
1913 
1914         Returns:  An input range representing the results of the map.  This range
1915                   has a length iff `source` has a length.
1916 
1917         Notes:
1918 
1919         If a range returned by `map` or `asyncBuf` is used as an input to
1920         `map`, then as an optimization the copying from the output buffer
1921         of the first range to the input buffer of the second range is elided, even
1922         though the ranges returned by `map` and `asyncBuf` are non-random
1923         access ranges.  This means that the `bufSize` parameter passed to the
1924         current call to `map` will be ignored and the size of the buffer
1925         will be the buffer size of `source`.
1926 
1927         Example:
1928         ---
1929         // Pipeline reading a file, converting each line
1930         // to a number, taking the logarithms of the numbers,
1931         // and performing the additions necessary to find
1932         // the sum of the logarithms.
1933 
1934         auto lineRange = File("numberList.txt").byLine();
1935         auto dupedLines = std.algorithm.map!"a.idup"(lineRange);
1936         auto nums = taskPool.map!(to!double)(dupedLines);
1937         auto logs = taskPool.map!log10(nums);
1938 
1939         double sum = 0;
1940         foreach (elem; logs)
1941         {
1942             sum += elem;
1943         }
1944         ---
1945 
1946         $(B Exception Handling):
1947 
1948         Any exceptions thrown while iterating over `source`
1949         or computing the map function are re-thrown on a call to `popFront` or,
1950         if thrown during construction, are simply allowed to propagate to the
1951         caller.  In the case of exceptions thrown while computing the map function,
1952         the exceptions are chained as in `TaskPool.amap`.
1953         */
1954         auto
1955         map(S)(S source, size_t bufSize = 100, size_t workUnitSize = size_t.max)
1956         if (isInputRange!S)
1957         {
1958             import std.exception : enforce;
1959 
1960             enforce(workUnitSize == size_t.max || workUnitSize <= bufSize,
1961                     "Work unit size must be smaller than buffer size.");
1962             alias fun = adjoin!(staticMap!(unaryFun, functions));
1963 
1964             static final class Map
1965             {
1966                 // This is a class because the task needs to be located on the
1967                 // heap and in the non-random access case source needs to be on
1968                 // the heap, too.
1969 
1970             private:
1971                 enum bufferTrick = is(typeof(source.buf1)) &&
1972                 is(typeof(source.bufPos)) &&
1973                 is(typeof(source.doBufSwap()));
1974 
1975                 alias E = MapType!(S, functions);
1976                 E[] buf1, buf2;
1977                 S source;
1978                 TaskPool pool;
1979                 Task!(run, E[] delegate(E[]), E[]) nextBufTask;
1980                 size_t workUnitSize;
1981                 size_t bufPos;
1982                 bool lastTaskWaited;
1983 
1984             static if (isRandomAccessRange!S)
1985             {
1986                 alias FromType = S;
1987 
1988                 void popSource()
1989                 {
1990                     import std.algorithm.comparison : min;
1991 
1992                     static if (__traits(compiles, source[0 .. source.length]))
1993                     {
1994                         source = source[min(buf1.length, source.length)..source.length];
1995                     }
1996                     else static if (__traits(compiles, source[0..$]))
1997                     {
1998                         source = source[min(buf1.length, source.length)..$];
1999                     }
2000                     else
2001                     {
2002                         static assert(0, "S must have slicing for Map."
2003                                       ~ "  " ~ S.stringof ~ " doesn't.");
2004                     }
2005                 }
2006             }
2007             else static if (bufferTrick)
2008             {
2009                 // Make sure we don't have the buffer recycling overload of
2010                 // asyncBuf.
2011                 static if (
2012                     is(typeof(source.source)) &&
2013                     isRoundRobin!(typeof(source.source))
2014                 )
2015                 {
2016                     static assert(0, "Cannot execute a parallel map on " ~
2017                                   "the buffer recycling overload of asyncBuf."
2018                                  );
2019                 }
2020 
2021                 alias FromType = typeof(source.buf1);
2022                 FromType from;
2023 
2024                 // Just swap our input buffer with source's output buffer.
2025                 // No need to copy element by element.
2026                 FromType dumpToFrom()
2027                 {
2028                     import std.algorithm.mutation : swap;
2029 
2030                     assert(source.buf1.length <= from.length);
2031                     from.length = source.buf1.length;
2032                     swap(source.buf1, from);
2033 
2034                     // Just in case this source has been popped before
2035                     // being sent to map:
2036                     from = from[source.bufPos..$];
2037 
2038                     static if (is(typeof(source._length)))
2039                     {
2040                         source._length -= (from.length - source.bufPos);
2041                     }
2042 
2043                     source.doBufSwap();
2044 
2045                     return from;
2046                 }
2047             }
2048             else
2049             {
2050                 alias FromType = ElementType!S[];
2051 
2052                 // The temporary array that data is copied to before being
2053                 // mapped.
2054                 FromType from;
2055 
2056                 FromType dumpToFrom()
2057                 {
2058                     assert(from !is null);
2059 
2060                     size_t i;
2061                     for (; !source.empty && i < from.length; source.popFront())
2062                     {
2063                         from[i++] = source.front;
2064                     }
2065 
2066                     from = from[0 .. i];
2067                     return from;
2068                 }
2069             }
2070 
2071             static if (hasLength!S)
2072             {
2073                 size_t _length;
2074 
2075                 public @property size_t length() const @safe pure nothrow
2076                 {
2077                     return _length;
2078                 }
2079             }
2080 
2081                 this(S source, size_t bufSize, size_t workUnitSize, TaskPool pool)
2082                 {
2083                     static if (bufferTrick)
2084                     {
2085                         bufSize = source.buf1.length;
2086                     }
2087 
2088                     buf1.length = bufSize;
2089                     buf2.length = bufSize;
2090 
2091                     static if (!isRandomAccessRange!S)
2092                     {
2093                         from.length = bufSize;
2094                     }
2095 
2096                     this.workUnitSize = (workUnitSize == size_t.max) ?
2097                             pool.defaultWorkUnitSize(bufSize) : workUnitSize;
2098                     this.source = source;
2099                     this.pool = pool;
2100 
2101                     static if (hasLength!S)
2102                     {
2103                         _length = source.length;
2104                     }
2105 
2106                     buf1 = fillBuf(buf1);
2107                     submitBuf2();
2108                 }
2109 
2110                 // The from parameter is a dummy and ignored in the random access
2111                 // case.
2112                 E[] fillBuf(E[] buf)
2113                 {
2114                     import std.algorithm.comparison : min;
2115 
2116                     static if (isRandomAccessRange!S)
2117                     {
2118                         import std.range : take;
2119                         auto toMap = take(source, buf.length);
2120                         scope(success) popSource();
2121                     }
2122                     else
2123                     {
2124                         auto toMap = dumpToFrom();
2125                     }
2126 
2127                     buf = buf[0 .. min(buf.length, toMap.length)];
2128 
2129                     // Handle as a special case:
2130                     if (pool.size == 0)
2131                     {
2132                         size_t index = 0;
2133                         foreach (elem; toMap)
2134                         {
2135                             buf[index++] = fun(elem);
2136                         }
2137                         return buf;
2138                     }
2139 
2140                     pool.amap!functions(toMap, workUnitSize, buf);
2141 
2142                     return buf;
2143                 }
2144 
2145                 void submitBuf2()
2146                 in
2147                 {
2148                     assert(nextBufTask.prev is null);
2149                     assert(nextBufTask.next is null);
2150                 }
2151                 do
2152                 {
2153                     // Hack to reuse the task object.
2154 
2155                     nextBufTask = typeof(nextBufTask).init;
2156                     nextBufTask._args[0] = &fillBuf;
2157                     nextBufTask._args[1] = buf2;
2158                     pool.put(nextBufTask);
2159                 }
2160 
2161                 void doBufSwap()
2162                 {
2163                     if (lastTaskWaited)
2164                     {
2165                         // Then the source is empty.  Signal it here.
2166                         buf1 = null;
2167                         buf2 = null;
2168 
2169                         static if (!isRandomAccessRange!S)
2170                         {
2171                             from = null;
2172                         }
2173 
2174                         return;
2175                     }
2176 
2177                     buf2 = buf1;
2178                     buf1 = nextBufTask.yieldForce;
2179                     bufPos = 0;
2180 
2181                     if (source.empty)
2182                     {
2183                         lastTaskWaited = true;
2184                     }
2185                     else
2186                     {
2187                         submitBuf2();
2188                     }
2189                 }
2190 
2191             public:
2192                 @property auto front()
2193                 {
2194                     return buf1[bufPos];
2195                 }
2196 
2197                 void popFront()
2198                 {
2199                     static if (hasLength!S)
2200                     {
2201                         _length--;
2202                     }
2203 
2204                     bufPos++;
2205                     if (bufPos >= buf1.length)
2206                     {
2207                         doBufSwap();
2208                     }
2209                 }
2210 
2211                 static if (isInfinite!S)
2212                 {
2213                     enum bool empty = false;
2214                 }
2215                 else
2216                 {
2217 
2218                     bool empty() const @property
2219                     {
2220                         // popFront() sets this when source is empty
2221                         return buf1.length == 0;
2222                     }
2223                 }
2224             }
2225             return new Map(source, bufSize, workUnitSize, this);
2226         }
2227     }
2228 
2229     /**
2230     Given a `source` range that is expensive to iterate over, returns an
2231     $(REF_ALTTEXT input range, isInputRange, std,range,primitives) that
2232     asynchronously buffers the contents of `source` into a buffer of `bufSize` elements in a worker thread,
2233     while making previously buffered elements from a second buffer, also of size
2234     `bufSize`, available via the range interface of the returned
2235     object.  The returned range has a length iff `hasLength!S`.
2236     `asyncBuf` is useful, for example, when performing expensive operations
2237     on the elements of ranges that represent data on a disk or network.
2238 
2239     Example:
2240     ---
2241     import std.conv, std.stdio;
2242 
2243     void main()
2244     {
2245         // Fetch lines of a file in a background thread
2246         // while processing previously fetched lines,
2247         // dealing with byLine's buffer recycling by
2248         // eagerly duplicating every line.
2249         auto lines = File("foo.txt").byLine();
2250         auto duped = std.algorithm.map!"a.idup"(lines);
2251 
2252         // Fetch more lines in the background while we
2253         // process the lines already read into memory
2254         // into a matrix of doubles.
2255         double[][] matrix;
2256         auto asyncReader = taskPool.asyncBuf(duped);
2257 
2258         foreach (line; asyncReader)
2259         {
2260             auto ls = line.split("\t");
2261             matrix ~= to!(double[])(ls);
2262         }
2263     }
2264     ---
2265 
2266     $(B Exception Handling):
2267 
2268     Any exceptions thrown while iterating over `source` are re-thrown on a
2269     call to `popFront` or, if thrown during construction, simply
2270     allowed to propagate to the caller.
2271     */
2272     auto asyncBuf(S)(S source, size_t bufSize = 100) if (isInputRange!S)
2273     {
2274         static final class AsyncBuf
2275         {
2276             // This is a class because the task and source both need to be on
2277             // the heap.
2278 
2279             // The element type of S.
2280             alias E = ElementType!S;  // Needs to be here b/c of forward ref bugs.
2281 
2282         private:
2283             E[] buf1, buf2;
2284             S source;
2285             TaskPool pool;
2286             Task!(run, E[] delegate(E[]), E[]) nextBufTask;
2287             size_t bufPos;
2288             bool lastTaskWaited;
2289 
2290             static if (hasLength!S)
2291             {
2292                 size_t _length;
2293 
2294                 // Available if hasLength!S.
2295                 public @property size_t length() const @safe pure nothrow
2296                 {
2297                     return _length;
2298                 }
2299             }
2300 
2301             this(S source, size_t bufSize, TaskPool pool)
2302             {
2303                 buf1.length = bufSize;
2304                 buf2.length = bufSize;
2305 
2306                 this.source = source;
2307                 this.pool = pool;
2308 
2309                 static if (hasLength!S)
2310                 {
2311                     _length = source.length;
2312                 }
2313 
2314                 buf1 = fillBuf(buf1);
2315                 submitBuf2();
2316             }
2317 
2318             E[] fillBuf(E[] buf)
2319             {
2320                 assert(buf !is null);
2321 
2322                 size_t i;
2323                 for (; !source.empty && i < buf.length; source.popFront())
2324                 {
2325                     buf[i++] = source.front;
2326                 }
2327 
2328                 buf = buf[0 .. i];
2329                 return buf;
2330             }
2331 
2332             void submitBuf2()
2333             in
2334             {
2335                 assert(nextBufTask.prev is null);
2336                 assert(nextBufTask.next is null);
2337             }
2338             do
2339             {
2340                 // Hack to reuse the task object.
2341 
2342                 nextBufTask = typeof(nextBufTask).init;
2343                 nextBufTask._args[0] = &fillBuf;
2344                 nextBufTask._args[1] = buf2;
2345                 pool.put(nextBufTask);
2346             }
2347 
2348             void doBufSwap()
2349             {
2350                 if (lastTaskWaited)
2351                 {
2352                     // Then source is empty.  Signal it here.
2353                     buf1 = null;
2354                     buf2 = null;
2355                     return;
2356                 }
2357 
2358                 buf2 = buf1;
2359                 buf1 = nextBufTask.yieldForce;
2360                 bufPos = 0;
2361 
2362                 if (source.empty)
2363                 {
2364                     lastTaskWaited = true;
2365                 }
2366                 else
2367                 {
2368                     submitBuf2();
2369                 }
2370             }
2371 
2372         public:
2373             E front() @property
2374             {
2375                 return buf1[bufPos];
2376             }
2377 
2378             void popFront()
2379             {
2380                 static if (hasLength!S)
2381                 {
2382                     _length--;
2383                 }
2384 
2385                 bufPos++;
2386                 if (bufPos >= buf1.length)
2387                 {
2388                     doBufSwap();
2389                 }
2390             }
2391 
2392             static if (isInfinite!S)
2393             {
2394                 enum bool empty = false;
2395             }
2396 
2397             else
2398             {
2399                 ///
2400                 bool empty() @property
2401                 {
2402                     // popFront() sets this when source is empty:
2403                     return buf1.length == 0;
2404                 }
2405             }
2406         }
2407         return new AsyncBuf(source, bufSize, this);
2408     }
2409 
2410     /**
2411     Given a callable object `next` that writes to a user-provided buffer and
2412     a second callable object `empty` that determines whether more data is
2413     available to write via `next`, returns an input range that
2414     asynchronously calls `next` with a set of size `nBuffers` of buffers
2415     and makes the results available in the order they were obtained via the
2416     input range interface of the returned object.  Similarly to the
2417     input range overload of `asyncBuf`, the first half of the buffers
2418     are made available via the range interface while the second half are
2419     filled and vice-versa.
2420 
2421     Params:
2422 
2423     next = A callable object that takes a single argument that must be an array
2424            with mutable elements.  When called, `next` writes data to
2425            the array provided by the caller.
2426 
2427     empty = A callable object that takes no arguments and returns a type
2428             implicitly convertible to `bool`.  This is used to signify
2429             that no more data is available to be obtained by calling `next`.
2430 
2431     initialBufSize = The initial size of each buffer.  If `next` takes its
2432                      array by reference, it may resize the buffers.
2433 
2434     nBuffers = The number of buffers to cycle through when calling `next`.
2435 
2436     Example:
2437     ---
2438     // Fetch lines of a file in a background
2439     // thread while processing previously fetched
2440     // lines, without duplicating any lines.
2441     auto file = File("foo.txt");
2442 
2443     void next(ref char[] buf)
2444     {
2445         file.readln(buf);
2446     }
2447 
2448     // Fetch more lines in the background while we
2449     // process the lines already read into memory
2450     // into a matrix of doubles.
2451     double[][] matrix;
2452     auto asyncReader = taskPool.asyncBuf(&next, &file.eof);
2453 
2454     foreach (line; asyncReader)
2455     {
2456         auto ls = line.split("\t");
2457         matrix ~= to!(double[])(ls);
2458     }
2459     ---
2460 
2461     $(B Exception Handling):
2462 
2463     Any exceptions thrown while iterating over `range` are re-thrown on a
2464     call to `popFront`.
2465 
2466     Warning:
2467 
2468     Using the range returned by this function in a parallel foreach loop
2469     will not work because buffers may be overwritten while the task that
2470     processes them is in queue.  This is checked for at compile time
2471     and will result in a static assertion failure.
2472     */
2473     auto asyncBuf(C1, C2)(C1 next, C2 empty, size_t initialBufSize = 0, size_t nBuffers = 100)
2474     if (is(typeof(C2.init()) : bool) &&
2475         Parameters!C1.length == 1 &&
2476         Parameters!C2.length == 0 &&
2477         isArray!(Parameters!C1[0])
2478     ) {
2479         auto roundRobin = RoundRobinBuffer!(C1, C2)(next, empty, initialBufSize, nBuffers);
2480         return asyncBuf(roundRobin, nBuffers / 2);
2481     }
2482 
2483     ///
2484     template reduce(functions...)
2485     {
2486         /**
2487         Parallel reduce on a random access range.  Except as otherwise noted,
2488         usage is similar to $(REF _reduce, std,algorithm,iteration).  There is
2489         also $(LREF fold) which does the same thing with a different parameter
2490         order.
2491 
2492         This function works by splitting the range to be reduced into work
2493         units, which are slices to be reduced in parallel.  Once the results
2494         from all work units are computed, a final serial reduction is performed
2495         on these results to compute the final answer. Therefore, care must be
2496         taken to choose the seed value appropriately.
2497 
2498         Because the reduction is being performed in parallel, `functions`
2499         must be associative.  For notational simplicity, let # be an
2500         infix operator representing `functions`.  Then, (a # b) # c must equal
2501         a # (b # c).  Floating point addition is not associative
2502         even though addition in exact arithmetic is.  Summing floating
2503         point numbers using this function may give different results than summing
2504         serially.  However, for many practical purposes floating point addition
2505         can be treated as associative.
2506 
2507         Note that, since `functions` are assumed to be associative,
2508         additional optimizations are made to the serial portion of the reduction
2509         algorithm. These take advantage of the instruction level parallelism of
2510         modern CPUs, in addition to the thread-level parallelism that the rest
2511         of this module exploits.  This can lead to better than linear speedups
2512         relative to $(REF _reduce, std,algorithm,iteration), especially for
2513         fine-grained benchmarks like dot products.
2514 
2515         An explicit seed may be provided as the first argument.  If
2516         provided, it is used as the seed for all work units and for the final
2517         reduction of results from all work units.  Therefore, if it is not the
2518         identity value for the operation being performed, results may differ
2519         from those generated by $(REF _reduce, std,algorithm,iteration) or
2520         depending on how many work units are used.  The next argument must be
2521         the range to be reduced.
2522         ---
2523         // Find the sum of squares of a range in parallel, using
2524         // an explicit seed.
2525         //
2526         // Timings on an Athlon 64 X2 dual core machine:
2527         //
2528         // Parallel reduce:                     72 milliseconds
2529         // Using std.algorithm.reduce instead:  181 milliseconds
2530         auto nums = iota(10_000_000.0f);
2531         auto sumSquares = taskPool.reduce!"a + b"(
2532             0.0, std.algorithm.map!"a * a"(nums)
2533         );
2534         ---
2535 
2536         If no explicit seed is provided, the first element of each work unit
2537         is used as a seed.  For the final reduction, the result from the first
2538         work unit is used as the seed.
2539         ---
2540         // Find the sum of a range in parallel, using the first
2541         // element of each work unit as the seed.
2542         auto sum = taskPool.reduce!"a + b"(nums);
2543         ---
2544 
2545         An explicit work unit size may be specified as the last argument.
2546         Specifying too small a work unit size will effectively serialize the
2547         reduction, as the final reduction of the result of each work unit will
2548         dominate computation time.  If `TaskPool.size` for this instance
2549         is zero, this parameter is ignored and one work unit is used.
2550         ---
2551         // Use a work unit size of 100.
2552         auto sum2 = taskPool.reduce!"a + b"(nums, 100);
2553 
2554         // Work unit size of 100 and explicit seed.
2555         auto sum3 = taskPool.reduce!"a + b"(0.0, nums, 100);
2556         ---
2557 
2558         Parallel reduce supports multiple functions, like
2559         `std.algorithm.reduce`.
2560         ---
2561         // Find both the min and max of nums.
2562         auto minMax = taskPool.reduce!(min, max)(nums);
2563         assert(minMax[0] == reduce!min(nums));
2564         assert(minMax[1] == reduce!max(nums));
2565         ---
2566 
2567         $(B Exception Handling):
2568 
2569         After this function is finished executing, any exceptions thrown
2570         are chained together via `Throwable.next` and rethrown.  The chaining
2571         order is non-deterministic.
2572 
2573         See_Also:
2574 
2575             $(LREF fold) is functionally equivalent to $(LREF _reduce) except the
2576             range parameter comes first and there is no need to use
2577             $(REF_ALTTEXT `tuple`,tuple,std,typecons) for multiple seeds.
2578          */
2579         auto reduce(Args...)(Args args)
2580         {
2581             import core.exception : OutOfMemoryError;
2582             import core.internal.lifetime : emplaceRef;
2583             import std.exception : enforce;
2584 
2585             alias fun = reduceAdjoin!functions;
2586             alias finishFun = reduceFinish!functions;
2587 
2588             static if (isIntegral!(Args[$ - 1]))
2589             {
2590                 size_t workUnitSize = cast(size_t) args[$ - 1];
2591                 alias args2 = args[0..$ - 1];
2592                 alias Args2 = Args[0..$ - 1];
2593             }
2594             else
2595             {
2596                 alias args2 = args;
2597                 alias Args2 = Args;
2598             }
2599 
2600             auto makeStartValue(Type)(Type e)
2601             {
2602                 static if (functions.length == 1)
2603                 {
2604                     return e;
2605                 }
2606                 else
2607                 {
2608                     typeof(adjoin!(staticMap!(binaryFun, functions))(e, e)) seed = void;
2609                     foreach (i, T; seed.Types)
2610                     {
2611                         emplaceRef(seed.expand[i], e);
2612                     }
2613 
2614                     return seed;
2615                 }
2616             }
2617 
2618             static if (args2.length == 2)
2619             {
2620                 static assert(isInputRange!(Args2[1]));
2621                 alias range = args2[1];
2622                 alias seed = args2[0];
2623                 enum explicitSeed = true;
2624 
2625                 static if (!is(typeof(workUnitSize)))
2626                 {
2627                     size_t workUnitSize = defaultWorkUnitSize(range.length);
2628                 }
2629             }
2630             else
2631             {
2632                 static assert(args2.length == 1);
2633                 alias range = args2[0];
2634 
2635                 static if (!is(typeof(workUnitSize)))
2636                 {
2637                     size_t workUnitSize = defaultWorkUnitSize(range.length);
2638                 }
2639 
2640                 enforce(!range.empty,
2641                     "Cannot reduce an empty range with first element as start value.");
2642 
2643                 auto seed = makeStartValue(range.front);
2644                 enum explicitSeed = false;
2645                 range.popFront();
2646             }
2647 
2648             alias E = typeof(seed);
2649             alias R = typeof(range);
2650 
2651             E reduceOnRange(R range, size_t lowerBound, size_t upperBound)
2652             {
2653                 // This is for exploiting instruction level parallelism by
2654                 // using multiple accumulator variables within each thread,
2655                 // since we're assuming functions are associative anyhow.
2656 
2657                 // This is so that loops can be unrolled automatically.
2658                 enum ilpTuple = AliasSeq!(0, 1, 2, 3, 4, 5);
2659                 enum nILP = ilpTuple.length;
2660                 immutable subSize = (upperBound - lowerBound) / nILP;
2661 
2662                 if (subSize <= 1)
2663                 {
2664                     // Handle as a special case.
2665                     static if (explicitSeed)
2666                     {
2667                         E result = seed;
2668                     }
2669                     else
2670                     {
2671                         E result = makeStartValue(range[lowerBound]);
2672                         lowerBound++;
2673                     }
2674 
2675                     foreach (i; lowerBound .. upperBound)
2676                     {
2677                         result = fun(result, range[i]);
2678                     }
2679 
2680                     return result;
2681                 }
2682 
2683                 assert(subSize > 1);
2684                 E[nILP] results;
2685                 size_t[nILP] offsets;
2686 
2687                 foreach (i; ilpTuple)
2688                 {
2689                     offsets[i] = lowerBound + subSize * i;
2690 
2691                     static if (explicitSeed)
2692                     {
2693                         results[i] = seed;
2694                     }
2695                     else
2696                     {
2697                         results[i] = makeStartValue(range[offsets[i]]);
2698                         offsets[i]++;
2699                     }
2700                 }
2701 
2702                 immutable nLoop = subSize - (!explicitSeed);
2703                 foreach (i; 0 .. nLoop)
2704                 {
2705                     foreach (j; ilpTuple)
2706                     {
2707                         results[j] = fun(results[j], range[offsets[j]]);
2708                         offsets[j]++;
2709                     }
2710                 }
2711 
2712                 // Finish the remainder.
2713                 foreach (i; nILP * subSize + lowerBound .. upperBound)
2714                 {
2715                     results[$ - 1] = fun(results[$ - 1], range[i]);
2716                 }
2717 
2718                 foreach (i; ilpTuple[1..$])
2719                 {
2720                     results[0] = finishFun(results[0], results[i]);
2721                 }
2722 
2723                 return results[0];
2724             }
2725 
2726             immutable len = range.length;
2727             if (len == 0)
2728             {
2729                 return seed;
2730             }
2731 
2732             if (this.size == 0)
2733             {
2734                 return finishFun(seed, reduceOnRange(range, 0, len));
2735             }
2736 
2737             // Unlike the rest of the functions here, I can't use the Task object
2738             // recycling trick here because this has to work on non-commutative
2739             // operations.  After all the tasks are done executing, fun() has to
2740             // be applied on the results of these to get a final result, but
2741             // it can't be evaluated out of order.
2742 
2743             if (workUnitSize > len)
2744             {
2745                 workUnitSize = len;
2746             }
2747 
2748             immutable size_t nWorkUnits = (len / workUnitSize) + ((len % workUnitSize == 0) ? 0 : 1);
2749             assert(nWorkUnits * workUnitSize >= len);
2750 
2751             alias RTask = Task!(run, typeof(&reduceOnRange), R, size_t, size_t);
2752             RTask[] tasks;
2753 
2754             // Can't use alloca() due to https://issues.dlang.org/show_bug.cgi?id=3753
2755             // Use a fixed buffer backed by malloc().
2756             enum maxStack = 2_048;
2757             byte[maxStack] buf = void;
2758             immutable size_t nBytesNeeded = nWorkUnits * RTask.sizeof;
2759 
2760             import core.stdc.stdlib : malloc, free;
2761             if (nBytesNeeded <= maxStack)
2762             {
2763                 tasks = (cast(RTask*) buf.ptr)[0 .. nWorkUnits];
2764             }
2765             else
2766             {
2767                 auto ptr = cast(RTask*) malloc(nBytesNeeded);
2768                 if (!ptr)
2769                 {
2770                     throw new OutOfMemoryError(
2771                         "Out of memory in std.parallelism."
2772                     );
2773                 }
2774 
2775                 tasks = ptr[0 .. nWorkUnits];
2776             }
2777 
2778             scope(exit)
2779             {
2780                 if (nBytesNeeded > maxStack)
2781                 {
2782                     free(tasks.ptr);
2783                 }
2784             }
2785 
2786             // Hack to take the address of a nested function w/o
2787             // making a closure.
2788             static auto scopedAddress(D)(scope D del) @system
2789             {
2790                 auto tmp = del;
2791                 return tmp;
2792             }
2793 
2794             size_t curPos = 0;
2795             void useTask(ref RTask task)
2796             {
2797                 import std.algorithm.comparison : min;
2798                 import core.lifetime : emplace;
2799 
2800                 // Private constructor, so can't feed it's arguments directly
2801                 // to emplace
2802                 emplace(&task, RTask
2803                 (
2804                     scopedAddress(&reduceOnRange),
2805                     range,
2806                     curPos, // lower bound.
2807                     cast() min(len, curPos + workUnitSize)  // upper bound.
2808                 ));
2809 
2810                 task.pool = this;
2811 
2812                 curPos += workUnitSize;
2813             }
2814 
2815             foreach (ref task; tasks)
2816             {
2817                 useTask(task);
2818             }
2819 
2820             foreach (i; 1 .. tasks.length - 1)
2821             {
2822                 tasks[i].next = tasks[i + 1].basePtr;
2823                 tasks[i + 1].prev = tasks[i].basePtr;
2824             }
2825 
2826             if (tasks.length > 1)
2827             {
2828                 queueLock();
2829                 scope(exit) queueUnlock();
2830 
2831                 abstractPutGroupNoSync(
2832                     tasks[1].basePtr,
2833                     tasks[$ - 1].basePtr
2834                 );
2835             }
2836 
2837             if (tasks.length > 0)
2838             {
2839                 try
2840                 {
2841                     tasks[0].job();
2842                 }
2843                 catch (Throwable e)
2844                 {
2845                     tasks[0].exception = e;
2846                 }
2847                 tasks[0].taskStatus = TaskStatus.done;
2848 
2849                 // Try to execute each of these in the current thread
2850                 foreach (ref task; tasks[1..$])
2851                 {
2852                     tryDeleteExecute(task.basePtr);
2853                 }
2854             }
2855 
2856             // Now that we've tried to execute every task, they're all either
2857             // done or in progress.  Force all of them.
2858             E result = seed;
2859 
2860             Throwable firstException;
2861 
2862             foreach (ref task; tasks)
2863             {
2864                 try
2865                 {
2866                     task.yieldForce;
2867                 }
2868                 catch (Throwable e)
2869                 {
2870                     /* Chain e to front because order doesn't matter and because
2871                      * e is not likely to be a chain itself (so fewer traversals)
2872                      */
2873                     firstException = Throwable.chainTogether(e, firstException);
2874                     continue;
2875                 }
2876 
2877                 if (!firstException) result = finishFun(result, task.returnVal);
2878             }
2879 
2880             if (firstException) throw firstException;
2881 
2882             return result;
2883         }
2884     }
2885 
2886     ///
2887     template fold(functions...)
2888     {
2889         /** Implements the homonym function (also known as `accumulate`, `compress`,
2890             `inject`, or `foldl`) present in various programming languages of
2891             functional flavor.
2892 
2893             `fold` is functionally equivalent to $(LREF reduce) except the range
2894             parameter comes first and there is no need to use $(REF_ALTTEXT
2895             `tuple`,tuple,std,typecons) for multiple seeds.
2896 
2897             There may be one or more callable entities (`functions` argument) to
2898             apply.
2899 
2900             Params:
2901                 args = Just the range to _fold over; or the range and one seed
2902                        per function; or the range, one seed per function, and
2903                        the work unit size
2904 
2905             Returns:
2906                 The accumulated result as a single value for single function and
2907                 as a tuple of values for multiple functions
2908 
2909             See_Also:
2910             Similar to $(REF _fold, std,algorithm,iteration), `fold` is a wrapper around $(LREF reduce).
2911 
2912             Example:
2913             ---
2914             static int adder(int a, int b)
2915             {
2916                 return a + b;
2917             }
2918             static int multiplier(int a, int b)
2919             {
2920                 return a * b;
2921             }
2922 
2923             // Just the range
2924             auto x = taskPool.fold!adder([1, 2, 3, 4]);
2925             assert(x == 10);
2926 
2927             // The range and the seeds (0 and 1 below; also note multiple
2928             // functions in this example)
2929             auto y = taskPool.fold!(adder, multiplier)([1, 2, 3, 4], 0, 1);
2930             assert(y[0] == 10);
2931             assert(y[1] == 24);
2932 
2933             // The range, the seed (0), and the work unit size (20)
2934             auto z = taskPool.fold!adder([1, 2, 3, 4], 0, 20);
2935             assert(z == 10);
2936             ---
2937         */
2938         auto fold(Args...)(Args args)
2939         {
2940             static assert(isInputRange!(Args[0]), "First argument must be an InputRange");
2941 
2942             alias range = args[0];
2943 
2944             static if (Args.length == 1)
2945             {
2946                 // Just the range
2947                 return reduce!functions(range);
2948             }
2949             else static if (Args.length == 1 + functions.length ||
2950                             Args.length == 1 + functions.length + 1)
2951             {
2952                 static if (functions.length == 1)
2953                 {
2954                     alias seeds = args[1];
2955                 }
2956                 else
2957                 {
2958                     auto seeds()
2959                     {
2960                         import std.typecons : tuple;
2961                         return tuple(args[1 .. functions.length+1]);
2962                     }
2963                 }
2964 
2965                 static if (Args.length == 1 + functions.length)
2966                 {
2967                     // The range and the seeds
2968                     return reduce!functions(seeds, range);
2969                 }
2970                 else static if (Args.length == 1 + functions.length + 1)
2971                 {
2972                     // The range, the seeds, and the work unit size
2973                     static assert(isIntegral!(Args[$-1]), "Work unit size must be an integral type");
2974                     return reduce!functions(seeds, range, args[$-1]);
2975                 }
2976             }
2977             else
2978             {
2979                 import std.conv : text;
2980                 static assert(0, "Invalid number of arguments (" ~ Args.length.text ~ "): Should be an input range, "
2981                               ~ functions.length.text ~ " optional seed(s), and an optional work unit size.");
2982             }
2983         }
2984     }
2985 
2986     // This test is not included in the documentation because even though these
2987     // examples are for the inner fold() template, with their current location,
2988     // they would appear under the outer one. (We can't move this inside the
2989     // outer fold() template because then dmd runs out of memory possibly due to
2990     // recursive template instantiation, which is surprisingly not caught.)
2991     @system unittest
2992     {
2993         // Just the range
2994         auto x = taskPool.fold!"a + b"([1, 2, 3, 4]);
2995         assert(x == 10);
2996 
2997         // The range and the seeds (0 and 1 below; also note multiple
2998         // functions in this example)
2999         auto y = taskPool.fold!("a + b", "a * b")([1, 2, 3, 4], 0, 1);
3000         assert(y[0] == 10);
3001         assert(y[1] == 24);
3002 
3003         // The range, the seed (0), and the work unit size (20)
3004         auto z = taskPool.fold!"a + b"([1, 2, 3, 4], 0, 20);
3005         assert(z == 10);
3006     }
3007 
3008     /**
3009     Gets the index of the current thread relative to this `TaskPool`.  Any
3010     thread not in this pool will receive an index of 0.  The worker threads in
3011     this pool receive unique indices of 1 through `this.size`.
3012 
3013     This function is useful for maintaining worker-local resources.
3014 
3015     Example:
3016     ---
3017     // Execute a loop that computes the greatest common
3018     // divisor of every number from 0 through 999 with
3019     // 42 in parallel.  Write the results out to
3020     // a set of files, one for each thread.  This allows
3021     // results to be written out without any synchronization.
3022 
3023     import std.conv, std.range, std.numeric, std.stdio;
3024 
3025     void main()
3026     {
3027         auto filesHandles = new File[taskPool.size + 1];
3028         scope(exit) {
3029             foreach (ref handle; fileHandles)
3030             {
3031                 handle.close();
3032             }
3033         }
3034 
3035         foreach (i, ref handle; fileHandles)
3036         {
3037             handle = File("workerResults" ~ to!string(i) ~ ".txt");
3038         }
3039 
3040         foreach (num; parallel(iota(1_000)))
3041         {
3042             auto outHandle = fileHandles[taskPool.workerIndex];
3043             outHandle.writeln(num, '\t', gcd(num, 42));
3044         }
3045     }
3046     ---
3047     */
3048     size_t workerIndex() @property @safe const nothrow
3049     {
3050         immutable rawInd = threadIndex;
3051         return (rawInd >= instanceStartIndex && rawInd < instanceStartIndex + size) ?
3052                 (rawInd - instanceStartIndex + 1) : 0;
3053     }
3054 
3055     /**
3056     Struct for creating worker-local storage.  Worker-local storage is
3057     thread-local storage that exists only for worker threads in a given
3058     `TaskPool` plus a single thread outside the pool.  It is allocated on the
3059     garbage collected heap in a way that avoids _false sharing, and doesn't
3060     necessarily have global scope within any thread.  It can be accessed from
3061     any worker thread in the `TaskPool` that created it, and one thread
3062     outside this `TaskPool`.  All threads outside the pool that created a
3063     given instance of worker-local storage share a single slot.
3064 
3065     Since the underlying data for this struct is heap-allocated, this struct
3066     has reference semantics when passed between functions.
3067 
3068     The main uses cases for `WorkerLocalStorage` are:
3069 
3070     1.  Performing parallel reductions with an imperative, as opposed to
3071         functional, programming style.  In this case, it's useful to treat
3072         `WorkerLocalStorage` as local to each thread for only the parallel
3073         portion of an algorithm.
3074 
3075     2.  Recycling temporary buffers across iterations of a parallel foreach loop.
3076 
3077     Example:
3078     ---
3079     // Calculate pi as in our synopsis example, but
3080     // use an imperative instead of a functional style.
3081     immutable n = 1_000_000_000;
3082     immutable delta = 1.0L / n;
3083 
3084     auto sums = taskPool.workerLocalStorage(0.0L);
3085     foreach (i; parallel(iota(n)))
3086     {
3087         immutable x = ( i - 0.5L ) * delta;
3088         immutable toAdd = delta / ( 1.0 + x * x );
3089         sums.get += toAdd;
3090     }
3091 
3092     // Add up the results from each worker thread.
3093     real pi = 0;
3094     foreach (threadResult; sums.toRange)
3095     {
3096         pi += 4.0L * threadResult;
3097     }
3098     ---
3099      */
3100     static struct WorkerLocalStorage(T)
3101     {
3102     private:
3103         TaskPool pool;
3104         size_t size;
3105 
3106         size_t elemSize;
3107         bool* stillThreadLocal;
3108 
3109         static size_t roundToLine(size_t num) pure nothrow
3110         {
3111             if (num % cacheLineSize == 0)
3112             {
3113                 return num;
3114             }
3115             else
3116             {
3117                 return ((num / cacheLineSize) + 1) * cacheLineSize;
3118             }
3119         }
3120 
3121         void* data;
3122 
3123         void initialize(TaskPool pool)
3124         {
3125             this.pool = pool;
3126             size = pool.size + 1;
3127             stillThreadLocal = new bool;
3128             *stillThreadLocal = true;
3129 
3130             // Determines whether the GC should scan the array.
3131             auto blkInfo = (typeid(T).flags & 1) ?
3132                            cast(GC.BlkAttr) 0 :
3133                            GC.BlkAttr.NO_SCAN;
3134 
3135             immutable nElem = pool.size + 1;
3136             elemSize = roundToLine(T.sizeof);
3137 
3138             // The + 3 is to pad one full cache line worth of space on either side
3139             // of the data structure to make sure false sharing with completely
3140             // unrelated heap data is prevented, and to provide enough padding to
3141             // make sure that data is cache line-aligned.
3142             data = GC.malloc(elemSize * (nElem + 3), blkInfo) + elemSize;
3143 
3144             // Cache line align data ptr.
3145             data = cast(void*) roundToLine(cast(size_t) data);
3146 
3147             foreach (i; 0 .. nElem)
3148             {
3149                 this.opIndex(i) = T.init;
3150             }
3151         }
3152 
3153         ref opIndex(this Qualified)(size_t index)
3154         {
3155             import std.conv : text;
3156             assert(index < size, text(index, '\t', uint.max));
3157             return *(cast(CopyTypeQualifiers!(Qualified, T)*) (data + elemSize * index));
3158         }
3159 
3160         void opIndexAssign(T val, size_t index)
3161         {
3162             assert(index < size);
3163             *(cast(T*) (data + elemSize * index)) = val;
3164         }
3165 
3166     public:
3167         /**
3168         Get the current thread's instance.  Returns by ref.
3169         Note that calling `get` from any thread
3170         outside the `TaskPool` that created this instance will return the
3171         same reference, so an instance of worker-local storage should only be
3172         accessed from one thread outside the pool that created it.  If this
3173         rule is violated, undefined behavior will result.
3174 
3175         If assertions are enabled and `toRange` has been called, then this
3176         WorkerLocalStorage instance is no longer worker-local and an assertion
3177         failure will result when calling this method.  This is not checked
3178         when assertions are disabled for performance reasons.
3179          */
3180         ref get(this Qualified)() @property
3181         {
3182             assert(*stillThreadLocal,
3183                 "Cannot call get() on this instance of WorkerLocalStorage " ~
3184                 "because it is no longer worker-local."
3185             );
3186             return opIndex(pool.workerIndex);
3187         }
3188 
3189         /**
3190         Assign a value to the current thread's instance.  This function has
3191         the same caveats as its overload.
3192         */
3193         void get(T val) @property
3194         {
3195             assert(*stillThreadLocal,
3196                 "Cannot call get() on this instance of WorkerLocalStorage " ~
3197                 "because it is no longer worker-local."
3198             );
3199 
3200             opIndexAssign(val, pool.workerIndex);
3201         }
3202 
3203         /**
3204         Returns a range view of the values for all threads, which can be used
3205         to further process the results of each thread after running the parallel
3206         part of your algorithm.  Do not use this method in the parallel portion
3207         of your algorithm.
3208 
3209         Calling this function sets a flag indicating that this struct is no
3210         longer worker-local, and attempting to use the `get` method again
3211         will result in an assertion failure if assertions are enabled.
3212          */
3213         WorkerLocalStorageRange!T toRange() @property
3214         {
3215             if (*stillThreadLocal)
3216             {
3217                 *stillThreadLocal = false;
3218 
3219                 // Make absolutely sure results are visible to all threads.
3220                 // This is probably not necessary since some other
3221                 // synchronization primitive will be used to signal that the
3222                 // parallel part of the algorithm is done, but the
3223                 // performance impact should be negligible, so it's better
3224                 // to be safe.
3225                 ubyte barrierDummy;
3226                 atomicSetUbyte(barrierDummy, 1);
3227             }
3228 
3229             return WorkerLocalStorageRange!T(this);
3230         }
3231     }
3232 
3233     /**
3234     Range primitives for worker-local storage.  The purpose of this is to
3235     access results produced by each worker thread from a single thread once you
3236     are no longer using the worker-local storage from multiple threads.
3237     Do not use this struct in the parallel portion of your algorithm.
3238 
3239     The proper way to instantiate this object is to call
3240     `WorkerLocalStorage.toRange`.  Once instantiated, this object behaves
3241     as a finite random-access range with assignable, lvalue elements and
3242     a length equal to the number of worker threads in the `TaskPool` that
3243     created it plus 1.
3244      */
3245     static struct WorkerLocalStorageRange(T)
3246     {
3247     private:
3248         WorkerLocalStorage!T workerLocalStorage;
3249 
3250         size_t _length;
3251         size_t beginOffset;
3252 
3253         this(WorkerLocalStorage!T wl)
3254         {
3255             this.workerLocalStorage = wl;
3256             _length = wl.size;
3257         }
3258 
3259     public:
3260         ref front(this Qualified)() @property
3261         {
3262             return this[0];
3263         }
3264 
3265         ref back(this Qualified)() @property
3266         {
3267             return this[_length - 1];
3268         }
3269 
3270         void popFront()
3271         {
3272             if (_length > 0)
3273             {
3274                 beginOffset++;
3275                 _length--;
3276             }
3277         }
3278 
3279         void popBack()
3280         {
3281             if (_length > 0)
3282             {
3283                 _length--;
3284             }
3285         }
3286 
3287         typeof(this) save() @property
3288         {
3289             return this;
3290         }
3291 
3292         ref opIndex(this Qualified)(size_t index)
3293         {
3294             assert(index < _length);
3295             return workerLocalStorage[index + beginOffset];
3296         }
3297 
3298         void opIndexAssign(T val, size_t index)
3299         {
3300             assert(index < _length);
3301             workerLocalStorage[index] = val;
3302         }
3303 
3304         typeof(this) opSlice(size_t lower, size_t upper)
3305         {
3306             assert(upper <= _length);
3307             auto newWl = this.workerLocalStorage;
3308             newWl.data += lower * newWl.elemSize;
3309             newWl.size = upper - lower;
3310             return typeof(this)(newWl);
3311         }
3312 
3313         bool empty() const @property
3314         {
3315             return length == 0;
3316         }
3317 
3318         size_t length() const @property
3319         {
3320             return _length;
3321         }
3322     }
3323 
3324     /**
3325     Creates an instance of worker-local storage, initialized with a given
3326     value.  The value is `lazy` so that you can, for example, easily
3327     create one instance of a class for each worker.  For usage example,
3328     see the `WorkerLocalStorage` struct.
3329      */
3330     WorkerLocalStorage!T workerLocalStorage(T)(lazy T initialVal = T.init)
3331     {
3332         WorkerLocalStorage!T ret;
3333         ret.initialize(this);
3334         foreach (i; 0 .. size + 1)
3335         {
3336             ret[i] = initialVal;
3337         }
3338 
3339         // Memory barrier to make absolutely sure that what we wrote is
3340         // visible to worker threads.
3341         ubyte barrierDummy;
3342         atomicSetUbyte(barrierDummy, 0);
3343 
3344         return ret;
3345     }
3346 
3347     /**
3348     Signals to all worker threads to terminate as soon as they are finished
3349     with their current `Task`, or immediately if they are not executing a
3350     `Task`.  `Task`s that were in queue will not be executed unless
3351     a call to `Task.workForce`, `Task.yieldForce` or `Task.spinForce`
3352     causes them to be executed.
3353 
3354     Use only if you have waited on every `Task` and therefore know the
3355     queue is empty, or if you speculatively executed some tasks and no longer
3356     need the results.
3357      */
3358     void stop() @trusted
3359     {
3360         queueLock();
3361         scope(exit) queueUnlock();
3362         atomicSetUbyte(status, PoolState.stopNow);
3363         notifyAll();
3364     }
3365 
3366     /**
3367     Signals worker threads to terminate when the queue becomes empty.
3368 
3369     If blocking argument is true, wait for all worker threads to terminate
3370     before returning.  This option might be used in applications where
3371     task results are never consumed-- e.g. when `TaskPool` is employed as a
3372     rudimentary scheduler for tasks which communicate by means other than
3373     return values.
3374 
3375     Warning:  Calling this function with $(D blocking = true) from a worker
3376               thread that is a member of the same `TaskPool` that
3377               `finish` is being called on will result in a deadlock.
3378      */
3379     void finish(bool blocking = false) @trusted
3380     {
3381         {
3382             queueLock();
3383             scope(exit) queueUnlock();
3384             atomicCasUbyte(status, PoolState.running, PoolState.finishing);
3385             notifyAll();
3386         }
3387         if (blocking)
3388         {
3389             // Use this thread as a worker until everything is finished.
3390             executeWorkLoop();
3391 
3392             foreach (t; pool)
3393             {
3394                 // Maybe there should be something here to prevent a thread
3395                 // from calling join() on itself if this function is called
3396                 // from a worker thread in the same pool, but:
3397                 //
3398                 // 1.  Using an if statement to skip join() would result in
3399                 //     finish() returning without all tasks being finished.
3400                 //
3401                 // 2.  If an exception were thrown, it would bubble up to the
3402                 //     Task from which finish() was called and likely be
3403                 //     swallowed.
3404                 t.join();
3405             }
3406         }
3407     }
3408 
3409     /// Returns the number of worker threads in the pool.
3410     @property size_t size() @safe const pure nothrow
3411     {
3412         return pool.length;
3413     }
3414 
3415     /**
3416     Put a `Task` object on the back of the task queue.  The `Task`
3417     object may be passed by pointer or reference.
3418 
3419     Example:
3420     ---
3421     import std.file;
3422 
3423     // Create a task.
3424     auto t = task!read("foo.txt");
3425 
3426     // Add it to the queue to be executed.
3427     taskPool.put(t);
3428     ---
3429 
3430     Notes:
3431 
3432     @trusted overloads of this function are called for `Task`s if
3433     $(REF hasUnsharedAliasing, std,traits) is false for the `Task`'s
3434     return type or the function the `Task` executes is `pure`.
3435     `Task` objects that meet all other requirements specified in the
3436     `@trusted` overloads of `task` and `scopedTask` may be created
3437     and executed from `@safe` code via `Task.executeInNewThread` but
3438     not via `TaskPool`.
3439 
3440     While this function takes the address of variables that may
3441     be on the stack, some overloads are marked as @trusted.
3442     `Task` includes a destructor that waits for the task to complete
3443     before destroying the stack frame it is allocated on.  Therefore,
3444     it is impossible for the stack frame to be destroyed before the task is
3445     complete and no longer referenced by a `TaskPool`.
3446     */
3447     void put(alias fun, Args...)(ref Task!(fun, Args) task)
3448     if (!isSafeReturn!(typeof(task)))
3449     {
3450         task.pool = this;
3451         abstractPut(task.basePtr);
3452     }
3453 
3454     /// Ditto
3455     void put(alias fun, Args...)(Task!(fun, Args)* task)
3456     if (!isSafeReturn!(typeof(*task)))
3457     {
3458         import std.exception : enforce;
3459         enforce(task !is null, "Cannot put a null Task on a TaskPool queue.");
3460         put(*task);
3461     }
3462 
3463     @trusted void put(alias fun, Args...)(ref Task!(fun, Args) task)
3464     if (isSafeReturn!(typeof(task)))
3465     {
3466         task.pool = this;
3467         abstractPut(task.basePtr);
3468     }
3469 
3470     @trusted void put(alias fun, Args...)(Task!(fun, Args)* task)
3471     if (isSafeReturn!(typeof(*task)))
3472     {
3473         import std.exception : enforce;
3474         enforce(task !is null, "Cannot put a null Task on a TaskPool queue.");
3475         put(*task);
3476     }
3477 
3478     /**
3479     These properties control whether the worker threads are daemon threads.
3480     A daemon thread is automatically terminated when all non-daemon threads
3481     have terminated.  A non-daemon thread will prevent a program from
3482     terminating as long as it has not terminated.
3483 
3484     If any `TaskPool` with non-daemon threads is active, either `stop`
3485     or `finish` must be called on it before the program can terminate.
3486 
3487     The worker treads in the `TaskPool` instance returned by the
3488     `taskPool` property are daemon by default.  The worker threads of
3489     manually instantiated task pools are non-daemon by default.
3490 
3491     Note:  For a size zero pool, the getter arbitrarily returns true and the
3492            setter has no effect.
3493     */
3494     bool isDaemon() @property @trusted
3495     {
3496         queueLock();
3497         scope(exit) queueUnlock();
3498         return (size == 0) ? true : pool[0].isDaemon;
3499     }
3500 
3501     /// Ditto
3502     void isDaemon(bool newVal) @property @trusted
3503     {
3504         queueLock();
3505         scope(exit) queueUnlock();
3506         foreach (thread; pool)
3507         {
3508             thread.isDaemon = newVal;
3509         }
3510     }
3511 
3512     /**
3513     These functions allow getting and setting the OS scheduling priority of
3514     the worker threads in this `TaskPool`.  They forward to
3515     `core.thread.Thread.priority`, so a given priority value here means the
3516     same thing as an identical priority value in `core.thread`.
3517 
3518     Note:  For a size zero pool, the getter arbitrarily returns
3519            `core.thread.Thread.PRIORITY_MIN` and the setter has no effect.
3520     */
3521     int priority() @property @trusted
3522     {
3523         return (size == 0) ? core.thread.Thread.PRIORITY_MIN :
3524         pool[0].priority;
3525     }
3526 
3527     /// Ditto
3528     void priority(int newPriority) @property @trusted
3529     {
3530         if (size > 0)
3531         {
3532             foreach (t; pool)
3533             {
3534                 t.priority = newPriority;
3535             }
3536         }
3537     }
3538 }
3539 
3540 @system unittest
3541 {
3542     import std.algorithm.iteration : sum;
3543     import std.range : iota;
3544     import std.typecons : tuple;
3545 
3546     enum N = 100;
3547     auto r = iota(1, N + 1);
3548     const expected = r.sum();
3549 
3550     // Just the range
3551     assert(taskPool.fold!"a + b"(r) == expected);
3552 
3553     // Range and seeds
3554     assert(taskPool.fold!"a + b"(r, 0) == expected);
3555     assert(taskPool.fold!("a + b", "a + b")(r, 0, 0) == tuple(expected, expected));
3556 
3557     // Range, seeds, and work unit size
3558     assert(taskPool.fold!"a + b"(r, 0, 42) == expected);
3559     assert(taskPool.fold!("a + b", "a + b")(r, 0, 0, 42) == tuple(expected, expected));
3560 }
3561 
3562 // Issue 16705
3563 @system unittest
3564 {
3565     struct MyIota
3566     {
3567         size_t front;
3568         void popFront()(){front++;}
3569         auto empty(){return front >= 25;}
3570         auto opIndex(size_t i){return front+i;}
3571         auto length(){return 25-front;}
3572     }
3573 
3574     auto mySum = taskPool.reduce!"a + b"(MyIota());
3575 }
3576 
3577 /**
3578 Returns a lazily initialized global instantiation of `TaskPool`.
3579 This function can safely be called concurrently from multiple non-worker
3580 threads.  The worker threads in this pool are daemon threads, meaning that it
3581 is not necessary to call `TaskPool.stop` or `TaskPool.finish` before
3582 terminating the main thread.
3583 */
3584 @property TaskPool taskPool() @trusted
3585 {
3586     import std.concurrency : initOnce;
3587     __gshared TaskPool pool;
3588     return initOnce!pool({
3589         auto p = new TaskPool(defaultPoolThreads);
3590         p.isDaemon = true;
3591         return p;
3592     }());
3593 }
3594 
3595 private shared uint _defaultPoolThreads = uint.max;
3596 
3597 /**
3598 These properties get and set the number of worker threads in the `TaskPool`
3599 instance returned by `taskPool`.  The default value is `totalCPUs` - 1.
3600 Calling the setter after the first call to `taskPool` does not changes
3601 number of worker threads in the instance returned by `taskPool`.
3602 */
3603 @property uint defaultPoolThreads() @trusted
3604 {
3605     const local = atomicLoad(_defaultPoolThreads);
3606     return local < uint.max ? local : totalCPUs - 1;
3607 }
3608 
3609 /// Ditto
3610 @property void defaultPoolThreads(uint newVal) @trusted
3611 {
3612     atomicStore(_defaultPoolThreads, newVal);
3613 }
3614 
3615 /**
3616 Convenience functions that forwards to `taskPool.parallel`.  The
3617 purpose of these is to make parallel foreach less verbose and more
3618 readable.
3619 
3620 Example:
3621 ---
3622 // Find the logarithm of every number from
3623 // 1 to 1_000_000 in parallel, using the
3624 // default TaskPool instance.
3625 auto logs = new double[1_000_000];
3626 
3627 foreach (i, ref elem; parallel(logs))
3628 {
3629     elem = log(i + 1.0);
3630 }
3631 ---
3632 
3633 */
3634 ParallelForeach!R parallel(R)(R range)
3635 {
3636     return taskPool.parallel(range);
3637 }
3638 
3639 /// Ditto
3640 ParallelForeach!R parallel(R)(R range, size_t workUnitSize)
3641 {
3642     return taskPool.parallel(range, workUnitSize);
3643 }
3644 
3645 //  `each` should be usable with parallel
3646 // https://issues.dlang.org/show_bug.cgi?id=17019
3647 @system unittest
3648 {
3649     import std.algorithm.iteration : each, sum;
3650     import std.range : iota;
3651 
3652     // check behavior with parallel
3653     auto arr = new int[10];
3654     parallel(arr).each!((ref e) => e += 1);
3655     assert(arr.sum == 10);
3656 
3657     auto arrIndex = new int[10];
3658     parallel(arrIndex).each!((i, ref e) => e += i);
3659     assert(arrIndex.sum == 10.iota.sum);
3660 }
3661 
3662 // https://issues.dlang.org/show_bug.cgi?id=22745
3663 @system unittest
3664 {
3665     auto pool = new TaskPool(0);
3666     int[] empty;
3667     foreach (i; pool.parallel(empty)) {}
3668     pool.finish();
3669 }
3670 
3671 // Thrown when a parallel foreach loop is broken from.
3672 class ParallelForeachError : Error
3673 {
3674     this()
3675     {
3676         super("Cannot break from a parallel foreach loop using break, return, "
3677               ~ "labeled break/continue or goto statements.");
3678     }
3679 }
3680 
3681 /*------Structs that implement opApply for parallel foreach.------------------*/
3682 private template randLen(R)
3683 {
3684     enum randLen = isRandomAccessRange!R && hasLength!R;
3685 }
3686 
3687 private void submitAndExecute(
3688     TaskPool pool,
3689     scope void delegate() doIt
3690 )
3691 {
3692     import core.exception : OutOfMemoryError;
3693     immutable nThreads = pool.size + 1;
3694 
3695     alias PTask = typeof(scopedTask(doIt));
3696     import core.stdc.stdlib : malloc, free;
3697     import core.stdc.string : memcpy;
3698 
3699     // The logical thing to do would be to just use alloca() here, but that
3700     // causes problems on Windows for reasons that I don't understand
3701     // (tentatively a compiler bug) and definitely doesn't work on Posix due
3702     // to https://issues.dlang.org/show_bug.cgi?id=3753.
3703     // Therefore, allocate a fixed buffer and fall back to `malloc()` if
3704     // someone's using a ridiculous amount of threads.
3705     // Also, the using a byte array instead of a PTask array as the fixed buffer
3706     // is to prevent d'tors from being called on uninitialized excess PTask
3707     // instances.
3708     enum nBuf = 64;
3709     byte[nBuf * PTask.sizeof] buf = void;
3710     PTask[] tasks;
3711     if (nThreads <= nBuf)
3712     {
3713         tasks = (cast(PTask*) buf.ptr)[0 .. nThreads];
3714     }
3715     else
3716     {
3717         auto ptr = cast(PTask*) malloc(nThreads * PTask.sizeof);
3718         if (!ptr) throw new OutOfMemoryError("Out of memory in std.parallelism.");
3719         tasks = ptr[0 .. nThreads];
3720     }
3721 
3722     scope(exit)
3723     {
3724         if (nThreads > nBuf)
3725         {
3726             free(tasks.ptr);
3727         }
3728     }
3729 
3730     foreach (ref t; tasks)
3731     {
3732         import core.stdc.string : memcpy;
3733 
3734         // This silly looking code is necessary to prevent d'tors from being
3735         // called on uninitialized objects.
3736         auto temp = scopedTask(doIt);
3737         memcpy(&t, &temp, PTask.sizeof);
3738 
3739         // This has to be done to t after copying, not temp before copying.
3740         // Otherwise, temp's destructor will sit here and wait for the
3741         // task to finish.
3742         t.pool = pool;
3743     }
3744 
3745     foreach (i; 1 .. tasks.length - 1)
3746     {
3747         tasks[i].next = tasks[i + 1].basePtr;
3748         tasks[i + 1].prev = tasks[i].basePtr;
3749     }
3750 
3751     if (tasks.length > 1)
3752     {
3753         pool.queueLock();
3754         scope(exit) pool.queueUnlock();
3755 
3756         pool.abstractPutGroupNoSync(
3757             tasks[1].basePtr,
3758             tasks[$ - 1].basePtr
3759         );
3760     }
3761 
3762     if (tasks.length > 0)
3763     {
3764         try
3765         {
3766             tasks[0].job();
3767         }
3768         catch (Throwable e)
3769         {
3770             tasks[0].exception = e; // nocoverage
3771         }
3772         tasks[0].taskStatus = TaskStatus.done;
3773 
3774         // Try to execute each of these in the current thread
3775         foreach (ref task; tasks[1..$])
3776         {
3777             pool.tryDeleteExecute(task.basePtr);
3778         }
3779     }
3780 
3781     Throwable firstException;
3782 
3783     foreach (i, ref task; tasks)
3784     {
3785         try
3786         {
3787             task.yieldForce;
3788         }
3789         catch (Throwable e)
3790         {
3791             /* Chain e to front because order doesn't matter and because
3792              * e is not likely to be a chain itself (so fewer traversals)
3793              */
3794             firstException = Throwable.chainTogether(e, firstException);
3795             continue;
3796         }
3797     }
3798 
3799     if (firstException) throw firstException;
3800 }
3801 
3802 void foreachErr()
3803 {
3804     throw new ParallelForeachError();
3805 }
3806 
3807 int doSizeZeroCase(R, Delegate)(ref ParallelForeach!R p, Delegate dg)
3808 {
3809     with(p)
3810     {
3811         int res = 0;
3812         size_t index = 0;
3813 
3814         // The explicit ElementType!R in the foreach loops is necessary for
3815         // correct behavior when iterating over strings.
3816         static if (hasLvalueElements!R)
3817         {
3818             foreach (ref ElementType!R elem; range)
3819             {
3820                 static if (Parameters!dg.length == 2)
3821                 {
3822                     res = dg(index, elem);
3823                 }
3824                 else
3825                 {
3826                     res = dg(elem);
3827                 }
3828                 if (res) break;
3829                 index++;
3830             }
3831         }
3832         else
3833         {
3834             foreach (ElementType!R elem; range)
3835             {
3836                 static if (Parameters!dg.length == 2)
3837                 {
3838                     res = dg(index, elem);
3839                 }
3840                 else
3841                 {
3842                     res = dg(elem);
3843                 }
3844                 if (res) break;
3845                 index++;
3846             }
3847         }
3848         if (res) foreachErr;
3849         return res;
3850     }
3851 }
3852 
3853 private enum string parallelApplyMixinRandomAccess = q{
3854     // Handle empty thread pool as special case.
3855     if (pool.size == 0)
3856     {
3857         return doSizeZeroCase(this, dg);
3858     }
3859 
3860     // Whether iteration is with or without an index variable.
3861     enum withIndex = Parameters!(typeof(dg)).length == 2;
3862 
3863     shared size_t workUnitIndex = size_t.max;  // Effectively -1:  chunkIndex + 1 == 0
3864     immutable len = range.length;
3865     if (!len) return 0;
3866 
3867     shared bool shouldContinue = true;
3868 
3869     void doIt()
3870     {
3871         import std.algorithm.comparison : min;
3872 
3873         scope(failure)
3874         {
3875             // If an exception is thrown, all threads should bail.
3876             atomicStore(shouldContinue, false);
3877         }
3878 
3879         while (atomicLoad(shouldContinue))
3880         {
3881             immutable myUnitIndex = atomicOp!"+="(workUnitIndex, 1);
3882             immutable start = workUnitSize * myUnitIndex;
3883             if (start >= len)
3884             {
3885                 atomicStore(shouldContinue, false);
3886                 break;
3887             }
3888 
3889             immutable end = min(len, start + workUnitSize);
3890 
3891             foreach (i; start .. end)
3892             {
3893                 static if (withIndex)
3894                 {
3895                     if (dg(i, range[i])) foreachErr();
3896                 }
3897                 else
3898                 {
3899                     if (dg(range[i])) foreachErr();
3900                 }
3901             }
3902         }
3903     }
3904 
3905     submitAndExecute(pool, &doIt);
3906 
3907     return 0;
3908 };
3909 
3910 enum string parallelApplyMixinInputRange = q{
3911     // Handle empty thread pool as special case.
3912     if (pool.size == 0)
3913     {
3914         return doSizeZeroCase(this, dg);
3915     }
3916 
3917     // Whether iteration is with or without an index variable.
3918     enum withIndex = Parameters!(typeof(dg)).length == 2;
3919 
3920     // This protects the range while copying it.
3921     auto rangeMutex = new Mutex();
3922 
3923     shared bool shouldContinue = true;
3924 
3925     // The total number of elements that have been popped off range.
3926     // This is updated only while protected by rangeMutex;
3927     size_t nPopped = 0;
3928 
3929     static if (
3930         is(typeof(range.buf1)) &&
3931         is(typeof(range.bufPos)) &&
3932         is(typeof(range.doBufSwap()))
3933     )
3934     {
3935         // Make sure we don't have the buffer recycling overload of
3936         // asyncBuf.
3937         static if (
3938             is(typeof(range.source)) &&
3939             isRoundRobin!(typeof(range.source))
3940         )
3941         {
3942             static assert(0, "Cannot execute a parallel foreach loop on " ~
3943             "the buffer recycling overload of asyncBuf.");
3944         }
3945 
3946         enum bool bufferTrick = true;
3947     }
3948     else
3949     {
3950         enum bool bufferTrick = false;
3951     }
3952 
3953     void doIt()
3954     {
3955         scope(failure)
3956         {
3957             // If an exception is thrown, all threads should bail.
3958             atomicStore(shouldContinue, false);
3959         }
3960 
3961         static if (hasLvalueElements!R)
3962         {
3963             alias Temp = ElementType!R*[];
3964             Temp temp;
3965 
3966             // Returns:  The previous value of nPopped.
3967             size_t makeTemp()
3968             {
3969                 import std.algorithm.internal : addressOf;
3970                 import std.array : uninitializedArray;
3971 
3972                 if (temp is null)
3973                 {
3974                     temp = uninitializedArray!Temp(workUnitSize);
3975                 }
3976 
3977                 rangeMutex.lock();
3978                 scope(exit) rangeMutex.unlock();
3979 
3980                 size_t i = 0;
3981                 for (; i < workUnitSize && !range.empty; range.popFront(), i++)
3982                 {
3983                     temp[i] = addressOf(range.front);
3984                 }
3985 
3986                 temp = temp[0 .. i];
3987                 auto ret = nPopped;
3988                 nPopped += temp.length;
3989                 return ret;
3990             }
3991 
3992         }
3993         else
3994         {
3995 
3996             alias Temp = ElementType!R[];
3997             Temp temp;
3998 
3999             // Returns:  The previous value of nPopped.
4000             static if (!bufferTrick) size_t makeTemp()
4001             {
4002                 import std.array : uninitializedArray;
4003 
4004                 if (temp is null)
4005                 {
4006                     temp = uninitializedArray!Temp(workUnitSize);
4007                 }
4008 
4009                 rangeMutex.lock();
4010                 scope(exit) rangeMutex.unlock();
4011 
4012                 size_t i = 0;
4013                 for (; i < workUnitSize && !range.empty; range.popFront(), i++)
4014                 {
4015                     temp[i] = range.front;
4016                 }
4017 
4018                 temp = temp[0 .. i];
4019                 auto ret = nPopped;
4020                 nPopped += temp.length;
4021                 return ret;
4022             }
4023 
4024             static if (bufferTrick) size_t makeTemp()
4025             {
4026                 import std.algorithm.mutation : swap;
4027                 rangeMutex.lock();
4028                 scope(exit) rangeMutex.unlock();
4029 
4030                 // Elide copying by just swapping buffers.
4031                 temp.length = range.buf1.length;
4032                 swap(range.buf1, temp);
4033 
4034                 // This is necessary in case popFront() has been called on
4035                 // range before entering the parallel foreach loop.
4036                 temp = temp[range.bufPos..$];
4037 
4038                 static if (is(typeof(range._length)))
4039                 {
4040                     range._length -= (temp.length - range.bufPos);
4041                 }
4042 
4043                 range.doBufSwap();
4044                 auto ret = nPopped;
4045                 nPopped += temp.length;
4046                 return ret;
4047             }
4048         }
4049 
4050         while (atomicLoad(shouldContinue))
4051         {
4052             auto overallIndex = makeTemp();
4053             if (temp.empty)
4054             {
4055                 atomicStore(shouldContinue, false);
4056                 break;
4057             }
4058 
4059             foreach (i; 0 .. temp.length)
4060             {
4061                 scope(success) overallIndex++;
4062 
4063                 static if (hasLvalueElements!R)
4064                 {
4065                     static if (withIndex)
4066                     {
4067                         if (dg(overallIndex, *temp[i])) foreachErr();
4068                     }
4069                     else
4070                     {
4071                         if (dg(*temp[i])) foreachErr();
4072                     }
4073                 }
4074                 else
4075                 {
4076                     static if (withIndex)
4077                     {
4078                         if (dg(overallIndex, temp[i])) foreachErr();
4079                     }
4080                     else
4081                     {
4082                         if (dg(temp[i])) foreachErr();
4083                     }
4084                 }
4085             }
4086         }
4087     }
4088 
4089     submitAndExecute(pool, &doIt);
4090 
4091     return 0;
4092 };
4093 
4094 
4095 private struct ParallelForeach(R)
4096 {
4097     TaskPool pool;
4098     R range;
4099     size_t workUnitSize;
4100     alias E = ElementType!R;
4101 
4102     static if (hasLvalueElements!R)
4103     {
4104         alias NoIndexDg = int delegate(ref E);
4105         alias IndexDg = int delegate(size_t, ref E);
4106     }
4107     else
4108     {
4109         alias NoIndexDg = int delegate(E);
4110         alias IndexDg = int delegate(size_t, E);
4111     }
4112 
4113     int opApply(scope NoIndexDg dg)
4114     {
4115         static if (randLen!R)
4116         {
4117             mixin(parallelApplyMixinRandomAccess);
4118         }
4119         else
4120         {
4121             mixin(parallelApplyMixinInputRange);
4122         }
4123     }
4124 
4125     int opApply(scope IndexDg dg)
4126     {
4127         static if (randLen!R)
4128         {
4129             mixin(parallelApplyMixinRandomAccess);
4130         }
4131         else
4132         {
4133             mixin(parallelApplyMixinInputRange);
4134         }
4135     }
4136 }
4137 
4138 /*
4139 This struct buffers the output of a callable that outputs data into a
4140 user-supplied buffer into a set of buffers of some fixed size.  It allows these
4141 buffers to be accessed with an input range interface.  This is used internally
4142 in the buffer-recycling overload of TaskPool.asyncBuf, which creates an
4143 instance and forwards it to the input range overload of asyncBuf.
4144 */
4145 private struct RoundRobinBuffer(C1, C2)
4146 {
4147     // No need for constraints because they're already checked for in asyncBuf.
4148 
4149     alias Array = Parameters!(C1.init)[0];
4150     alias T = typeof(Array.init[0]);
4151 
4152     T[][] bufs;
4153     size_t index;
4154     C1 nextDel;
4155     C2 emptyDel;
4156     bool _empty;
4157     bool primed;
4158 
4159     this(
4160         C1 nextDel,
4161         C2 emptyDel,
4162         size_t initialBufSize,
4163         size_t nBuffers
4164     ) {
4165         this.nextDel = nextDel;
4166         this.emptyDel = emptyDel;
4167         bufs.length = nBuffers;
4168 
4169         foreach (ref buf; bufs)
4170         {
4171             buf.length = initialBufSize;
4172         }
4173     }
4174 
4175     void prime()
4176     in
4177     {
4178         assert(!empty);
4179     }
4180     do
4181     {
4182         scope(success) primed = true;
4183         nextDel(bufs[index]);
4184     }
4185 
4186 
4187     T[] front() @property
4188     in
4189     {
4190         assert(!empty);
4191     }
4192     do
4193     {
4194         if (!primed) prime();
4195         return bufs[index];
4196     }
4197 
4198     void popFront()
4199     {
4200         if (empty || emptyDel())
4201         {
4202             _empty = true;
4203             return;
4204         }
4205 
4206         index = (index + 1) % bufs.length;
4207         primed = false;
4208     }
4209 
4210     bool empty() @property const @safe pure nothrow
4211     {
4212         return _empty;
4213     }
4214 }
4215 
4216 version (StdUnittest)
4217 {
4218     // This was the only way I could get nested maps to work.
4219     private __gshared TaskPool poolInstance;
4220 }
4221 
4222 // These test basic functionality but don't stress test for threading bugs.
4223 // These are the tests that should be run every time Phobos is compiled.
4224 @system unittest
4225 {
4226     import std.algorithm.comparison : equal, min, max;
4227     import std.algorithm.iteration : filter, map, reduce;
4228     import std.array : split;
4229     import std.conv : text;
4230     import std.exception : assertThrown;
4231     import std.math.operations : isClose;
4232     import std.math.algebraic : sqrt, abs;
4233     import std.math.exponential : log;
4234     import std.range : indexed, iota, join;
4235     import std.typecons : Tuple, tuple;
4236     import std.stdio;
4237 
4238     poolInstance = new TaskPool(2);
4239     scope(exit) poolInstance.stop();
4240 
4241     // The only way this can be verified is manually.
4242     debug(std_parallelism) stderr.writeln("totalCPUs = ", totalCPUs);
4243 
4244     auto oldPriority = poolInstance.priority;
4245     poolInstance.priority = Thread.PRIORITY_MAX;
4246     assert(poolInstance.priority == Thread.PRIORITY_MAX);
4247 
4248     poolInstance.priority = Thread.PRIORITY_MIN;
4249     assert(poolInstance.priority == Thread.PRIORITY_MIN);
4250 
4251     poolInstance.priority = oldPriority;
4252     assert(poolInstance.priority == oldPriority);
4253 
4254     static void refFun(ref uint num)
4255     {
4256         num++;
4257     }
4258 
4259     uint x;
4260 
4261     // Test task().
4262     auto t = task!refFun(x);
4263     poolInstance.put(t);
4264     t.yieldForce;
4265     assert(t.args[0] == 1);
4266 
4267     auto t2 = task(&refFun, x);
4268     poolInstance.put(t2);
4269     t2.yieldForce;
4270     assert(t2.args[0] == 1);
4271 
4272     // Test scopedTask().
4273     auto st = scopedTask!refFun(x);
4274     poolInstance.put(st);
4275     st.yieldForce;
4276     assert(st.args[0] == 1);
4277 
4278     auto st2 = scopedTask(&refFun, x);
4279     poolInstance.put(st2);
4280     st2.yieldForce;
4281     assert(st2.args[0] == 1);
4282 
4283     // Test executeInNewThread().
4284     auto ct = scopedTask!refFun(x);
4285     ct.executeInNewThread(Thread.PRIORITY_MAX);
4286     ct.yieldForce;
4287     assert(ct.args[0] == 1);
4288 
4289     // Test ref return.
4290     uint toInc = 0;
4291     static ref T makeRef(T)(ref T num)
4292     {
4293         return num;
4294     }
4295 
4296     auto t3 = task!makeRef(toInc);
4297     taskPool.put(t3);
4298     assert(t3.args[0] == 0);
4299     t3.spinForce++;
4300     assert(t3.args[0] == 1);
4301 
4302     static void testSafe() @safe {
4303         static int bump(int num)
4304         {
4305             return num + 1;
4306         }
4307 
4308         auto safePool = new TaskPool(0);
4309         auto t = task(&bump, 1);
4310         taskPool.put(t);
4311         assert(t.yieldForce == 2);
4312 
4313         auto st = scopedTask(&bump, 1);
4314         taskPool.put(st);
4315         assert(st.yieldForce == 2);
4316         safePool.stop();
4317     }
4318 
4319     auto arr = [1,2,3,4,5];
4320     auto nums = new uint[5];
4321     auto nums2 = new uint[5];
4322 
4323     foreach (i, ref elem; poolInstance.parallel(arr))
4324     {
4325         elem++;
4326         nums[i] = cast(uint) i + 2;
4327         nums2[i] = elem;
4328     }
4329 
4330     assert(nums == [2,3,4,5,6], text(nums));
4331     assert(nums2 == nums, text(nums2));
4332     assert(arr == nums, text(arr));
4333 
4334     // Test const/immutable arguments.
4335     static int add(int lhs, int rhs)
4336     {
4337         return lhs + rhs;
4338     }
4339     immutable addLhs = 1;
4340     immutable addRhs = 2;
4341     auto addTask = task(&add, addLhs, addRhs);
4342     auto addScopedTask = scopedTask(&add, addLhs, addRhs);
4343     poolInstance.put(addTask);
4344     poolInstance.put(addScopedTask);
4345     assert(addTask.yieldForce == 3);
4346     assert(addScopedTask.yieldForce == 3);
4347 
4348     // Test parallel foreach with non-random access range.
4349     auto range = filter!"a != 666"([0, 1, 2, 3, 4]);
4350 
4351     foreach (i, elem; poolInstance.parallel(range))
4352     {
4353         nums[i] = cast(uint) i;
4354     }
4355 
4356     assert(nums == [0,1,2,3,4]);
4357 
4358     auto logs = new double[1_000_000];
4359     foreach (i, ref elem; poolInstance.parallel(logs))
4360     {
4361         elem = log(i + 1.0);
4362     }
4363 
4364     foreach (i, elem; logs)
4365     {
4366         assert(isClose(elem, log(double(i + 1))));
4367     }
4368 
4369     assert(poolInstance.amap!"a * a"([1,2,3,4,5]) == [1,4,9,16,25]);
4370     assert(poolInstance.amap!"a * a"([1,2,3,4,5], new long[5]) == [1,4,9,16,25]);
4371     assert(poolInstance.amap!("a * a", "-a")([1,2,3]) ==
4372            [tuple(1, -1), tuple(4, -2), tuple(9, -3)]);
4373 
4374     auto tupleBuf = new Tuple!(int, int)[3];
4375     poolInstance.amap!("a * a", "-a")([1,2,3], tupleBuf);
4376     assert(tupleBuf == [tuple(1, -1), tuple(4, -2), tuple(9, -3)]);
4377     poolInstance.amap!("a * a", "-a")([1,2,3], 5, tupleBuf);
4378     assert(tupleBuf == [tuple(1, -1), tuple(4, -2), tuple(9, -3)]);
4379 
4380     // Test amap with a non-array buffer.
4381     auto toIndex = new int[5];
4382     auto ind = indexed(toIndex, [3, 1, 4, 0, 2]);
4383     poolInstance.amap!"a * 2"([1, 2, 3, 4, 5], ind);
4384     assert(equal(ind, [2, 4, 6, 8, 10]));
4385     assert(equal(toIndex, [8, 4, 10, 2, 6]));
4386     poolInstance.amap!"a / 2"(ind, ind);
4387     assert(equal(ind, [1, 2, 3, 4, 5]));
4388     assert(equal(toIndex, [4, 2, 5, 1, 3]));
4389 
4390     auto buf = new int[5];
4391     poolInstance.amap!"a * a"([1,2,3,4,5], buf);
4392     assert(buf == [1,4,9,16,25]);
4393     poolInstance.amap!"a * a"([1,2,3,4,5], 4, buf);
4394     assert(buf == [1,4,9,16,25]);
4395 
4396     assert(poolInstance.reduce!"a + b"([1]) == 1);
4397     assert(poolInstance.reduce!"a + b"([1,2,3,4]) == 10);
4398     assert(poolInstance.reduce!"a + b"(0.0, [1,2,3,4]) == 10);
4399     assert(poolInstance.reduce!"a + b"(0.0, [1,2,3,4], 1) == 10);
4400     assert(poolInstance.reduce!(min, max)([1,2,3,4]) == tuple(1, 4));
4401     assert(poolInstance.reduce!("a + b", "a * b")(tuple(0, 1), [1,2,3,4]) ==
4402            tuple(10, 24));
4403 
4404     immutable serialAns = reduce!"a + b"(iota(1000));
4405     assert(poolInstance.reduce!"a + b"(0, iota(1000)) == serialAns);
4406     assert(poolInstance.reduce!"a + b"(iota(1000)) == serialAns);
4407 
4408     // Test worker-local storage.
4409     auto wl = poolInstance.workerLocalStorage(0);
4410     foreach (i; poolInstance.parallel(iota(1000), 1))
4411     {
4412         wl.get = wl.get + i;
4413     }
4414 
4415     auto wlRange = wl.toRange;
4416     auto parallelSum = poolInstance.reduce!"a + b"(wlRange);
4417     assert(parallelSum == 499500);
4418     assert(wlRange[0 .. 1][0] == wlRange[0]);
4419     assert(wlRange[1 .. 2][0] == wlRange[1]);
4420 
4421     // Test finish()
4422     {
4423         static void slowFun() { Thread.sleep(dur!"msecs"(1)); }
4424 
4425         auto pool1 = new TaskPool();
4426         auto tSlow = task!slowFun();
4427         pool1.put(tSlow);
4428         pool1.finish();
4429         tSlow.yieldForce;
4430         // Can't assert that pool1.status == PoolState.stopNow because status
4431         // doesn't change until after the "done" flag is set and the waiting
4432         // thread is woken up.
4433 
4434         auto pool2 = new TaskPool();
4435         auto tSlow2 = task!slowFun();
4436         pool2.put(tSlow2);
4437         pool2.finish(true); // blocking
4438         assert(tSlow2.done);
4439 
4440         // Test fix for https://issues.dlang.org/show_bug.cgi?id=8582 by making pool size zero.
4441         auto pool3 = new TaskPool(0);
4442         auto tSlow3 = task!slowFun();
4443         pool3.put(tSlow3);
4444         pool3.finish(true); // blocking
4445         assert(tSlow3.done);
4446 
4447         // This is correct because no thread will terminate unless pool2.status
4448         // and pool3.status have already been set to stopNow.
4449         assert(pool2.status == TaskPool.PoolState.stopNow);
4450         assert(pool3.status == TaskPool.PoolState.stopNow);
4451     }
4452 
4453     // Test default pool stuff.
4454     assert(taskPool.size == totalCPUs - 1);
4455 
4456     nums = new uint[1000];
4457     foreach (i; parallel(iota(1000)))
4458     {
4459         nums[i] = cast(uint) i;
4460     }
4461     assert(equal(nums, iota(1000)));
4462 
4463     assert(equal(
4464                poolInstance.map!"a * a"(iota(3_000_001), 10_000),
4465                map!"a * a"(iota(3_000_001))
4466            ));
4467 
4468     // The filter is to kill random access and test the non-random access
4469     // branch.
4470     assert(equal(
4471                poolInstance.map!"a * a"(
4472                    filter!"a == a"(iota(3_000_001)
4473                                   ), 10_000, 1000),
4474                map!"a * a"(iota(3_000_001))
4475            ));
4476 
4477     assert(
4478         reduce!"a + b"(0UL,
4479                        poolInstance.map!"a * a"(iota(300_001), 10_000)
4480                       ) ==
4481         reduce!"a + b"(0UL,
4482                        map!"a * a"(iota(300_001))
4483                       )
4484     );
4485 
4486     assert(equal(
4487                iota(1_000_002),
4488                poolInstance.asyncBuf(filter!"a == a"(iota(1_000_002)))
4489            ));
4490 
4491     {
4492         import std.conv : to;
4493         import std.file : deleteme;
4494 
4495         string temp_file = deleteme ~ "-tempDelMe.txt";
4496         auto file = File(temp_file, "wb");
4497         scope(exit)
4498         {
4499             file.close();
4500             import std.file;
4501             remove(temp_file);
4502         }
4503 
4504         auto written = [[1.0, 2, 3], [4.0, 5, 6], [7.0, 8, 9]];
4505         foreach (row; written)
4506         {
4507             file.writeln(join(to!(string[])(row), "\t"));
4508         }
4509 
4510         file = File(temp_file);
4511 
4512         void next(ref char[] buf)
4513         {
4514             file.readln(buf);
4515             import std.string : chomp;
4516             buf = chomp(buf);
4517         }
4518 
4519         double[][] read;
4520         auto asyncReader = taskPool.asyncBuf(&next, &file.eof);
4521 
4522         foreach (line; asyncReader)
4523         {
4524             if (line.length == 0) continue;
4525             auto ls = line.split("\t");
4526             read ~= to!(double[])(ls);
4527         }
4528 
4529         assert(read == written);
4530         file.close();
4531     }
4532 
4533     // Test Map/AsyncBuf chaining.
4534 
4535     auto abuf = poolInstance.asyncBuf(iota(-1.0, 3_000_000), 100);
4536     auto temp = poolInstance.map!sqrt(
4537                     abuf, 100, 5
4538                 );
4539     auto lmchain = poolInstance.map!"a * a"(temp, 100, 5);
4540     lmchain.popFront();
4541 
4542     int ii;
4543     foreach ( elem; (lmchain))
4544     {
4545         if (!isClose(elem, ii))
4546         {
4547             stderr.writeln(ii, '\t', elem);
4548         }
4549         ii++;
4550     }
4551 
4552     // Test buffer trick in parallel foreach.
4553     abuf = poolInstance.asyncBuf(iota(-1.0, 1_000_000), 100);
4554     abuf.popFront();
4555     auto bufTrickTest = new size_t[abuf.length];
4556     foreach (i, elem; parallel(abuf))
4557     {
4558         bufTrickTest[i] = i;
4559     }
4560 
4561     assert(equal(iota(1_000_000), bufTrickTest));
4562 
4563     auto myTask = task!(abs)(-1);
4564     taskPool.put(myTask);
4565     assert(myTask.spinForce == 1);
4566 
4567     // Test that worker local storage from one pool receives an index of 0
4568     // when the index is queried w.r.t. another pool.  The only way to do this
4569     // is non-deterministically.
4570     foreach (i; parallel(iota(1000), 1))
4571     {
4572         assert(poolInstance.workerIndex == 0);
4573     }
4574 
4575     foreach (i; poolInstance.parallel(iota(1000), 1))
4576     {
4577         assert(taskPool.workerIndex == 0);
4578     }
4579 
4580     // Test exception handling.
4581     static void parallelForeachThrow()
4582     {
4583         foreach (elem; parallel(iota(10)))
4584         {
4585             throw new Exception("");
4586         }
4587     }
4588 
4589     assertThrown!Exception(parallelForeachThrow());
4590 
4591     static int reduceException(int a, int b)
4592     {
4593         throw new Exception("");
4594     }
4595 
4596     assertThrown!Exception(
4597         poolInstance.reduce!reduceException(iota(3))
4598     );
4599 
4600     static int mapException(int a)
4601     {
4602         throw new Exception("");
4603     }
4604 
4605     assertThrown!Exception(
4606         poolInstance.amap!mapException(iota(3))
4607     );
4608 
4609     static void mapThrow()
4610     {
4611         auto m = poolInstance.map!mapException(iota(3));
4612         m.popFront();
4613     }
4614 
4615     assertThrown!Exception(mapThrow());
4616 
4617     struct ThrowingRange
4618     {
4619         @property int front()
4620         {
4621             return 1;
4622         }
4623         void popFront()
4624         {
4625             throw new Exception("");
4626         }
4627         enum bool empty = false;
4628     }
4629 
4630     assertThrown!Exception(poolInstance.asyncBuf(ThrowingRange.init));
4631 }
4632 
4633 //version = parallelismStressTest;
4634 
4635 // These are more like stress tests than real unit tests.  They print out
4636 // tons of stuff and should not be run every time make unittest is run.
4637 version (parallelismStressTest)
4638 {
4639     @system unittest
4640     {
4641         import std.stdio : stderr, writeln, readln;
4642         import std.range : iota;
4643         import std.algorithm.iteration : filter, reduce;
4644 
4645         size_t attempt;
4646         for (; attempt < 10; attempt++)
4647             foreach (poolSize; [0, 4])
4648         {
4649 
4650             poolInstance = new TaskPool(poolSize);
4651 
4652             uint[] numbers = new uint[1_000];
4653 
4654             foreach (i; poolInstance.parallel( iota(0, numbers.length)) )
4655             {
4656                 numbers[i] = cast(uint) i;
4657             }
4658 
4659             // Make sure it works.
4660             foreach (i; 0 .. numbers.length)
4661             {
4662                 assert(numbers[i] == i);
4663             }
4664 
4665             stderr.writeln("Done creating nums.");
4666 
4667 
4668             auto myNumbers = filter!"a % 7 > 0"( iota(0, 1000));
4669             foreach (num; poolInstance.parallel(myNumbers))
4670             {
4671                 assert(num % 7 > 0 && num < 1000);
4672             }
4673             stderr.writeln("Done modulus test.");
4674 
4675             uint[] squares = poolInstance.amap!"a * a"(numbers, 100);
4676             assert(squares.length == numbers.length);
4677             foreach (i, number; numbers)
4678             {
4679                 assert(squares[i] == number * number);
4680             }
4681             stderr.writeln("Done squares.");
4682 
4683             auto sumFuture = task!( reduce!"a + b" )(numbers);
4684             poolInstance.put(sumFuture);
4685 
4686             ulong sumSquares = 0;
4687             foreach (elem; numbers)
4688             {
4689                 sumSquares += elem * elem;
4690             }
4691 
4692             uint mySum = sumFuture.spinForce();
4693             assert(mySum == 999 * 1000 / 2);
4694 
4695             auto mySumParallel = poolInstance.reduce!"a + b"(numbers);
4696             assert(mySum == mySumParallel);
4697             stderr.writeln("Done sums.");
4698 
4699             auto myTask = task(
4700             {
4701                 synchronized writeln("Our lives are parallel...Our lives are parallel.");
4702             });
4703             poolInstance.put(myTask);
4704 
4705             auto nestedOuter = "abcd";
4706             auto nestedInner =  iota(0, 10, 2);
4707 
4708             foreach (i, letter; poolInstance.parallel(nestedOuter, 1))
4709             {
4710                 foreach (j, number; poolInstance.parallel(nestedInner, 1))
4711                 {
4712                     synchronized writeln(i, ": ", letter, "  ", j, ": ", number);
4713                 }
4714             }
4715 
4716             poolInstance.stop();
4717         }
4718 
4719         assert(attempt == 10);
4720         writeln("Press enter to go to next round of unittests.");
4721         readln();
4722     }
4723 
4724     // These unittests are intended more for actual testing and not so much
4725     // as examples.
4726     @system unittest
4727     {
4728         import std.stdio : stderr;
4729         import std.range : iota;
4730         import std.algorithm.iteration : filter, reduce;
4731         import std.math.algebraic : sqrt;
4732         import std.math.operations : isClose;
4733         import std.math.traits : isNaN;
4734         import std.conv : text;
4735 
4736         foreach (attempt; 0 .. 10)
4737         foreach (poolSize; [0, 4])
4738         {
4739             poolInstance = new TaskPool(poolSize);
4740 
4741             // Test indexing.
4742             stderr.writeln("Creator Raw Index:  ", poolInstance.threadIndex);
4743             assert(poolInstance.workerIndex() == 0);
4744 
4745             // Test worker-local storage.
4746             auto workerLocalStorage = poolInstance.workerLocalStorage!uint(1);
4747             foreach (i; poolInstance.parallel(iota(0U, 1_000_000)))
4748             {
4749                 workerLocalStorage.get++;
4750             }
4751             assert(reduce!"a + b"(workerLocalStorage.toRange) ==
4752             1_000_000 + poolInstance.size + 1);
4753 
4754             // Make sure work is reasonably balanced among threads.  This test is
4755             // non-deterministic and is more of a sanity check than something that
4756             // has an absolute pass/fail.
4757             shared(uint)[void*] nJobsByThread;
4758             foreach (thread; poolInstance.pool)
4759             {
4760                 nJobsByThread[cast(void*) thread] = 0;
4761             }
4762             nJobsByThread[ cast(void*) Thread.getThis()] = 0;
4763 
4764             foreach (i; poolInstance.parallel( iota(0, 1_000_000), 100 ))
4765             {
4766                 atomicOp!"+="( nJobsByThread[ cast(void*) Thread.getThis() ], 1);
4767             }
4768 
4769             stderr.writeln("\nCurrent thread is:  ",
4770             cast(void*) Thread.getThis());
4771             stderr.writeln("Workload distribution:  ");
4772             foreach (k, v; nJobsByThread)
4773             {
4774                 stderr.writeln(k, '\t', v);
4775             }
4776 
4777             // Test whether amap can be nested.
4778             real[][] matrix = new real[][](1000, 1000);
4779             foreach (i; poolInstance.parallel( iota(0, matrix.length) ))
4780             {
4781                 foreach (j; poolInstance.parallel( iota(0, matrix[0].length) ))
4782                 {
4783                     matrix[i][j] = i * j;
4784                 }
4785             }
4786 
4787             // Get around weird bugs having to do w/ sqrt being an intrinsic:
4788             static real mySqrt(real num)
4789             {
4790                 return sqrt(num);
4791             }
4792 
4793             static real[] parallelSqrt(real[] nums)
4794             {
4795                 return poolInstance.amap!mySqrt(nums);
4796             }
4797 
4798             real[][] sqrtMatrix = poolInstance.amap!parallelSqrt(matrix);
4799 
4800             foreach (i, row; sqrtMatrix)
4801             {
4802                 foreach (j, elem; row)
4803                 {
4804                     real shouldBe = sqrt( cast(real) i * j);
4805                     assert(isClose(shouldBe, elem));
4806                     sqrtMatrix[i][j] = shouldBe;
4807                 }
4808             }
4809 
4810             auto saySuccess = task(
4811             {
4812                 stderr.writeln(
4813                     "Success doing matrix stuff that involves nested pool use.");
4814             });
4815             poolInstance.put(saySuccess);
4816             saySuccess.workForce();
4817 
4818             // A more thorough test of amap, reduce:  Find the sum of the square roots of
4819             // matrix.
4820 
4821             static real parallelSum(real[] input)
4822             {
4823                 return poolInstance.reduce!"a + b"(input);
4824             }
4825 
4826             auto sumSqrt = poolInstance.reduce!"a + b"(
4827                                poolInstance.amap!parallelSum(
4828                                    sqrtMatrix
4829                                )
4830                            );
4831 
4832             assert(isClose(sumSqrt, 4.437e8, 1e-2));
4833             stderr.writeln("Done sum of square roots.");
4834 
4835             // Test whether tasks work with function pointers.
4836             /+ // This part is buggy and needs to be fixed...
4837             auto nanTask = task(&isNaN, 1.0L);
4838             poolInstance.put(nanTask);
4839             assert(nanTask.spinForce == false);
4840 
4841             if (poolInstance.size > 0)
4842             {
4843                 // Test work waiting.
4844                 static void uselessFun()
4845                 {
4846                     foreach (i; 0 .. 1_000_000) {}
4847                 }
4848 
4849                 auto uselessTasks = new typeof(task(&uselessFun))[1000];
4850                 foreach (ref uselessTask; uselessTasks)
4851                 {
4852                     uselessTask = task(&uselessFun);
4853                 }
4854                 foreach (ref uselessTask; uselessTasks)
4855                 {
4856                     poolInstance.put(uselessTask);
4857                 }
4858                 foreach (ref uselessTask; uselessTasks)
4859                 {
4860                     uselessTask.workForce();
4861                 }
4862             }
4863              +/
4864 
4865             // Test the case of non-random access + ref returns.
4866             int[] nums = [1,2,3,4,5];
4867             static struct RemoveRandom
4868             {
4869                 int[] arr;
4870 
4871                 ref int front()
4872                 {
4873                     return arr.front;
4874                 }
4875                 void popFront()
4876                 {
4877                     arr.popFront();
4878                 }
4879                 bool empty()
4880                 {
4881                     return arr.empty;
4882                 }
4883             }
4884 
4885             auto refRange = RemoveRandom(nums);
4886             foreach (ref elem; poolInstance.parallel(refRange))
4887             {
4888                 elem++;
4889             }
4890             assert(nums == [2,3,4,5,6], text(nums));
4891             stderr.writeln("Nums:  ", nums);
4892 
4893             poolInstance.stop();
4894         }
4895     }
4896 }
4897 
4898 @system unittest
4899 {
4900     static struct __S_12733
4901     {
4902         invariant() { assert(checksum == 1_234_567_890); }
4903         this(ulong u){n = u;}
4904         void opAssign(__S_12733 s){this.n = s.n;}
4905         ulong n;
4906         ulong checksum = 1_234_567_890;
4907     }
4908 
4909     static auto __genPair_12733(ulong n) { return __S_12733(n); }
4910     immutable ulong[] data = [ 2UL^^59-1, 2UL^^59-1, 2UL^^59-1, 112_272_537_195_293UL ];
4911 
4912     auto result = taskPool.amap!__genPair_12733(data);
4913 }
4914 
4915 @safe unittest
4916 {
4917     import std.range : iota;
4918 
4919     // this test was in std.range, but caused cycles.
4920     assert(__traits(compiles, { foreach (i; iota(0, 100UL).parallel) {} }));
4921 }
4922 
4923 @safe unittest
4924 {
4925     import std.algorithm.iteration : each;
4926 
4927     long[] arr;
4928     static assert(is(typeof({
4929         arr.parallel.each!"a++";
4930     })));
4931 }
4932 
4933 // https://issues.dlang.org/show_bug.cgi?id=17539
4934 @system unittest
4935 {
4936     import std.random : rndGen;
4937     // ensure compilation
4938     try foreach (rnd; rndGen.parallel) break;
4939     catch (ParallelForeachError e) {}
4940 }