1 /**
2 `std.parallelism` implements high-level primitives for SMP parallelism.
3 These include parallel foreach, parallel reduce, parallel eager map, pipelining
4 and future/promise parallelism.  `std.parallelism` is recommended when the
5 same operation is to be executed in parallel on different data, or when a
6 function is to be executed in a background thread and its result returned to a
7 well-defined main thread.  For communication between arbitrary threads, see
8 `std.concurrency`.
9 
10 `std.parallelism` is based on the concept of a `Task`.  A `Task` is an
11 object that represents the fundamental unit of work in this library and may be
12 executed in parallel with any other `Task`.  Using `Task`
13 directly allows programming with a future/promise paradigm.  All other
14 supported parallelism paradigms (parallel foreach, map, reduce, pipelining)
15 represent an additional level of abstraction over `Task`.  They
16 automatically create one or more `Task` objects, or closely related types
17 that are conceptually identical but not part of the public API.
18 
19 After creation, a `Task` may be executed in a new thread, or submitted
20 to a `TaskPool` for execution.  A `TaskPool` encapsulates a task queue
21 and its worker threads.  Its purpose is to efficiently map a large
22 number of `Task`s onto a smaller number of threads.  A task queue is a
23 FIFO queue of `Task` objects that have been submitted to the
24 `TaskPool` and are awaiting execution.  A worker thread is a thread that
25 is associated with exactly one task queue.  It executes the `Task` at the
26 front of its queue when the queue has work available, or sleeps when
27 no work is available.  Each task queue is associated with zero or
28 more worker threads.  If the result of a `Task` is needed before execution
29 by a worker thread has begun, the `Task` can be removed from the task queue
30 and executed immediately in the thread where the result is needed.
31 
32 Warning:  Unless marked as `@trusted` or `@safe`, artifacts in
33           this module allow implicit data sharing between threads and cannot
34           guarantee that client code is free from low level data races.
35 
36 Source:    $(PHOBOSSRC std/parallelism.d)
37 Author:  David Simcha
38 Copyright:  Copyright (c) 2009-2011, David Simcha.
39 License:    $(HTTP boost.org/LICENSE_1_0.txt, Boost License 1.0)
40 */
41 module std.parallelism;
42 
43 version (OSX)
44     version = Darwin;
45 else version (iOS)
46     version = Darwin;
47 else version (TVOS)
48     version = Darwin;
49 else version (WatchOS)
50     version = Darwin;
51 
52 ///
53 @system unittest
54 {
55     import std.algorithm.iteration : map;
56     import std.math.operations : isClose;
57     import std.parallelism : taskPool;
58     import std.range : iota;
59 
60     // Parallel reduce can be combined with
61     // std.algorithm.iteration.map to interesting effect.
62     // The following example (thanks to Russel Winder)
63     // calculates pi by quadrature  using
64     // std.algorithm.map and TaskPool.reduce.
65     // getTerm is evaluated in parallel as needed by
66     // TaskPool.reduce.
67     //
68     // Timings on an Intel i5-3450 quad core machine
69     // for n = 1_000_000_000:
70     //
71     // TaskPool.reduce:       1.067 s
72     // std.algorithm.reduce:  4.011 s
73 
74     enum n = 1_000_000;
75     enum delta = 1.0 / n;
76 
77     alias getTerm = (int i)
78     {
79         immutable x = ( i - 0.5 ) * delta;
80         return delta / ( 1.0 + x * x ) ;
81     };
82 
83     immutable pi = 4.0 * taskPool.reduce!"a + b"(n.iota.map!getTerm);
84 
85     assert(pi.isClose(3.14159, 1e-5));
86 }
87 
88 import core.atomic;
89 import core.memory;
90 import core.sync.condition;
91 import core.thread;
92 
93 import std.functional;
94 import std.meta;
95 import std.range.primitives;
96 import std.traits;
97 
98 /*
99 (For now public undocumented with reserved name.)
100 
101 A lazily initialized global constant. The underlying value is a shared global
102 statically initialized to `outOfBandValue` which must not be a legit value of
103 the constant. Upon the first call the situation is detected and the global is
104 initialized by calling `initializer`. The initializer is assumed to be pure
105 (even if not marked as such), i.e. return the same value upon repeated calls.
106 For that reason, no special precautions are taken so `initializer` may be called
107 more than one time leading to benign races on the cached value.
108 
109 In the quiescent state the cost of the function is an atomic load from a global.
110 
111 Params:
112     T = The type of the pseudo-constant (may be qualified)
113     outOfBandValue = A value that cannot be valid, it is used for initialization
114     initializer = The function performing initialization; must be `nothrow`
115 
116 Returns:
117     The lazily initialized value
118 */
119 @property pure
120 T __lazilyInitializedConstant(T, alias outOfBandValue, alias initializer)()
121 if (is(Unqual!T : T)
122     && is(typeof(initializer()) : T)
123     && is(typeof(outOfBandValue) : T))
124 {
125     static T impl() nothrow
126     {
127         // Thread-local cache
128         static Unqual!T tls = outOfBandValue;
129         auto local = tls;
130         // Shortest path, no atomic operations
131         if (local != outOfBandValue) return local;
132         // Process-level cache
133         static shared Unqual!T result = outOfBandValue;
134         // Initialize both process-level cache and tls
135         local = atomicLoad(result);
136         if (local == outOfBandValue)
137         {
138             local = initializer();
139             atomicStore(result, local);
140         }
141         tls = local;
142         return local;
143     }
144 
145     import std.traits : SetFunctionAttributes;
146     alias Fun = SetFunctionAttributes!(typeof(&impl), "D",
147         functionAttributes!(typeof(&impl)) | FunctionAttribute.pure_);
148     auto purified = (() @trusted => cast(Fun) &impl)();
149     return purified();
150 }
151 
152 // Returns the size of a cache line.
153 alias cacheLineSize =
154     __lazilyInitializedConstant!(immutable(size_t), size_t.max, cacheLineSizeImpl);
155 
156 private size_t cacheLineSizeImpl() @nogc nothrow @trusted
157 {
158     size_t result = 0;
159     import core.cpuid : datacache;
160     foreach (ref const cachelevel; datacache)
161     {
162         if (cachelevel.lineSize > result && cachelevel.lineSize < uint.max)
163         {
164             result = cachelevel.lineSize;
165         }
166     }
167     return result;
168 }
169 
170 @nogc @safe nothrow unittest
171 {
172     assert(cacheLineSize == cacheLineSizeImpl);
173 }
174 
175 /* Atomics code.  These forward to core.atomic, but are written like this
176    for two reasons:
177 
178    1.  They used to actually contain ASM code and I don' want to have to change
179        to directly calling core.atomic in a zillion different places.
180 
181    2.  core.atomic has some misc. issues that make my use cases difficult
182        without wrapping it.  If I didn't wrap it, casts would be required
183        basically everywhere.
184 */
185 private void atomicSetUbyte(T)(ref T stuff, T newVal)
186 if (__traits(isIntegral, T) && is(T : ubyte))
187 {
188     //core.atomic.cas(cast(shared) &stuff, stuff, newVal);
189     atomicStore(*(cast(shared) &stuff), newVal);
190 }
191 
192 private ubyte atomicReadUbyte(T)(ref T val)
193 if (__traits(isIntegral, T) && is(T : ubyte))
194 {
195     return atomicLoad(*(cast(shared) &val));
196 }
197 
198 // This gets rid of the need for a lot of annoying casts in other parts of the
199 // code, when enums are involved.
200 private bool atomicCasUbyte(T)(ref T stuff, T testVal, T newVal)
201 if (__traits(isIntegral, T) && is(T : ubyte))
202 {
203     return core.atomic.cas(cast(shared) &stuff, testVal, newVal);
204 }
205 
206 /*--------------------- Generic helper functions, etc.------------------------*/
207 private template MapType(R, functions...)
208 {
209     static assert(functions.length);
210 
211     ElementType!R e = void;
212     alias MapType =
213         typeof(adjoin!(staticMap!(unaryFun, functions))(e));
214 }
215 
216 private template ReduceType(alias fun, R, E)
217 {
218     alias ReduceType = typeof(binaryFun!fun(E.init, ElementType!R.init));
219 }
220 
221 private template noUnsharedAliasing(T)
222 {
223     enum bool noUnsharedAliasing = !hasUnsharedAliasing!T;
224 }
225 
226 // This template tests whether a function may be executed in parallel from
227 // @safe code via Task.executeInNewThread().  There is an additional
228 // requirement for executing it via a TaskPool.  (See isSafeReturn).
229 private template isSafeTask(F)
230 {
231     enum bool isSafeTask =
232         (functionAttributes!F & (FunctionAttribute.safe | FunctionAttribute.trusted)) != 0 &&
233         (functionAttributes!F & FunctionAttribute.ref_) == 0 &&
234         (isFunctionPointer!F || !hasUnsharedAliasing!F) &&
235         allSatisfy!(noUnsharedAliasing, Parameters!F);
236 }
237 
238 @safe unittest
239 {
240     alias F1 = void function() @safe;
241     alias F2 = void function();
242     alias F3 = void function(uint, string) @trusted;
243     alias F4 = void function(uint, char[]);
244 
245     static assert( isSafeTask!F1);
246     static assert(!isSafeTask!F2);
247     static assert( isSafeTask!F3);
248     static assert(!isSafeTask!F4);
249 
250     alias F5 = uint[] function(uint, string) pure @trusted;
251     static assert( isSafeTask!F5);
252 }
253 
254 // This function decides whether Tasks that meet all of the other requirements
255 // for being executed from @safe code can be executed on a TaskPool.
256 // When executing via TaskPool, it's theoretically possible
257 // to return a value that is also pointed to by a worker thread's thread local
258 // storage.  When executing from executeInNewThread(), the thread that executed
259 // the Task is terminated by the time the return value is visible in the calling
260 // thread, so this is a non-issue.  It's also a non-issue for pure functions
261 // since they can't read global state.
262 private template isSafeReturn(T)
263 {
264     static if (!hasUnsharedAliasing!(T.ReturnType))
265     {
266         enum isSafeReturn = true;
267     }
268     else static if (T.isPure)
269     {
270         enum isSafeReturn = true;
271     }
272     else
273     {
274         enum isSafeReturn = false;
275     }
276 }
277 
278 private template randAssignable(R)
279 {
280     enum randAssignable = isRandomAccessRange!R && hasAssignableElements!R;
281 }
282 
283 private enum TaskStatus : ubyte
284 {
285     notStarted,
286     inProgress,
287     done
288 }
289 
290 private template AliasReturn(alias fun, T...)
291 {
292     alias AliasReturn = typeof({ T args; return fun(args); });
293 }
294 
295 // Should be private, but std.algorithm.reduce is used in the zero-thread case
296 // and won't work w/ private.
297 template reduceAdjoin(functions...)
298 {
299     static if (functions.length == 1)
300     {
301         alias reduceAdjoin = binaryFun!(functions[0]);
302     }
303     else
304     {
305         T reduceAdjoin(T, U)(T lhs, U rhs)
306         {
307             alias funs = staticMap!(binaryFun, functions);
308 
309             foreach (i, Unused; typeof(lhs.expand))
310             {
311                 lhs.expand[i] = funs[i](lhs.expand[i], rhs);
312             }
313 
314             return lhs;
315         }
316     }
317 }
318 
319 private template reduceFinish(functions...)
320 {
321     static if (functions.length == 1)
322     {
323         alias reduceFinish = binaryFun!(functions[0]);
324     }
325     else
326     {
327         T reduceFinish(T)(T lhs, T rhs)
328         {
329             alias funs = staticMap!(binaryFun, functions);
330 
331             foreach (i, Unused; typeof(lhs.expand))
332             {
333                 lhs.expand[i] = funs[i](lhs.expand[i], rhs.expand[i]);
334             }
335 
336             return lhs;
337         }
338     }
339 }
340 
341 private template isRoundRobin(R : RoundRobinBuffer!(C1, C2), C1, C2)
342 {
343     enum isRoundRobin = true;
344 }
345 
346 private template isRoundRobin(T)
347 {
348     enum isRoundRobin = false;
349 }
350 
351 @safe unittest
352 {
353     static assert( isRoundRobin!(RoundRobinBuffer!(void delegate(char[]), bool delegate())));
354     static assert(!isRoundRobin!(uint));
355 }
356 
357 // This is the base "class" for all of the other tasks.  Using C-style
358 // polymorphism to allow more direct control over memory allocation, etc.
359 private struct AbstractTask
360 {
361     AbstractTask* prev;
362     AbstractTask* next;
363 
364     // Pointer to a function that executes this task.
365     void function(void*) runTask;
366 
367     Throwable exception;
368     ubyte taskStatus = TaskStatus.notStarted;
369 
370     bool done() @property
371     {
372         if (atomicReadUbyte(taskStatus) == TaskStatus.done)
373         {
374             if (exception)
375             {
376                 throw exception;
377             }
378 
379             return true;
380         }
381 
382         return false;
383     }
384 
385     void job()
386     {
387         runTask(&this);
388     }
389 }
390 
391 /**
392 `Task` represents the fundamental unit of work.  A `Task` may be
393 executed in parallel with any other `Task`.  Using this struct directly
394 allows future/promise parallelism.  In this paradigm, a function (or delegate
395 or other callable) is executed in a thread other than the one it was called
396 from.  The calling thread does not block while the function is being executed.
397 A call to `workForce`, `yieldForce`, or `spinForce` is used to
398 ensure that the `Task` has finished executing and to obtain the return
399 value, if any.  These functions and `done` also act as full memory barriers,
400 meaning that any memory writes made in the thread that executed the `Task`
401 are guaranteed to be visible in the calling thread after one of these functions
402 returns.
403 
404 The $(REF task, std,parallelism) and $(REF scopedTask, std,parallelism) functions can
405 be used to create an instance of this struct.  See `task` for usage examples.
406 
407 Function results are returned from `yieldForce`, `spinForce` and
408 `workForce` by ref.  If `fun` returns by ref, the reference will point
409 to the returned reference of `fun`.  Otherwise it will point to a
410 field in this struct.
411 
412 Copying of this struct is disabled, since it would provide no useful semantics.
413 If you want to pass this struct around, you should do so by reference or
414 pointer.
415 
416 Bugs:  Changes to `ref` and `out` arguments are not propagated to the
417        call site, only to `args` in this struct.
418 */
419 struct Task(alias fun, Args...)
420 {
421     private AbstractTask base = {runTask : &impl};
422     private alias base this;
423 
424     private @property AbstractTask* basePtr()
425     {
426         return &base;
427     }
428 
429     private static void impl(void* myTask)
430     {
431         import std.algorithm.internal : addressOf;
432 
433         Task* myCastedTask = cast(typeof(this)*) myTask;
434         static if (is(ReturnType == void))
435         {
436             fun(myCastedTask._args);
437         }
438         else static if (is(typeof(&(fun(myCastedTask._args)))))
439         {
440             myCastedTask.returnVal = addressOf(fun(myCastedTask._args));
441         }
442         else
443         {
444             myCastedTask.returnVal = fun(myCastedTask._args);
445         }
446     }
447 
448     private TaskPool pool;
449     private bool isScoped;  // True if created with scopedTask.
450 
451     Args _args;
452 
453     /**
454     The arguments the function was called with.  Changes to `out` and
455     `ref` arguments will be visible here.
456     */
457     static if (__traits(isSame, fun, run))
458     {
459         alias args = _args[1..$];
460     }
461     else
462     {
463         alias args = _args;
464     }
465 
466 
467     // The purpose of this code is to decide whether functions whose
468     // return values have unshared aliasing can be executed via
469     // TaskPool from @safe code.  See isSafeReturn.
470     static if (__traits(isSame, fun, run))
471     {
472         static if (isFunctionPointer!(_args[0]))
473         {
474             private enum bool isPure =
475             (functionAttributes!(Args[0]) & FunctionAttribute.pure_) != 0;
476         }
477         else
478         {
479             // BUG:  Should check this for delegates too, but std.traits
480             //       apparently doesn't allow this.  isPure is irrelevant
481             //       for delegates, at least for now since shared delegates
482             //       don't work.
483             private enum bool isPure = false;
484         }
485 
486     }
487     else
488     {
489         // We already know that we can't execute aliases in @safe code, so
490         // just put a dummy value here.
491         private enum bool isPure = false;
492     }
493 
494 
495     /**
496     The return type of the function called by this `Task`.  This can be
497     `void`.
498     */
499     alias ReturnType = typeof(fun(_args));
500 
501     static if (!is(ReturnType == void))
502     {
503         static if (is(typeof(&fun(_args))))
504         {
505             // Ref return.
506             ReturnType* returnVal;
507 
508             ref ReturnType fixRef(ReturnType* val)
509             {
510                 return *val;
511             }
512 
513         }
514         else
515         {
516             ReturnType returnVal;
517 
518             ref ReturnType fixRef(ref ReturnType val)
519             {
520                 return val;
521             }
522         }
523     }
524 
525     private void enforcePool()
526     {
527         import std.exception : enforce;
528         enforce(this.pool !is null, "Job not submitted yet.");
529     }
530 
531     static if (Args.length > 0)
532     {
533         private this(Args args)
534         {
535             _args = args;
536         }
537     }
538 
539     // Work around DMD bug https://issues.dlang.org/show_bug.cgi?id=6588,
540     // allow immutable elements.
541     static if (allSatisfy!(isAssignable, Args))
542     {
543         typeof(this) opAssign(typeof(this) rhs)
544         {
545             foreach (i, Type; typeof(this.tupleof))
546             {
547                 this.tupleof[i] = rhs.tupleof[i];
548             }
549             return this;
550         }
551     }
552     else
553     {
554         @disable typeof(this) opAssign(typeof(this) rhs);
555     }
556 
557     /**
558     If the `Task` isn't started yet, execute it in the current thread.
559     If it's done, return its return value, if any.  If it's in progress,
560     busy spin until it's done, then return the return value.  If it threw
561     an exception, rethrow that exception.
562 
563     This function should be used when you expect the result of the
564     `Task` to be available on a timescale shorter than that of an OS
565     context switch.
566      */
567     @property ref ReturnType spinForce() @trusted
568     {
569         enforcePool();
570 
571         this.pool.tryDeleteExecute(basePtr);
572 
573         while (atomicReadUbyte(this.taskStatus) != TaskStatus.done) {}
574 
575         if (exception)
576         {
577             throw exception;
578         }
579 
580         static if (!is(ReturnType == void))
581         {
582             return fixRef(this.returnVal);
583         }
584     }
585 
586     /**
587     If the `Task` isn't started yet, execute it in the current thread.
588     If it's done, return its return value, if any.  If it's in progress,
589     wait on a condition variable.  If it threw an exception, rethrow that
590     exception.
591 
592     This function should be used for expensive functions, as waiting on a
593     condition variable introduces latency, but avoids wasted CPU cycles.
594      */
595     @property ref ReturnType yieldForce() @trusted
596     {
597         enforcePool();
598         this.pool.tryDeleteExecute(basePtr);
599 
600         if (done)
601         {
602             static if (is(ReturnType == void))
603             {
604                 return;
605             }
606             else
607             {
608                 return fixRef(this.returnVal);
609             }
610         }
611 
612         pool.waiterLock();
613         scope(exit) pool.waiterUnlock();
614 
615         while (atomicReadUbyte(this.taskStatus) != TaskStatus.done)
616         {
617             pool.waitUntilCompletion();
618         }
619 
620         if (exception)
621         {
622             throw exception; // nocoverage
623         }
624 
625         static if (!is(ReturnType == void))
626         {
627             return fixRef(this.returnVal);
628         }
629     }
630 
631     /**
632     If this `Task` was not started yet, execute it in the current
633     thread.  If it is finished, return its result.  If it is in progress,
634     execute any other `Task` from the `TaskPool` instance that
635     this `Task` was submitted to until this one
636     is finished.  If it threw an exception, rethrow that exception.
637     If no other tasks are available or this `Task` was executed using
638     `executeInNewThread`, wait on a condition variable.
639      */
640     @property ref ReturnType workForce() @trusted
641     {
642         enforcePool();
643         this.pool.tryDeleteExecute(basePtr);
644 
645         while (true)
646         {
647             if (done)    // done() implicitly checks for exceptions.
648             {
649                 static if (is(ReturnType == void))
650                 {
651                     return;
652                 }
653                 else
654                 {
655                     return fixRef(this.returnVal);
656                 }
657             }
658 
659             AbstractTask* job;
660             {
661                 // Locking explicitly and calling popNoSync() because
662                 // pop() waits on a condition variable if there are no Tasks
663                 // in the queue.
664 
665                 pool.queueLock();
666                 scope(exit) pool.queueUnlock();
667                 job = pool.popNoSync();
668             }
669 
670 
671             if (job !is null)
672             {
673 
674                 version (verboseUnittest)
675                 {
676                     stderr.writeln("Doing workForce work.");
677                 }
678 
679                 pool.doJob(job);
680 
681                 if (done)
682                 {
683                     static if (is(ReturnType == void))
684                     {
685                         return;
686                     }
687                     else
688                     {
689                         return fixRef(this.returnVal);
690                     }
691                 }
692             }
693             else
694             {
695                 version (verboseUnittest)
696                 {
697                     stderr.writeln("Yield from workForce.");
698                 }
699 
700                 return yieldForce;
701             }
702         }
703     }
704 
705     /**
706     Returns `true` if the `Task` is finished executing.
707 
708     Throws:  Rethrows any exception thrown during the execution of the
709              `Task`.
710     */
711     @property bool done() @trusted
712     {
713         // Explicitly forwarded for documentation purposes.
714         return base.done;
715     }
716 
717     /**
718     Create a new thread for executing this `Task`, execute it in the
719     newly created thread, then terminate the thread.  This can be used for
720     future/promise parallelism.  An explicit priority may be given
721     to the `Task`.  If one is provided, its value is forwarded to
722     `core.thread.Thread.priority`. See $(REF task, std,parallelism) for
723     usage example.
724     */
725     void executeInNewThread() @trusted
726     {
727         pool = new TaskPool(basePtr);
728     }
729 
730     /// Ditto
731     void executeInNewThread(int priority) @trusted
732     {
733         pool = new TaskPool(basePtr, priority);
734     }
735 
736     @safe ~this()
737     {
738         if (isScoped && pool !is null && taskStatus != TaskStatus.done)
739         {
740             yieldForce;
741         }
742     }
743 
744     // When this is uncommented, it somehow gets called on returning from
745     // scopedTask even though the struct shouldn't be getting copied.
746     //@disable this(this) {}
747 }
748 
749 // Calls `fpOrDelegate` with `args`.  This is an
750 // adapter that makes `Task` work with delegates, function pointers and
751 // functors instead of just aliases.
752 ReturnType!F run(F, Args...)(F fpOrDelegate, ref Args args)
753 {
754     return fpOrDelegate(args);
755 }
756 
757 /**
758 Creates a `Task` on the GC heap that calls an alias.  This may be executed
759 via `Task.executeInNewThread` or by submitting to a
760 $(REF TaskPool, std,parallelism).  A globally accessible instance of
761 `TaskPool` is provided by $(REF taskPool, std,parallelism).
762 
763 Returns:  A pointer to the `Task`.
764 
765 Example:
766 ---
767 // Read two files into memory at the same time.
768 import std.file;
769 
770 void main()
771 {
772     // Create and execute a Task for reading
773     // foo.txt.
774     auto file1Task = task!read("foo.txt");
775     file1Task.executeInNewThread();
776 
777     // Read bar.txt in parallel.
778     auto file2Data = read("bar.txt");
779 
780     // Get the results of reading foo.txt.
781     auto file1Data = file1Task.yieldForce;
782 }
783 ---
784 
785 ---
786 // Sorts an array using a parallel quick sort algorithm.
787 // The first partition is done serially.  Both recursion
788 // branches are then executed in parallel.
789 //
790 // Timings for sorting an array of 1,000,000 doubles on
791 // an Athlon 64 X2 dual core machine:
792 //
793 // This implementation:               176 milliseconds.
794 // Equivalent serial implementation:  280 milliseconds
795 void parallelSort(T)(T[] data)
796 {
797     // Sort small subarrays serially.
798     if (data.length < 100)
799     {
800          std.algorithm.sort(data);
801          return;
802     }
803 
804     // Partition the array.
805     swap(data[$ / 2], data[$ - 1]);
806     auto pivot = data[$ - 1];
807     bool lessThanPivot(T elem) { return elem < pivot; }
808 
809     auto greaterEqual = partition!lessThanPivot(data[0..$ - 1]);
810     swap(data[$ - greaterEqual.length - 1], data[$ - 1]);
811 
812     auto less = data[0..$ - greaterEqual.length - 1];
813     greaterEqual = data[$ - greaterEqual.length..$];
814 
815     // Execute both recursion branches in parallel.
816     auto recurseTask = task!parallelSort(greaterEqual);
817     taskPool.put(recurseTask);
818     parallelSort(less);
819     recurseTask.yieldForce;
820 }
821 ---
822 */
823 auto task(alias fun, Args...)(Args args)
824 {
825     return new Task!(fun, Args)(args);
826 }
827 
828 /**
829 Creates a `Task` on the GC heap that calls a function pointer, delegate, or
830 class/struct with overloaded opCall.
831 
832 Example:
833 ---
834 // Read two files in at the same time again,
835 // but this time use a function pointer instead
836 // of an alias to represent std.file.read.
837 import std.file;
838 
839 void main()
840 {
841     // Create and execute a Task for reading
842     // foo.txt.
843     auto file1Task = task(&read!string, "foo.txt", size_t.max);
844     file1Task.executeInNewThread();
845 
846     // Read bar.txt in parallel.
847     auto file2Data = read("bar.txt");
848 
849     // Get the results of reading foo.txt.
850     auto file1Data = file1Task.yieldForce;
851 }
852 ---
853 
854 Notes: This function takes a non-scope delegate, meaning it can be
855        used with closures.  If you can't allocate a closure due to objects
856        on the stack that have scoped destruction, see `scopedTask`, which
857        takes a scope delegate.
858  */
859 auto task(F, Args...)(F delegateOrFp, Args args)
860 if (is(typeof(delegateOrFp(args))) && !isSafeTask!F)
861 {
862     return new Task!(run, F, Args)(delegateOrFp, args);
863 }
864 
865 /**
866 Version of `task` usable from `@safe` code.  Usage mechanics are
867 identical to the non-@safe case, but safety introduces some restrictions:
868 
869 1.  `fun` must be @safe or @trusted.
870 
871 2.  `F` must not have any unshared aliasing as defined by
872     $(REF hasUnsharedAliasing, std,traits).  This means it
873     may not be an unshared delegate or a non-shared class or struct
874     with overloaded `opCall`.  This also precludes accepting template
875     alias parameters.
876 
877 3.  `Args` must not have unshared aliasing.
878 
879 4.  `fun` must not return by reference.
880 
881 5.  The return type must not have unshared aliasing unless `fun` is
882     `pure` or the `Task` is executed via `executeInNewThread` instead
883     of using a `TaskPool`.
884 
885 */
886 @trusted auto task(F, Args...)(F fun, Args args)
887 if (is(typeof(fun(args))) && isSafeTask!F)
888 {
889     return new Task!(run, F, Args)(fun, args);
890 }
891 
892 /**
893 These functions allow the creation of `Task` objects on the stack rather
894 than the GC heap.  The lifetime of a `Task` created by `scopedTask`
895 cannot exceed the lifetime of the scope it was created in.
896 
897 `scopedTask` might be preferred over `task`:
898 
899 1.  When a `Task` that calls a delegate is being created and a closure
900     cannot be allocated due to objects on the stack that have scoped
901     destruction.  The delegate overload of `scopedTask` takes a `scope`
902     delegate.
903 
904 2.  As a micro-optimization, to avoid the heap allocation associated with
905     `task` or with the creation of a closure.
906 
907 Usage is otherwise identical to `task`.
908 
909 Notes:  `Task` objects created using `scopedTask` will automatically
910 call `Task.yieldForce` in their destructor if necessary to ensure
911 the `Task` is complete before the stack frame they reside on is destroyed.
912 */
913 auto scopedTask(alias fun, Args...)(Args args)
914 {
915     auto ret = Task!(fun, Args)(args);
916     ret.isScoped = true;
917     return ret;
918 }
919 
920 /// Ditto
921 auto scopedTask(F, Args...)(scope F delegateOrFp, Args args)
922 if (is(typeof(delegateOrFp(args))) && !isSafeTask!F)
923 {
924     auto ret = Task!(run, F, Args)(delegateOrFp, args);
925     ret.isScoped = true;
926     return ret;
927 }
928 
929 /// Ditto
930 @trusted auto scopedTask(F, Args...)(F fun, Args args)
931 if (is(typeof(fun(args))) && isSafeTask!F)
932 {
933     auto ret = Task!(run, F, Args)(fun, args);
934     ret.isScoped = true;
935     return ret;
936 }
937 
938 /**
939 The total number of CPU cores available on the current machine, as reported by
940 the operating system.
941 */
942 alias totalCPUs =
943     __lazilyInitializedConstant!(immutable(uint), uint.max, totalCPUsImpl);
944 
945 uint totalCPUsImpl() @nogc nothrow @trusted
946 {
947     version (Windows)
948     {
949         // BUGS:  Only works on Windows 2000 and above.
950         import core.sys.windows.winbase : SYSTEM_INFO, GetSystemInfo;
951         import std.algorithm.comparison : max;
952         SYSTEM_INFO si;
953         GetSystemInfo(&si);
954         return max(1, cast(uint) si.dwNumberOfProcessors);
955     }
956     else version (linux)
957     {
958         import core.stdc.stdlib : calloc;
959         import core.stdc.string : memset;
960         import core.sys.linux.sched : CPU_ALLOC_SIZE, CPU_FREE, CPU_COUNT, CPU_COUNT_S, cpu_set_t, sched_getaffinity;
961         import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf;
962 
963         int count = 0;
964 
965         /**
966          * According to ruby's source code, CPU_ALLOC() doesn't work as expected.
967          *  see: https://github.com/ruby/ruby/commit/7d9e04de496915dd9e4544ee18b1a0026dc79242
968          *
969          *  The hardcode number also comes from ruby's source code.
970          *  see: https://github.com/ruby/ruby/commit/0fa75e813ecf0f5f3dd01f89aa76d0e25ab4fcd4
971          */
972         for (int n = 64; n <= 16384; n *= 2)
973         {
974             size_t size = CPU_ALLOC_SIZE(count);
975             if (size >= 0x400)
976             {
977                 auto cpuset = cast(cpu_set_t*) calloc(1, size);
978                 if (cpuset is null) break;
979                 if (sched_getaffinity(0, size, cpuset) == 0)
980                 {
981                     count = CPU_COUNT_S(size, cpuset);
982                 }
983                 CPU_FREE(cpuset);
984             }
985             else
986             {
987                 cpu_set_t cpuset;
988                 if (sched_getaffinity(0, cpu_set_t.sizeof, &cpuset) == 0)
989                 {
990                     count = CPU_COUNT(&cpuset);
991                 }
992             }
993 
994             if (count > 0)
995                 return cast(uint) count;
996         }
997 
998         return cast(uint) sysconf(_SC_NPROCESSORS_ONLN);
999     }
1000     else version (Darwin)
1001     {
1002         import core.sys.darwin.sys.sysctl : sysctlbyname;
1003         uint result;
1004         size_t len = result.sizeof;
1005         sysctlbyname("hw.physicalcpu", &result, &len, null, 0);
1006         return result;
1007     }
1008     else version (DragonFlyBSD)
1009     {
1010         import core.sys.dragonflybsd.sys.sysctl : sysctlbyname;
1011         uint result;
1012         size_t len = result.sizeof;
1013         sysctlbyname("hw.ncpu", &result, &len, null, 0);
1014         return result;
1015     }
1016     else version (FreeBSD)
1017     {
1018         import core.sys.freebsd.sys.sysctl : sysctlbyname;
1019         uint result;
1020         size_t len = result.sizeof;
1021         sysctlbyname("hw.ncpu", &result, &len, null, 0);
1022         return result;
1023     }
1024     else version (NetBSD)
1025     {
1026         import core.sys.netbsd.sys.sysctl : sysctlbyname;
1027         uint result;
1028         size_t len = result.sizeof;
1029         sysctlbyname("hw.ncpu", &result, &len, null, 0);
1030         return result;
1031     }
1032     else version (Solaris)
1033     {
1034         import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf;
1035         return cast(uint) sysconf(_SC_NPROCESSORS_ONLN);
1036     }
1037     else version (OpenBSD)
1038     {
1039         import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf;
1040         return cast(uint) sysconf(_SC_NPROCESSORS_ONLN);
1041     }
1042     else version (Hurd)
1043     {
1044         import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf;
1045         return cast(uint) sysconf(_SC_NPROCESSORS_ONLN);
1046     }
1047     else
1048     {
1049         static assert(0, "Don't know how to get N CPUs on this OS.");
1050     }
1051 }
1052 
1053 /*
1054 This class serves two purposes:
1055 
1056 1.  It distinguishes std.parallelism threads from other threads so that
1057     the std.parallelism daemon threads can be terminated.
1058 
1059 2.  It adds a reference to the pool that the thread is a member of,
1060     which is also necessary to allow the daemon threads to be properly
1061     terminated.
1062 */
1063 private final class ParallelismThread : Thread
1064 {
1065     this(void delegate() dg)
1066     {
1067         super(dg);
1068     }
1069 
1070     TaskPool pool;
1071 }
1072 
1073 // Kill daemon threads.
1074 shared static ~this()
1075 {
1076     foreach (ref thread; Thread)
1077     {
1078         auto pthread = cast(ParallelismThread) thread;
1079         if (pthread is null) continue;
1080         auto pool = pthread.pool;
1081         if (!pool.isDaemon) continue;
1082         pool.stop();
1083         pthread.join();
1084     }
1085 }
1086 
1087 /**
1088 This class encapsulates a task queue and a set of worker threads.  Its purpose
1089 is to efficiently map a large number of `Task`s onto a smaller number of
1090 threads.  A task queue is a FIFO queue of `Task` objects that have been
1091 submitted to the `TaskPool` and are awaiting execution.  A worker thread is a
1092 thread that executes the `Task` at the front of the queue when one is
1093 available and sleeps when the queue is empty.
1094 
1095 This class should usually be used via the global instantiation
1096 available via the $(REF taskPool, std,parallelism) property.
1097 Occasionally it is useful to explicitly instantiate a `TaskPool`:
1098 
1099 1.  When you want `TaskPool` instances with multiple priorities, for example
1100     a low priority pool and a high priority pool.
1101 
1102 2.  When the threads in the global task pool are waiting on a synchronization
1103     primitive (for example a mutex), and you want to parallelize the code that
1104     needs to run before these threads can be resumed.
1105 
1106 Note: The worker threads in this pool will not stop until
1107       `stop` or `finish` is called, even if the main thread
1108       has finished already. This may lead to programs that
1109       never end. If you do not want this behaviour, you can set `isDaemon`
1110       to true.
1111  */
1112 final class TaskPool
1113 {
1114 private:
1115 
1116     // A pool can either be a regular pool or a single-task pool.  A
1117     // single-task pool is a dummy pool that's fired up for
1118     // Task.executeInNewThread().
1119     bool isSingleTask;
1120 
1121     ParallelismThread[] pool;
1122     Thread singleTaskThread;
1123 
1124     AbstractTask* head;
1125     AbstractTask* tail;
1126     PoolState status = PoolState.running;
1127     Condition workerCondition;
1128     Condition waiterCondition;
1129     Mutex queueMutex;
1130     Mutex waiterMutex;  // For waiterCondition
1131 
1132     // The instanceStartIndex of the next instance that will be created.
1133     __gshared size_t nextInstanceIndex = 1;
1134 
1135     // The index of the current thread.
1136     static size_t threadIndex;
1137 
1138     // The index of the first thread in this instance.
1139     immutable size_t instanceStartIndex;
1140 
1141     // The index that the next thread to be initialized in this pool will have.
1142     size_t nextThreadIndex;
1143 
1144     enum PoolState : ubyte
1145     {
1146         running,
1147         finishing,
1148         stopNow
1149     }
1150 
1151     void doJob(AbstractTask* job)
1152     {
1153         assert(job.taskStatus == TaskStatus.inProgress);
1154         assert(job.next is null);
1155         assert(job.prev is null);
1156 
1157         scope(exit)
1158         {
1159             if (!isSingleTask)
1160             {
1161                 waiterLock();
1162                 scope(exit) waiterUnlock();
1163                 notifyWaiters();
1164             }
1165         }
1166 
1167         try
1168         {
1169             job.job();
1170         }
1171         catch (Throwable e)
1172         {
1173             job.exception = e;
1174         }
1175 
1176         atomicSetUbyte(job.taskStatus, TaskStatus.done);
1177     }
1178 
1179     // This function is used for dummy pools created by Task.executeInNewThread().
1180     void doSingleTask()
1181     {
1182         // No synchronization.  Pool is guaranteed to only have one thread,
1183         // and the queue is submitted to before this thread is created.
1184         assert(head);
1185         auto t = head;
1186         t.next = t.prev = head = null;
1187         doJob(t);
1188     }
1189 
1190     // This function performs initialization for each thread that affects
1191     // thread local storage and therefore must be done from within the
1192     // worker thread.  It then calls executeWorkLoop().
1193     void startWorkLoop()
1194     {
1195         // Initialize thread index.
1196         {
1197             queueLock();
1198             scope(exit) queueUnlock();
1199             threadIndex = nextThreadIndex;
1200             nextThreadIndex++;
1201         }
1202 
1203         executeWorkLoop();
1204     }
1205 
1206     // This is the main work loop that worker threads spend their time in
1207     // until they terminate.  It's also entered by non-worker threads when
1208     // finish() is called with the blocking variable set to true.
1209     void executeWorkLoop()
1210     {
1211         while (atomicReadUbyte(status) != PoolState.stopNow)
1212         {
1213             AbstractTask* task = pop();
1214             if (task is null)
1215             {
1216                 if (atomicReadUbyte(status) == PoolState.finishing)
1217                 {
1218                     atomicSetUbyte(status, PoolState.stopNow);
1219                     return;
1220                 }
1221             }
1222             else
1223             {
1224                 doJob(task);
1225             }
1226         }
1227     }
1228 
1229     // Pop a task off the queue.
1230     AbstractTask* pop()
1231     {
1232         queueLock();
1233         scope(exit) queueUnlock();
1234         auto ret = popNoSync();
1235         while (ret is null && status == PoolState.running)
1236         {
1237             wait();
1238             ret = popNoSync();
1239         }
1240         return ret;
1241     }
1242 
1243     AbstractTask* popNoSync()
1244     out(returned)
1245     {
1246         /* If task.prev and task.next aren't null, then another thread
1247          * can try to delete this task from the pool after it's
1248          * alreadly been deleted/popped.
1249          */
1250         if (returned !is null)
1251         {
1252             assert(returned.next is null);
1253             assert(returned.prev is null);
1254         }
1255     }
1256     do
1257     {
1258         if (isSingleTask) return null;
1259 
1260         AbstractTask* returned = head;
1261         if (head !is null)
1262         {
1263             head = head.next;
1264             returned.prev = null;
1265             returned.next = null;
1266             returned.taskStatus = TaskStatus.inProgress;
1267         }
1268         if (head !is null)
1269         {
1270             head.prev = null;
1271         }
1272 
1273         return returned;
1274     }
1275 
1276     // Push a task onto the queue.
1277     void abstractPut(AbstractTask* task)
1278     {
1279         queueLock();
1280         scope(exit) queueUnlock();
1281         abstractPutNoSync(task);
1282     }
1283 
1284     void abstractPutNoSync(AbstractTask* task)
1285     in
1286     {
1287         assert(task);
1288     }
1289     out
1290     {
1291         import std.conv : text;
1292 
1293         assert(tail.prev !is tail);
1294         assert(tail.next is null, text(tail.prev, '\t', tail.next));
1295         if (tail.prev !is null)
1296         {
1297             assert(tail.prev.next is tail, text(tail.prev, '\t', tail.next));
1298         }
1299     }
1300     do
1301     {
1302         // Not using enforce() to save on function call overhead since this
1303         // is a performance critical function.
1304         if (status != PoolState.running)
1305         {
1306             throw new Error(
1307                 "Cannot submit a new task to a pool after calling " ~
1308                 "finish() or stop()."
1309             );
1310         }
1311 
1312         task.next = null;
1313         if (head is null)   //Queue is empty.
1314         {
1315             head = task;
1316             tail = task;
1317             tail.prev = null;
1318         }
1319         else
1320         {
1321             assert(tail);
1322             task.prev = tail;
1323             tail.next = task;
1324             tail = task;
1325         }
1326         notify();
1327     }
1328 
1329     void abstractPutGroupNoSync(AbstractTask* h, AbstractTask* t)
1330     {
1331         if (status != PoolState.running)
1332         {
1333             throw new Error(
1334                 "Cannot submit a new task to a pool after calling " ~
1335                 "finish() or stop()."
1336             );
1337         }
1338 
1339         if (head is null)
1340         {
1341             head = h;
1342             tail = t;
1343         }
1344         else
1345         {
1346             h.prev = tail;
1347             tail.next = h;
1348             tail = t;
1349         }
1350 
1351         notifyAll();
1352     }
1353 
1354     void tryDeleteExecute(AbstractTask* toExecute)
1355     {
1356         if (isSingleTask) return;
1357 
1358         if ( !deleteItem(toExecute) )
1359         {
1360             return;
1361         }
1362 
1363         try
1364         {
1365             toExecute.job();
1366         }
1367         catch (Exception e)
1368         {
1369             toExecute.exception = e;
1370         }
1371 
1372         atomicSetUbyte(toExecute.taskStatus, TaskStatus.done);
1373     }
1374 
1375     bool deleteItem(AbstractTask* item)
1376     {
1377         queueLock();
1378         scope(exit) queueUnlock();
1379         return deleteItemNoSync(item);
1380     }
1381 
1382     bool deleteItemNoSync(AbstractTask* item)
1383     {
1384         if (item.taskStatus != TaskStatus.notStarted)
1385         {
1386             return false;
1387         }
1388         item.taskStatus = TaskStatus.inProgress;
1389 
1390         if (item is head)
1391         {
1392             // Make sure head gets set properly.
1393             popNoSync();
1394             return true;
1395         }
1396         if (item is tail)
1397         {
1398             tail = tail.prev;
1399             if (tail !is null)
1400             {
1401                 tail.next = null;
1402             }
1403             item.next = null;
1404             item.prev = null;
1405             return true;
1406         }
1407         if (item.next !is null)
1408         {
1409             assert(item.next.prev is item);  // Check queue consistency.
1410             item.next.prev = item.prev;
1411         }
1412         if (item.prev !is null)
1413         {
1414             assert(item.prev.next is item);  // Check queue consistency.
1415             item.prev.next = item.next;
1416         }
1417         item.next = null;
1418         item.prev = null;
1419         return true;
1420     }
1421 
1422     void queueLock()
1423     {
1424         assert(queueMutex);
1425         if (!isSingleTask) queueMutex.lock();
1426     }
1427 
1428     void queueUnlock()
1429     {
1430         assert(queueMutex);
1431         if (!isSingleTask) queueMutex.unlock();
1432     }
1433 
1434     void waiterLock()
1435     {
1436         if (!isSingleTask) waiterMutex.lock();
1437     }
1438 
1439     void waiterUnlock()
1440     {
1441         if (!isSingleTask) waiterMutex.unlock();
1442     }
1443 
1444     void wait()
1445     {
1446         if (!isSingleTask) workerCondition.wait();
1447     }
1448 
1449     void notify()
1450     {
1451         if (!isSingleTask) workerCondition.notify();
1452     }
1453 
1454     void notifyAll()
1455     {
1456         if (!isSingleTask) workerCondition.notifyAll();
1457     }
1458 
1459     void waitUntilCompletion()
1460     {
1461         if (isSingleTask)
1462         {
1463             singleTaskThread.join();
1464         }
1465         else
1466         {
1467             waiterCondition.wait();
1468         }
1469     }
1470 
1471     void notifyWaiters()
1472     {
1473         if (!isSingleTask) waiterCondition.notifyAll();
1474     }
1475 
1476     // Private constructor for creating dummy pools that only have one thread,
1477     // only execute one Task, and then terminate.  This is used for
1478     // Task.executeInNewThread().
1479     this(AbstractTask* task, int priority = int.max)
1480     {
1481         assert(task);
1482 
1483         // Dummy value, not used.
1484         instanceStartIndex = 0;
1485 
1486         this.isSingleTask = true;
1487         task.taskStatus = TaskStatus.inProgress;
1488         this.head = task;
1489         singleTaskThread = new Thread(&doSingleTask);
1490         singleTaskThread.start();
1491 
1492         // Disabled until writing code to support
1493         // running thread with specified priority
1494         // See https://issues.dlang.org/show_bug.cgi?id=8960
1495 
1496         /*if (priority != int.max)
1497         {
1498             singleTaskThread.priority = priority;
1499         }*/
1500     }
1501 
1502 public:
1503     // This is used in parallel_algorithm but is too unstable to document
1504     // as public API.
1505     size_t defaultWorkUnitSize(size_t rangeLen) const @safe pure nothrow
1506     {
1507         import std.algorithm.comparison : max;
1508 
1509         if (this.size == 0)
1510         {
1511             return max(rangeLen, 1);
1512         }
1513 
1514         immutable size_t eightSize = 4 * (this.size + 1);
1515         auto ret = (rangeLen / eightSize) + ((rangeLen % eightSize == 0) ? 0 : 1);
1516         return max(ret, 1);
1517     }
1518 
1519     /**
1520     Default constructor that initializes a `TaskPool` with
1521     `totalCPUs` - 1 worker threads.  The minus 1 is included because the
1522     main thread will also be available to do work.
1523 
1524     Note:  On single-core machines, the primitives provided by `TaskPool`
1525            operate transparently in single-threaded mode.
1526      */
1527     this() @trusted
1528     {
1529         this(totalCPUs - 1);
1530     }
1531 
1532     /**
1533     Allows for custom number of worker threads.
1534     */
1535     this(size_t nWorkers) @trusted
1536     {
1537         synchronized(typeid(TaskPool))
1538         {
1539             instanceStartIndex = nextInstanceIndex;
1540 
1541             // The first worker thread to be initialized will have this index,
1542             // and will increment it.  The second worker to be initialized will
1543             // have this index plus 1.
1544             nextThreadIndex = instanceStartIndex;
1545             nextInstanceIndex += nWorkers;
1546         }
1547 
1548         queueMutex = new Mutex(this);
1549         waiterMutex = new Mutex();
1550         workerCondition = new Condition(queueMutex);
1551         waiterCondition = new Condition(waiterMutex);
1552 
1553         pool = new ParallelismThread[nWorkers];
1554         foreach (ref poolThread; pool)
1555         {
1556             poolThread = new ParallelismThread(&startWorkLoop);
1557             poolThread.pool = this;
1558             poolThread.start();
1559         }
1560     }
1561 
1562     /**
1563     Implements a parallel foreach loop over a range.  This works by implicitly
1564     creating and submitting one `Task` to the `TaskPool` for each worker
1565     thread.  A work unit is a set of consecutive elements of `range` to
1566     be processed by a worker thread between communication with any other
1567     thread.  The number of elements processed per work unit is controlled by the
1568     `workUnitSize` parameter.  Smaller work units provide better load
1569     balancing, but larger work units avoid the overhead of communicating
1570     with other threads frequently to fetch the next work unit.  Large work
1571     units also avoid false sharing in cases where the range is being modified.
1572     The less time a single iteration of the loop takes, the larger
1573     `workUnitSize` should be.  For very expensive loop bodies,
1574     `workUnitSize` should  be 1.  An overload that chooses a default work
1575     unit size is also available.
1576 
1577     Example:
1578     ---
1579     // Find the logarithm of every number from 1 to
1580     // 10_000_000 in parallel.
1581     auto logs = new double[10_000_000];
1582 
1583     // Parallel foreach works with or without an index
1584     // variable.  It can iterate by ref if range.front
1585     // returns by ref.
1586 
1587     // Iterate over logs using work units of size 100.
1588     foreach (i, ref elem; taskPool.parallel(logs, 100))
1589     {
1590         elem = log(i + 1.0);
1591     }
1592 
1593     // Same thing, but use the default work unit size.
1594     //
1595     // Timings on an Athlon 64 X2 dual core machine:
1596     //
1597     // Parallel foreach:  388 milliseconds
1598     // Regular foreach:   619 milliseconds
1599     foreach (i, ref elem; taskPool.parallel(logs))
1600     {
1601         elem = log(i + 1.0);
1602     }
1603     ---
1604 
1605     Notes:
1606 
1607     The memory usage of this implementation is guaranteed to be constant
1608     in `range.length`.
1609 
1610     Breaking from a parallel foreach loop via a break, labeled break,
1611     labeled continue, return or goto statement throws a
1612     `ParallelForeachError`.
1613 
1614     In the case of non-random access ranges, parallel foreach buffers lazily
1615     to an array of size `workUnitSize` before executing the parallel portion
1616     of the loop.  The exception is that, if a parallel foreach is executed
1617     over a range returned by `asyncBuf` or `map`, the copying is elided
1618     and the buffers are simply swapped.  In this case `workUnitSize` is
1619     ignored and the work unit size is set to the  buffer size of `range`.
1620 
1621     A memory barrier is guaranteed to be executed on exit from the loop,
1622     so that results produced by all threads are visible in the calling thread.
1623 
1624     $(B Exception Handling):
1625 
1626     When at least one exception is thrown from inside a parallel foreach loop,
1627     the submission of additional `Task` objects is terminated as soon as
1628     possible, in a non-deterministic manner.  All executing or
1629     enqueued work units are allowed to complete.  Then, all exceptions that
1630     were thrown by any work unit are chained using `Throwable.next` and
1631     rethrown.  The order of the exception chaining is non-deterministic.
1632     */
1633     ParallelForeach!R parallel(R)(R range, size_t workUnitSize)
1634     {
1635         import std.exception : enforce;
1636         enforce(workUnitSize > 0, "workUnitSize must be > 0.");
1637         alias RetType = ParallelForeach!R;
1638         return RetType(this, range, workUnitSize);
1639     }
1640 
1641 
1642     /// Ditto
1643     ParallelForeach!R parallel(R)(R range)
1644     {
1645         static if (hasLength!R)
1646         {
1647             // Default work unit size is such that we would use 4x as many
1648             // slots as are in this thread pool.
1649             size_t workUnitSize = defaultWorkUnitSize(range.length);
1650             return parallel(range, workUnitSize);
1651         }
1652         else
1653         {
1654             // Just use a really, really dumb guess if the user is too lazy to
1655             // specify.
1656             return parallel(range, 512);
1657         }
1658     }
1659 
1660     ///
1661     template amap(functions...)
1662     {
1663         /**
1664         Eager parallel map.  The eagerness of this function means it has less
1665         overhead than the lazily evaluated `TaskPool.map` and should be
1666         preferred where the memory requirements of eagerness are acceptable.
1667         `functions` are the functions to be evaluated, passed as template
1668         alias parameters in a style similar to
1669         $(REF map, std,algorithm,iteration).
1670         The first argument must be a random access range. For performance
1671         reasons, amap will assume the range elements have not yet been
1672         initialized. Elements will be overwritten without calling a destructor
1673         nor doing an assignment. As such, the range must not contain meaningful
1674         data$(DDOC_COMMENT not a section): either un-initialized objects, or
1675         objects in their `.init` state.
1676 
1677         ---
1678         auto numbers = iota(100_000_000.0);
1679 
1680         // Find the square roots of numbers.
1681         //
1682         // Timings on an Athlon 64 X2 dual core machine:
1683         //
1684         // Parallel eager map:                   0.802 s
1685         // Equivalent serial implementation:     1.768 s
1686         auto squareRoots = taskPool.amap!sqrt(numbers);
1687         ---
1688 
1689         Immediately after the range argument, an optional work unit size argument
1690         may be provided.  Work units as used by `amap` are identical to those
1691         defined for parallel foreach.  If no work unit size is provided, the
1692         default work unit size is used.
1693 
1694         ---
1695         // Same thing, but make work unit size 100.
1696         auto squareRoots = taskPool.amap!sqrt(numbers, 100);
1697         ---
1698 
1699         An output range for returning the results may be provided as the last
1700         argument.  If one is not provided, an array of the proper type will be
1701         allocated on the garbage collected heap.  If one is provided, it must be a
1702         random access range with assignable elements, must have reference
1703         semantics with respect to assignment to its elements, and must have the
1704         same length as the input range.  Writing to adjacent elements from
1705         different threads must be safe.
1706 
1707         ---
1708         // Same thing, but explicitly allocate an array
1709         // to return the results in.  The element type
1710         // of the array may be either the exact type
1711         // returned by functions or an implicit conversion
1712         // target.
1713         auto squareRoots = new float[numbers.length];
1714         taskPool.amap!sqrt(numbers, squareRoots);
1715 
1716         // Multiple functions, explicit output range, and
1717         // explicit work unit size.
1718         auto results = new Tuple!(float, real)[numbers.length];
1719         taskPool.amap!(sqrt, log)(numbers, 100, results);
1720         ---
1721 
1722         Note:
1723 
1724         A memory barrier is guaranteed to be executed after all results are written
1725         but before returning so that results produced by all threads are visible
1726         in the calling thread.
1727 
1728         Tips:
1729 
1730         To perform the mapping operation in place, provide the same range for the
1731         input and output range.
1732 
1733         To parallelize the copying of a range with expensive to evaluate elements
1734         to an array, pass an identity function (a function that just returns
1735         whatever argument is provided to it) to `amap`.
1736 
1737         $(B Exception Handling):
1738 
1739         When at least one exception is thrown from inside the map functions,
1740         the submission of additional `Task` objects is terminated as soon as
1741         possible, in a non-deterministic manner.  All currently executing or
1742         enqueued work units are allowed to complete.  Then, all exceptions that
1743         were thrown from any work unit are chained using `Throwable.next` and
1744         rethrown.  The order of the exception chaining is non-deterministic.
1745         */
1746         auto amap(Args...)(Args args)
1747         if (isRandomAccessRange!(Args[0]))
1748         {
1749             import core.internal.lifetime : emplaceRef;
1750 
1751             alias fun = adjoin!(staticMap!(unaryFun, functions));
1752 
1753             alias range = args[0];
1754             immutable len = range.length;
1755 
1756             static if (
1757                 Args.length > 1 &&
1758                 randAssignable!(Args[$ - 1]) &&
1759                 is(MapType!(Args[0], functions) : ElementType!(Args[$ - 1]))
1760                 )
1761             {
1762                 import std.conv : text;
1763                 import std.exception : enforce;
1764 
1765                 alias buf = args[$ - 1];
1766                 alias args2 = args[0..$ - 1];
1767                 alias Args2 = Args[0..$ - 1];
1768                 enforce(buf.length == len,
1769                         text("Can't use a user supplied buffer that's the wrong ",
1770                              "size.  (Expected  :", len, " Got:  ", buf.length));
1771             }
1772             else static if (randAssignable!(Args[$ - 1]) && Args.length > 1)
1773             {
1774                 static assert(0, "Wrong buffer type.");
1775             }
1776             else
1777             {
1778                 import std.array : uninitializedArray;
1779 
1780                 auto buf = uninitializedArray!(MapType!(Args[0], functions)[])(len);
1781                 alias args2 = args;
1782                 alias Args2 = Args;
1783             }
1784 
1785             if (!len) return buf;
1786 
1787             static if (isIntegral!(Args2[$ - 1]))
1788             {
1789                 static assert(args2.length == 2);
1790                 auto workUnitSize = cast(size_t) args2[1];
1791             }
1792             else
1793             {
1794                 static assert(args2.length == 1, Args);
1795                 auto workUnitSize = defaultWorkUnitSize(range.length);
1796             }
1797 
1798             alias R = typeof(range);
1799 
1800             if (workUnitSize > len)
1801             {
1802                 workUnitSize = len;
1803             }
1804 
1805             // Handle as a special case:
1806             if (size == 0)
1807             {
1808                 size_t index = 0;
1809                 foreach (elem; range)
1810                 {
1811                     emplaceRef(buf[index++], fun(elem));
1812                 }
1813                 return buf;
1814             }
1815 
1816             // Effectively -1:  chunkIndex + 1 == 0:
1817             shared size_t workUnitIndex = size_t.max;
1818             shared bool shouldContinue = true;
1819 
1820             void doIt()
1821             {
1822                 import std.algorithm.comparison : min;
1823 
1824                 scope(failure)
1825                 {
1826                     // If an exception is thrown, all threads should bail.
1827                     atomicStore(shouldContinue, false);
1828                 }
1829 
1830                 while (atomicLoad(shouldContinue))
1831                 {
1832                     immutable myUnitIndex = atomicOp!"+="(workUnitIndex, 1);
1833                     immutable start = workUnitSize * myUnitIndex;
1834                     if (start >= len)
1835                     {
1836                         atomicStore(shouldContinue, false);
1837                         break;
1838                     }
1839 
1840                     immutable end = min(len, start + workUnitSize);
1841 
1842                     static if (hasSlicing!R)
1843                     {
1844                         auto subrange = range[start .. end];
1845                         foreach (i; start .. end)
1846                         {
1847                             emplaceRef(buf[i], fun(subrange.front));
1848                             subrange.popFront();
1849                         }
1850                     }
1851                     else
1852                     {
1853                         foreach (i; start .. end)
1854                         {
1855                             emplaceRef(buf[i], fun(range[i]));
1856                         }
1857                     }
1858                 }
1859             }
1860 
1861             submitAndExecute(this, &doIt);
1862             return buf;
1863         }
1864     }
1865 
1866     ///
1867     template map(functions...)
1868     {
1869         /**
1870         A semi-lazy parallel map that can be used for pipelining.  The map
1871         functions are evaluated for the first `bufSize` elements and stored in a
1872         buffer and made available to `popFront`.  Meanwhile, in the
1873         background a second buffer of the same size is filled.  When the first
1874         buffer is exhausted, it is swapped with the second buffer and filled while
1875         the values from what was originally the second buffer are read.  This
1876         implementation allows for elements to be written to the buffer without
1877         the need for atomic operations or synchronization for each write, and
1878         enables the mapping function to be evaluated efficiently in parallel.
1879 
1880         `map` has more overhead than the simpler procedure used by `amap`
1881         but avoids the need to keep all results in memory simultaneously and works
1882         with non-random access ranges.
1883 
1884         Params:
1885 
1886         source = The $(REF_ALTTEXT input range, isInputRange, std,range,primitives)
1887         to be mapped.  If `source` is not random
1888         access it will be lazily buffered to an array of size `bufSize` before
1889         the map function is evaluated.  (For an exception to this rule, see Notes.)
1890 
1891         bufSize = The size of the buffer to store the evaluated elements.
1892 
1893         workUnitSize = The number of elements to evaluate in a single
1894         `Task`.  Must be less than or equal to `bufSize`, and
1895         should be a fraction of `bufSize` such that all worker threads can be
1896         used.  If the default of size_t.max is used, workUnitSize will be set to
1897         the pool-wide default.
1898 
1899         Returns:  An input range representing the results of the map.  This range
1900                   has a length iff `source` has a length.
1901 
1902         Notes:
1903 
1904         If a range returned by `map` or `asyncBuf` is used as an input to
1905         `map`, then as an optimization the copying from the output buffer
1906         of the first range to the input buffer of the second range is elided, even
1907         though the ranges returned by `map` and `asyncBuf` are non-random
1908         access ranges.  This means that the `bufSize` parameter passed to the
1909         current call to `map` will be ignored and the size of the buffer
1910         will be the buffer size of `source`.
1911 
1912         Example:
1913         ---
1914         // Pipeline reading a file, converting each line
1915         // to a number, taking the logarithms of the numbers,
1916         // and performing the additions necessary to find
1917         // the sum of the logarithms.
1918 
1919         auto lineRange = File("numberList.txt").byLine();
1920         auto dupedLines = std.algorithm.map!"a.idup"(lineRange);
1921         auto nums = taskPool.map!(to!double)(dupedLines);
1922         auto logs = taskPool.map!log10(nums);
1923 
1924         double sum = 0;
1925         foreach (elem; logs)
1926         {
1927             sum += elem;
1928         }
1929         ---
1930 
1931         $(B Exception Handling):
1932 
1933         Any exceptions thrown while iterating over `source`
1934         or computing the map function are re-thrown on a call to `popFront` or,
1935         if thrown during construction, are simply allowed to propagate to the
1936         caller.  In the case of exceptions thrown while computing the map function,
1937         the exceptions are chained as in `TaskPool.amap`.
1938         */
1939         auto
1940         map(S)(S source, size_t bufSize = 100, size_t workUnitSize = size_t.max)
1941         if (isInputRange!S)
1942         {
1943             import std.exception : enforce;
1944 
1945             enforce(workUnitSize == size_t.max || workUnitSize <= bufSize,
1946                     "Work unit size must be smaller than buffer size.");
1947             alias fun = adjoin!(staticMap!(unaryFun, functions));
1948 
1949             static final class Map
1950             {
1951                 // This is a class because the task needs to be located on the
1952                 // heap and in the non-random access case source needs to be on
1953                 // the heap, too.
1954 
1955             private:
1956                 enum bufferTrick = is(typeof(source.buf1)) &&
1957                 is(typeof(source.bufPos)) &&
1958                 is(typeof(source.doBufSwap()));
1959 
1960                 alias E = MapType!(S, functions);
1961                 E[] buf1, buf2;
1962                 S source;
1963                 TaskPool pool;
1964                 Task!(run, E[] delegate(E[]), E[]) nextBufTask;
1965                 size_t workUnitSize;
1966                 size_t bufPos;
1967                 bool lastTaskWaited;
1968 
1969             static if (isRandomAccessRange!S)
1970             {
1971                 alias FromType = S;
1972 
1973                 void popSource()
1974                 {
1975                     import std.algorithm.comparison : min;
1976 
1977                     static if (__traits(compiles, source[0 .. source.length]))
1978                     {
1979                         source = source[min(buf1.length, source.length)..source.length];
1980                     }
1981                     else static if (__traits(compiles, source[0..$]))
1982                     {
1983                         source = source[min(buf1.length, source.length)..$];
1984                     }
1985                     else
1986                     {
1987                         static assert(0, "S must have slicing for Map."
1988                                       ~ "  " ~ S.stringof ~ " doesn't.");
1989                     }
1990                 }
1991             }
1992             else static if (bufferTrick)
1993             {
1994                 // Make sure we don't have the buffer recycling overload of
1995                 // asyncBuf.
1996                 static if (
1997                     is(typeof(source.source)) &&
1998                     isRoundRobin!(typeof(source.source))
1999                 )
2000                 {
2001                     static assert(0, "Cannot execute a parallel map on " ~
2002                                   "the buffer recycling overload of asyncBuf."
2003                                  );
2004                 }
2005 
2006                 alias FromType = typeof(source.buf1);
2007                 FromType from;
2008 
2009                 // Just swap our input buffer with source's output buffer.
2010                 // No need to copy element by element.
2011                 FromType dumpToFrom()
2012                 {
2013                     import std.algorithm.mutation : swap;
2014 
2015                     assert(source.buf1.length <= from.length);
2016                     from.length = source.buf1.length;
2017                     swap(source.buf1, from);
2018 
2019                     // Just in case this source has been popped before
2020                     // being sent to map:
2021                     from = from[source.bufPos..$];
2022 
2023                     static if (is(typeof(source._length)))
2024                     {
2025                         source._length -= (from.length - source.bufPos);
2026                     }
2027 
2028                     source.doBufSwap();
2029 
2030                     return from;
2031                 }
2032             }
2033             else
2034             {
2035                 alias FromType = ElementType!S[];
2036 
2037                 // The temporary array that data is copied to before being
2038                 // mapped.
2039                 FromType from;
2040 
2041                 FromType dumpToFrom()
2042                 {
2043                     assert(from !is null);
2044 
2045                     size_t i;
2046                     for (; !source.empty && i < from.length; source.popFront())
2047                     {
2048                         from[i++] = source.front;
2049                     }
2050 
2051                     from = from[0 .. i];
2052                     return from;
2053                 }
2054             }
2055 
2056             static if (hasLength!S)
2057             {
2058                 size_t _length;
2059 
2060                 public @property size_t length() const @safe pure nothrow
2061                 {
2062                     return _length;
2063                 }
2064             }
2065 
2066                 this(S source, size_t bufSize, size_t workUnitSize, TaskPool pool)
2067                 {
2068                     static if (bufferTrick)
2069                     {
2070                         bufSize = source.buf1.length;
2071                     }
2072 
2073                     buf1.length = bufSize;
2074                     buf2.length = bufSize;
2075 
2076                     static if (!isRandomAccessRange!S)
2077                     {
2078                         from.length = bufSize;
2079                     }
2080 
2081                     this.workUnitSize = (workUnitSize == size_t.max) ?
2082                             pool.defaultWorkUnitSize(bufSize) : workUnitSize;
2083                     this.source = source;
2084                     this.pool = pool;
2085 
2086                     static if (hasLength!S)
2087                     {
2088                         _length = source.length;
2089                     }
2090 
2091                     buf1 = fillBuf(buf1);
2092                     submitBuf2();
2093                 }
2094 
2095                 // The from parameter is a dummy and ignored in the random access
2096                 // case.
2097                 E[] fillBuf(E[] buf)
2098                 {
2099                     import std.algorithm.comparison : min;
2100 
2101                     static if (isRandomAccessRange!S)
2102                     {
2103                         import std.range : take;
2104                         auto toMap = take(source, buf.length);
2105                         scope(success) popSource();
2106                     }
2107                     else
2108                     {
2109                         auto toMap = dumpToFrom();
2110                     }
2111 
2112                     buf = buf[0 .. min(buf.length, toMap.length)];
2113 
2114                     // Handle as a special case:
2115                     if (pool.size == 0)
2116                     {
2117                         size_t index = 0;
2118                         foreach (elem; toMap)
2119                         {
2120                             buf[index++] = fun(elem);
2121                         }
2122                         return buf;
2123                     }
2124 
2125                     pool.amap!functions(toMap, workUnitSize, buf);
2126 
2127                     return buf;
2128                 }
2129 
2130                 void submitBuf2()
2131                 in
2132                 {
2133                     assert(nextBufTask.prev is null);
2134                     assert(nextBufTask.next is null);
2135                 }
2136                 do
2137                 {
2138                     // Hack to reuse the task object.
2139 
2140                     nextBufTask = typeof(nextBufTask).init;
2141                     nextBufTask._args[0] = &fillBuf;
2142                     nextBufTask._args[1] = buf2;
2143                     pool.put(nextBufTask);
2144                 }
2145 
2146                 void doBufSwap()
2147                 {
2148                     if (lastTaskWaited)
2149                     {
2150                         // Then the source is empty.  Signal it here.
2151                         buf1 = null;
2152                         buf2 = null;
2153 
2154                         static if (!isRandomAccessRange!S)
2155                         {
2156                             from = null;
2157                         }
2158 
2159                         return;
2160                     }
2161 
2162                     buf2 = buf1;
2163                     buf1 = nextBufTask.yieldForce;
2164                     bufPos = 0;
2165 
2166                     if (source.empty)
2167                     {
2168                         lastTaskWaited = true;
2169                     }
2170                     else
2171                     {
2172                         submitBuf2();
2173                     }
2174                 }
2175 
2176             public:
2177                 @property auto front()
2178                 {
2179                     return buf1[bufPos];
2180                 }
2181 
2182                 void popFront()
2183                 {
2184                     static if (hasLength!S)
2185                     {
2186                         _length--;
2187                     }
2188 
2189                     bufPos++;
2190                     if (bufPos >= buf1.length)
2191                     {
2192                         doBufSwap();
2193                     }
2194                 }
2195 
2196                 static if (isInfinite!S)
2197                 {
2198                     enum bool empty = false;
2199                 }
2200                 else
2201                 {
2202 
2203                     bool empty() const @property
2204                     {
2205                         // popFront() sets this when source is empty
2206                         return buf1.length == 0;
2207                     }
2208                 }
2209             }
2210             return new Map(source, bufSize, workUnitSize, this);
2211         }
2212     }
2213 
2214     /**
2215     Given a `source` range that is expensive to iterate over, returns an
2216     $(REF_ALTTEXT input range, isInputRange, std,range,primitives) that
2217     asynchronously buffers the contents of `source` into a buffer of `bufSize` elements in a worker thread,
2218     while making previously buffered elements from a second buffer, also of size
2219     `bufSize`, available via the range interface of the returned
2220     object.  The returned range has a length iff `hasLength!S`.
2221     `asyncBuf` is useful, for example, when performing expensive operations
2222     on the elements of ranges that represent data on a disk or network.
2223 
2224     Example:
2225     ---
2226     import std.conv, std.stdio;
2227 
2228     void main()
2229     {
2230         // Fetch lines of a file in a background thread
2231         // while processing previously fetched lines,
2232         // dealing with byLine's buffer recycling by
2233         // eagerly duplicating every line.
2234         auto lines = File("foo.txt").byLine();
2235         auto duped = std.algorithm.map!"a.idup"(lines);
2236 
2237         // Fetch more lines in the background while we
2238         // process the lines already read into memory
2239         // into a matrix of doubles.
2240         double[][] matrix;
2241         auto asyncReader = taskPool.asyncBuf(duped);
2242 
2243         foreach (line; asyncReader)
2244         {
2245             auto ls = line.split("\t");
2246             matrix ~= to!(double[])(ls);
2247         }
2248     }
2249     ---
2250 
2251     $(B Exception Handling):
2252 
2253     Any exceptions thrown while iterating over `source` are re-thrown on a
2254     call to `popFront` or, if thrown during construction, simply
2255     allowed to propagate to the caller.
2256     */
2257     auto asyncBuf(S)(S source, size_t bufSize = 100) if (isInputRange!S)
2258     {
2259         static final class AsyncBuf
2260         {
2261             // This is a class because the task and source both need to be on
2262             // the heap.
2263 
2264             // The element type of S.
2265             alias E = ElementType!S;  // Needs to be here b/c of forward ref bugs.
2266 
2267         private:
2268             E[] buf1, buf2;
2269             S source;
2270             TaskPool pool;
2271             Task!(run, E[] delegate(E[]), E[]) nextBufTask;
2272             size_t bufPos;
2273             bool lastTaskWaited;
2274 
2275             static if (hasLength!S)
2276             {
2277                 size_t _length;
2278 
2279                 // Available if hasLength!S.
2280                 public @property size_t length() const @safe pure nothrow
2281                 {
2282                     return _length;
2283                 }
2284             }
2285 
2286             this(S source, size_t bufSize, TaskPool pool)
2287             {
2288                 buf1.length = bufSize;
2289                 buf2.length = bufSize;
2290 
2291                 this.source = source;
2292                 this.pool = pool;
2293 
2294                 static if (hasLength!S)
2295                 {
2296                     _length = source.length;
2297                 }
2298 
2299                 buf1 = fillBuf(buf1);
2300                 submitBuf2();
2301             }
2302 
2303             E[] fillBuf(E[] buf)
2304             {
2305                 assert(buf !is null);
2306 
2307                 size_t i;
2308                 for (; !source.empty && i < buf.length; source.popFront())
2309                 {
2310                     buf[i++] = source.front;
2311                 }
2312 
2313                 buf = buf[0 .. i];
2314                 return buf;
2315             }
2316 
2317             void submitBuf2()
2318             in
2319             {
2320                 assert(nextBufTask.prev is null);
2321                 assert(nextBufTask.next is null);
2322             }
2323             do
2324             {
2325                 // Hack to reuse the task object.
2326 
2327                 nextBufTask = typeof(nextBufTask).init;
2328                 nextBufTask._args[0] = &fillBuf;
2329                 nextBufTask._args[1] = buf2;
2330                 pool.put(nextBufTask);
2331             }
2332 
2333             void doBufSwap()
2334             {
2335                 if (lastTaskWaited)
2336                 {
2337                     // Then source is empty.  Signal it here.
2338                     buf1 = null;
2339                     buf2 = null;
2340                     return;
2341                 }
2342 
2343                 buf2 = buf1;
2344                 buf1 = nextBufTask.yieldForce;
2345                 bufPos = 0;
2346 
2347                 if (source.empty)
2348                 {
2349                     lastTaskWaited = true;
2350                 }
2351                 else
2352                 {
2353                     submitBuf2();
2354                 }
2355             }
2356 
2357         public:
2358             E front() @property
2359             {
2360                 return buf1[bufPos];
2361             }
2362 
2363             void popFront()
2364             {
2365                 static if (hasLength!S)
2366                 {
2367                     _length--;
2368                 }
2369 
2370                 bufPos++;
2371                 if (bufPos >= buf1.length)
2372                 {
2373                     doBufSwap();
2374                 }
2375             }
2376 
2377             static if (isInfinite!S)
2378             {
2379                 enum bool empty = false;
2380             }
2381 
2382             else
2383             {
2384                 ///
2385                 bool empty() @property
2386                 {
2387                     // popFront() sets this when source is empty:
2388                     return buf1.length == 0;
2389                 }
2390             }
2391         }
2392         return new AsyncBuf(source, bufSize, this);
2393     }
2394 
2395     /**
2396     Given a callable object `next` that writes to a user-provided buffer and
2397     a second callable object `empty` that determines whether more data is
2398     available to write via `next`, returns an input range that
2399     asynchronously calls `next` with a set of size `nBuffers` of buffers
2400     and makes the results available in the order they were obtained via the
2401     input range interface of the returned object.  Similarly to the
2402     input range overload of `asyncBuf`, the first half of the buffers
2403     are made available via the range interface while the second half are
2404     filled and vice-versa.
2405 
2406     Params:
2407 
2408     next = A callable object that takes a single argument that must be an array
2409            with mutable elements.  When called, `next` writes data to
2410            the array provided by the caller.
2411 
2412     empty = A callable object that takes no arguments and returns a type
2413             implicitly convertible to `bool`.  This is used to signify
2414             that no more data is available to be obtained by calling `next`.
2415 
2416     initialBufSize = The initial size of each buffer.  If `next` takes its
2417                      array by reference, it may resize the buffers.
2418 
2419     nBuffers = The number of buffers to cycle through when calling `next`.
2420 
2421     Example:
2422     ---
2423     // Fetch lines of a file in a background
2424     // thread while processing previously fetched
2425     // lines, without duplicating any lines.
2426     auto file = File("foo.txt");
2427 
2428     void next(ref char[] buf)
2429     {
2430         file.readln(buf);
2431     }
2432 
2433     // Fetch more lines in the background while we
2434     // process the lines already read into memory
2435     // into a matrix of doubles.
2436     double[][] matrix;
2437     auto asyncReader = taskPool.asyncBuf(&next, &file.eof);
2438 
2439     foreach (line; asyncReader)
2440     {
2441         auto ls = line.split("\t");
2442         matrix ~= to!(double[])(ls);
2443     }
2444     ---
2445 
2446     $(B Exception Handling):
2447 
2448     Any exceptions thrown while iterating over `range` are re-thrown on a
2449     call to `popFront`.
2450 
2451     Warning:
2452 
2453     Using the range returned by this function in a parallel foreach loop
2454     will not work because buffers may be overwritten while the task that
2455     processes them is in queue.  This is checked for at compile time
2456     and will result in a static assertion failure.
2457     */
2458     auto asyncBuf(C1, C2)(C1 next, C2 empty, size_t initialBufSize = 0, size_t nBuffers = 100)
2459     if (is(typeof(C2.init()) : bool) &&
2460         Parameters!C1.length == 1 &&
2461         Parameters!C2.length == 0 &&
2462         isArray!(Parameters!C1[0])
2463     ) {
2464         auto roundRobin = RoundRobinBuffer!(C1, C2)(next, empty, initialBufSize, nBuffers);
2465         return asyncBuf(roundRobin, nBuffers / 2);
2466     }
2467 
2468     ///
2469     template reduce(functions...)
2470     {
2471         /**
2472         Parallel reduce on a random access range.  Except as otherwise noted,
2473         usage is similar to $(REF _reduce, std,algorithm,iteration).  There is
2474         also $(LREF fold) which does the same thing with a different parameter
2475         order.
2476 
2477         This function works by splitting the range to be reduced into work
2478         units, which are slices to be reduced in parallel.  Once the results
2479         from all work units are computed, a final serial reduction is performed
2480         on these results to compute the final answer. Therefore, care must be
2481         taken to choose the seed value appropriately.
2482 
2483         Because the reduction is being performed in parallel, `functions`
2484         must be associative.  For notational simplicity, let # be an
2485         infix operator representing `functions`.  Then, (a # b) # c must equal
2486         a # (b # c).  Floating point addition is not associative
2487         even though addition in exact arithmetic is.  Summing floating
2488         point numbers using this function may give different results than summing
2489         serially.  However, for many practical purposes floating point addition
2490         can be treated as associative.
2491 
2492         Note that, since `functions` are assumed to be associative,
2493         additional optimizations are made to the serial portion of the reduction
2494         algorithm. These take advantage of the instruction level parallelism of
2495         modern CPUs, in addition to the thread-level parallelism that the rest
2496         of this module exploits.  This can lead to better than linear speedups
2497         relative to $(REF _reduce, std,algorithm,iteration), especially for
2498         fine-grained benchmarks like dot products.
2499 
2500         An explicit seed may be provided as the first argument.  If
2501         provided, it is used as the seed for all work units and for the final
2502         reduction of results from all work units.  Therefore, if it is not the
2503         identity value for the operation being performed, results may differ
2504         from those generated by $(REF _reduce, std,algorithm,iteration) or
2505         depending on how many work units are used.  The next argument must be
2506         the range to be reduced.
2507         ---
2508         // Find the sum of squares of a range in parallel, using
2509         // an explicit seed.
2510         //
2511         // Timings on an Athlon 64 X2 dual core machine:
2512         //
2513         // Parallel reduce:                     72 milliseconds
2514         // Using std.algorithm.reduce instead:  181 milliseconds
2515         auto nums = iota(10_000_000.0f);
2516         auto sumSquares = taskPool.reduce!"a + b"(
2517             0.0, std.algorithm.map!"a * a"(nums)
2518         );
2519         ---
2520 
2521         If no explicit seed is provided, the first element of each work unit
2522         is used as a seed.  For the final reduction, the result from the first
2523         work unit is used as the seed.
2524         ---
2525         // Find the sum of a range in parallel, using the first
2526         // element of each work unit as the seed.
2527         auto sum = taskPool.reduce!"a + b"(nums);
2528         ---
2529 
2530         An explicit work unit size may be specified as the last argument.
2531         Specifying too small a work unit size will effectively serialize the
2532         reduction, as the final reduction of the result of each work unit will
2533         dominate computation time.  If `TaskPool.size` for this instance
2534         is zero, this parameter is ignored and one work unit is used.
2535         ---
2536         // Use a work unit size of 100.
2537         auto sum2 = taskPool.reduce!"a + b"(nums, 100);
2538 
2539         // Work unit size of 100 and explicit seed.
2540         auto sum3 = taskPool.reduce!"a + b"(0.0, nums, 100);
2541         ---
2542 
2543         Parallel reduce supports multiple functions, like
2544         `std.algorithm.reduce`.
2545         ---
2546         // Find both the min and max of nums.
2547         auto minMax = taskPool.reduce!(min, max)(nums);
2548         assert(minMax[0] == reduce!min(nums));
2549         assert(minMax[1] == reduce!max(nums));
2550         ---
2551 
2552         $(B Exception Handling):
2553 
2554         After this function is finished executing, any exceptions thrown
2555         are chained together via `Throwable.next` and rethrown.  The chaining
2556         order is non-deterministic.
2557 
2558         See_Also:
2559 
2560             $(LREF fold) is functionally equivalent to $(LREF _reduce) except the
2561             range parameter comes first and there is no need to use
2562             $(REF_ALTTEXT `tuple`,tuple,std,typecons) for multiple seeds.
2563          */
2564         auto reduce(Args...)(Args args)
2565         {
2566             import core.exception : OutOfMemoryError;
2567             import core.internal.lifetime : emplaceRef;
2568             import std.exception : enforce;
2569 
2570             alias fun = reduceAdjoin!functions;
2571             alias finishFun = reduceFinish!functions;
2572 
2573             static if (isIntegral!(Args[$ - 1]))
2574             {
2575                 size_t workUnitSize = cast(size_t) args[$ - 1];
2576                 alias args2 = args[0..$ - 1];
2577                 alias Args2 = Args[0..$ - 1];
2578             }
2579             else
2580             {
2581                 alias args2 = args;
2582                 alias Args2 = Args;
2583             }
2584 
2585             auto makeStartValue(Type)(Type e)
2586             {
2587                 static if (functions.length == 1)
2588                 {
2589                     return e;
2590                 }
2591                 else
2592                 {
2593                     typeof(adjoin!(staticMap!(binaryFun, functions))(e, e)) seed = void;
2594                     foreach (i, T; seed.Types)
2595                     {
2596                         emplaceRef(seed.expand[i], e);
2597                     }
2598 
2599                     return seed;
2600                 }
2601             }
2602 
2603             static if (args2.length == 2)
2604             {
2605                 static assert(isInputRange!(Args2[1]));
2606                 alias range = args2[1];
2607                 alias seed = args2[0];
2608                 enum explicitSeed = true;
2609 
2610                 static if (!is(typeof(workUnitSize)))
2611                 {
2612                     size_t workUnitSize = defaultWorkUnitSize(range.length);
2613                 }
2614             }
2615             else
2616             {
2617                 static assert(args2.length == 1);
2618                 alias range = args2[0];
2619 
2620                 static if (!is(typeof(workUnitSize)))
2621                 {
2622                     size_t workUnitSize = defaultWorkUnitSize(range.length);
2623                 }
2624 
2625                 enforce(!range.empty,
2626                     "Cannot reduce an empty range with first element as start value.");
2627 
2628                 auto seed = makeStartValue(range.front);
2629                 enum explicitSeed = false;
2630                 range.popFront();
2631             }
2632 
2633             alias E = typeof(seed);
2634             alias R = typeof(range);
2635 
2636             E reduceOnRange(R range, size_t lowerBound, size_t upperBound)
2637             {
2638                 // This is for exploiting instruction level parallelism by
2639                 // using multiple accumulator variables within each thread,
2640                 // since we're assuming functions are associative anyhow.
2641 
2642                 // This is so that loops can be unrolled automatically.
2643                 enum ilpTuple = AliasSeq!(0, 1, 2, 3, 4, 5);
2644                 enum nILP = ilpTuple.length;
2645                 immutable subSize = (upperBound - lowerBound) / nILP;
2646 
2647                 if (subSize <= 1)
2648                 {
2649                     // Handle as a special case.
2650                     static if (explicitSeed)
2651                     {
2652                         E result = seed;
2653                     }
2654                     else
2655                     {
2656                         E result = makeStartValue(range[lowerBound]);
2657                         lowerBound++;
2658                     }
2659 
2660                     foreach (i; lowerBound .. upperBound)
2661                     {
2662                         result = fun(result, range[i]);
2663                     }
2664 
2665                     return result;
2666                 }
2667 
2668                 assert(subSize > 1);
2669                 E[nILP] results;
2670                 size_t[nILP] offsets;
2671 
2672                 foreach (i; ilpTuple)
2673                 {
2674                     offsets[i] = lowerBound + subSize * i;
2675 
2676                     static if (explicitSeed)
2677                     {
2678                         results[i] = seed;
2679                     }
2680                     else
2681                     {
2682                         results[i] = makeStartValue(range[offsets[i]]);
2683                         offsets[i]++;
2684                     }
2685                 }
2686 
2687                 immutable nLoop = subSize - (!explicitSeed);
2688                 foreach (i; 0 .. nLoop)
2689                 {
2690                     foreach (j; ilpTuple)
2691                     {
2692                         results[j] = fun(results[j], range[offsets[j]]);
2693                         offsets[j]++;
2694                     }
2695                 }
2696 
2697                 // Finish the remainder.
2698                 foreach (i; nILP * subSize + lowerBound .. upperBound)
2699                 {
2700                     results[$ - 1] = fun(results[$ - 1], range[i]);
2701                 }
2702 
2703                 foreach (i; ilpTuple[1..$])
2704                 {
2705                     results[0] = finishFun(results[0], results[i]);
2706                 }
2707 
2708                 return results[0];
2709             }
2710 
2711             immutable len = range.length;
2712             if (len == 0)
2713             {
2714                 return seed;
2715             }
2716 
2717             if (this.size == 0)
2718             {
2719                 return finishFun(seed, reduceOnRange(range, 0, len));
2720             }
2721 
2722             // Unlike the rest of the functions here, I can't use the Task object
2723             // recycling trick here because this has to work on non-commutative
2724             // operations.  After all the tasks are done executing, fun() has to
2725             // be applied on the results of these to get a final result, but
2726             // it can't be evaluated out of order.
2727 
2728             if (workUnitSize > len)
2729             {
2730                 workUnitSize = len;
2731             }
2732 
2733             immutable size_t nWorkUnits = (len / workUnitSize) + ((len % workUnitSize == 0) ? 0 : 1);
2734             assert(nWorkUnits * workUnitSize >= len);
2735 
2736             alias RTask = Task!(run, typeof(&reduceOnRange), R, size_t, size_t);
2737             RTask[] tasks;
2738 
2739             // Can't use alloca() due to https://issues.dlang.org/show_bug.cgi?id=3753
2740             // Use a fixed buffer backed by malloc().
2741             enum maxStack = 2_048;
2742             byte[maxStack] buf = void;
2743             immutable size_t nBytesNeeded = nWorkUnits * RTask.sizeof;
2744 
2745             import core.stdc.stdlib : malloc, free;
2746             if (nBytesNeeded <= maxStack)
2747             {
2748                 tasks = (cast(RTask*) buf.ptr)[0 .. nWorkUnits];
2749             }
2750             else
2751             {
2752                 auto ptr = cast(RTask*) malloc(nBytesNeeded);
2753                 if (!ptr)
2754                 {
2755                     throw new OutOfMemoryError(
2756                         "Out of memory in std.parallelism."
2757                     );
2758                 }
2759 
2760                 tasks = ptr[0 .. nWorkUnits];
2761             }
2762 
2763             scope(exit)
2764             {
2765                 if (nBytesNeeded > maxStack)
2766                 {
2767                     free(tasks.ptr);
2768                 }
2769             }
2770 
2771             // Hack to take the address of a nested function w/o
2772             // making a closure.
2773             static auto scopedAddress(D)(scope D del) @system
2774             {
2775                 auto tmp = del;
2776                 return tmp;
2777             }
2778 
2779             size_t curPos = 0;
2780             void useTask(ref RTask task)
2781             {
2782                 import std.algorithm.comparison : min;
2783                 import core.lifetime : emplace;
2784 
2785                 // Private constructor, so can't feed it's arguments directly
2786                 // to emplace
2787                 emplace(&task, RTask
2788                 (
2789                     scopedAddress(&reduceOnRange),
2790                     range,
2791                     curPos, // lower bound.
2792                     cast() min(len, curPos + workUnitSize)  // upper bound.
2793                 ));
2794 
2795                 task.pool = this;
2796 
2797                 curPos += workUnitSize;
2798             }
2799 
2800             foreach (ref task; tasks)
2801             {
2802                 useTask(task);
2803             }
2804 
2805             foreach (i; 1 .. tasks.length - 1)
2806             {
2807                 tasks[i].next = tasks[i + 1].basePtr;
2808                 tasks[i + 1].prev = tasks[i].basePtr;
2809             }
2810 
2811             if (tasks.length > 1)
2812             {
2813                 queueLock();
2814                 scope(exit) queueUnlock();
2815 
2816                 abstractPutGroupNoSync(
2817                     tasks[1].basePtr,
2818                     tasks[$ - 1].basePtr
2819                 );
2820             }
2821 
2822             if (tasks.length > 0)
2823             {
2824                 try
2825                 {
2826                     tasks[0].job();
2827                 }
2828                 catch (Throwable e)
2829                 {
2830                     tasks[0].exception = e;
2831                 }
2832                 tasks[0].taskStatus = TaskStatus.done;
2833 
2834                 // Try to execute each of these in the current thread
2835                 foreach (ref task; tasks[1..$])
2836                 {
2837                     tryDeleteExecute(task.basePtr);
2838                 }
2839             }
2840 
2841             // Now that we've tried to execute every task, they're all either
2842             // done or in progress.  Force all of them.
2843             E result = seed;
2844 
2845             Throwable firstException;
2846 
2847             foreach (ref task; tasks)
2848             {
2849                 try
2850                 {
2851                     task.yieldForce;
2852                 }
2853                 catch (Throwable e)
2854                 {
2855                     /* Chain e to front because order doesn't matter and because
2856                      * e is not likely to be a chain itself (so fewer traversals)
2857                      */
2858                     firstException = Throwable.chainTogether(e, firstException);
2859                     continue;
2860                 }
2861 
2862                 if (!firstException) result = finishFun(result, task.returnVal);
2863             }
2864 
2865             if (firstException) throw firstException;
2866 
2867             return result;
2868         }
2869     }
2870 
2871     ///
2872     template fold(functions...)
2873     {
2874         /** Implements the homonym function (also known as `accumulate`, `compress`,
2875             `inject`, or `foldl`) present in various programming languages of
2876             functional flavor.
2877 
2878             `fold` is functionally equivalent to $(LREF reduce) except the range
2879             parameter comes first and there is no need to use $(REF_ALTTEXT
2880             `tuple`,tuple,std,typecons) for multiple seeds.
2881 
2882             There may be one or more callable entities (`functions` argument) to
2883             apply.
2884 
2885             Params:
2886                 args = Just the range to _fold over; or the range and one seed
2887                        per function; or the range, one seed per function, and
2888                        the work unit size
2889 
2890             Returns:
2891                 The accumulated result as a single value for single function and
2892                 as a tuple of values for multiple functions
2893 
2894             See_Also:
2895             Similar to $(REF _fold, std,algorithm,iteration), `fold` is a wrapper around $(LREF reduce).
2896 
2897             Example:
2898             ---
2899             static int adder(int a, int b)
2900             {
2901                 return a + b;
2902             }
2903             static int multiplier(int a, int b)
2904             {
2905                 return a * b;
2906             }
2907 
2908             // Just the range
2909             auto x = taskPool.fold!adder([1, 2, 3, 4]);
2910             assert(x == 10);
2911 
2912             // The range and the seeds (0 and 1 below; also note multiple
2913             // functions in this example)
2914             auto y = taskPool.fold!(adder, multiplier)([1, 2, 3, 4], 0, 1);
2915             assert(y[0] == 10);
2916             assert(y[1] == 24);
2917 
2918             // The range, the seed (0), and the work unit size (20)
2919             auto z = taskPool.fold!adder([1, 2, 3, 4], 0, 20);
2920             assert(z == 10);
2921             ---
2922         */
2923         auto fold(Args...)(Args args)
2924         {
2925             static assert(isInputRange!(Args[0]), "First argument must be an InputRange");
2926 
2927             alias range = args[0];
2928 
2929             static if (Args.length == 1)
2930             {
2931                 // Just the range
2932                 return reduce!functions(range);
2933             }
2934             else static if (Args.length == 1 + functions.length ||
2935                             Args.length == 1 + functions.length + 1)
2936             {
2937                 static if (functions.length == 1)
2938                 {
2939                     alias seeds = args[1];
2940                 }
2941                 else
2942                 {
2943                     auto seeds()
2944                     {
2945                         import std.typecons : tuple;
2946                         return tuple(args[1 .. functions.length+1]);
2947                     }
2948                 }
2949 
2950                 static if (Args.length == 1 + functions.length)
2951                 {
2952                     // The range and the seeds
2953                     return reduce!functions(seeds, range);
2954                 }
2955                 else static if (Args.length == 1 + functions.length + 1)
2956                 {
2957                     // The range, the seeds, and the work unit size
2958                     static assert(isIntegral!(Args[$-1]), "Work unit size must be an integral type");
2959                     return reduce!functions(seeds, range, args[$-1]);
2960                 }
2961             }
2962             else
2963             {
2964                 import std.conv : text;
2965                 static assert(0, "Invalid number of arguments (" ~ Args.length.text ~ "): Should be an input range, "
2966                               ~ functions.length.text ~ " optional seed(s), and an optional work unit size.");
2967             }
2968         }
2969     }
2970 
2971     // This test is not included in the documentation because even though these
2972     // examples are for the inner fold() template, with their current location,
2973     // they would appear under the outer one. (We can't move this inside the
2974     // outer fold() template because then dmd runs out of memory possibly due to
2975     // recursive template instantiation, which is surprisingly not caught.)
2976     @system unittest
2977     {
2978         // Just the range
2979         auto x = taskPool.fold!"a + b"([1, 2, 3, 4]);
2980         assert(x == 10);
2981 
2982         // The range and the seeds (0 and 1 below; also note multiple
2983         // functions in this example)
2984         auto y = taskPool.fold!("a + b", "a * b")([1, 2, 3, 4], 0, 1);
2985         assert(y[0] == 10);
2986         assert(y[1] == 24);
2987 
2988         // The range, the seed (0), and the work unit size (20)
2989         auto z = taskPool.fold!"a + b"([1, 2, 3, 4], 0, 20);
2990         assert(z == 10);
2991     }
2992 
2993     /**
2994     Gets the index of the current thread relative to this `TaskPool`.  Any
2995     thread not in this pool will receive an index of 0.  The worker threads in
2996     this pool receive unique indices of 1 through `this.size`.
2997 
2998     This function is useful for maintaining worker-local resources.
2999 
3000     Example:
3001     ---
3002     // Execute a loop that computes the greatest common
3003     // divisor of every number from 0 through 999 with
3004     // 42 in parallel.  Write the results out to
3005     // a set of files, one for each thread.  This allows
3006     // results to be written out without any synchronization.
3007 
3008     import std.conv, std.range, std.numeric, std.stdio;
3009 
3010     void main()
3011     {
3012         auto filesHandles = new File[taskPool.size + 1];
3013         scope(exit) {
3014             foreach (ref handle; fileHandles)
3015             {
3016                 handle.close();
3017             }
3018         }
3019 
3020         foreach (i, ref handle; fileHandles)
3021         {
3022             handle = File("workerResults" ~ to!string(i) ~ ".txt");
3023         }
3024 
3025         foreach (num; parallel(iota(1_000)))
3026         {
3027             auto outHandle = fileHandles[taskPool.workerIndex];
3028             outHandle.writeln(num, '\t', gcd(num, 42));
3029         }
3030     }
3031     ---
3032     */
3033     size_t workerIndex() @property @safe const nothrow
3034     {
3035         immutable rawInd = threadIndex;
3036         return (rawInd >= instanceStartIndex && rawInd < instanceStartIndex + size) ?
3037                 (rawInd - instanceStartIndex + 1) : 0;
3038     }
3039 
3040     /**
3041     Struct for creating worker-local storage.  Worker-local storage is
3042     thread-local storage that exists only for worker threads in a given
3043     `TaskPool` plus a single thread outside the pool.  It is allocated on the
3044     garbage collected heap in a way that avoids _false sharing, and doesn't
3045     necessarily have global scope within any thread.  It can be accessed from
3046     any worker thread in the `TaskPool` that created it, and one thread
3047     outside this `TaskPool`.  All threads outside the pool that created a
3048     given instance of worker-local storage share a single slot.
3049 
3050     Since the underlying data for this struct is heap-allocated, this struct
3051     has reference semantics when passed between functions.
3052 
3053     The main uses cases for `WorkerLocalStorage` are:
3054 
3055     1.  Performing parallel reductions with an imperative, as opposed to
3056         functional, programming style.  In this case, it's useful to treat
3057         `WorkerLocalStorage` as local to each thread for only the parallel
3058         portion of an algorithm.
3059 
3060     2.  Recycling temporary buffers across iterations of a parallel foreach loop.
3061 
3062     Example:
3063     ---
3064     // Calculate pi as in our synopsis example, but
3065     // use an imperative instead of a functional style.
3066     immutable n = 1_000_000_000;
3067     immutable delta = 1.0L / n;
3068 
3069     auto sums = taskPool.workerLocalStorage(0.0L);
3070     foreach (i; parallel(iota(n)))
3071     {
3072         immutable x = ( i - 0.5L ) * delta;
3073         immutable toAdd = delta / ( 1.0 + x * x );
3074         sums.get += toAdd;
3075     }
3076 
3077     // Add up the results from each worker thread.
3078     real pi = 0;
3079     foreach (threadResult; sums.toRange)
3080     {
3081         pi += 4.0L * threadResult;
3082     }
3083     ---
3084      */
3085     static struct WorkerLocalStorage(T)
3086     {
3087     private:
3088         TaskPool pool;
3089         size_t size;
3090 
3091         size_t elemSize;
3092         bool* stillThreadLocal;
3093 
3094         static size_t roundToLine(size_t num) pure nothrow
3095         {
3096             if (num % cacheLineSize == 0)
3097             {
3098                 return num;
3099             }
3100             else
3101             {
3102                 return ((num / cacheLineSize) + 1) * cacheLineSize;
3103             }
3104         }
3105 
3106         void* data;
3107 
3108         void initialize(TaskPool pool)
3109         {
3110             this.pool = pool;
3111             size = pool.size + 1;
3112             stillThreadLocal = new bool;
3113             *stillThreadLocal = true;
3114 
3115             // Determines whether the GC should scan the array.
3116             auto blkInfo = (typeid(T).flags & 1) ?
3117                            cast(GC.BlkAttr) 0 :
3118                            GC.BlkAttr.NO_SCAN;
3119 
3120             immutable nElem = pool.size + 1;
3121             elemSize = roundToLine(T.sizeof);
3122 
3123             // The + 3 is to pad one full cache line worth of space on either side
3124             // of the data structure to make sure false sharing with completely
3125             // unrelated heap data is prevented, and to provide enough padding to
3126             // make sure that data is cache line-aligned.
3127             data = GC.malloc(elemSize * (nElem + 3), blkInfo) + elemSize;
3128 
3129             // Cache line align data ptr.
3130             data = cast(void*) roundToLine(cast(size_t) data);
3131 
3132             foreach (i; 0 .. nElem)
3133             {
3134                 this.opIndex(i) = T.init;
3135             }
3136         }
3137 
3138         ref opIndex(this Qualified)(size_t index)
3139         {
3140             import std.conv : text;
3141             assert(index < size, text(index, '\t', uint.max));
3142             return *(cast(CopyTypeQualifiers!(Qualified, T)*) (data + elemSize * index));
3143         }
3144 
3145         void opIndexAssign(T val, size_t index)
3146         {
3147             assert(index < size);
3148             *(cast(T*) (data + elemSize * index)) = val;
3149         }
3150 
3151     public:
3152         /**
3153         Get the current thread's instance.  Returns by ref.
3154         Note that calling `get` from any thread
3155         outside the `TaskPool` that created this instance will return the
3156         same reference, so an instance of worker-local storage should only be
3157         accessed from one thread outside the pool that created it.  If this
3158         rule is violated, undefined behavior will result.
3159 
3160         If assertions are enabled and `toRange` has been called, then this
3161         WorkerLocalStorage instance is no longer worker-local and an assertion
3162         failure will result when calling this method.  This is not checked
3163         when assertions are disabled for performance reasons.
3164          */
3165         ref get(this Qualified)() @property
3166         {
3167             assert(*stillThreadLocal,
3168                 "Cannot call get() on this instance of WorkerLocalStorage " ~
3169                 "because it is no longer worker-local."
3170             );
3171             return opIndex(pool.workerIndex);
3172         }
3173 
3174         /**
3175         Assign a value to the current thread's instance.  This function has
3176         the same caveats as its overload.
3177         */
3178         void get(T val) @property
3179         {
3180             assert(*stillThreadLocal,
3181                 "Cannot call get() on this instance of WorkerLocalStorage " ~
3182                 "because it is no longer worker-local."
3183             );
3184 
3185             opIndexAssign(val, pool.workerIndex);
3186         }
3187 
3188         /**
3189         Returns a range view of the values for all threads, which can be used
3190         to further process the results of each thread after running the parallel
3191         part of your algorithm.  Do not use this method in the parallel portion
3192         of your algorithm.
3193 
3194         Calling this function sets a flag indicating that this struct is no
3195         longer worker-local, and attempting to use the `get` method again
3196         will result in an assertion failure if assertions are enabled.
3197          */
3198         WorkerLocalStorageRange!T toRange() @property
3199         {
3200             if (*stillThreadLocal)
3201             {
3202                 *stillThreadLocal = false;
3203 
3204                 // Make absolutely sure results are visible to all threads.
3205                 // This is probably not necessary since some other
3206                 // synchronization primitive will be used to signal that the
3207                 // parallel part of the algorithm is done, but the
3208                 // performance impact should be negligible, so it's better
3209                 // to be safe.
3210                 ubyte barrierDummy;
3211                 atomicSetUbyte(barrierDummy, 1);
3212             }
3213 
3214             return WorkerLocalStorageRange!T(this);
3215         }
3216     }
3217 
3218     /**
3219     Range primitives for worker-local storage.  The purpose of this is to
3220     access results produced by each worker thread from a single thread once you
3221     are no longer using the worker-local storage from multiple threads.
3222     Do not use this struct in the parallel portion of your algorithm.
3223 
3224     The proper way to instantiate this object is to call
3225     `WorkerLocalStorage.toRange`.  Once instantiated, this object behaves
3226     as a finite random-access range with assignable, lvalue elements and
3227     a length equal to the number of worker threads in the `TaskPool` that
3228     created it plus 1.
3229      */
3230     static struct WorkerLocalStorageRange(T)
3231     {
3232     private:
3233         WorkerLocalStorage!T workerLocalStorage;
3234 
3235         size_t _length;
3236         size_t beginOffset;
3237 
3238         this(WorkerLocalStorage!T wl)
3239         {
3240             this.workerLocalStorage = wl;
3241             _length = wl.size;
3242         }
3243 
3244     public:
3245         ref front(this Qualified)() @property
3246         {
3247             return this[0];
3248         }
3249 
3250         ref back(this Qualified)() @property
3251         {
3252             return this[_length - 1];
3253         }
3254 
3255         void popFront()
3256         {
3257             if (_length > 0)
3258             {
3259                 beginOffset++;
3260                 _length--;
3261             }
3262         }
3263 
3264         void popBack()
3265         {
3266             if (_length > 0)
3267             {
3268                 _length--;
3269             }
3270         }
3271 
3272         typeof(this) save() @property
3273         {
3274             return this;
3275         }
3276 
3277         ref opIndex(this Qualified)(size_t index)
3278         {
3279             assert(index < _length);
3280             return workerLocalStorage[index + beginOffset];
3281         }
3282 
3283         void opIndexAssign(T val, size_t index)
3284         {
3285             assert(index < _length);
3286             workerLocalStorage[index] = val;
3287         }
3288 
3289         typeof(this) opSlice(size_t lower, size_t upper)
3290         {
3291             assert(upper <= _length);
3292             auto newWl = this.workerLocalStorage;
3293             newWl.data += lower * newWl.elemSize;
3294             newWl.size = upper - lower;
3295             return typeof(this)(newWl);
3296         }
3297 
3298         bool empty() const @property
3299         {
3300             return length == 0;
3301         }
3302 
3303         size_t length() const @property
3304         {
3305             return _length;
3306         }
3307     }
3308 
3309     /**
3310     Creates an instance of worker-local storage, initialized with a given
3311     value.  The value is `lazy` so that you can, for example, easily
3312     create one instance of a class for each worker.  For usage example,
3313     see the `WorkerLocalStorage` struct.
3314      */
3315     WorkerLocalStorage!T workerLocalStorage(T)(lazy T initialVal = T.init)
3316     {
3317         WorkerLocalStorage!T ret;
3318         ret.initialize(this);
3319         foreach (i; 0 .. size + 1)
3320         {
3321             ret[i] = initialVal;
3322         }
3323 
3324         // Memory barrier to make absolutely sure that what we wrote is
3325         // visible to worker threads.
3326         ubyte barrierDummy;
3327         atomicSetUbyte(barrierDummy, 0);
3328 
3329         return ret;
3330     }
3331 
3332     /**
3333     Signals to all worker threads to terminate as soon as they are finished
3334     with their current `Task`, or immediately if they are not executing a
3335     `Task`.  `Task`s that were in queue will not be executed unless
3336     a call to `Task.workForce`, `Task.yieldForce` or `Task.spinForce`
3337     causes them to be executed.
3338 
3339     Use only if you have waited on every `Task` and therefore know the
3340     queue is empty, or if you speculatively executed some tasks and no longer
3341     need the results.
3342      */
3343     void stop() @trusted
3344     {
3345         queueLock();
3346         scope(exit) queueUnlock();
3347         atomicSetUbyte(status, PoolState.stopNow);
3348         notifyAll();
3349     }
3350 
3351     /**
3352     Signals worker threads to terminate when the queue becomes empty.
3353 
3354     If blocking argument is true, wait for all worker threads to terminate
3355     before returning.  This option might be used in applications where
3356     task results are never consumed-- e.g. when `TaskPool` is employed as a
3357     rudimentary scheduler for tasks which communicate by means other than
3358     return values.
3359 
3360     Warning:  Calling this function with $(D blocking = true) from a worker
3361               thread that is a member of the same `TaskPool` that
3362               `finish` is being called on will result in a deadlock.
3363      */
3364     void finish(bool blocking = false) @trusted
3365     {
3366         {
3367             queueLock();
3368             scope(exit) queueUnlock();
3369             atomicCasUbyte(status, PoolState.running, PoolState.finishing);
3370             notifyAll();
3371         }
3372         if (blocking)
3373         {
3374             // Use this thread as a worker until everything is finished.
3375             executeWorkLoop();
3376 
3377             foreach (t; pool)
3378             {
3379                 // Maybe there should be something here to prevent a thread
3380                 // from calling join() on itself if this function is called
3381                 // from a worker thread in the same pool, but:
3382                 //
3383                 // 1.  Using an if statement to skip join() would result in
3384                 //     finish() returning without all tasks being finished.
3385                 //
3386                 // 2.  If an exception were thrown, it would bubble up to the
3387                 //     Task from which finish() was called and likely be
3388                 //     swallowed.
3389                 t.join();
3390             }
3391         }
3392     }
3393 
3394     /// Returns the number of worker threads in the pool.
3395     @property size_t size() @safe const pure nothrow
3396     {
3397         return pool.length;
3398     }
3399 
3400     /**
3401     Put a `Task` object on the back of the task queue.  The `Task`
3402     object may be passed by pointer or reference.
3403 
3404     Example:
3405     ---
3406     import std.file;
3407 
3408     // Create a task.
3409     auto t = task!read("foo.txt");
3410 
3411     // Add it to the queue to be executed.
3412     taskPool.put(t);
3413     ---
3414 
3415     Notes:
3416 
3417     @trusted overloads of this function are called for `Task`s if
3418     $(REF hasUnsharedAliasing, std,traits) is false for the `Task`'s
3419     return type or the function the `Task` executes is `pure`.
3420     `Task` objects that meet all other requirements specified in the
3421     `@trusted` overloads of `task` and `scopedTask` may be created
3422     and executed from `@safe` code via `Task.executeInNewThread` but
3423     not via `TaskPool`.
3424 
3425     While this function takes the address of variables that may
3426     be on the stack, some overloads are marked as @trusted.
3427     `Task` includes a destructor that waits for the task to complete
3428     before destroying the stack frame it is allocated on.  Therefore,
3429     it is impossible for the stack frame to be destroyed before the task is
3430     complete and no longer referenced by a `TaskPool`.
3431     */
3432     void put(alias fun, Args...)(ref Task!(fun, Args) task)
3433     if (!isSafeReturn!(typeof(task)))
3434     {
3435         task.pool = this;
3436         abstractPut(task.basePtr);
3437     }
3438 
3439     /// Ditto
3440     void put(alias fun, Args...)(Task!(fun, Args)* task)
3441     if (!isSafeReturn!(typeof(*task)))
3442     {
3443         import std.exception : enforce;
3444         enforce(task !is null, "Cannot put a null Task on a TaskPool queue.");
3445         put(*task);
3446     }
3447 
3448     @trusted void put(alias fun, Args...)(ref Task!(fun, Args) task)
3449     if (isSafeReturn!(typeof(task)))
3450     {
3451         task.pool = this;
3452         abstractPut(task.basePtr);
3453     }
3454 
3455     @trusted void put(alias fun, Args...)(Task!(fun, Args)* task)
3456     if (isSafeReturn!(typeof(*task)))
3457     {
3458         import std.exception : enforce;
3459         enforce(task !is null, "Cannot put a null Task on a TaskPool queue.");
3460         put(*task);
3461     }
3462 
3463     /**
3464     These properties control whether the worker threads are daemon threads.
3465     A daemon thread is automatically terminated when all non-daemon threads
3466     have terminated.  A non-daemon thread will prevent a program from
3467     terminating as long as it has not terminated.
3468 
3469     If any `TaskPool` with non-daemon threads is active, either `stop`
3470     or `finish` must be called on it before the program can terminate.
3471 
3472     The worker treads in the `TaskPool` instance returned by the
3473     `taskPool` property are daemon by default.  The worker threads of
3474     manually instantiated task pools are non-daemon by default.
3475 
3476     Note:  For a size zero pool, the getter arbitrarily returns true and the
3477            setter has no effect.
3478     */
3479     bool isDaemon() @property @trusted
3480     {
3481         queueLock();
3482         scope(exit) queueUnlock();
3483         return (size == 0) ? true : pool[0].isDaemon;
3484     }
3485 
3486     /// Ditto
3487     void isDaemon(bool newVal) @property @trusted
3488     {
3489         queueLock();
3490         scope(exit) queueUnlock();
3491         foreach (thread; pool)
3492         {
3493             thread.isDaemon = newVal;
3494         }
3495     }
3496 
3497     /**
3498     These functions allow getting and setting the OS scheduling priority of
3499     the worker threads in this `TaskPool`.  They forward to
3500     `core.thread.Thread.priority`, so a given priority value here means the
3501     same thing as an identical priority value in `core.thread`.
3502 
3503     Note:  For a size zero pool, the getter arbitrarily returns
3504            `core.thread.Thread.PRIORITY_MIN` and the setter has no effect.
3505     */
3506     int priority() @property @trusted
3507     {
3508         return (size == 0) ? core.thread.Thread.PRIORITY_MIN :
3509         pool[0].priority;
3510     }
3511 
3512     /// Ditto
3513     void priority(int newPriority) @property @trusted
3514     {
3515         if (size > 0)
3516         {
3517             foreach (t; pool)
3518             {
3519                 t.priority = newPriority;
3520             }
3521         }
3522     }
3523 }
3524 
3525 @system unittest
3526 {
3527     import std.algorithm.iteration : sum;
3528     import std.range : iota;
3529     import std.typecons : tuple;
3530 
3531     enum N = 100;
3532     auto r = iota(1, N + 1);
3533     const expected = r.sum();
3534 
3535     // Just the range
3536     assert(taskPool.fold!"a + b"(r) == expected);
3537 
3538     // Range and seeds
3539     assert(taskPool.fold!"a + b"(r, 0) == expected);
3540     assert(taskPool.fold!("a + b", "a + b")(r, 0, 0) == tuple(expected, expected));
3541 
3542     // Range, seeds, and work unit size
3543     assert(taskPool.fold!"a + b"(r, 0, 42) == expected);
3544     assert(taskPool.fold!("a + b", "a + b")(r, 0, 0, 42) == tuple(expected, expected));
3545 }
3546 
3547 // Issue 16705
3548 @system unittest
3549 {
3550     struct MyIota
3551     {
3552         size_t front;
3553         void popFront()(){front++;}
3554         auto empty(){return front >= 25;}
3555         auto opIndex(size_t i){return front+i;}
3556         auto length(){return 25-front;}
3557     }
3558 
3559     auto mySum = taskPool.reduce!"a + b"(MyIota());
3560 }
3561 
3562 /**
3563 Returns a lazily initialized global instantiation of `TaskPool`.
3564 This function can safely be called concurrently from multiple non-worker
3565 threads.  The worker threads in this pool are daemon threads, meaning that it
3566 is not necessary to call `TaskPool.stop` or `TaskPool.finish` before
3567 terminating the main thread.
3568 */
3569 @property TaskPool taskPool() @trusted
3570 {
3571     import std.concurrency : initOnce;
3572     __gshared TaskPool pool;
3573     return initOnce!pool({
3574         auto p = new TaskPool(defaultPoolThreads);
3575         p.isDaemon = true;
3576         return p;
3577     }());
3578 }
3579 
3580 private shared uint _defaultPoolThreads = uint.max;
3581 
3582 /**
3583 These properties get and set the number of worker threads in the `TaskPool`
3584 instance returned by `taskPool`.  The default value is `totalCPUs` - 1.
3585 Calling the setter after the first call to `taskPool` does not changes
3586 number of worker threads in the instance returned by `taskPool`.
3587 */
3588 @property uint defaultPoolThreads() @trusted
3589 {
3590     const local = atomicLoad(_defaultPoolThreads);
3591     return local < uint.max ? local : totalCPUs - 1;
3592 }
3593 
3594 /// Ditto
3595 @property void defaultPoolThreads(uint newVal) @trusted
3596 {
3597     atomicStore(_defaultPoolThreads, newVal);
3598 }
3599 
3600 /**
3601 Convenience functions that forwards to `taskPool.parallel`.  The
3602 purpose of these is to make parallel foreach less verbose and more
3603 readable.
3604 
3605 Example:
3606 ---
3607 // Find the logarithm of every number from
3608 // 1 to 1_000_000 in parallel, using the
3609 // default TaskPool instance.
3610 auto logs = new double[1_000_000];
3611 
3612 foreach (i, ref elem; parallel(logs))
3613 {
3614     elem = log(i + 1.0);
3615 }
3616 ---
3617 
3618 */
3619 ParallelForeach!R parallel(R)(R range)
3620 {
3621     return taskPool.parallel(range);
3622 }
3623 
3624 /// Ditto
3625 ParallelForeach!R parallel(R)(R range, size_t workUnitSize)
3626 {
3627     return taskPool.parallel(range, workUnitSize);
3628 }
3629 
3630 //  `each` should be usable with parallel
3631 // https://issues.dlang.org/show_bug.cgi?id=17019
3632 @system unittest
3633 {
3634     import std.algorithm.iteration : each, sum;
3635     import std.range : iota;
3636 
3637     // check behavior with parallel
3638     auto arr = new int[10];
3639     parallel(arr).each!((ref e) => e += 1);
3640     assert(arr.sum == 10);
3641 
3642     auto arrIndex = new int[10];
3643     parallel(arrIndex).each!((i, ref e) => e += i);
3644     assert(arrIndex.sum == 10.iota.sum);
3645 }
3646 
3647 // https://issues.dlang.org/show_bug.cgi?id=22745
3648 @system unittest
3649 {
3650     auto pool = new TaskPool(0);
3651     int[] empty;
3652     foreach (i; pool.parallel(empty)) {}
3653     pool.finish();
3654 }
3655 
3656 // Thrown when a parallel foreach loop is broken from.
3657 class ParallelForeachError : Error
3658 {
3659     this()
3660     {
3661         super("Cannot break from a parallel foreach loop using break, return, "
3662               ~ "labeled break/continue or goto statements.");
3663     }
3664 }
3665 
3666 /*------Structs that implement opApply for parallel foreach.------------------*/
3667 private template randLen(R)
3668 {
3669     enum randLen = isRandomAccessRange!R && hasLength!R;
3670 }
3671 
3672 private void submitAndExecute(
3673     TaskPool pool,
3674     scope void delegate() doIt
3675 )
3676 {
3677     import core.exception : OutOfMemoryError;
3678     immutable nThreads = pool.size + 1;
3679 
3680     alias PTask = typeof(scopedTask(doIt));
3681     import core.stdc.stdlib : malloc, free;
3682     import core.stdc.string : memcpy;
3683 
3684     // The logical thing to do would be to just use alloca() here, but that
3685     // causes problems on Windows for reasons that I don't understand
3686     // (tentatively a compiler bug) and definitely doesn't work on Posix due
3687     // to https://issues.dlang.org/show_bug.cgi?id=3753.
3688     // Therefore, allocate a fixed buffer and fall back to `malloc()` if
3689     // someone's using a ridiculous amount of threads.
3690     // Also, the using a byte array instead of a PTask array as the fixed buffer
3691     // is to prevent d'tors from being called on uninitialized excess PTask
3692     // instances.
3693     enum nBuf = 64;
3694     byte[nBuf * PTask.sizeof] buf = void;
3695     PTask[] tasks;
3696     if (nThreads <= nBuf)
3697     {
3698         tasks = (cast(PTask*) buf.ptr)[0 .. nThreads];
3699     }
3700     else
3701     {
3702         auto ptr = cast(PTask*) malloc(nThreads * PTask.sizeof);
3703         if (!ptr) throw new OutOfMemoryError("Out of memory in std.parallelism.");
3704         tasks = ptr[0 .. nThreads];
3705     }
3706 
3707     scope(exit)
3708     {
3709         if (nThreads > nBuf)
3710         {
3711             free(tasks.ptr);
3712         }
3713     }
3714 
3715     foreach (ref t; tasks)
3716     {
3717         import core.stdc.string : memcpy;
3718 
3719         // This silly looking code is necessary to prevent d'tors from being
3720         // called on uninitialized objects.
3721         auto temp = scopedTask(doIt);
3722         memcpy(&t, &temp, PTask.sizeof);
3723 
3724         // This has to be done to t after copying, not temp before copying.
3725         // Otherwise, temp's destructor will sit here and wait for the
3726         // task to finish.
3727         t.pool = pool;
3728     }
3729 
3730     foreach (i; 1 .. tasks.length - 1)
3731     {
3732         tasks[i].next = tasks[i + 1].basePtr;
3733         tasks[i + 1].prev = tasks[i].basePtr;
3734     }
3735 
3736     if (tasks.length > 1)
3737     {
3738         pool.queueLock();
3739         scope(exit) pool.queueUnlock();
3740 
3741         pool.abstractPutGroupNoSync(
3742             tasks[1].basePtr,
3743             tasks[$ - 1].basePtr
3744         );
3745     }
3746 
3747     if (tasks.length > 0)
3748     {
3749         try
3750         {
3751             tasks[0].job();
3752         }
3753         catch (Throwable e)
3754         {
3755             tasks[0].exception = e; // nocoverage
3756         }
3757         tasks[0].taskStatus = TaskStatus.done;
3758 
3759         // Try to execute each of these in the current thread
3760         foreach (ref task; tasks[1..$])
3761         {
3762             pool.tryDeleteExecute(task.basePtr);
3763         }
3764     }
3765 
3766     Throwable firstException;
3767 
3768     foreach (i, ref task; tasks)
3769     {
3770         try
3771         {
3772             task.yieldForce;
3773         }
3774         catch (Throwable e)
3775         {
3776             /* Chain e to front because order doesn't matter and because
3777              * e is not likely to be a chain itself (so fewer traversals)
3778              */
3779             firstException = Throwable.chainTogether(e, firstException);
3780             continue;
3781         }
3782     }
3783 
3784     if (firstException) throw firstException;
3785 }
3786 
3787 void foreachErr()
3788 {
3789     throw new ParallelForeachError();
3790 }
3791 
3792 int doSizeZeroCase(R, Delegate)(ref ParallelForeach!R p, Delegate dg)
3793 {
3794     with(p)
3795     {
3796         int res = 0;
3797         size_t index = 0;
3798 
3799         // The explicit ElementType!R in the foreach loops is necessary for
3800         // correct behavior when iterating over strings.
3801         static if (hasLvalueElements!R)
3802         {
3803             foreach (ref ElementType!R elem; range)
3804             {
3805                 static if (Parameters!dg.length == 2)
3806                 {
3807                     res = dg(index, elem);
3808                 }
3809                 else
3810                 {
3811                     res = dg(elem);
3812                 }
3813                 if (res) break;
3814                 index++;
3815             }
3816         }
3817         else
3818         {
3819             foreach (ElementType!R elem; range)
3820             {
3821                 static if (Parameters!dg.length == 2)
3822                 {
3823                     res = dg(index, elem);
3824                 }
3825                 else
3826                 {
3827                     res = dg(elem);
3828                 }
3829                 if (res) break;
3830                 index++;
3831             }
3832         }
3833         if (res) foreachErr;
3834         return res;
3835     }
3836 }
3837 
3838 private enum string parallelApplyMixinRandomAccess = q{
3839     // Handle empty thread pool as special case.
3840     if (pool.size == 0)
3841     {
3842         return doSizeZeroCase(this, dg);
3843     }
3844 
3845     // Whether iteration is with or without an index variable.
3846     enum withIndex = Parameters!(typeof(dg)).length == 2;
3847 
3848     shared size_t workUnitIndex = size_t.max;  // Effectively -1:  chunkIndex + 1 == 0
3849     immutable len = range.length;
3850     if (!len) return 0;
3851 
3852     shared bool shouldContinue = true;
3853 
3854     void doIt()
3855     {
3856         import std.algorithm.comparison : min;
3857 
3858         scope(failure)
3859         {
3860             // If an exception is thrown, all threads should bail.
3861             atomicStore(shouldContinue, false);
3862         }
3863 
3864         while (atomicLoad(shouldContinue))
3865         {
3866             immutable myUnitIndex = atomicOp!"+="(workUnitIndex, 1);
3867             immutable start = workUnitSize * myUnitIndex;
3868             if (start >= len)
3869             {
3870                 atomicStore(shouldContinue, false);
3871                 break;
3872             }
3873 
3874             immutable end = min(len, start + workUnitSize);
3875 
3876             foreach (i; start .. end)
3877             {
3878                 static if (withIndex)
3879                 {
3880                     if (dg(i, range[i])) foreachErr();
3881                 }
3882                 else
3883                 {
3884                     if (dg(range[i])) foreachErr();
3885                 }
3886             }
3887         }
3888     }
3889 
3890     submitAndExecute(pool, &doIt);
3891 
3892     return 0;
3893 };
3894 
3895 enum string parallelApplyMixinInputRange = q{
3896     // Handle empty thread pool as special case.
3897     if (pool.size == 0)
3898     {
3899         return doSizeZeroCase(this, dg);
3900     }
3901 
3902     // Whether iteration is with or without an index variable.
3903     enum withIndex = Parameters!(typeof(dg)).length == 2;
3904 
3905     // This protects the range while copying it.
3906     auto rangeMutex = new Mutex();
3907 
3908     shared bool shouldContinue = true;
3909 
3910     // The total number of elements that have been popped off range.
3911     // This is updated only while protected by rangeMutex;
3912     size_t nPopped = 0;
3913 
3914     static if (
3915         is(typeof(range.buf1)) &&
3916         is(typeof(range.bufPos)) &&
3917         is(typeof(range.doBufSwap()))
3918     )
3919     {
3920         // Make sure we don't have the buffer recycling overload of
3921         // asyncBuf.
3922         static if (
3923             is(typeof(range.source)) &&
3924             isRoundRobin!(typeof(range.source))
3925         )
3926         {
3927             static assert(0, "Cannot execute a parallel foreach loop on " ~
3928             "the buffer recycling overload of asyncBuf.");
3929         }
3930 
3931         enum bool bufferTrick = true;
3932     }
3933     else
3934     {
3935         enum bool bufferTrick = false;
3936     }
3937 
3938     void doIt()
3939     {
3940         scope(failure)
3941         {
3942             // If an exception is thrown, all threads should bail.
3943             atomicStore(shouldContinue, false);
3944         }
3945 
3946         static if (hasLvalueElements!R)
3947         {
3948             alias Temp = ElementType!R*[];
3949             Temp temp;
3950 
3951             // Returns:  The previous value of nPopped.
3952             size_t makeTemp()
3953             {
3954                 import std.algorithm.internal : addressOf;
3955                 import std.array : uninitializedArray;
3956 
3957                 if (temp is null)
3958                 {
3959                     temp = uninitializedArray!Temp(workUnitSize);
3960                 }
3961 
3962                 rangeMutex.lock();
3963                 scope(exit) rangeMutex.unlock();
3964 
3965                 size_t i = 0;
3966                 for (; i < workUnitSize && !range.empty; range.popFront(), i++)
3967                 {
3968                     temp[i] = addressOf(range.front);
3969                 }
3970 
3971                 temp = temp[0 .. i];
3972                 auto ret = nPopped;
3973                 nPopped += temp.length;
3974                 return ret;
3975             }
3976 
3977         }
3978         else
3979         {
3980 
3981             alias Temp = ElementType!R[];
3982             Temp temp;
3983 
3984             // Returns:  The previous value of nPopped.
3985             static if (!bufferTrick) size_t makeTemp()
3986             {
3987                 import std.array : uninitializedArray;
3988 
3989                 if (temp is null)
3990                 {
3991                     temp = uninitializedArray!Temp(workUnitSize);
3992                 }
3993 
3994                 rangeMutex.lock();
3995                 scope(exit) rangeMutex.unlock();
3996 
3997                 size_t i = 0;
3998                 for (; i < workUnitSize && !range.empty; range.popFront(), i++)
3999                 {
4000                     temp[i] = range.front;
4001                 }
4002 
4003                 temp = temp[0 .. i];
4004                 auto ret = nPopped;
4005                 nPopped += temp.length;
4006                 return ret;
4007             }
4008 
4009             static if (bufferTrick) size_t makeTemp()
4010             {
4011                 import std.algorithm.mutation : swap;
4012                 rangeMutex.lock();
4013                 scope(exit) rangeMutex.unlock();
4014 
4015                 // Elide copying by just swapping buffers.
4016                 temp.length = range.buf1.length;
4017                 swap(range.buf1, temp);
4018 
4019                 // This is necessary in case popFront() has been called on
4020                 // range before entering the parallel foreach loop.
4021                 temp = temp[range.bufPos..$];
4022 
4023                 static if (is(typeof(range._length)))
4024                 {
4025                     range._length -= (temp.length - range.bufPos);
4026                 }
4027 
4028                 range.doBufSwap();
4029                 auto ret = nPopped;
4030                 nPopped += temp.length;
4031                 return ret;
4032             }
4033         }
4034 
4035         while (atomicLoad(shouldContinue))
4036         {
4037             auto overallIndex = makeTemp();
4038             if (temp.empty)
4039             {
4040                 atomicStore(shouldContinue, false);
4041                 break;
4042             }
4043 
4044             foreach (i; 0 .. temp.length)
4045             {
4046                 scope(success) overallIndex++;
4047 
4048                 static if (hasLvalueElements!R)
4049                 {
4050                     static if (withIndex)
4051                     {
4052                         if (dg(overallIndex, *temp[i])) foreachErr();
4053                     }
4054                     else
4055                     {
4056                         if (dg(*temp[i])) foreachErr();
4057                     }
4058                 }
4059                 else
4060                 {
4061                     static if (withIndex)
4062                     {
4063                         if (dg(overallIndex, temp[i])) foreachErr();
4064                     }
4065                     else
4066                     {
4067                         if (dg(temp[i])) foreachErr();
4068                     }
4069                 }
4070             }
4071         }
4072     }
4073 
4074     submitAndExecute(pool, &doIt);
4075 
4076     return 0;
4077 };
4078 
4079 
4080 private struct ParallelForeach(R)
4081 {
4082     TaskPool pool;
4083     R range;
4084     size_t workUnitSize;
4085     alias E = ElementType!R;
4086 
4087     static if (hasLvalueElements!R)
4088     {
4089         alias NoIndexDg = int delegate(ref E);
4090         alias IndexDg = int delegate(size_t, ref E);
4091     }
4092     else
4093     {
4094         alias NoIndexDg = int delegate(E);
4095         alias IndexDg = int delegate(size_t, E);
4096     }
4097 
4098     int opApply(scope NoIndexDg dg)
4099     {
4100         static if (randLen!R)
4101         {
4102             mixin(parallelApplyMixinRandomAccess);
4103         }
4104         else
4105         {
4106             mixin(parallelApplyMixinInputRange);
4107         }
4108     }
4109 
4110     int opApply(scope IndexDg dg)
4111     {
4112         static if (randLen!R)
4113         {
4114             mixin(parallelApplyMixinRandomAccess);
4115         }
4116         else
4117         {
4118             mixin(parallelApplyMixinInputRange);
4119         }
4120     }
4121 }
4122 
4123 /*
4124 This struct buffers the output of a callable that outputs data into a
4125 user-supplied buffer into a set of buffers of some fixed size.  It allows these
4126 buffers to be accessed with an input range interface.  This is used internally
4127 in the buffer-recycling overload of TaskPool.asyncBuf, which creates an
4128 instance and forwards it to the input range overload of asyncBuf.
4129 */
4130 private struct RoundRobinBuffer(C1, C2)
4131 {
4132     // No need for constraints because they're already checked for in asyncBuf.
4133 
4134     alias Array = Parameters!(C1.init)[0];
4135     alias T = typeof(Array.init[0]);
4136 
4137     T[][] bufs;
4138     size_t index;
4139     C1 nextDel;
4140     C2 emptyDel;
4141     bool _empty;
4142     bool primed;
4143 
4144     this(
4145         C1 nextDel,
4146         C2 emptyDel,
4147         size_t initialBufSize,
4148         size_t nBuffers
4149     ) {
4150         this.nextDel = nextDel;
4151         this.emptyDel = emptyDel;
4152         bufs.length = nBuffers;
4153 
4154         foreach (ref buf; bufs)
4155         {
4156             buf.length = initialBufSize;
4157         }
4158     }
4159 
4160     void prime()
4161     in
4162     {
4163         assert(!empty);
4164     }
4165     do
4166     {
4167         scope(success) primed = true;
4168         nextDel(bufs[index]);
4169     }
4170 
4171 
4172     T[] front() @property
4173     in
4174     {
4175         assert(!empty);
4176     }
4177     do
4178     {
4179         if (!primed) prime();
4180         return bufs[index];
4181     }
4182 
4183     void popFront()
4184     {
4185         if (empty || emptyDel())
4186         {
4187             _empty = true;
4188             return;
4189         }
4190 
4191         index = (index + 1) % bufs.length;
4192         primed = false;
4193     }
4194 
4195     bool empty() @property const @safe pure nothrow
4196     {
4197         return _empty;
4198     }
4199 }
4200 
4201 version (StdUnittest)
4202 {
4203     // This was the only way I could get nested maps to work.
4204     private __gshared TaskPool poolInstance;
4205 }
4206 
4207 // These test basic functionality but don't stress test for threading bugs.
4208 // These are the tests that should be run every time Phobos is compiled.
4209 @system unittest
4210 {
4211     import std.algorithm.comparison : equal, min, max;
4212     import std.algorithm.iteration : filter, map, reduce;
4213     import std.array : split;
4214     import std.conv : text;
4215     import std.exception : assertThrown;
4216     import std.math.operations : isClose;
4217     import std.math.algebraic : sqrt, abs;
4218     import std.math.exponential : log;
4219     import std.range : indexed, iota, join;
4220     import std.typecons : Tuple, tuple;
4221     import std.stdio;
4222 
4223     poolInstance = new TaskPool(2);
4224     scope(exit) poolInstance.stop();
4225 
4226     // The only way this can be verified is manually.
4227     debug(std_parallelism) stderr.writeln("totalCPUs = ", totalCPUs);
4228 
4229     auto oldPriority = poolInstance.priority;
4230     poolInstance.priority = Thread.PRIORITY_MAX;
4231     assert(poolInstance.priority == Thread.PRIORITY_MAX);
4232 
4233     poolInstance.priority = Thread.PRIORITY_MIN;
4234     assert(poolInstance.priority == Thread.PRIORITY_MIN);
4235 
4236     poolInstance.priority = oldPriority;
4237     assert(poolInstance.priority == oldPriority);
4238 
4239     static void refFun(ref uint num)
4240     {
4241         num++;
4242     }
4243 
4244     uint x;
4245 
4246     // Test task().
4247     auto t = task!refFun(x);
4248     poolInstance.put(t);
4249     t.yieldForce;
4250     assert(t.args[0] == 1);
4251 
4252     auto t2 = task(&refFun, x);
4253     poolInstance.put(t2);
4254     t2.yieldForce;
4255     assert(t2.args[0] == 1);
4256 
4257     // Test scopedTask().
4258     auto st = scopedTask!refFun(x);
4259     poolInstance.put(st);
4260     st.yieldForce;
4261     assert(st.args[0] == 1);
4262 
4263     auto st2 = scopedTask(&refFun, x);
4264     poolInstance.put(st2);
4265     st2.yieldForce;
4266     assert(st2.args[0] == 1);
4267 
4268     // Test executeInNewThread().
4269     auto ct = scopedTask!refFun(x);
4270     ct.executeInNewThread(Thread.PRIORITY_MAX);
4271     ct.yieldForce;
4272     assert(ct.args[0] == 1);
4273 
4274     // Test ref return.
4275     uint toInc = 0;
4276     static ref T makeRef(T)(ref T num)
4277     {
4278         return num;
4279     }
4280 
4281     auto t3 = task!makeRef(toInc);
4282     taskPool.put(t3);
4283     assert(t3.args[0] == 0);
4284     t3.spinForce++;
4285     assert(t3.args[0] == 1);
4286 
4287     static void testSafe() @safe {
4288         static int bump(int num)
4289         {
4290             return num + 1;
4291         }
4292 
4293         auto safePool = new TaskPool(0);
4294         auto t = task(&bump, 1);
4295         taskPool.put(t);
4296         assert(t.yieldForce == 2);
4297 
4298         auto st = scopedTask(&bump, 1);
4299         taskPool.put(st);
4300         assert(st.yieldForce == 2);
4301         safePool.stop();
4302     }
4303 
4304     auto arr = [1,2,3,4,5];
4305     auto nums = new uint[5];
4306     auto nums2 = new uint[5];
4307 
4308     foreach (i, ref elem; poolInstance.parallel(arr))
4309     {
4310         elem++;
4311         nums[i] = cast(uint) i + 2;
4312         nums2[i] = elem;
4313     }
4314 
4315     assert(nums == [2,3,4,5,6], text(nums));
4316     assert(nums2 == nums, text(nums2));
4317     assert(arr == nums, text(arr));
4318 
4319     // Test const/immutable arguments.
4320     static int add(int lhs, int rhs)
4321     {
4322         return lhs + rhs;
4323     }
4324     immutable addLhs = 1;
4325     immutable addRhs = 2;
4326     auto addTask = task(&add, addLhs, addRhs);
4327     auto addScopedTask = scopedTask(&add, addLhs, addRhs);
4328     poolInstance.put(addTask);
4329     poolInstance.put(addScopedTask);
4330     assert(addTask.yieldForce == 3);
4331     assert(addScopedTask.yieldForce == 3);
4332 
4333     // Test parallel foreach with non-random access range.
4334     auto range = filter!"a != 666"([0, 1, 2, 3, 4]);
4335 
4336     foreach (i, elem; poolInstance.parallel(range))
4337     {
4338         nums[i] = cast(uint) i;
4339     }
4340 
4341     assert(nums == [0,1,2,3,4]);
4342 
4343     auto logs = new double[1_000_000];
4344     foreach (i, ref elem; poolInstance.parallel(logs))
4345     {
4346         elem = log(i + 1.0);
4347     }
4348 
4349     foreach (i, elem; logs)
4350     {
4351         assert(isClose(elem, log(double(i + 1))));
4352     }
4353 
4354     assert(poolInstance.amap!"a * a"([1,2,3,4,5]) == [1,4,9,16,25]);
4355     assert(poolInstance.amap!"a * a"([1,2,3,4,5], new long[5]) == [1,4,9,16,25]);
4356     assert(poolInstance.amap!("a * a", "-a")([1,2,3]) ==
4357            [tuple(1, -1), tuple(4, -2), tuple(9, -3)]);
4358 
4359     auto tupleBuf = new Tuple!(int, int)[3];
4360     poolInstance.amap!("a * a", "-a")([1,2,3], tupleBuf);
4361     assert(tupleBuf == [tuple(1, -1), tuple(4, -2), tuple(9, -3)]);
4362     poolInstance.amap!("a * a", "-a")([1,2,3], 5, tupleBuf);
4363     assert(tupleBuf == [tuple(1, -1), tuple(4, -2), tuple(9, -3)]);
4364 
4365     // Test amap with a non-array buffer.
4366     auto toIndex = new int[5];
4367     auto ind = indexed(toIndex, [3, 1, 4, 0, 2]);
4368     poolInstance.amap!"a * 2"([1, 2, 3, 4, 5], ind);
4369     assert(equal(ind, [2, 4, 6, 8, 10]));
4370     assert(equal(toIndex, [8, 4, 10, 2, 6]));
4371     poolInstance.amap!"a / 2"(ind, ind);
4372     assert(equal(ind, [1, 2, 3, 4, 5]));
4373     assert(equal(toIndex, [4, 2, 5, 1, 3]));
4374 
4375     auto buf = new int[5];
4376     poolInstance.amap!"a * a"([1,2,3,4,5], buf);
4377     assert(buf == [1,4,9,16,25]);
4378     poolInstance.amap!"a * a"([1,2,3,4,5], 4, buf);
4379     assert(buf == [1,4,9,16,25]);
4380 
4381     assert(poolInstance.reduce!"a + b"([1]) == 1);
4382     assert(poolInstance.reduce!"a + b"([1,2,3,4]) == 10);
4383     assert(poolInstance.reduce!"a + b"(0.0, [1,2,3,4]) == 10);
4384     assert(poolInstance.reduce!"a + b"(0.0, [1,2,3,4], 1) == 10);
4385     assert(poolInstance.reduce!(min, max)([1,2,3,4]) == tuple(1, 4));
4386     assert(poolInstance.reduce!("a + b", "a * b")(tuple(0, 1), [1,2,3,4]) ==
4387            tuple(10, 24));
4388 
4389     immutable serialAns = reduce!"a + b"(iota(1000));
4390     assert(poolInstance.reduce!"a + b"(0, iota(1000)) == serialAns);
4391     assert(poolInstance.reduce!"a + b"(iota(1000)) == serialAns);
4392 
4393     // Test worker-local storage.
4394     auto wl = poolInstance.workerLocalStorage(0);
4395     foreach (i; poolInstance.parallel(iota(1000), 1))
4396     {
4397         wl.get = wl.get + i;
4398     }
4399 
4400     auto wlRange = wl.toRange;
4401     auto parallelSum = poolInstance.reduce!"a + b"(wlRange);
4402     assert(parallelSum == 499500);
4403     assert(wlRange[0 .. 1][0] == wlRange[0]);
4404     assert(wlRange[1 .. 2][0] == wlRange[1]);
4405 
4406     // Test finish()
4407     {
4408         static void slowFun() { Thread.sleep(dur!"msecs"(1)); }
4409 
4410         auto pool1 = new TaskPool();
4411         auto tSlow = task!slowFun();
4412         pool1.put(tSlow);
4413         pool1.finish();
4414         tSlow.yieldForce;
4415         // Can't assert that pool1.status == PoolState.stopNow because status
4416         // doesn't change until after the "done" flag is set and the waiting
4417         // thread is woken up.
4418 
4419         auto pool2 = new TaskPool();
4420         auto tSlow2 = task!slowFun();
4421         pool2.put(tSlow2);
4422         pool2.finish(true); // blocking
4423         assert(tSlow2.done);
4424 
4425         // Test fix for https://issues.dlang.org/show_bug.cgi?id=8582 by making pool size zero.
4426         auto pool3 = new TaskPool(0);
4427         auto tSlow3 = task!slowFun();
4428         pool3.put(tSlow3);
4429         pool3.finish(true); // blocking
4430         assert(tSlow3.done);
4431 
4432         // This is correct because no thread will terminate unless pool2.status
4433         // and pool3.status have already been set to stopNow.
4434         assert(pool2.status == TaskPool.PoolState.stopNow);
4435         assert(pool3.status == TaskPool.PoolState.stopNow);
4436     }
4437 
4438     // Test default pool stuff.
4439     assert(taskPool.size == totalCPUs - 1);
4440 
4441     nums = new uint[1000];
4442     foreach (i; parallel(iota(1000)))
4443     {
4444         nums[i] = cast(uint) i;
4445     }
4446     assert(equal(nums, iota(1000)));
4447 
4448     assert(equal(
4449                poolInstance.map!"a * a"(iota(3_000_001), 10_000),
4450                map!"a * a"(iota(3_000_001))
4451            ));
4452 
4453     // The filter is to kill random access and test the non-random access
4454     // branch.
4455     assert(equal(
4456                poolInstance.map!"a * a"(
4457                    filter!"a == a"(iota(3_000_001)
4458                                   ), 10_000, 1000),
4459                map!"a * a"(iota(3_000_001))
4460            ));
4461 
4462     assert(
4463         reduce!"a + b"(0UL,
4464                        poolInstance.map!"a * a"(iota(300_001), 10_000)
4465                       ) ==
4466         reduce!"a + b"(0UL,
4467                        map!"a * a"(iota(300_001))
4468                       )
4469     );
4470 
4471     assert(equal(
4472                iota(1_000_002),
4473                poolInstance.asyncBuf(filter!"a == a"(iota(1_000_002)))
4474            ));
4475 
4476     {
4477         import std.conv : to;
4478         import std.file : deleteme;
4479 
4480         string temp_file = deleteme ~ "-tempDelMe.txt";
4481         auto file = File(temp_file, "wb");
4482         scope(exit)
4483         {
4484             file.close();
4485             import std.file;
4486             remove(temp_file);
4487         }
4488 
4489         auto written = [[1.0, 2, 3], [4.0, 5, 6], [7.0, 8, 9]];
4490         foreach (row; written)
4491         {
4492             file.writeln(join(to!(string[])(row), "\t"));
4493         }
4494 
4495         file = File(temp_file);
4496 
4497         void next(ref char[] buf)
4498         {
4499             file.readln(buf);
4500             import std.string : chomp;
4501             buf = chomp(buf);
4502         }
4503 
4504         double[][] read;
4505         auto asyncReader = taskPool.asyncBuf(&next, &file.eof);
4506 
4507         foreach (line; asyncReader)
4508         {
4509             if (line.length == 0) continue;
4510             auto ls = line.split("\t");
4511             read ~= to!(double[])(ls);
4512         }
4513 
4514         assert(read == written);
4515         file.close();
4516     }
4517 
4518     // Test Map/AsyncBuf chaining.
4519 
4520     auto abuf = poolInstance.asyncBuf(iota(-1.0, 3_000_000), 100);
4521     auto temp = poolInstance.map!sqrt(
4522                     abuf, 100, 5
4523                 );
4524     auto lmchain = poolInstance.map!"a * a"(temp, 100, 5);
4525     lmchain.popFront();
4526 
4527     int ii;
4528     foreach ( elem; (lmchain))
4529     {
4530         if (!isClose(elem, ii))
4531         {
4532             stderr.writeln(ii, '\t', elem);
4533         }
4534         ii++;
4535     }
4536 
4537     // Test buffer trick in parallel foreach.
4538     abuf = poolInstance.asyncBuf(iota(-1.0, 1_000_000), 100);
4539     abuf.popFront();
4540     auto bufTrickTest = new size_t[abuf.length];
4541     foreach (i, elem; parallel(abuf))
4542     {
4543         bufTrickTest[i] = i;
4544     }
4545 
4546     assert(equal(iota(1_000_000), bufTrickTest));
4547 
4548     auto myTask = task!(abs)(-1);
4549     taskPool.put(myTask);
4550     assert(myTask.spinForce == 1);
4551 
4552     // Test that worker local storage from one pool receives an index of 0
4553     // when the index is queried w.r.t. another pool.  The only way to do this
4554     // is non-deterministically.
4555     foreach (i; parallel(iota(1000), 1))
4556     {
4557         assert(poolInstance.workerIndex == 0);
4558     }
4559 
4560     foreach (i; poolInstance.parallel(iota(1000), 1))
4561     {
4562         assert(taskPool.workerIndex == 0);
4563     }
4564 
4565     // Test exception handling.
4566     static void parallelForeachThrow()
4567     {
4568         foreach (elem; parallel(iota(10)))
4569         {
4570             throw new Exception("");
4571         }
4572     }
4573 
4574     assertThrown!Exception(parallelForeachThrow());
4575 
4576     static int reduceException(int a, int b)
4577     {
4578         throw new Exception("");
4579     }
4580 
4581     assertThrown!Exception(
4582         poolInstance.reduce!reduceException(iota(3))
4583     );
4584 
4585     static int mapException(int a)
4586     {
4587         throw new Exception("");
4588     }
4589 
4590     assertThrown!Exception(
4591         poolInstance.amap!mapException(iota(3))
4592     );
4593 
4594     static void mapThrow()
4595     {
4596         auto m = poolInstance.map!mapException(iota(3));
4597         m.popFront();
4598     }
4599 
4600     assertThrown!Exception(mapThrow());
4601 
4602     struct ThrowingRange
4603     {
4604         @property int front()
4605         {
4606             return 1;
4607         }
4608         void popFront()
4609         {
4610             throw new Exception("");
4611         }
4612         enum bool empty = false;
4613     }
4614 
4615     assertThrown!Exception(poolInstance.asyncBuf(ThrowingRange.init));
4616 }
4617 
4618 //version = parallelismStressTest;
4619 
4620 // These are more like stress tests than real unit tests.  They print out
4621 // tons of stuff and should not be run every time make unittest is run.
4622 version (parallelismStressTest)
4623 {
4624     @system unittest
4625     {
4626         import std.stdio : stderr, writeln, readln;
4627         import std.range : iota;
4628         import std.algorithm.iteration : filter, reduce;
4629 
4630         size_t attempt;
4631         for (; attempt < 10; attempt++)
4632             foreach (poolSize; [0, 4])
4633         {
4634 
4635             poolInstance = new TaskPool(poolSize);
4636 
4637             uint[] numbers = new uint[1_000];
4638 
4639             foreach (i; poolInstance.parallel( iota(0, numbers.length)) )
4640             {
4641                 numbers[i] = cast(uint) i;
4642             }
4643 
4644             // Make sure it works.
4645             foreach (i; 0 .. numbers.length)
4646             {
4647                 assert(numbers[i] == i);
4648             }
4649 
4650             stderr.writeln("Done creating nums.");
4651 
4652 
4653             auto myNumbers = filter!"a % 7 > 0"( iota(0, 1000));
4654             foreach (num; poolInstance.parallel(myNumbers))
4655             {
4656                 assert(num % 7 > 0 && num < 1000);
4657             }
4658             stderr.writeln("Done modulus test.");
4659 
4660             uint[] squares = poolInstance.amap!"a * a"(numbers, 100);
4661             assert(squares.length == numbers.length);
4662             foreach (i, number; numbers)
4663             {
4664                 assert(squares[i] == number * number);
4665             }
4666             stderr.writeln("Done squares.");
4667 
4668             auto sumFuture = task!( reduce!"a + b" )(numbers);
4669             poolInstance.put(sumFuture);
4670 
4671             ulong sumSquares = 0;
4672             foreach (elem; numbers)
4673             {
4674                 sumSquares += elem * elem;
4675             }
4676 
4677             uint mySum = sumFuture.spinForce();
4678             assert(mySum == 999 * 1000 / 2);
4679 
4680             auto mySumParallel = poolInstance.reduce!"a + b"(numbers);
4681             assert(mySum == mySumParallel);
4682             stderr.writeln("Done sums.");
4683 
4684             auto myTask = task(
4685             {
4686                 synchronized writeln("Our lives are parallel...Our lives are parallel.");
4687             });
4688             poolInstance.put(myTask);
4689 
4690             auto nestedOuter = "abcd";
4691             auto nestedInner =  iota(0, 10, 2);
4692 
4693             foreach (i, letter; poolInstance.parallel(nestedOuter, 1))
4694             {
4695                 foreach (j, number; poolInstance.parallel(nestedInner, 1))
4696                 {
4697                     synchronized writeln(i, ": ", letter, "  ", j, ": ", number);
4698                 }
4699             }
4700 
4701             poolInstance.stop();
4702         }
4703 
4704         assert(attempt == 10);
4705         writeln("Press enter to go to next round of unittests.");
4706         readln();
4707     }
4708 
4709     // These unittests are intended more for actual testing and not so much
4710     // as examples.
4711     @system unittest
4712     {
4713         import std.stdio : stderr;
4714         import std.range : iota;
4715         import std.algorithm.iteration : filter, reduce;
4716         import std.math.algebraic : sqrt;
4717         import std.math.operations : isClose;
4718         import std.math.traits : isNaN;
4719         import std.conv : text;
4720 
4721         foreach (attempt; 0 .. 10)
4722         foreach (poolSize; [0, 4])
4723         {
4724             poolInstance = new TaskPool(poolSize);
4725 
4726             // Test indexing.
4727             stderr.writeln("Creator Raw Index:  ", poolInstance.threadIndex);
4728             assert(poolInstance.workerIndex() == 0);
4729 
4730             // Test worker-local storage.
4731             auto workerLocalStorage = poolInstance.workerLocalStorage!uint(1);
4732             foreach (i; poolInstance.parallel(iota(0U, 1_000_000)))
4733             {
4734                 workerLocalStorage.get++;
4735             }
4736             assert(reduce!"a + b"(workerLocalStorage.toRange) ==
4737             1_000_000 + poolInstance.size + 1);
4738 
4739             // Make sure work is reasonably balanced among threads.  This test is
4740             // non-deterministic and is more of a sanity check than something that
4741             // has an absolute pass/fail.
4742             shared(uint)[void*] nJobsByThread;
4743             foreach (thread; poolInstance.pool)
4744             {
4745                 nJobsByThread[cast(void*) thread] = 0;
4746             }
4747             nJobsByThread[ cast(void*) Thread.getThis()] = 0;
4748 
4749             foreach (i; poolInstance.parallel( iota(0, 1_000_000), 100 ))
4750             {
4751                 atomicOp!"+="( nJobsByThread[ cast(void*) Thread.getThis() ], 1);
4752             }
4753 
4754             stderr.writeln("\nCurrent thread is:  ",
4755             cast(void*) Thread.getThis());
4756             stderr.writeln("Workload distribution:  ");
4757             foreach (k, v; nJobsByThread)
4758             {
4759                 stderr.writeln(k, '\t', v);
4760             }
4761 
4762             // Test whether amap can be nested.
4763             real[][] matrix = new real[][](1000, 1000);
4764             foreach (i; poolInstance.parallel( iota(0, matrix.length) ))
4765             {
4766                 foreach (j; poolInstance.parallel( iota(0, matrix[0].length) ))
4767                 {
4768                     matrix[i][j] = i * j;
4769                 }
4770             }
4771 
4772             // Get around weird bugs having to do w/ sqrt being an intrinsic:
4773             static real mySqrt(real num)
4774             {
4775                 return sqrt(num);
4776             }
4777 
4778             static real[] parallelSqrt(real[] nums)
4779             {
4780                 return poolInstance.amap!mySqrt(nums);
4781             }
4782 
4783             real[][] sqrtMatrix = poolInstance.amap!parallelSqrt(matrix);
4784 
4785             foreach (i, row; sqrtMatrix)
4786             {
4787                 foreach (j, elem; row)
4788                 {
4789                     real shouldBe = sqrt( cast(real) i * j);
4790                     assert(isClose(shouldBe, elem));
4791                     sqrtMatrix[i][j] = shouldBe;
4792                 }
4793             }
4794 
4795             auto saySuccess = task(
4796             {
4797                 stderr.writeln(
4798                     "Success doing matrix stuff that involves nested pool use.");
4799             });
4800             poolInstance.put(saySuccess);
4801             saySuccess.workForce();
4802 
4803             // A more thorough test of amap, reduce:  Find the sum of the square roots of
4804             // matrix.
4805 
4806             static real parallelSum(real[] input)
4807             {
4808                 return poolInstance.reduce!"a + b"(input);
4809             }
4810 
4811             auto sumSqrt = poolInstance.reduce!"a + b"(
4812                                poolInstance.amap!parallelSum(
4813                                    sqrtMatrix
4814                                )
4815                            );
4816 
4817             assert(isClose(sumSqrt, 4.437e8, 1e-2));
4818             stderr.writeln("Done sum of square roots.");
4819 
4820             // Test whether tasks work with function pointers.
4821             /+ // This part is buggy and needs to be fixed...
4822             auto nanTask = task(&isNaN, 1.0L);
4823             poolInstance.put(nanTask);
4824             assert(nanTask.spinForce == false);
4825 
4826             if (poolInstance.size > 0)
4827             {
4828                 // Test work waiting.
4829                 static void uselessFun()
4830                 {
4831                     foreach (i; 0 .. 1_000_000) {}
4832                 }
4833 
4834                 auto uselessTasks = new typeof(task(&uselessFun))[1000];
4835                 foreach (ref uselessTask; uselessTasks)
4836                 {
4837                     uselessTask = task(&uselessFun);
4838                 }
4839                 foreach (ref uselessTask; uselessTasks)
4840                 {
4841                     poolInstance.put(uselessTask);
4842                 }
4843                 foreach (ref uselessTask; uselessTasks)
4844                 {
4845                     uselessTask.workForce();
4846                 }
4847             }
4848              +/
4849 
4850             // Test the case of non-random access + ref returns.
4851             int[] nums = [1,2,3,4,5];
4852             static struct RemoveRandom
4853             {
4854                 int[] arr;
4855 
4856                 ref int front()
4857                 {
4858                     return arr.front;
4859                 }
4860                 void popFront()
4861                 {
4862                     arr.popFront();
4863                 }
4864                 bool empty()
4865                 {
4866                     return arr.empty;
4867                 }
4868             }
4869 
4870             auto refRange = RemoveRandom(nums);
4871             foreach (ref elem; poolInstance.parallel(refRange))
4872             {
4873                 elem++;
4874             }
4875             assert(nums == [2,3,4,5,6], text(nums));
4876             stderr.writeln("Nums:  ", nums);
4877 
4878             poolInstance.stop();
4879         }
4880     }
4881 }
4882 
4883 @system unittest
4884 {
4885     static struct __S_12733
4886     {
4887         invariant() { assert(checksum == 1_234_567_890); }
4888         this(ulong u){n = u;}
4889         void opAssign(__S_12733 s){this.n = s.n;}
4890         ulong n;
4891         ulong checksum = 1_234_567_890;
4892     }
4893 
4894     static auto __genPair_12733(ulong n) { return __S_12733(n); }
4895     immutable ulong[] data = [ 2UL^^59-1, 2UL^^59-1, 2UL^^59-1, 112_272_537_195_293UL ];
4896 
4897     auto result = taskPool.amap!__genPair_12733(data);
4898 }
4899 
4900 @safe unittest
4901 {
4902     import std.range : iota;
4903 
4904     // this test was in std.range, but caused cycles.
4905     assert(__traits(compiles, { foreach (i; iota(0, 100UL).parallel) {} }));
4906 }
4907 
4908 @safe unittest
4909 {
4910     import std.algorithm.iteration : each;
4911 
4912     long[] arr;
4913     static assert(is(typeof({
4914         arr.parallel.each!"a++";
4915     })));
4916 }
4917 
4918 // https://issues.dlang.org/show_bug.cgi?id=17539
4919 @system unittest
4920 {
4921     import std.random : rndGen;
4922     // ensure compilation
4923     try foreach (rnd; rndGen.parallel) break;
4924     catch (ParallelForeachError e) {}
4925 }