1 /** 2 `std.parallelism` implements high-level primitives for SMP parallelism. 3 These include parallel foreach, parallel reduce, parallel eager map, pipelining 4 and future/promise parallelism. `std.parallelism` is recommended when the 5 same operation is to be executed in parallel on different data, or when a 6 function is to be executed in a background thread and its result returned to a 7 well-defined main thread. For communication between arbitrary threads, see 8 `std.concurrency`. 9 10 `std.parallelism` is based on the concept of a `Task`. A `Task` is an 11 object that represents the fundamental unit of work in this library and may be 12 executed in parallel with any other `Task`. Using `Task` 13 directly allows programming with a future/promise paradigm. All other 14 supported parallelism paradigms (parallel foreach, map, reduce, pipelining) 15 represent an additional level of abstraction over `Task`. They 16 automatically create one or more `Task` objects, or closely related types 17 that are conceptually identical but not part of the public API. 18 19 After creation, a `Task` may be executed in a new thread, or submitted 20 to a `TaskPool` for execution. A `TaskPool` encapsulates a task queue 21 and its worker threads. Its purpose is to efficiently map a large 22 number of `Task`s onto a smaller number of threads. A task queue is a 23 FIFO queue of `Task` objects that have been submitted to the 24 `TaskPool` and are awaiting execution. A worker thread is a thread that 25 is associated with exactly one task queue. It executes the `Task` at the 26 front of its queue when the queue has work available, or sleeps when 27 no work is available. Each task queue is associated with zero or 28 more worker threads. If the result of a `Task` is needed before execution 29 by a worker thread has begun, the `Task` can be removed from the task queue 30 and executed immediately in the thread where the result is needed. 31 32 Warning: Unless marked as `@trusted` or `@safe`, artifacts in 33 this module allow implicit data sharing between threads and cannot 34 guarantee that client code is free from low level data races. 35 36 Source: $(PHOBOSSRC std/parallelism.d) 37 Author: David Simcha 38 Copyright: Copyright (c) 2009-2011, David Simcha. 39 License: $(HTTP boost.org/LICENSE_1_0.txt, Boost License 1.0) 40 */ 41 module std.parallelism; 42 43 version (OSX) 44 version = Darwin; 45 else version (iOS) 46 version = Darwin; 47 else version (TVOS) 48 version = Darwin; 49 else version (WatchOS) 50 version = Darwin; 51 52 /// 53 @system unittest 54 { 55 import std.algorithm.iteration : map; 56 import std.math.operations : isClose; 57 import std.parallelism : taskPool; 58 import std.range : iota; 59 60 // Parallel reduce can be combined with 61 // std.algorithm.iteration.map to interesting effect. 62 // The following example (thanks to Russel Winder) 63 // calculates pi by quadrature using 64 // std.algorithm.map and TaskPool.reduce. 65 // getTerm is evaluated in parallel as needed by 66 // TaskPool.reduce. 67 // 68 // Timings on an Intel i5-3450 quad core machine 69 // for n = 1_000_000_000: 70 // 71 // TaskPool.reduce: 1.067 s 72 // std.algorithm.reduce: 4.011 s 73 74 enum n = 1_000_000; 75 enum delta = 1.0 / n; 76 77 alias getTerm = (int i) 78 { 79 immutable x = ( i - 0.5 ) * delta; 80 return delta / ( 1.0 + x * x ) ; 81 }; 82 83 immutable pi = 4.0 * taskPool.reduce!"a + b"(n.iota.map!getTerm); 84 85 assert(pi.isClose(3.14159, 1e-5)); 86 } 87 88 import core.atomic; 89 import core.memory; 90 import core.sync.condition; 91 import core.thread; 92 93 import std.functional; 94 import std.meta; 95 import std.range.primitives; 96 import std.traits; 97 98 /* 99 (For now public undocumented with reserved name.) 100 101 A lazily initialized global constant. The underlying value is a shared global 102 statically initialized to `outOfBandValue` which must not be a legit value of 103 the constant. Upon the first call the situation is detected and the global is 104 initialized by calling `initializer`. The initializer is assumed to be pure 105 (even if not marked as such), i.e. return the same value upon repeated calls. 106 For that reason, no special precautions are taken so `initializer` may be called 107 more than one time leading to benign races on the cached value. 108 109 In the quiescent state the cost of the function is an atomic load from a global. 110 111 Params: 112 T = The type of the pseudo-constant (may be qualified) 113 outOfBandValue = A value that cannot be valid, it is used for initialization 114 initializer = The function performing initialization; must be `nothrow` 115 116 Returns: 117 The lazily initialized value 118 */ 119 @property pure 120 T __lazilyInitializedConstant(T, alias outOfBandValue, alias initializer)() 121 if (is(Unqual!T : T) 122 && is(typeof(initializer()) : T) 123 && is(typeof(outOfBandValue) : T)) 124 { 125 static T impl() nothrow 126 { 127 // Thread-local cache 128 static Unqual!T tls = outOfBandValue; 129 auto local = tls; 130 // Shortest path, no atomic operations 131 if (local != outOfBandValue) return local; 132 // Process-level cache 133 static shared Unqual!T result = outOfBandValue; 134 // Initialize both process-level cache and tls 135 local = atomicLoad(result); 136 if (local == outOfBandValue) 137 { 138 local = initializer(); 139 atomicStore(result, local); 140 } 141 tls = local; 142 return local; 143 } 144 145 import std.traits : SetFunctionAttributes; 146 alias Fun = SetFunctionAttributes!(typeof(&impl), "D", 147 functionAttributes!(typeof(&impl)) | FunctionAttribute.pure_); 148 auto purified = (() @trusted => cast(Fun) &impl)(); 149 return purified(); 150 } 151 152 // Returns the size of a cache line. 153 alias cacheLineSize = 154 __lazilyInitializedConstant!(immutable(size_t), size_t.max, cacheLineSizeImpl); 155 156 private size_t cacheLineSizeImpl() @nogc nothrow @trusted 157 { 158 size_t result = 0; 159 import core.cpuid : datacache; 160 foreach (ref const cachelevel; datacache) 161 { 162 if (cachelevel.lineSize > result && cachelevel.lineSize < uint.max) 163 { 164 result = cachelevel.lineSize; 165 } 166 } 167 return result; 168 } 169 170 @nogc @safe nothrow unittest 171 { 172 assert(cacheLineSize == cacheLineSizeImpl); 173 } 174 175 /* Atomics code. These forward to core.atomic, but are written like this 176 for two reasons: 177 178 1. They used to actually contain ASM code and I don' want to have to change 179 to directly calling core.atomic in a zillion different places. 180 181 2. core.atomic has some misc. issues that make my use cases difficult 182 without wrapping it. If I didn't wrap it, casts would be required 183 basically everywhere. 184 */ 185 private void atomicSetUbyte(T)(ref T stuff, T newVal) 186 if (__traits(isIntegral, T) && is(T : ubyte)) 187 { 188 //core.atomic.cas(cast(shared) &stuff, stuff, newVal); 189 atomicStore(*(cast(shared) &stuff), newVal); 190 } 191 192 private ubyte atomicReadUbyte(T)(ref T val) 193 if (__traits(isIntegral, T) && is(T : ubyte)) 194 { 195 return atomicLoad(*(cast(shared) &val)); 196 } 197 198 // This gets rid of the need for a lot of annoying casts in other parts of the 199 // code, when enums are involved. 200 private bool atomicCasUbyte(T)(ref T stuff, T testVal, T newVal) 201 if (__traits(isIntegral, T) && is(T : ubyte)) 202 { 203 return core.atomic.cas(cast(shared) &stuff, testVal, newVal); 204 } 205 206 /*--------------------- Generic helper functions, etc.------------------------*/ 207 private template MapType(R, functions...) 208 { 209 static assert(functions.length); 210 211 ElementType!R e = void; 212 alias MapType = 213 typeof(adjoin!(staticMap!(unaryFun, functions))(e)); 214 } 215 216 private template ReduceType(alias fun, R, E) 217 { 218 alias ReduceType = typeof(binaryFun!fun(E.init, ElementType!R.init)); 219 } 220 221 private template noUnsharedAliasing(T) 222 { 223 enum bool noUnsharedAliasing = !hasUnsharedAliasing!T; 224 } 225 226 // This template tests whether a function may be executed in parallel from 227 // @safe code via Task.executeInNewThread(). There is an additional 228 // requirement for executing it via a TaskPool. (See isSafeReturn). 229 private template isSafeTask(F) 230 { 231 enum bool isSafeTask = 232 (functionAttributes!F & (FunctionAttribute.safe | FunctionAttribute.trusted)) != 0 && 233 (functionAttributes!F & FunctionAttribute.ref_) == 0 && 234 (isFunctionPointer!F || !hasUnsharedAliasing!F) && 235 allSatisfy!(noUnsharedAliasing, Parameters!F); 236 } 237 238 @safe unittest 239 { 240 alias F1 = void function() @safe; 241 alias F2 = void function(); 242 alias F3 = void function(uint, string) @trusted; 243 alias F4 = void function(uint, char[]); 244 245 static assert( isSafeTask!F1); 246 static assert(!isSafeTask!F2); 247 static assert( isSafeTask!F3); 248 static assert(!isSafeTask!F4); 249 250 alias F5 = uint[] function(uint, string) pure @trusted; 251 static assert( isSafeTask!F5); 252 } 253 254 // This function decides whether Tasks that meet all of the other requirements 255 // for being executed from @safe code can be executed on a TaskPool. 256 // When executing via TaskPool, it's theoretically possible 257 // to return a value that is also pointed to by a worker thread's thread local 258 // storage. When executing from executeInNewThread(), the thread that executed 259 // the Task is terminated by the time the return value is visible in the calling 260 // thread, so this is a non-issue. It's also a non-issue for pure functions 261 // since they can't read global state. 262 private template isSafeReturn(T) 263 { 264 static if (!hasUnsharedAliasing!(T.ReturnType)) 265 { 266 enum isSafeReturn = true; 267 } 268 else static if (T.isPure) 269 { 270 enum isSafeReturn = true; 271 } 272 else 273 { 274 enum isSafeReturn = false; 275 } 276 } 277 278 private template randAssignable(R) 279 { 280 enum randAssignable = isRandomAccessRange!R && hasAssignableElements!R; 281 } 282 283 private enum TaskStatus : ubyte 284 { 285 notStarted, 286 inProgress, 287 done 288 } 289 290 private template AliasReturn(alias fun, T...) 291 { 292 alias AliasReturn = typeof({ T args; return fun(args); }); 293 } 294 295 // Should be private, but std.algorithm.reduce is used in the zero-thread case 296 // and won't work w/ private. 297 template reduceAdjoin(functions...) 298 { 299 static if (functions.length == 1) 300 { 301 alias reduceAdjoin = binaryFun!(functions[0]); 302 } 303 else 304 { 305 T reduceAdjoin(T, U)(T lhs, U rhs) 306 { 307 alias funs = staticMap!(binaryFun, functions); 308 309 foreach (i, Unused; typeof(lhs.expand)) 310 { 311 lhs.expand[i] = funs[i](lhs.expand[i], rhs); 312 } 313 314 return lhs; 315 } 316 } 317 } 318 319 private template reduceFinish(functions...) 320 { 321 static if (functions.length == 1) 322 { 323 alias reduceFinish = binaryFun!(functions[0]); 324 } 325 else 326 { 327 T reduceFinish(T)(T lhs, T rhs) 328 { 329 alias funs = staticMap!(binaryFun, functions); 330 331 foreach (i, Unused; typeof(lhs.expand)) 332 { 333 lhs.expand[i] = funs[i](lhs.expand[i], rhs.expand[i]); 334 } 335 336 return lhs; 337 } 338 } 339 } 340 341 private template isRoundRobin(R : RoundRobinBuffer!(C1, C2), C1, C2) 342 { 343 enum isRoundRobin = true; 344 } 345 346 private template isRoundRobin(T) 347 { 348 enum isRoundRobin = false; 349 } 350 351 @safe unittest 352 { 353 static assert( isRoundRobin!(RoundRobinBuffer!(void delegate(char[]), bool delegate()))); 354 static assert(!isRoundRobin!(uint)); 355 } 356 357 // This is the base "class" for all of the other tasks. Using C-style 358 // polymorphism to allow more direct control over memory allocation, etc. 359 private struct AbstractTask 360 { 361 AbstractTask* prev; 362 AbstractTask* next; 363 364 // Pointer to a function that executes this task. 365 void function(void*) runTask; 366 367 Throwable exception; 368 ubyte taskStatus = TaskStatus.notStarted; 369 370 bool done() @property 371 { 372 if (atomicReadUbyte(taskStatus) == TaskStatus.done) 373 { 374 if (exception) 375 { 376 throw exception; 377 } 378 379 return true; 380 } 381 382 return false; 383 } 384 385 void job() 386 { 387 runTask(&this); 388 } 389 } 390 391 /** 392 `Task` represents the fundamental unit of work. A `Task` may be 393 executed in parallel with any other `Task`. Using this struct directly 394 allows future/promise parallelism. In this paradigm, a function (or delegate 395 or other callable) is executed in a thread other than the one it was called 396 from. The calling thread does not block while the function is being executed. 397 A call to `workForce`, `yieldForce`, or `spinForce` is used to 398 ensure that the `Task` has finished executing and to obtain the return 399 value, if any. These functions and `done` also act as full memory barriers, 400 meaning that any memory writes made in the thread that executed the `Task` 401 are guaranteed to be visible in the calling thread after one of these functions 402 returns. 403 404 The $(REF task, std,parallelism) and $(REF scopedTask, std,parallelism) functions can 405 be used to create an instance of this struct. See `task` for usage examples. 406 407 Function results are returned from `yieldForce`, `spinForce` and 408 `workForce` by ref. If `fun` returns by ref, the reference will point 409 to the returned reference of `fun`. Otherwise it will point to a 410 field in this struct. 411 412 Copying of this struct is disabled, since it would provide no useful semantics. 413 If you want to pass this struct around, you should do so by reference or 414 pointer. 415 416 Bugs: Changes to `ref` and `out` arguments are not propagated to the 417 call site, only to `args` in this struct. 418 */ 419 struct Task(alias fun, Args...) 420 { 421 private AbstractTask base = {runTask : &impl}; 422 private alias base this; 423 424 private @property AbstractTask* basePtr() 425 { 426 return &base; 427 } 428 429 private static void impl(void* myTask) 430 { 431 import std.algorithm.internal : addressOf; 432 433 Task* myCastedTask = cast(typeof(this)*) myTask; 434 static if (is(ReturnType == void)) 435 { 436 fun(myCastedTask._args); 437 } 438 else static if (is(typeof(&(fun(myCastedTask._args))))) 439 { 440 myCastedTask.returnVal = addressOf(fun(myCastedTask._args)); 441 } 442 else 443 { 444 myCastedTask.returnVal = fun(myCastedTask._args); 445 } 446 } 447 448 private TaskPool pool; 449 private bool isScoped; // True if created with scopedTask. 450 451 Args _args; 452 453 /** 454 The arguments the function was called with. Changes to `out` and 455 `ref` arguments will be visible here. 456 */ 457 static if (__traits(isSame, fun, run)) 458 { 459 alias args = _args[1..$]; 460 } 461 else 462 { 463 alias args = _args; 464 } 465 466 467 // The purpose of this code is to decide whether functions whose 468 // return values have unshared aliasing can be executed via 469 // TaskPool from @safe code. See isSafeReturn. 470 static if (__traits(isSame, fun, run)) 471 { 472 static if (isFunctionPointer!(_args[0])) 473 { 474 private enum bool isPure = 475 (functionAttributes!(Args[0]) & FunctionAttribute.pure_) != 0; 476 } 477 else 478 { 479 // BUG: Should check this for delegates too, but std.traits 480 // apparently doesn't allow this. isPure is irrelevant 481 // for delegates, at least for now since shared delegates 482 // don't work. 483 private enum bool isPure = false; 484 } 485 486 } 487 else 488 { 489 // We already know that we can't execute aliases in @safe code, so 490 // just put a dummy value here. 491 private enum bool isPure = false; 492 } 493 494 495 /** 496 The return type of the function called by this `Task`. This can be 497 `void`. 498 */ 499 alias ReturnType = typeof(fun(_args)); 500 501 static if (!is(ReturnType == void)) 502 { 503 static if (is(typeof(&fun(_args)))) 504 { 505 // Ref return. 506 ReturnType* returnVal; 507 508 ref ReturnType fixRef(ReturnType* val) 509 { 510 return *val; 511 } 512 513 } 514 else 515 { 516 ReturnType returnVal; 517 518 ref ReturnType fixRef(ref ReturnType val) 519 { 520 return val; 521 } 522 } 523 } 524 525 private void enforcePool() 526 { 527 import std.exception : enforce; 528 enforce(this.pool !is null, "Job not submitted yet."); 529 } 530 531 static if (Args.length > 0) 532 { 533 private this(Args args) 534 { 535 _args = args; 536 } 537 } 538 539 // Work around DMD bug https://issues.dlang.org/show_bug.cgi?id=6588, 540 // allow immutable elements. 541 static if (allSatisfy!(isAssignable, Args)) 542 { 543 typeof(this) opAssign(typeof(this) rhs) 544 { 545 foreach (i, Type; typeof(this.tupleof)) 546 { 547 this.tupleof[i] = rhs.tupleof[i]; 548 } 549 return this; 550 } 551 } 552 else 553 { 554 @disable typeof(this) opAssign(typeof(this) rhs); 555 } 556 557 /** 558 If the `Task` isn't started yet, execute it in the current thread. 559 If it's done, return its return value, if any. If it's in progress, 560 busy spin until it's done, then return the return value. If it threw 561 an exception, rethrow that exception. 562 563 This function should be used when you expect the result of the 564 `Task` to be available on a timescale shorter than that of an OS 565 context switch. 566 */ 567 @property ref ReturnType spinForce() @trusted 568 { 569 enforcePool(); 570 571 this.pool.tryDeleteExecute(basePtr); 572 573 while (atomicReadUbyte(this.taskStatus) != TaskStatus.done) {} 574 575 if (exception) 576 { 577 throw exception; 578 } 579 580 static if (!is(ReturnType == void)) 581 { 582 return fixRef(this.returnVal); 583 } 584 } 585 586 /** 587 If the `Task` isn't started yet, execute it in the current thread. 588 If it's done, return its return value, if any. If it's in progress, 589 wait on a condition variable. If it threw an exception, rethrow that 590 exception. 591 592 This function should be used for expensive functions, as waiting on a 593 condition variable introduces latency, but avoids wasted CPU cycles. 594 */ 595 @property ref ReturnType yieldForce() @trusted 596 { 597 enforcePool(); 598 this.pool.tryDeleteExecute(basePtr); 599 600 if (done) 601 { 602 static if (is(ReturnType == void)) 603 { 604 return; 605 } 606 else 607 { 608 return fixRef(this.returnVal); 609 } 610 } 611 612 pool.waiterLock(); 613 scope(exit) pool.waiterUnlock(); 614 615 while (atomicReadUbyte(this.taskStatus) != TaskStatus.done) 616 { 617 pool.waitUntilCompletion(); 618 } 619 620 if (exception) 621 { 622 throw exception; // nocoverage 623 } 624 625 static if (!is(ReturnType == void)) 626 { 627 return fixRef(this.returnVal); 628 } 629 } 630 631 /** 632 If this `Task` was not started yet, execute it in the current 633 thread. If it is finished, return its result. If it is in progress, 634 execute any other `Task` from the `TaskPool` instance that 635 this `Task` was submitted to until this one 636 is finished. If it threw an exception, rethrow that exception. 637 If no other tasks are available or this `Task` was executed using 638 `executeInNewThread`, wait on a condition variable. 639 */ 640 @property ref ReturnType workForce() @trusted 641 { 642 enforcePool(); 643 this.pool.tryDeleteExecute(basePtr); 644 645 while (true) 646 { 647 if (done) // done() implicitly checks for exceptions. 648 { 649 static if (is(ReturnType == void)) 650 { 651 return; 652 } 653 else 654 { 655 return fixRef(this.returnVal); 656 } 657 } 658 659 AbstractTask* job; 660 { 661 // Locking explicitly and calling popNoSync() because 662 // pop() waits on a condition variable if there are no Tasks 663 // in the queue. 664 665 pool.queueLock(); 666 scope(exit) pool.queueUnlock(); 667 job = pool.popNoSync(); 668 } 669 670 671 if (job !is null) 672 { 673 674 version (verboseUnittest) 675 { 676 stderr.writeln("Doing workForce work."); 677 } 678 679 pool.doJob(job); 680 681 if (done) 682 { 683 static if (is(ReturnType == void)) 684 { 685 return; 686 } 687 else 688 { 689 return fixRef(this.returnVal); 690 } 691 } 692 } 693 else 694 { 695 version (verboseUnittest) 696 { 697 stderr.writeln("Yield from workForce."); 698 } 699 700 return yieldForce; 701 } 702 } 703 } 704 705 /** 706 Returns `true` if the `Task` is finished executing. 707 708 Throws: Rethrows any exception thrown during the execution of the 709 `Task`. 710 */ 711 @property bool done() @trusted 712 { 713 // Explicitly forwarded for documentation purposes. 714 return base.done; 715 } 716 717 /** 718 Create a new thread for executing this `Task`, execute it in the 719 newly created thread, then terminate the thread. This can be used for 720 future/promise parallelism. An explicit priority may be given 721 to the `Task`. If one is provided, its value is forwarded to 722 `core.thread.Thread.priority`. See $(REF task, std,parallelism) for 723 usage example. 724 */ 725 void executeInNewThread() @trusted 726 { 727 pool = new TaskPool(basePtr); 728 } 729 730 /// Ditto 731 void executeInNewThread(int priority) @trusted 732 { 733 pool = new TaskPool(basePtr, priority); 734 } 735 736 @safe ~this() 737 { 738 if (isScoped && pool !is null && taskStatus != TaskStatus.done) 739 { 740 yieldForce; 741 } 742 } 743 744 // When this is uncommented, it somehow gets called on returning from 745 // scopedTask even though the struct shouldn't be getting copied. 746 //@disable this(this) {} 747 } 748 749 // Calls `fpOrDelegate` with `args`. This is an 750 // adapter that makes `Task` work with delegates, function pointers and 751 // functors instead of just aliases. 752 ReturnType!F run(F, Args...)(F fpOrDelegate, ref Args args) 753 { 754 return fpOrDelegate(args); 755 } 756 757 /** 758 Creates a `Task` on the GC heap that calls an alias. This may be executed 759 via `Task.executeInNewThread` or by submitting to a 760 $(REF TaskPool, std,parallelism). A globally accessible instance of 761 `TaskPool` is provided by $(REF taskPool, std,parallelism). 762 763 Returns: A pointer to the `Task`. 764 765 Example: 766 --- 767 // Read two files into memory at the same time. 768 import std.file; 769 770 void main() 771 { 772 // Create and execute a Task for reading 773 // foo.txt. 774 auto file1Task = task!read("foo.txt"); 775 file1Task.executeInNewThread(); 776 777 // Read bar.txt in parallel. 778 auto file2Data = read("bar.txt"); 779 780 // Get the results of reading foo.txt. 781 auto file1Data = file1Task.yieldForce; 782 } 783 --- 784 785 --- 786 // Sorts an array using a parallel quick sort algorithm. 787 // The first partition is done serially. Both recursion 788 // branches are then executed in parallel. 789 // 790 // Timings for sorting an array of 1,000,000 doubles on 791 // an Athlon 64 X2 dual core machine: 792 // 793 // This implementation: 176 milliseconds. 794 // Equivalent serial implementation: 280 milliseconds 795 void parallelSort(T)(T[] data) 796 { 797 // Sort small subarrays serially. 798 if (data.length < 100) 799 { 800 std.algorithm.sort(data); 801 return; 802 } 803 804 // Partition the array. 805 swap(data[$ / 2], data[$ - 1]); 806 auto pivot = data[$ - 1]; 807 bool lessThanPivot(T elem) { return elem < pivot; } 808 809 auto greaterEqual = partition!lessThanPivot(data[0..$ - 1]); 810 swap(data[$ - greaterEqual.length - 1], data[$ - 1]); 811 812 auto less = data[0..$ - greaterEqual.length - 1]; 813 greaterEqual = data[$ - greaterEqual.length..$]; 814 815 // Execute both recursion branches in parallel. 816 auto recurseTask = task!parallelSort(greaterEqual); 817 taskPool.put(recurseTask); 818 parallelSort(less); 819 recurseTask.yieldForce; 820 } 821 --- 822 */ 823 auto task(alias fun, Args...)(Args args) 824 { 825 return new Task!(fun, Args)(args); 826 } 827 828 /** 829 Creates a `Task` on the GC heap that calls a function pointer, delegate, or 830 class/struct with overloaded opCall. 831 832 Example: 833 --- 834 // Read two files in at the same time again, 835 // but this time use a function pointer instead 836 // of an alias to represent std.file.read. 837 import std.file; 838 839 void main() 840 { 841 // Create and execute a Task for reading 842 // foo.txt. 843 auto file1Task = task(&read!string, "foo.txt", size_t.max); 844 file1Task.executeInNewThread(); 845 846 // Read bar.txt in parallel. 847 auto file2Data = read("bar.txt"); 848 849 // Get the results of reading foo.txt. 850 auto file1Data = file1Task.yieldForce; 851 } 852 --- 853 854 Notes: This function takes a non-scope delegate, meaning it can be 855 used with closures. If you can't allocate a closure due to objects 856 on the stack that have scoped destruction, see `scopedTask`, which 857 takes a scope delegate. 858 */ 859 auto task(F, Args...)(F delegateOrFp, Args args) 860 if (is(typeof(delegateOrFp(args))) && !isSafeTask!F) 861 { 862 return new Task!(run, F, Args)(delegateOrFp, args); 863 } 864 865 /** 866 Version of `task` usable from `@safe` code. Usage mechanics are 867 identical to the non-@safe case, but safety introduces some restrictions: 868 869 1. `fun` must be @safe or @trusted. 870 871 2. `F` must not have any unshared aliasing as defined by 872 $(REF hasUnsharedAliasing, std,traits). This means it 873 may not be an unshared delegate or a non-shared class or struct 874 with overloaded `opCall`. This also precludes accepting template 875 alias parameters. 876 877 3. `Args` must not have unshared aliasing. 878 879 4. `fun` must not return by reference. 880 881 5. The return type must not have unshared aliasing unless `fun` is 882 `pure` or the `Task` is executed via `executeInNewThread` instead 883 of using a `TaskPool`. 884 885 */ 886 @trusted auto task(F, Args...)(F fun, Args args) 887 if (__traits(compiles, () @safe => fun(args)) && isSafeTask!F) 888 { 889 return new Task!(run, F, Args)(fun, args); 890 } 891 892 @safe unittest 893 { 894 static struct Oops { 895 int convert() { 896 *cast(int*) 0xcafebabe = 0xdeadbeef; 897 return 0; 898 } 899 alias convert this; 900 } 901 static void foo(int) @safe {} 902 903 static assert(!__traits(compiles, task(&foo, Oops.init))); 904 static assert(!__traits(compiles, scopedTask(&foo, Oops.init))); 905 } 906 907 /** 908 These functions allow the creation of `Task` objects on the stack rather 909 than the GC heap. The lifetime of a `Task` created by `scopedTask` 910 cannot exceed the lifetime of the scope it was created in. 911 912 `scopedTask` might be preferred over `task`: 913 914 1. When a `Task` that calls a delegate is being created and a closure 915 cannot be allocated due to objects on the stack that have scoped 916 destruction. The delegate overload of `scopedTask` takes a `scope` 917 delegate. 918 919 2. As a micro-optimization, to avoid the heap allocation associated with 920 `task` or with the creation of a closure. 921 922 Usage is otherwise identical to `task`. 923 924 Notes: `Task` objects created using `scopedTask` will automatically 925 call `Task.yieldForce` in their destructor if necessary to ensure 926 the `Task` is complete before the stack frame they reside on is destroyed. 927 */ 928 auto scopedTask(alias fun, Args...)(Args args) 929 { 930 auto ret = Task!(fun, Args)(args); 931 ret.isScoped = true; 932 return ret; 933 } 934 935 /// Ditto 936 auto scopedTask(F, Args...)(scope F delegateOrFp, Args args) 937 if (is(typeof(delegateOrFp(args))) && !isSafeTask!F) 938 { 939 auto ret = Task!(run, F, Args)(delegateOrFp, args); 940 ret.isScoped = true; 941 return ret; 942 } 943 944 /// Ditto 945 @trusted auto scopedTask(F, Args...)(F fun, Args args) 946 if (__traits(compiles, () @safe => fun(args)) && isSafeTask!F) 947 { 948 auto ret = Task!(run, F, Args)(fun, args); 949 ret.isScoped = true; 950 return ret; 951 } 952 953 /** 954 The total number of CPU cores available on the current machine, as reported by 955 the operating system. 956 */ 957 alias totalCPUs = 958 __lazilyInitializedConstant!(immutable(uint), uint.max, totalCPUsImpl); 959 960 uint totalCPUsImpl() @nogc nothrow @trusted 961 { 962 version (Windows) 963 { 964 // BUGS: Only works on Windows 2000 and above. 965 import core.sys.windows.winbase : SYSTEM_INFO, GetSystemInfo; 966 import std.algorithm.comparison : max; 967 SYSTEM_INFO si; 968 GetSystemInfo(&si); 969 return max(1, cast(uint) si.dwNumberOfProcessors); 970 } 971 else version (linux) 972 { 973 import core.stdc.stdlib : calloc; 974 import core.stdc.string : memset; 975 import core.sys.linux.sched : CPU_ALLOC_SIZE, CPU_FREE, CPU_COUNT, CPU_COUNT_S, cpu_set_t, sched_getaffinity; 976 import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf; 977 978 int count = 0; 979 980 /** 981 * According to ruby's source code, CPU_ALLOC() doesn't work as expected. 982 * see: https://github.com/ruby/ruby/commit/7d9e04de496915dd9e4544ee18b1a0026dc79242 983 * 984 * The hardcode number also comes from ruby's source code. 985 * see: https://github.com/ruby/ruby/commit/0fa75e813ecf0f5f3dd01f89aa76d0e25ab4fcd4 986 */ 987 for (int n = 64; n <= 16384; n *= 2) 988 { 989 size_t size = CPU_ALLOC_SIZE(count); 990 if (size >= 0x400) 991 { 992 auto cpuset = cast(cpu_set_t*) calloc(1, size); 993 if (cpuset is null) break; 994 if (sched_getaffinity(0, size, cpuset) == 0) 995 { 996 count = CPU_COUNT_S(size, cpuset); 997 } 998 CPU_FREE(cpuset); 999 } 1000 else 1001 { 1002 cpu_set_t cpuset; 1003 if (sched_getaffinity(0, cpu_set_t.sizeof, &cpuset) == 0) 1004 { 1005 count = CPU_COUNT(&cpuset); 1006 } 1007 } 1008 1009 if (count > 0) 1010 return cast(uint) count; 1011 } 1012 1013 return cast(uint) sysconf(_SC_NPROCESSORS_ONLN); 1014 } 1015 else version (Darwin) 1016 { 1017 import core.sys.darwin.sys.sysctl : sysctlbyname; 1018 uint result; 1019 size_t len = result.sizeof; 1020 sysctlbyname("hw.physicalcpu", &result, &len, null, 0); 1021 return result; 1022 } 1023 else version (DragonFlyBSD) 1024 { 1025 import core.sys.dragonflybsd.sys.sysctl : sysctlbyname; 1026 uint result; 1027 size_t len = result.sizeof; 1028 sysctlbyname("hw.ncpu", &result, &len, null, 0); 1029 return result; 1030 } 1031 else version (FreeBSD) 1032 { 1033 import core.sys.freebsd.sys.sysctl : sysctlbyname; 1034 uint result; 1035 size_t len = result.sizeof; 1036 sysctlbyname("hw.ncpu", &result, &len, null, 0); 1037 return result; 1038 } 1039 else version (NetBSD) 1040 { 1041 import core.sys.netbsd.sys.sysctl : sysctlbyname; 1042 uint result; 1043 size_t len = result.sizeof; 1044 sysctlbyname("hw.ncpu", &result, &len, null, 0); 1045 return result; 1046 } 1047 else version (Solaris) 1048 { 1049 import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf; 1050 return cast(uint) sysconf(_SC_NPROCESSORS_ONLN); 1051 } 1052 else version (OpenBSD) 1053 { 1054 import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf; 1055 return cast(uint) sysconf(_SC_NPROCESSORS_ONLN); 1056 } 1057 else version (Hurd) 1058 { 1059 import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf; 1060 return cast(uint) sysconf(_SC_NPROCESSORS_ONLN); 1061 } 1062 else 1063 { 1064 static assert(0, "Don't know how to get N CPUs on this OS."); 1065 } 1066 } 1067 1068 /* 1069 This class serves two purposes: 1070 1071 1. It distinguishes std.parallelism threads from other threads so that 1072 the std.parallelism daemon threads can be terminated. 1073 1074 2. It adds a reference to the pool that the thread is a member of, 1075 which is also necessary to allow the daemon threads to be properly 1076 terminated. 1077 */ 1078 private final class ParallelismThread : Thread 1079 { 1080 this(void delegate() dg) 1081 { 1082 super(dg); 1083 } 1084 1085 TaskPool pool; 1086 } 1087 1088 // Kill daemon threads. 1089 shared static ~this() 1090 { 1091 foreach (ref thread; Thread) 1092 { 1093 auto pthread = cast(ParallelismThread) thread; 1094 if (pthread is null) continue; 1095 auto pool = pthread.pool; 1096 if (!pool.isDaemon) continue; 1097 pool.stop(); 1098 pthread.join(); 1099 } 1100 } 1101 1102 /** 1103 This class encapsulates a task queue and a set of worker threads. Its purpose 1104 is to efficiently map a large number of `Task`s onto a smaller number of 1105 threads. A task queue is a FIFO queue of `Task` objects that have been 1106 submitted to the `TaskPool` and are awaiting execution. A worker thread is a 1107 thread that executes the `Task` at the front of the queue when one is 1108 available and sleeps when the queue is empty. 1109 1110 This class should usually be used via the global instantiation 1111 available via the $(REF taskPool, std,parallelism) property. 1112 Occasionally it is useful to explicitly instantiate a `TaskPool`: 1113 1114 1. When you want `TaskPool` instances with multiple priorities, for example 1115 a low priority pool and a high priority pool. 1116 1117 2. When the threads in the global task pool are waiting on a synchronization 1118 primitive (for example a mutex), and you want to parallelize the code that 1119 needs to run before these threads can be resumed. 1120 1121 Note: The worker threads in this pool will not stop until 1122 `stop` or `finish` is called, even if the main thread 1123 has finished already. This may lead to programs that 1124 never end. If you do not want this behaviour, you can set `isDaemon` 1125 to true. 1126 */ 1127 final class TaskPool 1128 { 1129 private: 1130 1131 // A pool can either be a regular pool or a single-task pool. A 1132 // single-task pool is a dummy pool that's fired up for 1133 // Task.executeInNewThread(). 1134 bool isSingleTask; 1135 1136 ParallelismThread[] pool; 1137 Thread singleTaskThread; 1138 1139 AbstractTask* head; 1140 AbstractTask* tail; 1141 PoolState status = PoolState.running; 1142 Condition workerCondition; 1143 Condition waiterCondition; 1144 Mutex queueMutex; 1145 Mutex waiterMutex; // For waiterCondition 1146 1147 // The instanceStartIndex of the next instance that will be created. 1148 __gshared size_t nextInstanceIndex = 1; 1149 1150 // The index of the current thread. 1151 static size_t threadIndex; 1152 1153 // The index of the first thread in this instance. 1154 immutable size_t instanceStartIndex; 1155 1156 // The index that the next thread to be initialized in this pool will have. 1157 size_t nextThreadIndex; 1158 1159 enum PoolState : ubyte 1160 { 1161 running, 1162 finishing, 1163 stopNow 1164 } 1165 1166 void doJob(AbstractTask* job) 1167 { 1168 assert(job.taskStatus == TaskStatus.inProgress); 1169 assert(job.next is null); 1170 assert(job.prev is null); 1171 1172 scope(exit) 1173 { 1174 if (!isSingleTask) 1175 { 1176 waiterLock(); 1177 scope(exit) waiterUnlock(); 1178 notifyWaiters(); 1179 } 1180 } 1181 1182 try 1183 { 1184 job.job(); 1185 } 1186 catch (Throwable e) 1187 { 1188 job.exception = e; 1189 } 1190 1191 atomicSetUbyte(job.taskStatus, TaskStatus.done); 1192 } 1193 1194 // This function is used for dummy pools created by Task.executeInNewThread(). 1195 void doSingleTask() 1196 { 1197 // No synchronization. Pool is guaranteed to only have one thread, 1198 // and the queue is submitted to before this thread is created. 1199 assert(head); 1200 auto t = head; 1201 t.next = t.prev = head = null; 1202 doJob(t); 1203 } 1204 1205 // This function performs initialization for each thread that affects 1206 // thread local storage and therefore must be done from within the 1207 // worker thread. It then calls executeWorkLoop(). 1208 void startWorkLoop() 1209 { 1210 // Initialize thread index. 1211 { 1212 queueLock(); 1213 scope(exit) queueUnlock(); 1214 threadIndex = nextThreadIndex; 1215 nextThreadIndex++; 1216 } 1217 1218 executeWorkLoop(); 1219 } 1220 1221 // This is the main work loop that worker threads spend their time in 1222 // until they terminate. It's also entered by non-worker threads when 1223 // finish() is called with the blocking variable set to true. 1224 void executeWorkLoop() 1225 { 1226 while (atomicReadUbyte(status) != PoolState.stopNow) 1227 { 1228 AbstractTask* task = pop(); 1229 if (task is null) 1230 { 1231 if (atomicReadUbyte(status) == PoolState.finishing) 1232 { 1233 atomicSetUbyte(status, PoolState.stopNow); 1234 return; 1235 } 1236 } 1237 else 1238 { 1239 doJob(task); 1240 } 1241 } 1242 } 1243 1244 // Pop a task off the queue. 1245 AbstractTask* pop() 1246 { 1247 queueLock(); 1248 scope(exit) queueUnlock(); 1249 auto ret = popNoSync(); 1250 while (ret is null && status == PoolState.running) 1251 { 1252 wait(); 1253 ret = popNoSync(); 1254 } 1255 return ret; 1256 } 1257 1258 AbstractTask* popNoSync() 1259 out(returned) 1260 { 1261 /* If task.prev and task.next aren't null, then another thread 1262 * can try to delete this task from the pool after it's 1263 * alreadly been deleted/popped. 1264 */ 1265 if (returned !is null) 1266 { 1267 assert(returned.next is null); 1268 assert(returned.prev is null); 1269 } 1270 } 1271 do 1272 { 1273 if (isSingleTask) return null; 1274 1275 AbstractTask* returned = head; 1276 if (head !is null) 1277 { 1278 head = head.next; 1279 returned.prev = null; 1280 returned.next = null; 1281 returned.taskStatus = TaskStatus.inProgress; 1282 } 1283 if (head !is null) 1284 { 1285 head.prev = null; 1286 } 1287 1288 return returned; 1289 } 1290 1291 // Push a task onto the queue. 1292 void abstractPut(AbstractTask* task) 1293 { 1294 queueLock(); 1295 scope(exit) queueUnlock(); 1296 abstractPutNoSync(task); 1297 } 1298 1299 void abstractPutNoSync(AbstractTask* task) 1300 in 1301 { 1302 assert(task); 1303 } 1304 out 1305 { 1306 import std.conv : text; 1307 1308 assert(tail.prev !is tail); 1309 assert(tail.next is null, text(tail.prev, '\t', tail.next)); 1310 if (tail.prev !is null) 1311 { 1312 assert(tail.prev.next is tail, text(tail.prev, '\t', tail.next)); 1313 } 1314 } 1315 do 1316 { 1317 // Not using enforce() to save on function call overhead since this 1318 // is a performance critical function. 1319 if (status != PoolState.running) 1320 { 1321 throw new Error( 1322 "Cannot submit a new task to a pool after calling " ~ 1323 "finish() or stop()." 1324 ); 1325 } 1326 1327 task.next = null; 1328 if (head is null) //Queue is empty. 1329 { 1330 head = task; 1331 tail = task; 1332 tail.prev = null; 1333 } 1334 else 1335 { 1336 assert(tail); 1337 task.prev = tail; 1338 tail.next = task; 1339 tail = task; 1340 } 1341 notify(); 1342 } 1343 1344 void abstractPutGroupNoSync(AbstractTask* h, AbstractTask* t) 1345 { 1346 if (status != PoolState.running) 1347 { 1348 throw new Error( 1349 "Cannot submit a new task to a pool after calling " ~ 1350 "finish() or stop()." 1351 ); 1352 } 1353 1354 if (head is null) 1355 { 1356 head = h; 1357 tail = t; 1358 } 1359 else 1360 { 1361 h.prev = tail; 1362 tail.next = h; 1363 tail = t; 1364 } 1365 1366 notifyAll(); 1367 } 1368 1369 void tryDeleteExecute(AbstractTask* toExecute) 1370 { 1371 if (isSingleTask) return; 1372 1373 if ( !deleteItem(toExecute) ) 1374 { 1375 return; 1376 } 1377 1378 try 1379 { 1380 toExecute.job(); 1381 } 1382 catch (Exception e) 1383 { 1384 toExecute.exception = e; 1385 } 1386 1387 atomicSetUbyte(toExecute.taskStatus, TaskStatus.done); 1388 } 1389 1390 bool deleteItem(AbstractTask* item) 1391 { 1392 queueLock(); 1393 scope(exit) queueUnlock(); 1394 return deleteItemNoSync(item); 1395 } 1396 1397 bool deleteItemNoSync(AbstractTask* item) 1398 { 1399 if (item.taskStatus != TaskStatus.notStarted) 1400 { 1401 return false; 1402 } 1403 item.taskStatus = TaskStatus.inProgress; 1404 1405 if (item is head) 1406 { 1407 // Make sure head gets set properly. 1408 popNoSync(); 1409 return true; 1410 } 1411 if (item is tail) 1412 { 1413 tail = tail.prev; 1414 if (tail !is null) 1415 { 1416 tail.next = null; 1417 } 1418 item.next = null; 1419 item.prev = null; 1420 return true; 1421 } 1422 if (item.next !is null) 1423 { 1424 assert(item.next.prev is item); // Check queue consistency. 1425 item.next.prev = item.prev; 1426 } 1427 if (item.prev !is null) 1428 { 1429 assert(item.prev.next is item); // Check queue consistency. 1430 item.prev.next = item.next; 1431 } 1432 item.next = null; 1433 item.prev = null; 1434 return true; 1435 } 1436 1437 void queueLock() 1438 { 1439 assert(queueMutex); 1440 if (!isSingleTask) queueMutex.lock(); 1441 } 1442 1443 void queueUnlock() 1444 { 1445 assert(queueMutex); 1446 if (!isSingleTask) queueMutex.unlock(); 1447 } 1448 1449 void waiterLock() 1450 { 1451 if (!isSingleTask) waiterMutex.lock(); 1452 } 1453 1454 void waiterUnlock() 1455 { 1456 if (!isSingleTask) waiterMutex.unlock(); 1457 } 1458 1459 void wait() 1460 { 1461 if (!isSingleTask) workerCondition.wait(); 1462 } 1463 1464 void notify() 1465 { 1466 if (!isSingleTask) workerCondition.notify(); 1467 } 1468 1469 void notifyAll() 1470 { 1471 if (!isSingleTask) workerCondition.notifyAll(); 1472 } 1473 1474 void waitUntilCompletion() 1475 { 1476 if (isSingleTask) 1477 { 1478 singleTaskThread.join(); 1479 } 1480 else 1481 { 1482 waiterCondition.wait(); 1483 } 1484 } 1485 1486 void notifyWaiters() 1487 { 1488 if (!isSingleTask) waiterCondition.notifyAll(); 1489 } 1490 1491 // Private constructor for creating dummy pools that only have one thread, 1492 // only execute one Task, and then terminate. This is used for 1493 // Task.executeInNewThread(). 1494 this(AbstractTask* task, int priority = int.max) 1495 { 1496 assert(task); 1497 1498 // Dummy value, not used. 1499 instanceStartIndex = 0; 1500 1501 this.isSingleTask = true; 1502 task.taskStatus = TaskStatus.inProgress; 1503 this.head = task; 1504 singleTaskThread = new Thread(&doSingleTask); 1505 singleTaskThread.start(); 1506 1507 // Disabled until writing code to support 1508 // running thread with specified priority 1509 // See https://issues.dlang.org/show_bug.cgi?id=8960 1510 1511 /*if (priority != int.max) 1512 { 1513 singleTaskThread.priority = priority; 1514 }*/ 1515 } 1516 1517 public: 1518 // This is used in parallel_algorithm but is too unstable to document 1519 // as public API. 1520 size_t defaultWorkUnitSize(size_t rangeLen) const @safe pure nothrow 1521 { 1522 import std.algorithm.comparison : max; 1523 1524 if (this.size == 0) 1525 { 1526 return max(rangeLen, 1); 1527 } 1528 1529 immutable size_t eightSize = 4 * (this.size + 1); 1530 auto ret = (rangeLen / eightSize) + ((rangeLen % eightSize == 0) ? 0 : 1); 1531 return max(ret, 1); 1532 } 1533 1534 /** 1535 Default constructor that initializes a `TaskPool` with 1536 `totalCPUs` - 1 worker threads. The minus 1 is included because the 1537 main thread will also be available to do work. 1538 1539 Note: On single-core machines, the primitives provided by `TaskPool` 1540 operate transparently in single-threaded mode. 1541 */ 1542 this() @trusted 1543 { 1544 this(totalCPUs - 1); 1545 } 1546 1547 /** 1548 Allows for custom number of worker threads. 1549 */ 1550 this(size_t nWorkers) @trusted 1551 { 1552 synchronized(typeid(TaskPool)) 1553 { 1554 instanceStartIndex = nextInstanceIndex; 1555 1556 // The first worker thread to be initialized will have this index, 1557 // and will increment it. The second worker to be initialized will 1558 // have this index plus 1. 1559 nextThreadIndex = instanceStartIndex; 1560 nextInstanceIndex += nWorkers; 1561 } 1562 1563 queueMutex = new Mutex(this); 1564 waiterMutex = new Mutex(); 1565 workerCondition = new Condition(queueMutex); 1566 waiterCondition = new Condition(waiterMutex); 1567 1568 pool = new ParallelismThread[nWorkers]; 1569 foreach (ref poolThread; pool) 1570 { 1571 poolThread = new ParallelismThread(&startWorkLoop); 1572 poolThread.pool = this; 1573 poolThread.start(); 1574 } 1575 } 1576 1577 /** 1578 Implements a parallel foreach loop over a range. This works by implicitly 1579 creating and submitting one `Task` to the `TaskPool` for each worker 1580 thread. A work unit is a set of consecutive elements of `range` to 1581 be processed by a worker thread between communication with any other 1582 thread. The number of elements processed per work unit is controlled by the 1583 `workUnitSize` parameter. Smaller work units provide better load 1584 balancing, but larger work units avoid the overhead of communicating 1585 with other threads frequently to fetch the next work unit. Large work 1586 units also avoid false sharing in cases where the range is being modified. 1587 The less time a single iteration of the loop takes, the larger 1588 `workUnitSize` should be. For very expensive loop bodies, 1589 `workUnitSize` should be 1. An overload that chooses a default work 1590 unit size is also available. 1591 1592 Example: 1593 --- 1594 // Find the logarithm of every number from 1 to 1595 // 10_000_000 in parallel. 1596 auto logs = new double[10_000_000]; 1597 1598 // Parallel foreach works with or without an index 1599 // variable. It can iterate by ref if range.front 1600 // returns by ref. 1601 1602 // Iterate over logs using work units of size 100. 1603 foreach (i, ref elem; taskPool.parallel(logs, 100)) 1604 { 1605 elem = log(i + 1.0); 1606 } 1607 1608 // Same thing, but use the default work unit size. 1609 // 1610 // Timings on an Athlon 64 X2 dual core machine: 1611 // 1612 // Parallel foreach: 388 milliseconds 1613 // Regular foreach: 619 milliseconds 1614 foreach (i, ref elem; taskPool.parallel(logs)) 1615 { 1616 elem = log(i + 1.0); 1617 } 1618 --- 1619 1620 Notes: 1621 1622 The memory usage of this implementation is guaranteed to be constant 1623 in `range.length`. 1624 1625 Breaking from a parallel foreach loop via a break, labeled break, 1626 labeled continue, return or goto statement throws a 1627 `ParallelForeachError`. 1628 1629 In the case of non-random access ranges, parallel foreach buffers lazily 1630 to an array of size `workUnitSize` before executing the parallel portion 1631 of the loop. The exception is that, if a parallel foreach is executed 1632 over a range returned by `asyncBuf` or `map`, the copying is elided 1633 and the buffers are simply swapped. In this case `workUnitSize` is 1634 ignored and the work unit size is set to the buffer size of `range`. 1635 1636 A memory barrier is guaranteed to be executed on exit from the loop, 1637 so that results produced by all threads are visible in the calling thread. 1638 1639 $(B Exception Handling): 1640 1641 When at least one exception is thrown from inside a parallel foreach loop, 1642 the submission of additional `Task` objects is terminated as soon as 1643 possible, in a non-deterministic manner. All executing or 1644 enqueued work units are allowed to complete. Then, all exceptions that 1645 were thrown by any work unit are chained using `Throwable.next` and 1646 rethrown. The order of the exception chaining is non-deterministic. 1647 */ 1648 ParallelForeach!R parallel(R)(R range, size_t workUnitSize) 1649 { 1650 import std.exception : enforce; 1651 enforce(workUnitSize > 0, "workUnitSize must be > 0."); 1652 alias RetType = ParallelForeach!R; 1653 return RetType(this, range, workUnitSize); 1654 } 1655 1656 1657 /// Ditto 1658 ParallelForeach!R parallel(R)(R range) 1659 { 1660 static if (hasLength!R) 1661 { 1662 // Default work unit size is such that we would use 4x as many 1663 // slots as are in this thread pool. 1664 size_t workUnitSize = defaultWorkUnitSize(range.length); 1665 return parallel(range, workUnitSize); 1666 } 1667 else 1668 { 1669 // Just use a really, really dumb guess if the user is too lazy to 1670 // specify. 1671 return parallel(range, 512); 1672 } 1673 } 1674 1675 /// 1676 template amap(functions...) 1677 { 1678 /** 1679 Eager parallel map. The eagerness of this function means it has less 1680 overhead than the lazily evaluated `TaskPool.map` and should be 1681 preferred where the memory requirements of eagerness are acceptable. 1682 `functions` are the functions to be evaluated, passed as template 1683 alias parameters in a style similar to 1684 $(REF map, std,algorithm,iteration). 1685 The first argument must be a random access range. For performance 1686 reasons, amap will assume the range elements have not yet been 1687 initialized. Elements will be overwritten without calling a destructor 1688 nor doing an assignment. As such, the range must not contain meaningful 1689 data$(DDOC_COMMENT not a section): either un-initialized objects, or 1690 objects in their `.init` state. 1691 1692 --- 1693 auto numbers = iota(100_000_000.0); 1694 1695 // Find the square roots of numbers. 1696 // 1697 // Timings on an Athlon 64 X2 dual core machine: 1698 // 1699 // Parallel eager map: 0.802 s 1700 // Equivalent serial implementation: 1.768 s 1701 auto squareRoots = taskPool.amap!sqrt(numbers); 1702 --- 1703 1704 Immediately after the range argument, an optional work unit size argument 1705 may be provided. Work units as used by `amap` are identical to those 1706 defined for parallel foreach. If no work unit size is provided, the 1707 default work unit size is used. 1708 1709 --- 1710 // Same thing, but make work unit size 100. 1711 auto squareRoots = taskPool.amap!sqrt(numbers, 100); 1712 --- 1713 1714 An output range for returning the results may be provided as the last 1715 argument. If one is not provided, an array of the proper type will be 1716 allocated on the garbage collected heap. If one is provided, it must be a 1717 random access range with assignable elements, must have reference 1718 semantics with respect to assignment to its elements, and must have the 1719 same length as the input range. Writing to adjacent elements from 1720 different threads must be safe. 1721 1722 --- 1723 // Same thing, but explicitly allocate an array 1724 // to return the results in. The element type 1725 // of the array may be either the exact type 1726 // returned by functions or an implicit conversion 1727 // target. 1728 auto squareRoots = new float[numbers.length]; 1729 taskPool.amap!sqrt(numbers, squareRoots); 1730 1731 // Multiple functions, explicit output range, and 1732 // explicit work unit size. 1733 auto results = new Tuple!(float, real)[numbers.length]; 1734 taskPool.amap!(sqrt, log)(numbers, 100, results); 1735 --- 1736 1737 Note: 1738 1739 A memory barrier is guaranteed to be executed after all results are written 1740 but before returning so that results produced by all threads are visible 1741 in the calling thread. 1742 1743 Tips: 1744 1745 To perform the mapping operation in place, provide the same range for the 1746 input and output range. 1747 1748 To parallelize the copying of a range with expensive to evaluate elements 1749 to an array, pass an identity function (a function that just returns 1750 whatever argument is provided to it) to `amap`. 1751 1752 $(B Exception Handling): 1753 1754 When at least one exception is thrown from inside the map functions, 1755 the submission of additional `Task` objects is terminated as soon as 1756 possible, in a non-deterministic manner. All currently executing or 1757 enqueued work units are allowed to complete. Then, all exceptions that 1758 were thrown from any work unit are chained using `Throwable.next` and 1759 rethrown. The order of the exception chaining is non-deterministic. 1760 */ 1761 auto amap(Args...)(Args args) 1762 if (isRandomAccessRange!(Args[0])) 1763 { 1764 import core.internal.lifetime : emplaceRef; 1765 1766 alias fun = adjoin!(staticMap!(unaryFun, functions)); 1767 1768 alias range = args[0]; 1769 immutable len = range.length; 1770 1771 static if ( 1772 Args.length > 1 && 1773 randAssignable!(Args[$ - 1]) && 1774 is(MapType!(Args[0], functions) : ElementType!(Args[$ - 1])) 1775 ) 1776 { 1777 import std.conv : text; 1778 import std.exception : enforce; 1779 1780 alias buf = args[$ - 1]; 1781 alias args2 = args[0..$ - 1]; 1782 alias Args2 = Args[0..$ - 1]; 1783 enforce(buf.length == len, 1784 text("Can't use a user supplied buffer that's the wrong ", 1785 "size. (Expected :", len, " Got: ", buf.length)); 1786 } 1787 else static if (randAssignable!(Args[$ - 1]) && Args.length > 1) 1788 { 1789 static assert(0, "Wrong buffer type."); 1790 } 1791 else 1792 { 1793 import std.array : uninitializedArray; 1794 1795 auto buf = uninitializedArray!(MapType!(Args[0], functions)[])(len); 1796 alias args2 = args; 1797 alias Args2 = Args; 1798 } 1799 1800 if (!len) return buf; 1801 1802 static if (isIntegral!(Args2[$ - 1])) 1803 { 1804 static assert(args2.length == 2); 1805 auto workUnitSize = cast(size_t) args2[1]; 1806 } 1807 else 1808 { 1809 static assert(args2.length == 1, Args); 1810 auto workUnitSize = defaultWorkUnitSize(range.length); 1811 } 1812 1813 alias R = typeof(range); 1814 1815 if (workUnitSize > len) 1816 { 1817 workUnitSize = len; 1818 } 1819 1820 // Handle as a special case: 1821 if (size == 0) 1822 { 1823 size_t index = 0; 1824 foreach (elem; range) 1825 { 1826 emplaceRef(buf[index++], fun(elem)); 1827 } 1828 return buf; 1829 } 1830 1831 // Effectively -1: chunkIndex + 1 == 0: 1832 shared size_t workUnitIndex = size_t.max; 1833 shared bool shouldContinue = true; 1834 1835 void doIt() 1836 { 1837 import std.algorithm.comparison : min; 1838 1839 scope(failure) 1840 { 1841 // If an exception is thrown, all threads should bail. 1842 atomicStore(shouldContinue, false); 1843 } 1844 1845 while (atomicLoad(shouldContinue)) 1846 { 1847 immutable myUnitIndex = atomicOp!"+="(workUnitIndex, 1); 1848 immutable start = workUnitSize * myUnitIndex; 1849 if (start >= len) 1850 { 1851 atomicStore(shouldContinue, false); 1852 break; 1853 } 1854 1855 immutable end = min(len, start + workUnitSize); 1856 1857 static if (hasSlicing!R) 1858 { 1859 auto subrange = range[start .. end]; 1860 foreach (i; start .. end) 1861 { 1862 emplaceRef(buf[i], fun(subrange.front)); 1863 subrange.popFront(); 1864 } 1865 } 1866 else 1867 { 1868 foreach (i; start .. end) 1869 { 1870 emplaceRef(buf[i], fun(range[i])); 1871 } 1872 } 1873 } 1874 } 1875 1876 submitAndExecute(this, &doIt); 1877 return buf; 1878 } 1879 } 1880 1881 /// 1882 template map(functions...) 1883 { 1884 /** 1885 A semi-lazy parallel map that can be used for pipelining. The map 1886 functions are evaluated for the first `bufSize` elements and stored in a 1887 buffer and made available to `popFront`. Meanwhile, in the 1888 background a second buffer of the same size is filled. When the first 1889 buffer is exhausted, it is swapped with the second buffer and filled while 1890 the values from what was originally the second buffer are read. This 1891 implementation allows for elements to be written to the buffer without 1892 the need for atomic operations or synchronization for each write, and 1893 enables the mapping function to be evaluated efficiently in parallel. 1894 1895 `map` has more overhead than the simpler procedure used by `amap` 1896 but avoids the need to keep all results in memory simultaneously and works 1897 with non-random access ranges. 1898 1899 Params: 1900 1901 source = The $(REF_ALTTEXT input range, isInputRange, std,range,primitives) 1902 to be mapped. If `source` is not random 1903 access it will be lazily buffered to an array of size `bufSize` before 1904 the map function is evaluated. (For an exception to this rule, see Notes.) 1905 1906 bufSize = The size of the buffer to store the evaluated elements. 1907 1908 workUnitSize = The number of elements to evaluate in a single 1909 `Task`. Must be less than or equal to `bufSize`, and 1910 should be a fraction of `bufSize` such that all worker threads can be 1911 used. If the default of size_t.max is used, workUnitSize will be set to 1912 the pool-wide default. 1913 1914 Returns: An input range representing the results of the map. This range 1915 has a length iff `source` has a length. 1916 1917 Notes: 1918 1919 If a range returned by `map` or `asyncBuf` is used as an input to 1920 `map`, then as an optimization the copying from the output buffer 1921 of the first range to the input buffer of the second range is elided, even 1922 though the ranges returned by `map` and `asyncBuf` are non-random 1923 access ranges. This means that the `bufSize` parameter passed to the 1924 current call to `map` will be ignored and the size of the buffer 1925 will be the buffer size of `source`. 1926 1927 Example: 1928 --- 1929 // Pipeline reading a file, converting each line 1930 // to a number, taking the logarithms of the numbers, 1931 // and performing the additions necessary to find 1932 // the sum of the logarithms. 1933 1934 auto lineRange = File("numberList.txt").byLine(); 1935 auto dupedLines = std.algorithm.map!"a.idup"(lineRange); 1936 auto nums = taskPool.map!(to!double)(dupedLines); 1937 auto logs = taskPool.map!log10(nums); 1938 1939 double sum = 0; 1940 foreach (elem; logs) 1941 { 1942 sum += elem; 1943 } 1944 --- 1945 1946 $(B Exception Handling): 1947 1948 Any exceptions thrown while iterating over `source` 1949 or computing the map function are re-thrown on a call to `popFront` or, 1950 if thrown during construction, are simply allowed to propagate to the 1951 caller. In the case of exceptions thrown while computing the map function, 1952 the exceptions are chained as in `TaskPool.amap`. 1953 */ 1954 auto 1955 map(S)(S source, size_t bufSize = 100, size_t workUnitSize = size_t.max) 1956 if (isInputRange!S) 1957 { 1958 import std.exception : enforce; 1959 1960 enforce(workUnitSize == size_t.max || workUnitSize <= bufSize, 1961 "Work unit size must be smaller than buffer size."); 1962 alias fun = adjoin!(staticMap!(unaryFun, functions)); 1963 1964 static final class Map 1965 { 1966 // This is a class because the task needs to be located on the 1967 // heap and in the non-random access case source needs to be on 1968 // the heap, too. 1969 1970 private: 1971 enum bufferTrick = is(typeof(source.buf1)) && 1972 is(typeof(source.bufPos)) && 1973 is(typeof(source.doBufSwap())); 1974 1975 alias E = MapType!(S, functions); 1976 E[] buf1, buf2; 1977 S source; 1978 TaskPool pool; 1979 Task!(run, E[] delegate(E[]), E[]) nextBufTask; 1980 size_t workUnitSize; 1981 size_t bufPos; 1982 bool lastTaskWaited; 1983 1984 static if (isRandomAccessRange!S) 1985 { 1986 alias FromType = S; 1987 1988 void popSource() 1989 { 1990 import std.algorithm.comparison : min; 1991 1992 static if (__traits(compiles, source[0 .. source.length])) 1993 { 1994 source = source[min(buf1.length, source.length)..source.length]; 1995 } 1996 else static if (__traits(compiles, source[0..$])) 1997 { 1998 source = source[min(buf1.length, source.length)..$]; 1999 } 2000 else 2001 { 2002 static assert(0, "S must have slicing for Map." 2003 ~ " " ~ S.stringof ~ " doesn't."); 2004 } 2005 } 2006 } 2007 else static if (bufferTrick) 2008 { 2009 // Make sure we don't have the buffer recycling overload of 2010 // asyncBuf. 2011 static if ( 2012 is(typeof(source.source)) && 2013 isRoundRobin!(typeof(source.source)) 2014 ) 2015 { 2016 static assert(0, "Cannot execute a parallel map on " ~ 2017 "the buffer recycling overload of asyncBuf." 2018 ); 2019 } 2020 2021 alias FromType = typeof(source.buf1); 2022 FromType from; 2023 2024 // Just swap our input buffer with source's output buffer. 2025 // No need to copy element by element. 2026 FromType dumpToFrom() 2027 { 2028 import std.algorithm.mutation : swap; 2029 2030 assert(source.buf1.length <= from.length); 2031 from.length = source.buf1.length; 2032 swap(source.buf1, from); 2033 2034 // Just in case this source has been popped before 2035 // being sent to map: 2036 from = from[source.bufPos..$]; 2037 2038 static if (is(typeof(source._length))) 2039 { 2040 source._length -= (from.length - source.bufPos); 2041 } 2042 2043 source.doBufSwap(); 2044 2045 return from; 2046 } 2047 } 2048 else 2049 { 2050 alias FromType = ElementType!S[]; 2051 2052 // The temporary array that data is copied to before being 2053 // mapped. 2054 FromType from; 2055 2056 FromType dumpToFrom() 2057 { 2058 assert(from !is null); 2059 2060 size_t i; 2061 for (; !source.empty && i < from.length; source.popFront()) 2062 { 2063 from[i++] = source.front; 2064 } 2065 2066 from = from[0 .. i]; 2067 return from; 2068 } 2069 } 2070 2071 static if (hasLength!S) 2072 { 2073 size_t _length; 2074 2075 public @property size_t length() const @safe pure nothrow 2076 { 2077 return _length; 2078 } 2079 } 2080 2081 this(S source, size_t bufSize, size_t workUnitSize, TaskPool pool) 2082 { 2083 static if (bufferTrick) 2084 { 2085 bufSize = source.buf1.length; 2086 } 2087 2088 buf1.length = bufSize; 2089 buf2.length = bufSize; 2090 2091 static if (!isRandomAccessRange!S) 2092 { 2093 from.length = bufSize; 2094 } 2095 2096 this.workUnitSize = (workUnitSize == size_t.max) ? 2097 pool.defaultWorkUnitSize(bufSize) : workUnitSize; 2098 this.source = source; 2099 this.pool = pool; 2100 2101 static if (hasLength!S) 2102 { 2103 _length = source.length; 2104 } 2105 2106 buf1 = fillBuf(buf1); 2107 submitBuf2(); 2108 } 2109 2110 // The from parameter is a dummy and ignored in the random access 2111 // case. 2112 E[] fillBuf(E[] buf) 2113 { 2114 import std.algorithm.comparison : min; 2115 2116 static if (isRandomAccessRange!S) 2117 { 2118 import std.range : take; 2119 auto toMap = take(source, buf.length); 2120 scope(success) popSource(); 2121 } 2122 else 2123 { 2124 auto toMap = dumpToFrom(); 2125 } 2126 2127 buf = buf[0 .. min(buf.length, toMap.length)]; 2128 2129 // Handle as a special case: 2130 if (pool.size == 0) 2131 { 2132 size_t index = 0; 2133 foreach (elem; toMap) 2134 { 2135 buf[index++] = fun(elem); 2136 } 2137 return buf; 2138 } 2139 2140 pool.amap!functions(toMap, workUnitSize, buf); 2141 2142 return buf; 2143 } 2144 2145 void submitBuf2() 2146 in 2147 { 2148 assert(nextBufTask.prev is null); 2149 assert(nextBufTask.next is null); 2150 } 2151 do 2152 { 2153 // Hack to reuse the task object. 2154 2155 nextBufTask = typeof(nextBufTask).init; 2156 nextBufTask._args[0] = &fillBuf; 2157 nextBufTask._args[1] = buf2; 2158 pool.put(nextBufTask); 2159 } 2160 2161 void doBufSwap() 2162 { 2163 if (lastTaskWaited) 2164 { 2165 // Then the source is empty. Signal it here. 2166 buf1 = null; 2167 buf2 = null; 2168 2169 static if (!isRandomAccessRange!S) 2170 { 2171 from = null; 2172 } 2173 2174 return; 2175 } 2176 2177 buf2 = buf1; 2178 buf1 = nextBufTask.yieldForce; 2179 bufPos = 0; 2180 2181 if (source.empty) 2182 { 2183 lastTaskWaited = true; 2184 } 2185 else 2186 { 2187 submitBuf2(); 2188 } 2189 } 2190 2191 public: 2192 @property auto front() 2193 { 2194 return buf1[bufPos]; 2195 } 2196 2197 void popFront() 2198 { 2199 static if (hasLength!S) 2200 { 2201 _length--; 2202 } 2203 2204 bufPos++; 2205 if (bufPos >= buf1.length) 2206 { 2207 doBufSwap(); 2208 } 2209 } 2210 2211 static if (isInfinite!S) 2212 { 2213 enum bool empty = false; 2214 } 2215 else 2216 { 2217 2218 bool empty() const @property 2219 { 2220 // popFront() sets this when source is empty 2221 return buf1.length == 0; 2222 } 2223 } 2224 } 2225 return new Map(source, bufSize, workUnitSize, this); 2226 } 2227 } 2228 2229 /** 2230 Given a `source` range that is expensive to iterate over, returns an 2231 $(REF_ALTTEXT input range, isInputRange, std,range,primitives) that 2232 asynchronously buffers the contents of `source` into a buffer of `bufSize` elements in a worker thread, 2233 while making previously buffered elements from a second buffer, also of size 2234 `bufSize`, available via the range interface of the returned 2235 object. The returned range has a length iff `hasLength!S`. 2236 `asyncBuf` is useful, for example, when performing expensive operations 2237 on the elements of ranges that represent data on a disk or network. 2238 2239 Example: 2240 --- 2241 import std.conv, std.stdio; 2242 2243 void main() 2244 { 2245 // Fetch lines of a file in a background thread 2246 // while processing previously fetched lines, 2247 // dealing with byLine's buffer recycling by 2248 // eagerly duplicating every line. 2249 auto lines = File("foo.txt").byLine(); 2250 auto duped = std.algorithm.map!"a.idup"(lines); 2251 2252 // Fetch more lines in the background while we 2253 // process the lines already read into memory 2254 // into a matrix of doubles. 2255 double[][] matrix; 2256 auto asyncReader = taskPool.asyncBuf(duped); 2257 2258 foreach (line; asyncReader) 2259 { 2260 auto ls = line.split("\t"); 2261 matrix ~= to!(double[])(ls); 2262 } 2263 } 2264 --- 2265 2266 $(B Exception Handling): 2267 2268 Any exceptions thrown while iterating over `source` are re-thrown on a 2269 call to `popFront` or, if thrown during construction, simply 2270 allowed to propagate to the caller. 2271 */ 2272 auto asyncBuf(S)(S source, size_t bufSize = 100) if (isInputRange!S) 2273 { 2274 static final class AsyncBuf 2275 { 2276 // This is a class because the task and source both need to be on 2277 // the heap. 2278 2279 // The element type of S. 2280 alias E = ElementType!S; // Needs to be here b/c of forward ref bugs. 2281 2282 private: 2283 E[] buf1, buf2; 2284 S source; 2285 TaskPool pool; 2286 Task!(run, E[] delegate(E[]), E[]) nextBufTask; 2287 size_t bufPos; 2288 bool lastTaskWaited; 2289 2290 static if (hasLength!S) 2291 { 2292 size_t _length; 2293 2294 // Available if hasLength!S. 2295 public @property size_t length() const @safe pure nothrow 2296 { 2297 return _length; 2298 } 2299 } 2300 2301 this(S source, size_t bufSize, TaskPool pool) 2302 { 2303 buf1.length = bufSize; 2304 buf2.length = bufSize; 2305 2306 this.source = source; 2307 this.pool = pool; 2308 2309 static if (hasLength!S) 2310 { 2311 _length = source.length; 2312 } 2313 2314 buf1 = fillBuf(buf1); 2315 submitBuf2(); 2316 } 2317 2318 E[] fillBuf(E[] buf) 2319 { 2320 assert(buf !is null); 2321 2322 size_t i; 2323 for (; !source.empty && i < buf.length; source.popFront()) 2324 { 2325 buf[i++] = source.front; 2326 } 2327 2328 buf = buf[0 .. i]; 2329 return buf; 2330 } 2331 2332 void submitBuf2() 2333 in 2334 { 2335 assert(nextBufTask.prev is null); 2336 assert(nextBufTask.next is null); 2337 } 2338 do 2339 { 2340 // Hack to reuse the task object. 2341 2342 nextBufTask = typeof(nextBufTask).init; 2343 nextBufTask._args[0] = &fillBuf; 2344 nextBufTask._args[1] = buf2; 2345 pool.put(nextBufTask); 2346 } 2347 2348 void doBufSwap() 2349 { 2350 if (lastTaskWaited) 2351 { 2352 // Then source is empty. Signal it here. 2353 buf1 = null; 2354 buf2 = null; 2355 return; 2356 } 2357 2358 buf2 = buf1; 2359 buf1 = nextBufTask.yieldForce; 2360 bufPos = 0; 2361 2362 if (source.empty) 2363 { 2364 lastTaskWaited = true; 2365 } 2366 else 2367 { 2368 submitBuf2(); 2369 } 2370 } 2371 2372 public: 2373 E front() @property 2374 { 2375 return buf1[bufPos]; 2376 } 2377 2378 void popFront() 2379 { 2380 static if (hasLength!S) 2381 { 2382 _length--; 2383 } 2384 2385 bufPos++; 2386 if (bufPos >= buf1.length) 2387 { 2388 doBufSwap(); 2389 } 2390 } 2391 2392 static if (isInfinite!S) 2393 { 2394 enum bool empty = false; 2395 } 2396 2397 else 2398 { 2399 /// 2400 bool empty() @property 2401 { 2402 // popFront() sets this when source is empty: 2403 return buf1.length == 0; 2404 } 2405 } 2406 } 2407 return new AsyncBuf(source, bufSize, this); 2408 } 2409 2410 /** 2411 Given a callable object `next` that writes to a user-provided buffer and 2412 a second callable object `empty` that determines whether more data is 2413 available to write via `next`, returns an input range that 2414 asynchronously calls `next` with a set of size `nBuffers` of buffers 2415 and makes the results available in the order they were obtained via the 2416 input range interface of the returned object. Similarly to the 2417 input range overload of `asyncBuf`, the first half of the buffers 2418 are made available via the range interface while the second half are 2419 filled and vice-versa. 2420 2421 Params: 2422 2423 next = A callable object that takes a single argument that must be an array 2424 with mutable elements. When called, `next` writes data to 2425 the array provided by the caller. 2426 2427 empty = A callable object that takes no arguments and returns a type 2428 implicitly convertible to `bool`. This is used to signify 2429 that no more data is available to be obtained by calling `next`. 2430 2431 initialBufSize = The initial size of each buffer. If `next` takes its 2432 array by reference, it may resize the buffers. 2433 2434 nBuffers = The number of buffers to cycle through when calling `next`. 2435 2436 Example: 2437 --- 2438 // Fetch lines of a file in a background 2439 // thread while processing previously fetched 2440 // lines, without duplicating any lines. 2441 auto file = File("foo.txt"); 2442 2443 void next(ref char[] buf) 2444 { 2445 file.readln(buf); 2446 } 2447 2448 // Fetch more lines in the background while we 2449 // process the lines already read into memory 2450 // into a matrix of doubles. 2451 double[][] matrix; 2452 auto asyncReader = taskPool.asyncBuf(&next, &file.eof); 2453 2454 foreach (line; asyncReader) 2455 { 2456 auto ls = line.split("\t"); 2457 matrix ~= to!(double[])(ls); 2458 } 2459 --- 2460 2461 $(B Exception Handling): 2462 2463 Any exceptions thrown while iterating over `range` are re-thrown on a 2464 call to `popFront`. 2465 2466 Warning: 2467 2468 Using the range returned by this function in a parallel foreach loop 2469 will not work because buffers may be overwritten while the task that 2470 processes them is in queue. This is checked for at compile time 2471 and will result in a static assertion failure. 2472 */ 2473 auto asyncBuf(C1, C2)(C1 next, C2 empty, size_t initialBufSize = 0, size_t nBuffers = 100) 2474 if (is(typeof(C2.init()) : bool) && 2475 Parameters!C1.length == 1 && 2476 Parameters!C2.length == 0 && 2477 isArray!(Parameters!C1[0]) 2478 ) { 2479 auto roundRobin = RoundRobinBuffer!(C1, C2)(next, empty, initialBufSize, nBuffers); 2480 return asyncBuf(roundRobin, nBuffers / 2); 2481 } 2482 2483 /// 2484 template reduce(functions...) 2485 { 2486 /** 2487 Parallel reduce on a random access range. Except as otherwise noted, 2488 usage is similar to $(REF _reduce, std,algorithm,iteration). There is 2489 also $(LREF fold) which does the same thing with a different parameter 2490 order. 2491 2492 This function works by splitting the range to be reduced into work 2493 units, which are slices to be reduced in parallel. Once the results 2494 from all work units are computed, a final serial reduction is performed 2495 on these results to compute the final answer. Therefore, care must be 2496 taken to choose the seed value appropriately. 2497 2498 Because the reduction is being performed in parallel, `functions` 2499 must be associative. For notational simplicity, let # be an 2500 infix operator representing `functions`. Then, (a # b) # c must equal 2501 a # (b # c). Floating point addition is not associative 2502 even though addition in exact arithmetic is. Summing floating 2503 point numbers using this function may give different results than summing 2504 serially. However, for many practical purposes floating point addition 2505 can be treated as associative. 2506 2507 Note that, since `functions` are assumed to be associative, 2508 additional optimizations are made to the serial portion of the reduction 2509 algorithm. These take advantage of the instruction level parallelism of 2510 modern CPUs, in addition to the thread-level parallelism that the rest 2511 of this module exploits. This can lead to better than linear speedups 2512 relative to $(REF _reduce, std,algorithm,iteration), especially for 2513 fine-grained benchmarks like dot products. 2514 2515 An explicit seed may be provided as the first argument. If 2516 provided, it is used as the seed for all work units and for the final 2517 reduction of results from all work units. Therefore, if it is not the 2518 identity value for the operation being performed, results may differ 2519 from those generated by $(REF _reduce, std,algorithm,iteration) or 2520 depending on how many work units are used. The next argument must be 2521 the range to be reduced. 2522 --- 2523 // Find the sum of squares of a range in parallel, using 2524 // an explicit seed. 2525 // 2526 // Timings on an Athlon 64 X2 dual core machine: 2527 // 2528 // Parallel reduce: 72 milliseconds 2529 // Using std.algorithm.reduce instead: 181 milliseconds 2530 auto nums = iota(10_000_000.0f); 2531 auto sumSquares = taskPool.reduce!"a + b"( 2532 0.0, std.algorithm.map!"a * a"(nums) 2533 ); 2534 --- 2535 2536 If no explicit seed is provided, the first element of each work unit 2537 is used as a seed. For the final reduction, the result from the first 2538 work unit is used as the seed. 2539 --- 2540 // Find the sum of a range in parallel, using the first 2541 // element of each work unit as the seed. 2542 auto sum = taskPool.reduce!"a + b"(nums); 2543 --- 2544 2545 An explicit work unit size may be specified as the last argument. 2546 Specifying too small a work unit size will effectively serialize the 2547 reduction, as the final reduction of the result of each work unit will 2548 dominate computation time. If `TaskPool.size` for this instance 2549 is zero, this parameter is ignored and one work unit is used. 2550 --- 2551 // Use a work unit size of 100. 2552 auto sum2 = taskPool.reduce!"a + b"(nums, 100); 2553 2554 // Work unit size of 100 and explicit seed. 2555 auto sum3 = taskPool.reduce!"a + b"(0.0, nums, 100); 2556 --- 2557 2558 Parallel reduce supports multiple functions, like 2559 `std.algorithm.reduce`. 2560 --- 2561 // Find both the min and max of nums. 2562 auto minMax = taskPool.reduce!(min, max)(nums); 2563 assert(minMax[0] == reduce!min(nums)); 2564 assert(minMax[1] == reduce!max(nums)); 2565 --- 2566 2567 $(B Exception Handling): 2568 2569 After this function is finished executing, any exceptions thrown 2570 are chained together via `Throwable.next` and rethrown. The chaining 2571 order is non-deterministic. 2572 2573 See_Also: 2574 2575 $(LREF fold) is functionally equivalent to $(LREF _reduce) except the 2576 range parameter comes first and there is no need to use 2577 $(REF_ALTTEXT `tuple`,tuple,std,typecons) for multiple seeds. 2578 */ 2579 auto reduce(Args...)(Args args) 2580 { 2581 import core.exception : OutOfMemoryError; 2582 import core.internal.lifetime : emplaceRef; 2583 import std.exception : enforce; 2584 2585 alias fun = reduceAdjoin!functions; 2586 alias finishFun = reduceFinish!functions; 2587 2588 static if (isIntegral!(Args[$ - 1])) 2589 { 2590 size_t workUnitSize = cast(size_t) args[$ - 1]; 2591 alias args2 = args[0..$ - 1]; 2592 alias Args2 = Args[0..$ - 1]; 2593 } 2594 else 2595 { 2596 alias args2 = args; 2597 alias Args2 = Args; 2598 } 2599 2600 auto makeStartValue(Type)(Type e) 2601 { 2602 static if (functions.length == 1) 2603 { 2604 return e; 2605 } 2606 else 2607 { 2608 typeof(adjoin!(staticMap!(binaryFun, functions))(e, e)) seed = void; 2609 foreach (i, T; seed.Types) 2610 { 2611 emplaceRef(seed.expand[i], e); 2612 } 2613 2614 return seed; 2615 } 2616 } 2617 2618 static if (args2.length == 2) 2619 { 2620 static assert(isInputRange!(Args2[1])); 2621 alias range = args2[1]; 2622 alias seed = args2[0]; 2623 enum explicitSeed = true; 2624 2625 static if (!is(typeof(workUnitSize))) 2626 { 2627 size_t workUnitSize = defaultWorkUnitSize(range.length); 2628 } 2629 } 2630 else 2631 { 2632 static assert(args2.length == 1); 2633 alias range = args2[0]; 2634 2635 static if (!is(typeof(workUnitSize))) 2636 { 2637 size_t workUnitSize = defaultWorkUnitSize(range.length); 2638 } 2639 2640 enforce(!range.empty, 2641 "Cannot reduce an empty range with first element as start value."); 2642 2643 auto seed = makeStartValue(range.front); 2644 enum explicitSeed = false; 2645 range.popFront(); 2646 } 2647 2648 alias E = typeof(seed); 2649 alias R = typeof(range); 2650 2651 E reduceOnRange(R range, size_t lowerBound, size_t upperBound) 2652 { 2653 // This is for exploiting instruction level parallelism by 2654 // using multiple accumulator variables within each thread, 2655 // since we're assuming functions are associative anyhow. 2656 2657 // This is so that loops can be unrolled automatically. 2658 enum ilpTuple = AliasSeq!(0, 1, 2, 3, 4, 5); 2659 enum nILP = ilpTuple.length; 2660 immutable subSize = (upperBound - lowerBound) / nILP; 2661 2662 if (subSize <= 1) 2663 { 2664 // Handle as a special case. 2665 static if (explicitSeed) 2666 { 2667 E result = seed; 2668 } 2669 else 2670 { 2671 E result = makeStartValue(range[lowerBound]); 2672 lowerBound++; 2673 } 2674 2675 foreach (i; lowerBound .. upperBound) 2676 { 2677 result = fun(result, range[i]); 2678 } 2679 2680 return result; 2681 } 2682 2683 assert(subSize > 1); 2684 E[nILP] results; 2685 size_t[nILP] offsets; 2686 2687 foreach (i; ilpTuple) 2688 { 2689 offsets[i] = lowerBound + subSize * i; 2690 2691 static if (explicitSeed) 2692 { 2693 results[i] = seed; 2694 } 2695 else 2696 { 2697 results[i] = makeStartValue(range[offsets[i]]); 2698 offsets[i]++; 2699 } 2700 } 2701 2702 immutable nLoop = subSize - (!explicitSeed); 2703 foreach (i; 0 .. nLoop) 2704 { 2705 foreach (j; ilpTuple) 2706 { 2707 results[j] = fun(results[j], range[offsets[j]]); 2708 offsets[j]++; 2709 } 2710 } 2711 2712 // Finish the remainder. 2713 foreach (i; nILP * subSize + lowerBound .. upperBound) 2714 { 2715 results[$ - 1] = fun(results[$ - 1], range[i]); 2716 } 2717 2718 foreach (i; ilpTuple[1..$]) 2719 { 2720 results[0] = finishFun(results[0], results[i]); 2721 } 2722 2723 return results[0]; 2724 } 2725 2726 immutable len = range.length; 2727 if (len == 0) 2728 { 2729 return seed; 2730 } 2731 2732 if (this.size == 0) 2733 { 2734 return finishFun(seed, reduceOnRange(range, 0, len)); 2735 } 2736 2737 // Unlike the rest of the functions here, I can't use the Task object 2738 // recycling trick here because this has to work on non-commutative 2739 // operations. After all the tasks are done executing, fun() has to 2740 // be applied on the results of these to get a final result, but 2741 // it can't be evaluated out of order. 2742 2743 if (workUnitSize > len) 2744 { 2745 workUnitSize = len; 2746 } 2747 2748 immutable size_t nWorkUnits = (len / workUnitSize) + ((len % workUnitSize == 0) ? 0 : 1); 2749 assert(nWorkUnits * workUnitSize >= len); 2750 2751 alias RTask = Task!(run, typeof(&reduceOnRange), R, size_t, size_t); 2752 RTask[] tasks; 2753 2754 // Can't use alloca() due to https://issues.dlang.org/show_bug.cgi?id=3753 2755 // Use a fixed buffer backed by malloc(). 2756 enum maxStack = 2_048; 2757 byte[maxStack] buf = void; 2758 immutable size_t nBytesNeeded = nWorkUnits * RTask.sizeof; 2759 2760 import core.stdc.stdlib : malloc, free; 2761 if (nBytesNeeded <= maxStack) 2762 { 2763 tasks = (cast(RTask*) buf.ptr)[0 .. nWorkUnits]; 2764 } 2765 else 2766 { 2767 auto ptr = cast(RTask*) malloc(nBytesNeeded); 2768 if (!ptr) 2769 { 2770 throw new OutOfMemoryError( 2771 "Out of memory in std.parallelism." 2772 ); 2773 } 2774 2775 tasks = ptr[0 .. nWorkUnits]; 2776 } 2777 2778 scope(exit) 2779 { 2780 if (nBytesNeeded > maxStack) 2781 { 2782 free(tasks.ptr); 2783 } 2784 } 2785 2786 // Hack to take the address of a nested function w/o 2787 // making a closure. 2788 static auto scopedAddress(D)(scope D del) @system 2789 { 2790 auto tmp = del; 2791 return tmp; 2792 } 2793 2794 size_t curPos = 0; 2795 void useTask(ref RTask task) 2796 { 2797 import std.algorithm.comparison : min; 2798 import core.lifetime : emplace; 2799 2800 // Private constructor, so can't feed it's arguments directly 2801 // to emplace 2802 emplace(&task, RTask 2803 ( 2804 scopedAddress(&reduceOnRange), 2805 range, 2806 curPos, // lower bound. 2807 cast() min(len, curPos + workUnitSize) // upper bound. 2808 )); 2809 2810 task.pool = this; 2811 2812 curPos += workUnitSize; 2813 } 2814 2815 foreach (ref task; tasks) 2816 { 2817 useTask(task); 2818 } 2819 2820 foreach (i; 1 .. tasks.length - 1) 2821 { 2822 tasks[i].next = tasks[i + 1].basePtr; 2823 tasks[i + 1].prev = tasks[i].basePtr; 2824 } 2825 2826 if (tasks.length > 1) 2827 { 2828 queueLock(); 2829 scope(exit) queueUnlock(); 2830 2831 abstractPutGroupNoSync( 2832 tasks[1].basePtr, 2833 tasks[$ - 1].basePtr 2834 ); 2835 } 2836 2837 if (tasks.length > 0) 2838 { 2839 try 2840 { 2841 tasks[0].job(); 2842 } 2843 catch (Throwable e) 2844 { 2845 tasks[0].exception = e; 2846 } 2847 tasks[0].taskStatus = TaskStatus.done; 2848 2849 // Try to execute each of these in the current thread 2850 foreach (ref task; tasks[1..$]) 2851 { 2852 tryDeleteExecute(task.basePtr); 2853 } 2854 } 2855 2856 // Now that we've tried to execute every task, they're all either 2857 // done or in progress. Force all of them. 2858 E result = seed; 2859 2860 Throwable firstException; 2861 2862 foreach (ref task; tasks) 2863 { 2864 try 2865 { 2866 task.yieldForce; 2867 } 2868 catch (Throwable e) 2869 { 2870 /* Chain e to front because order doesn't matter and because 2871 * e is not likely to be a chain itself (so fewer traversals) 2872 */ 2873 firstException = Throwable.chainTogether(e, firstException); 2874 continue; 2875 } 2876 2877 if (!firstException) result = finishFun(result, task.returnVal); 2878 } 2879 2880 if (firstException) throw firstException; 2881 2882 return result; 2883 } 2884 } 2885 2886 /// 2887 template fold(functions...) 2888 { 2889 /** Implements the homonym function (also known as `accumulate`, `compress`, 2890 `inject`, or `foldl`) present in various programming languages of 2891 functional flavor. 2892 2893 `fold` is functionally equivalent to $(LREF reduce) except the range 2894 parameter comes first and there is no need to use $(REF_ALTTEXT 2895 `tuple`,tuple,std,typecons) for multiple seeds. 2896 2897 There may be one or more callable entities (`functions` argument) to 2898 apply. 2899 2900 Params: 2901 args = Just the range to _fold over; or the range and one seed 2902 per function; or the range, one seed per function, and 2903 the work unit size 2904 2905 Returns: 2906 The accumulated result as a single value for single function and 2907 as a tuple of values for multiple functions 2908 2909 See_Also: 2910 Similar to $(REF _fold, std,algorithm,iteration), `fold` is a wrapper around $(LREF reduce). 2911 2912 Example: 2913 --- 2914 static int adder(int a, int b) 2915 { 2916 return a + b; 2917 } 2918 static int multiplier(int a, int b) 2919 { 2920 return a * b; 2921 } 2922 2923 // Just the range 2924 auto x = taskPool.fold!adder([1, 2, 3, 4]); 2925 assert(x == 10); 2926 2927 // The range and the seeds (0 and 1 below; also note multiple 2928 // functions in this example) 2929 auto y = taskPool.fold!(adder, multiplier)([1, 2, 3, 4], 0, 1); 2930 assert(y[0] == 10); 2931 assert(y[1] == 24); 2932 2933 // The range, the seed (0), and the work unit size (20) 2934 auto z = taskPool.fold!adder([1, 2, 3, 4], 0, 20); 2935 assert(z == 10); 2936 --- 2937 */ 2938 auto fold(Args...)(Args args) 2939 { 2940 static assert(isInputRange!(Args[0]), "First argument must be an InputRange"); 2941 2942 alias range = args[0]; 2943 2944 static if (Args.length == 1) 2945 { 2946 // Just the range 2947 return reduce!functions(range); 2948 } 2949 else static if (Args.length == 1 + functions.length || 2950 Args.length == 1 + functions.length + 1) 2951 { 2952 static if (functions.length == 1) 2953 { 2954 alias seeds = args[1]; 2955 } 2956 else 2957 { 2958 auto seeds() 2959 { 2960 import std.typecons : tuple; 2961 return tuple(args[1 .. functions.length+1]); 2962 } 2963 } 2964 2965 static if (Args.length == 1 + functions.length) 2966 { 2967 // The range and the seeds 2968 return reduce!functions(seeds, range); 2969 } 2970 else static if (Args.length == 1 + functions.length + 1) 2971 { 2972 // The range, the seeds, and the work unit size 2973 static assert(isIntegral!(Args[$-1]), "Work unit size must be an integral type"); 2974 return reduce!functions(seeds, range, args[$-1]); 2975 } 2976 } 2977 else 2978 { 2979 import std.conv : text; 2980 static assert(0, "Invalid number of arguments (" ~ Args.length.text ~ "): Should be an input range, " 2981 ~ functions.length.text ~ " optional seed(s), and an optional work unit size."); 2982 } 2983 } 2984 } 2985 2986 // This test is not included in the documentation because even though these 2987 // examples are for the inner fold() template, with their current location, 2988 // they would appear under the outer one. (We can't move this inside the 2989 // outer fold() template because then dmd runs out of memory possibly due to 2990 // recursive template instantiation, which is surprisingly not caught.) 2991 @system unittest 2992 { 2993 // Just the range 2994 auto x = taskPool.fold!"a + b"([1, 2, 3, 4]); 2995 assert(x == 10); 2996 2997 // The range and the seeds (0 and 1 below; also note multiple 2998 // functions in this example) 2999 auto y = taskPool.fold!("a + b", "a * b")([1, 2, 3, 4], 0, 1); 3000 assert(y[0] == 10); 3001 assert(y[1] == 24); 3002 3003 // The range, the seed (0), and the work unit size (20) 3004 auto z = taskPool.fold!"a + b"([1, 2, 3, 4], 0, 20); 3005 assert(z == 10); 3006 } 3007 3008 /** 3009 Gets the index of the current thread relative to this `TaskPool`. Any 3010 thread not in this pool will receive an index of 0. The worker threads in 3011 this pool receive unique indices of 1 through `this.size`. 3012 3013 This function is useful for maintaining worker-local resources. 3014 3015 Example: 3016 --- 3017 // Execute a loop that computes the greatest common 3018 // divisor of every number from 0 through 999 with 3019 // 42 in parallel. Write the results out to 3020 // a set of files, one for each thread. This allows 3021 // results to be written out without any synchronization. 3022 3023 import std.conv, std.range, std.numeric, std.stdio; 3024 3025 void main() 3026 { 3027 auto filesHandles = new File[taskPool.size + 1]; 3028 scope(exit) { 3029 foreach (ref handle; fileHandles) 3030 { 3031 handle.close(); 3032 } 3033 } 3034 3035 foreach (i, ref handle; fileHandles) 3036 { 3037 handle = File("workerResults" ~ to!string(i) ~ ".txt"); 3038 } 3039 3040 foreach (num; parallel(iota(1_000))) 3041 { 3042 auto outHandle = fileHandles[taskPool.workerIndex]; 3043 outHandle.writeln(num, '\t', gcd(num, 42)); 3044 } 3045 } 3046 --- 3047 */ 3048 size_t workerIndex() @property @safe const nothrow 3049 { 3050 immutable rawInd = threadIndex; 3051 return (rawInd >= instanceStartIndex && rawInd < instanceStartIndex + size) ? 3052 (rawInd - instanceStartIndex + 1) : 0; 3053 } 3054 3055 /** 3056 Struct for creating worker-local storage. Worker-local storage is 3057 thread-local storage that exists only for worker threads in a given 3058 `TaskPool` plus a single thread outside the pool. It is allocated on the 3059 garbage collected heap in a way that avoids _false sharing, and doesn't 3060 necessarily have global scope within any thread. It can be accessed from 3061 any worker thread in the `TaskPool` that created it, and one thread 3062 outside this `TaskPool`. All threads outside the pool that created a 3063 given instance of worker-local storage share a single slot. 3064 3065 Since the underlying data for this struct is heap-allocated, this struct 3066 has reference semantics when passed between functions. 3067 3068 The main uses cases for `WorkerLocalStorage` are: 3069 3070 1. Performing parallel reductions with an imperative, as opposed to 3071 functional, programming style. In this case, it's useful to treat 3072 `WorkerLocalStorage` as local to each thread for only the parallel 3073 portion of an algorithm. 3074 3075 2. Recycling temporary buffers across iterations of a parallel foreach loop. 3076 3077 Example: 3078 --- 3079 // Calculate pi as in our synopsis example, but 3080 // use an imperative instead of a functional style. 3081 immutable n = 1_000_000_000; 3082 immutable delta = 1.0L / n; 3083 3084 auto sums = taskPool.workerLocalStorage(0.0L); 3085 foreach (i; parallel(iota(n))) 3086 { 3087 immutable x = ( i - 0.5L ) * delta; 3088 immutable toAdd = delta / ( 1.0 + x * x ); 3089 sums.get += toAdd; 3090 } 3091 3092 // Add up the results from each worker thread. 3093 real pi = 0; 3094 foreach (threadResult; sums.toRange) 3095 { 3096 pi += 4.0L * threadResult; 3097 } 3098 --- 3099 */ 3100 static struct WorkerLocalStorage(T) 3101 { 3102 private: 3103 TaskPool pool; 3104 size_t size; 3105 3106 size_t elemSize; 3107 bool* stillThreadLocal; 3108 3109 static size_t roundToLine(size_t num) pure nothrow 3110 { 3111 if (num % cacheLineSize == 0) 3112 { 3113 return num; 3114 } 3115 else 3116 { 3117 return ((num / cacheLineSize) + 1) * cacheLineSize; 3118 } 3119 } 3120 3121 void* data; 3122 3123 void initialize(TaskPool pool) 3124 { 3125 this.pool = pool; 3126 size = pool.size + 1; 3127 stillThreadLocal = new bool; 3128 *stillThreadLocal = true; 3129 3130 // Determines whether the GC should scan the array. 3131 auto blkInfo = (typeid(T).flags & 1) ? 3132 cast(GC.BlkAttr) 0 : 3133 GC.BlkAttr.NO_SCAN; 3134 3135 immutable nElem = pool.size + 1; 3136 elemSize = roundToLine(T.sizeof); 3137 3138 // The + 3 is to pad one full cache line worth of space on either side 3139 // of the data structure to make sure false sharing with completely 3140 // unrelated heap data is prevented, and to provide enough padding to 3141 // make sure that data is cache line-aligned. 3142 data = GC.malloc(elemSize * (nElem + 3), blkInfo) + elemSize; 3143 3144 // Cache line align data ptr. 3145 data = cast(void*) roundToLine(cast(size_t) data); 3146 3147 foreach (i; 0 .. nElem) 3148 { 3149 this.opIndex(i) = T.init; 3150 } 3151 } 3152 3153 ref opIndex(this Qualified)(size_t index) 3154 { 3155 import std.conv : text; 3156 assert(index < size, text(index, '\t', uint.max)); 3157 return *(cast(CopyTypeQualifiers!(Qualified, T)*) (data + elemSize * index)); 3158 } 3159 3160 void opIndexAssign(T val, size_t index) 3161 { 3162 assert(index < size); 3163 *(cast(T*) (data + elemSize * index)) = val; 3164 } 3165 3166 public: 3167 /** 3168 Get the current thread's instance. Returns by ref. 3169 Note that calling `get` from any thread 3170 outside the `TaskPool` that created this instance will return the 3171 same reference, so an instance of worker-local storage should only be 3172 accessed from one thread outside the pool that created it. If this 3173 rule is violated, undefined behavior will result. 3174 3175 If assertions are enabled and `toRange` has been called, then this 3176 WorkerLocalStorage instance is no longer worker-local and an assertion 3177 failure will result when calling this method. This is not checked 3178 when assertions are disabled for performance reasons. 3179 */ 3180 ref get(this Qualified)() @property 3181 { 3182 assert(*stillThreadLocal, 3183 "Cannot call get() on this instance of WorkerLocalStorage " ~ 3184 "because it is no longer worker-local." 3185 ); 3186 return opIndex(pool.workerIndex); 3187 } 3188 3189 /** 3190 Assign a value to the current thread's instance. This function has 3191 the same caveats as its overload. 3192 */ 3193 void get(T val) @property 3194 { 3195 assert(*stillThreadLocal, 3196 "Cannot call get() on this instance of WorkerLocalStorage " ~ 3197 "because it is no longer worker-local." 3198 ); 3199 3200 opIndexAssign(val, pool.workerIndex); 3201 } 3202 3203 /** 3204 Returns a range view of the values for all threads, which can be used 3205 to further process the results of each thread after running the parallel 3206 part of your algorithm. Do not use this method in the parallel portion 3207 of your algorithm. 3208 3209 Calling this function sets a flag indicating that this struct is no 3210 longer worker-local, and attempting to use the `get` method again 3211 will result in an assertion failure if assertions are enabled. 3212 */ 3213 WorkerLocalStorageRange!T toRange() @property 3214 { 3215 if (*stillThreadLocal) 3216 { 3217 *stillThreadLocal = false; 3218 3219 // Make absolutely sure results are visible to all threads. 3220 // This is probably not necessary since some other 3221 // synchronization primitive will be used to signal that the 3222 // parallel part of the algorithm is done, but the 3223 // performance impact should be negligible, so it's better 3224 // to be safe. 3225 ubyte barrierDummy; 3226 atomicSetUbyte(barrierDummy, 1); 3227 } 3228 3229 return WorkerLocalStorageRange!T(this); 3230 } 3231 } 3232 3233 /** 3234 Range primitives for worker-local storage. The purpose of this is to 3235 access results produced by each worker thread from a single thread once you 3236 are no longer using the worker-local storage from multiple threads. 3237 Do not use this struct in the parallel portion of your algorithm. 3238 3239 The proper way to instantiate this object is to call 3240 `WorkerLocalStorage.toRange`. Once instantiated, this object behaves 3241 as a finite random-access range with assignable, lvalue elements and 3242 a length equal to the number of worker threads in the `TaskPool` that 3243 created it plus 1. 3244 */ 3245 static struct WorkerLocalStorageRange(T) 3246 { 3247 private: 3248 WorkerLocalStorage!T workerLocalStorage; 3249 3250 size_t _length; 3251 size_t beginOffset; 3252 3253 this(WorkerLocalStorage!T wl) 3254 { 3255 this.workerLocalStorage = wl; 3256 _length = wl.size; 3257 } 3258 3259 public: 3260 ref front(this Qualified)() @property 3261 { 3262 return this[0]; 3263 } 3264 3265 ref back(this Qualified)() @property 3266 { 3267 return this[_length - 1]; 3268 } 3269 3270 void popFront() 3271 { 3272 if (_length > 0) 3273 { 3274 beginOffset++; 3275 _length--; 3276 } 3277 } 3278 3279 void popBack() 3280 { 3281 if (_length > 0) 3282 { 3283 _length--; 3284 } 3285 } 3286 3287 typeof(this) save() @property 3288 { 3289 return this; 3290 } 3291 3292 ref opIndex(this Qualified)(size_t index) 3293 { 3294 assert(index < _length); 3295 return workerLocalStorage[index + beginOffset]; 3296 } 3297 3298 void opIndexAssign(T val, size_t index) 3299 { 3300 assert(index < _length); 3301 workerLocalStorage[index] = val; 3302 } 3303 3304 typeof(this) opSlice(size_t lower, size_t upper) 3305 { 3306 assert(upper <= _length); 3307 auto newWl = this.workerLocalStorage; 3308 newWl.data += lower * newWl.elemSize; 3309 newWl.size = upper - lower; 3310 return typeof(this)(newWl); 3311 } 3312 3313 bool empty() const @property 3314 { 3315 return length == 0; 3316 } 3317 3318 size_t length() const @property 3319 { 3320 return _length; 3321 } 3322 } 3323 3324 /** 3325 Creates an instance of worker-local storage, initialized with a given 3326 value. The value is `lazy` so that you can, for example, easily 3327 create one instance of a class for each worker. For usage example, 3328 see the `WorkerLocalStorage` struct. 3329 */ 3330 WorkerLocalStorage!T workerLocalStorage(T)(lazy T initialVal = T.init) 3331 { 3332 WorkerLocalStorage!T ret; 3333 ret.initialize(this); 3334 foreach (i; 0 .. size + 1) 3335 { 3336 ret[i] = initialVal; 3337 } 3338 3339 // Memory barrier to make absolutely sure that what we wrote is 3340 // visible to worker threads. 3341 ubyte barrierDummy; 3342 atomicSetUbyte(barrierDummy, 0); 3343 3344 return ret; 3345 } 3346 3347 /** 3348 Signals to all worker threads to terminate as soon as they are finished 3349 with their current `Task`, or immediately if they are not executing a 3350 `Task`. `Task`s that were in queue will not be executed unless 3351 a call to `Task.workForce`, `Task.yieldForce` or `Task.spinForce` 3352 causes them to be executed. 3353 3354 Use only if you have waited on every `Task` and therefore know the 3355 queue is empty, or if you speculatively executed some tasks and no longer 3356 need the results. 3357 */ 3358 void stop() @trusted 3359 { 3360 queueLock(); 3361 scope(exit) queueUnlock(); 3362 atomicSetUbyte(status, PoolState.stopNow); 3363 notifyAll(); 3364 } 3365 3366 /** 3367 Signals worker threads to terminate when the queue becomes empty. 3368 3369 If blocking argument is true, wait for all worker threads to terminate 3370 before returning. This option might be used in applications where 3371 task results are never consumed-- e.g. when `TaskPool` is employed as a 3372 rudimentary scheduler for tasks which communicate by means other than 3373 return values. 3374 3375 Warning: Calling this function with $(D blocking = true) from a worker 3376 thread that is a member of the same `TaskPool` that 3377 `finish` is being called on will result in a deadlock. 3378 */ 3379 void finish(bool blocking = false) @trusted 3380 { 3381 { 3382 queueLock(); 3383 scope(exit) queueUnlock(); 3384 atomicCasUbyte(status, PoolState.running, PoolState.finishing); 3385 notifyAll(); 3386 } 3387 if (blocking) 3388 { 3389 // Use this thread as a worker until everything is finished. 3390 executeWorkLoop(); 3391 3392 foreach (t; pool) 3393 { 3394 // Maybe there should be something here to prevent a thread 3395 // from calling join() on itself if this function is called 3396 // from a worker thread in the same pool, but: 3397 // 3398 // 1. Using an if statement to skip join() would result in 3399 // finish() returning without all tasks being finished. 3400 // 3401 // 2. If an exception were thrown, it would bubble up to the 3402 // Task from which finish() was called and likely be 3403 // swallowed. 3404 t.join(); 3405 } 3406 } 3407 } 3408 3409 /// Returns the number of worker threads in the pool. 3410 @property size_t size() @safe const pure nothrow 3411 { 3412 return pool.length; 3413 } 3414 3415 /** 3416 Put a `Task` object on the back of the task queue. The `Task` 3417 object may be passed by pointer or reference. 3418 3419 Example: 3420 --- 3421 import std.file; 3422 3423 // Create a task. 3424 auto t = task!read("foo.txt"); 3425 3426 // Add it to the queue to be executed. 3427 taskPool.put(t); 3428 --- 3429 3430 Notes: 3431 3432 @trusted overloads of this function are called for `Task`s if 3433 $(REF hasUnsharedAliasing, std,traits) is false for the `Task`'s 3434 return type or the function the `Task` executes is `pure`. 3435 `Task` objects that meet all other requirements specified in the 3436 `@trusted` overloads of `task` and `scopedTask` may be created 3437 and executed from `@safe` code via `Task.executeInNewThread` but 3438 not via `TaskPool`. 3439 3440 While this function takes the address of variables that may 3441 be on the stack, some overloads are marked as @trusted. 3442 `Task` includes a destructor that waits for the task to complete 3443 before destroying the stack frame it is allocated on. Therefore, 3444 it is impossible for the stack frame to be destroyed before the task is 3445 complete and no longer referenced by a `TaskPool`. 3446 */ 3447 void put(alias fun, Args...)(ref Task!(fun, Args) task) 3448 if (!isSafeReturn!(typeof(task))) 3449 { 3450 task.pool = this; 3451 abstractPut(task.basePtr); 3452 } 3453 3454 /// Ditto 3455 void put(alias fun, Args...)(Task!(fun, Args)* task) 3456 if (!isSafeReturn!(typeof(*task))) 3457 { 3458 import std.exception : enforce; 3459 enforce(task !is null, "Cannot put a null Task on a TaskPool queue."); 3460 put(*task); 3461 } 3462 3463 @trusted void put(alias fun, Args...)(ref Task!(fun, Args) task) 3464 if (isSafeReturn!(typeof(task))) 3465 { 3466 task.pool = this; 3467 abstractPut(task.basePtr); 3468 } 3469 3470 @trusted void put(alias fun, Args...)(Task!(fun, Args)* task) 3471 if (isSafeReturn!(typeof(*task))) 3472 { 3473 import std.exception : enforce; 3474 enforce(task !is null, "Cannot put a null Task on a TaskPool queue."); 3475 put(*task); 3476 } 3477 3478 /** 3479 These properties control whether the worker threads are daemon threads. 3480 A daemon thread is automatically terminated when all non-daemon threads 3481 have terminated. A non-daemon thread will prevent a program from 3482 terminating as long as it has not terminated. 3483 3484 If any `TaskPool` with non-daemon threads is active, either `stop` 3485 or `finish` must be called on it before the program can terminate. 3486 3487 The worker treads in the `TaskPool` instance returned by the 3488 `taskPool` property are daemon by default. The worker threads of 3489 manually instantiated task pools are non-daemon by default. 3490 3491 Note: For a size zero pool, the getter arbitrarily returns true and the 3492 setter has no effect. 3493 */ 3494 bool isDaemon() @property @trusted 3495 { 3496 queueLock(); 3497 scope(exit) queueUnlock(); 3498 return (size == 0) ? true : pool[0].isDaemon; 3499 } 3500 3501 /// Ditto 3502 void isDaemon(bool newVal) @property @trusted 3503 { 3504 queueLock(); 3505 scope(exit) queueUnlock(); 3506 foreach (thread; pool) 3507 { 3508 thread.isDaemon = newVal; 3509 } 3510 } 3511 3512 /** 3513 These functions allow getting and setting the OS scheduling priority of 3514 the worker threads in this `TaskPool`. They forward to 3515 `core.thread.Thread.priority`, so a given priority value here means the 3516 same thing as an identical priority value in `core.thread`. 3517 3518 Note: For a size zero pool, the getter arbitrarily returns 3519 `core.thread.Thread.PRIORITY_MIN` and the setter has no effect. 3520 */ 3521 int priority() @property @trusted 3522 { 3523 return (size == 0) ? core.thread.Thread.PRIORITY_MIN : 3524 pool[0].priority; 3525 } 3526 3527 /// Ditto 3528 void priority(int newPriority) @property @trusted 3529 { 3530 if (size > 0) 3531 { 3532 foreach (t; pool) 3533 { 3534 t.priority = newPriority; 3535 } 3536 } 3537 } 3538 } 3539 3540 @system unittest 3541 { 3542 import std.algorithm.iteration : sum; 3543 import std.range : iota; 3544 import std.typecons : tuple; 3545 3546 enum N = 100; 3547 auto r = iota(1, N + 1); 3548 const expected = r.sum(); 3549 3550 // Just the range 3551 assert(taskPool.fold!"a + b"(r) == expected); 3552 3553 // Range and seeds 3554 assert(taskPool.fold!"a + b"(r, 0) == expected); 3555 assert(taskPool.fold!("a + b", "a + b")(r, 0, 0) == tuple(expected, expected)); 3556 3557 // Range, seeds, and work unit size 3558 assert(taskPool.fold!"a + b"(r, 0, 42) == expected); 3559 assert(taskPool.fold!("a + b", "a + b")(r, 0, 0, 42) == tuple(expected, expected)); 3560 } 3561 3562 // Issue 16705 3563 @system unittest 3564 { 3565 struct MyIota 3566 { 3567 size_t front; 3568 void popFront()(){front++;} 3569 auto empty(){return front >= 25;} 3570 auto opIndex(size_t i){return front+i;} 3571 auto length(){return 25-front;} 3572 } 3573 3574 auto mySum = taskPool.reduce!"a + b"(MyIota()); 3575 } 3576 3577 /** 3578 Returns a lazily initialized global instantiation of `TaskPool`. 3579 This function can safely be called concurrently from multiple non-worker 3580 threads. The worker threads in this pool are daemon threads, meaning that it 3581 is not necessary to call `TaskPool.stop` or `TaskPool.finish` before 3582 terminating the main thread. 3583 */ 3584 @property TaskPool taskPool() @trusted 3585 { 3586 import std.concurrency : initOnce; 3587 __gshared TaskPool pool; 3588 return initOnce!pool({ 3589 auto p = new TaskPool(defaultPoolThreads); 3590 p.isDaemon = true; 3591 return p; 3592 }()); 3593 } 3594 3595 private shared uint _defaultPoolThreads = uint.max; 3596 3597 /** 3598 These properties get and set the number of worker threads in the `TaskPool` 3599 instance returned by `taskPool`. The default value is `totalCPUs` - 1. 3600 Calling the setter after the first call to `taskPool` does not changes 3601 number of worker threads in the instance returned by `taskPool`. 3602 */ 3603 @property uint defaultPoolThreads() @trusted 3604 { 3605 const local = atomicLoad(_defaultPoolThreads); 3606 return local < uint.max ? local : totalCPUs - 1; 3607 } 3608 3609 /// Ditto 3610 @property void defaultPoolThreads(uint newVal) @trusted 3611 { 3612 atomicStore(_defaultPoolThreads, newVal); 3613 } 3614 3615 /** 3616 Convenience functions that forwards to `taskPool.parallel`. The 3617 purpose of these is to make parallel foreach less verbose and more 3618 readable. 3619 3620 Example: 3621 --- 3622 // Find the logarithm of every number from 3623 // 1 to 1_000_000 in parallel, using the 3624 // default TaskPool instance. 3625 auto logs = new double[1_000_000]; 3626 3627 foreach (i, ref elem; parallel(logs)) 3628 { 3629 elem = log(i + 1.0); 3630 } 3631 --- 3632 3633 */ 3634 ParallelForeach!R parallel(R)(R range) 3635 { 3636 return taskPool.parallel(range); 3637 } 3638 3639 /// Ditto 3640 ParallelForeach!R parallel(R)(R range, size_t workUnitSize) 3641 { 3642 return taskPool.parallel(range, workUnitSize); 3643 } 3644 3645 // `each` should be usable with parallel 3646 // https://issues.dlang.org/show_bug.cgi?id=17019 3647 @system unittest 3648 { 3649 import std.algorithm.iteration : each, sum; 3650 import std.range : iota; 3651 3652 // check behavior with parallel 3653 auto arr = new int[10]; 3654 parallel(arr).each!((ref e) => e += 1); 3655 assert(arr.sum == 10); 3656 3657 auto arrIndex = new int[10]; 3658 parallel(arrIndex).each!((i, ref e) => e += i); 3659 assert(arrIndex.sum == 10.iota.sum); 3660 } 3661 3662 // https://issues.dlang.org/show_bug.cgi?id=22745 3663 @system unittest 3664 { 3665 auto pool = new TaskPool(0); 3666 int[] empty; 3667 foreach (i; pool.parallel(empty)) {} 3668 pool.finish(); 3669 } 3670 3671 // Thrown when a parallel foreach loop is broken from. 3672 class ParallelForeachError : Error 3673 { 3674 this() 3675 { 3676 super("Cannot break from a parallel foreach loop using break, return, " 3677 ~ "labeled break/continue or goto statements."); 3678 } 3679 } 3680 3681 /*------Structs that implement opApply for parallel foreach.------------------*/ 3682 private template randLen(R) 3683 { 3684 enum randLen = isRandomAccessRange!R && hasLength!R; 3685 } 3686 3687 private void submitAndExecute( 3688 TaskPool pool, 3689 scope void delegate() doIt 3690 ) 3691 { 3692 import core.exception : OutOfMemoryError; 3693 immutable nThreads = pool.size + 1; 3694 3695 alias PTask = typeof(scopedTask(doIt)); 3696 import core.stdc.stdlib : malloc, free; 3697 import core.stdc.string : memcpy; 3698 3699 // The logical thing to do would be to just use alloca() here, but that 3700 // causes problems on Windows for reasons that I don't understand 3701 // (tentatively a compiler bug) and definitely doesn't work on Posix due 3702 // to https://issues.dlang.org/show_bug.cgi?id=3753. 3703 // Therefore, allocate a fixed buffer and fall back to `malloc()` if 3704 // someone's using a ridiculous amount of threads. 3705 // Also, the using a byte array instead of a PTask array as the fixed buffer 3706 // is to prevent d'tors from being called on uninitialized excess PTask 3707 // instances. 3708 enum nBuf = 64; 3709 byte[nBuf * PTask.sizeof] buf = void; 3710 PTask[] tasks; 3711 if (nThreads <= nBuf) 3712 { 3713 tasks = (cast(PTask*) buf.ptr)[0 .. nThreads]; 3714 } 3715 else 3716 { 3717 auto ptr = cast(PTask*) malloc(nThreads * PTask.sizeof); 3718 if (!ptr) throw new OutOfMemoryError("Out of memory in std.parallelism."); 3719 tasks = ptr[0 .. nThreads]; 3720 } 3721 3722 scope(exit) 3723 { 3724 if (nThreads > nBuf) 3725 { 3726 free(tasks.ptr); 3727 } 3728 } 3729 3730 foreach (ref t; tasks) 3731 { 3732 import core.stdc.string : memcpy; 3733 3734 // This silly looking code is necessary to prevent d'tors from being 3735 // called on uninitialized objects. 3736 auto temp = scopedTask(doIt); 3737 memcpy(&t, &temp, PTask.sizeof); 3738 3739 // This has to be done to t after copying, not temp before copying. 3740 // Otherwise, temp's destructor will sit here and wait for the 3741 // task to finish. 3742 t.pool = pool; 3743 } 3744 3745 foreach (i; 1 .. tasks.length - 1) 3746 { 3747 tasks[i].next = tasks[i + 1].basePtr; 3748 tasks[i + 1].prev = tasks[i].basePtr; 3749 } 3750 3751 if (tasks.length > 1) 3752 { 3753 pool.queueLock(); 3754 scope(exit) pool.queueUnlock(); 3755 3756 pool.abstractPutGroupNoSync( 3757 tasks[1].basePtr, 3758 tasks[$ - 1].basePtr 3759 ); 3760 } 3761 3762 if (tasks.length > 0) 3763 { 3764 try 3765 { 3766 tasks[0].job(); 3767 } 3768 catch (Throwable e) 3769 { 3770 tasks[0].exception = e; // nocoverage 3771 } 3772 tasks[0].taskStatus = TaskStatus.done; 3773 3774 // Try to execute each of these in the current thread 3775 foreach (ref task; tasks[1..$]) 3776 { 3777 pool.tryDeleteExecute(task.basePtr); 3778 } 3779 } 3780 3781 Throwable firstException; 3782 3783 foreach (i, ref task; tasks) 3784 { 3785 try 3786 { 3787 task.yieldForce; 3788 } 3789 catch (Throwable e) 3790 { 3791 /* Chain e to front because order doesn't matter and because 3792 * e is not likely to be a chain itself (so fewer traversals) 3793 */ 3794 firstException = Throwable.chainTogether(e, firstException); 3795 continue; 3796 } 3797 } 3798 3799 if (firstException) throw firstException; 3800 } 3801 3802 void foreachErr() 3803 { 3804 throw new ParallelForeachError(); 3805 } 3806 3807 int doSizeZeroCase(R, Delegate)(ref ParallelForeach!R p, Delegate dg) 3808 { 3809 with(p) 3810 { 3811 int res = 0; 3812 size_t index = 0; 3813 3814 // The explicit ElementType!R in the foreach loops is necessary for 3815 // correct behavior when iterating over strings. 3816 static if (hasLvalueElements!R) 3817 { 3818 foreach (ref ElementType!R elem; range) 3819 { 3820 static if (Parameters!dg.length == 2) 3821 { 3822 res = dg(index, elem); 3823 } 3824 else 3825 { 3826 res = dg(elem); 3827 } 3828 if (res) break; 3829 index++; 3830 } 3831 } 3832 else 3833 { 3834 foreach (ElementType!R elem; range) 3835 { 3836 static if (Parameters!dg.length == 2) 3837 { 3838 res = dg(index, elem); 3839 } 3840 else 3841 { 3842 res = dg(elem); 3843 } 3844 if (res) break; 3845 index++; 3846 } 3847 } 3848 if (res) foreachErr; 3849 return res; 3850 } 3851 } 3852 3853 private enum string parallelApplyMixinRandomAccess = q{ 3854 // Handle empty thread pool as special case. 3855 if (pool.size == 0) 3856 { 3857 return doSizeZeroCase(this, dg); 3858 } 3859 3860 // Whether iteration is with or without an index variable. 3861 enum withIndex = Parameters!(typeof(dg)).length == 2; 3862 3863 shared size_t workUnitIndex = size_t.max; // Effectively -1: chunkIndex + 1 == 0 3864 immutable len = range.length; 3865 if (!len) return 0; 3866 3867 shared bool shouldContinue = true; 3868 3869 void doIt() 3870 { 3871 import std.algorithm.comparison : min; 3872 3873 scope(failure) 3874 { 3875 // If an exception is thrown, all threads should bail. 3876 atomicStore(shouldContinue, false); 3877 } 3878 3879 while (atomicLoad(shouldContinue)) 3880 { 3881 immutable myUnitIndex = atomicOp!"+="(workUnitIndex, 1); 3882 immutable start = workUnitSize * myUnitIndex; 3883 if (start >= len) 3884 { 3885 atomicStore(shouldContinue, false); 3886 break; 3887 } 3888 3889 immutable end = min(len, start + workUnitSize); 3890 3891 foreach (i; start .. end) 3892 { 3893 static if (withIndex) 3894 { 3895 if (dg(i, range[i])) foreachErr(); 3896 } 3897 else 3898 { 3899 if (dg(range[i])) foreachErr(); 3900 } 3901 } 3902 } 3903 } 3904 3905 submitAndExecute(pool, &doIt); 3906 3907 return 0; 3908 }; 3909 3910 enum string parallelApplyMixinInputRange = q{ 3911 // Handle empty thread pool as special case. 3912 if (pool.size == 0) 3913 { 3914 return doSizeZeroCase(this, dg); 3915 } 3916 3917 // Whether iteration is with or without an index variable. 3918 enum withIndex = Parameters!(typeof(dg)).length == 2; 3919 3920 // This protects the range while copying it. 3921 auto rangeMutex = new Mutex(); 3922 3923 shared bool shouldContinue = true; 3924 3925 // The total number of elements that have been popped off range. 3926 // This is updated only while protected by rangeMutex; 3927 size_t nPopped = 0; 3928 3929 static if ( 3930 is(typeof(range.buf1)) && 3931 is(typeof(range.bufPos)) && 3932 is(typeof(range.doBufSwap())) 3933 ) 3934 { 3935 // Make sure we don't have the buffer recycling overload of 3936 // asyncBuf. 3937 static if ( 3938 is(typeof(range.source)) && 3939 isRoundRobin!(typeof(range.source)) 3940 ) 3941 { 3942 static assert(0, "Cannot execute a parallel foreach loop on " ~ 3943 "the buffer recycling overload of asyncBuf."); 3944 } 3945 3946 enum bool bufferTrick = true; 3947 } 3948 else 3949 { 3950 enum bool bufferTrick = false; 3951 } 3952 3953 void doIt() 3954 { 3955 scope(failure) 3956 { 3957 // If an exception is thrown, all threads should bail. 3958 atomicStore(shouldContinue, false); 3959 } 3960 3961 static if (hasLvalueElements!R) 3962 { 3963 alias Temp = ElementType!R*[]; 3964 Temp temp; 3965 3966 // Returns: The previous value of nPopped. 3967 size_t makeTemp() 3968 { 3969 import std.algorithm.internal : addressOf; 3970 import std.array : uninitializedArray; 3971 3972 if (temp is null) 3973 { 3974 temp = uninitializedArray!Temp(workUnitSize); 3975 } 3976 3977 rangeMutex.lock(); 3978 scope(exit) rangeMutex.unlock(); 3979 3980 size_t i = 0; 3981 for (; i < workUnitSize && !range.empty; range.popFront(), i++) 3982 { 3983 temp[i] = addressOf(range.front); 3984 } 3985 3986 temp = temp[0 .. i]; 3987 auto ret = nPopped; 3988 nPopped += temp.length; 3989 return ret; 3990 } 3991 3992 } 3993 else 3994 { 3995 3996 alias Temp = ElementType!R[]; 3997 Temp temp; 3998 3999 // Returns: The previous value of nPopped. 4000 static if (!bufferTrick) size_t makeTemp() 4001 { 4002 import std.array : uninitializedArray; 4003 4004 if (temp is null) 4005 { 4006 temp = uninitializedArray!Temp(workUnitSize); 4007 } 4008 4009 rangeMutex.lock(); 4010 scope(exit) rangeMutex.unlock(); 4011 4012 size_t i = 0; 4013 for (; i < workUnitSize && !range.empty; range.popFront(), i++) 4014 { 4015 temp[i] = range.front; 4016 } 4017 4018 temp = temp[0 .. i]; 4019 auto ret = nPopped; 4020 nPopped += temp.length; 4021 return ret; 4022 } 4023 4024 static if (bufferTrick) size_t makeTemp() 4025 { 4026 import std.algorithm.mutation : swap; 4027 rangeMutex.lock(); 4028 scope(exit) rangeMutex.unlock(); 4029 4030 // Elide copying by just swapping buffers. 4031 temp.length = range.buf1.length; 4032 swap(range.buf1, temp); 4033 4034 // This is necessary in case popFront() has been called on 4035 // range before entering the parallel foreach loop. 4036 temp = temp[range.bufPos..$]; 4037 4038 static if (is(typeof(range._length))) 4039 { 4040 range._length -= (temp.length - range.bufPos); 4041 } 4042 4043 range.doBufSwap(); 4044 auto ret = nPopped; 4045 nPopped += temp.length; 4046 return ret; 4047 } 4048 } 4049 4050 while (atomicLoad(shouldContinue)) 4051 { 4052 auto overallIndex = makeTemp(); 4053 if (temp.empty) 4054 { 4055 atomicStore(shouldContinue, false); 4056 break; 4057 } 4058 4059 foreach (i; 0 .. temp.length) 4060 { 4061 scope(success) overallIndex++; 4062 4063 static if (hasLvalueElements!R) 4064 { 4065 static if (withIndex) 4066 { 4067 if (dg(overallIndex, *temp[i])) foreachErr(); 4068 } 4069 else 4070 { 4071 if (dg(*temp[i])) foreachErr(); 4072 } 4073 } 4074 else 4075 { 4076 static if (withIndex) 4077 { 4078 if (dg(overallIndex, temp[i])) foreachErr(); 4079 } 4080 else 4081 { 4082 if (dg(temp[i])) foreachErr(); 4083 } 4084 } 4085 } 4086 } 4087 } 4088 4089 submitAndExecute(pool, &doIt); 4090 4091 return 0; 4092 }; 4093 4094 4095 private struct ParallelForeach(R) 4096 { 4097 TaskPool pool; 4098 R range; 4099 size_t workUnitSize; 4100 alias E = ElementType!R; 4101 4102 static if (hasLvalueElements!R) 4103 { 4104 alias NoIndexDg = int delegate(ref E); 4105 alias IndexDg = int delegate(size_t, ref E); 4106 } 4107 else 4108 { 4109 alias NoIndexDg = int delegate(E); 4110 alias IndexDg = int delegate(size_t, E); 4111 } 4112 4113 int opApply(scope NoIndexDg dg) 4114 { 4115 static if (randLen!R) 4116 { 4117 mixin(parallelApplyMixinRandomAccess); 4118 } 4119 else 4120 { 4121 mixin(parallelApplyMixinInputRange); 4122 } 4123 } 4124 4125 int opApply(scope IndexDg dg) 4126 { 4127 static if (randLen!R) 4128 { 4129 mixin(parallelApplyMixinRandomAccess); 4130 } 4131 else 4132 { 4133 mixin(parallelApplyMixinInputRange); 4134 } 4135 } 4136 } 4137 4138 /* 4139 This struct buffers the output of a callable that outputs data into a 4140 user-supplied buffer into a set of buffers of some fixed size. It allows these 4141 buffers to be accessed with an input range interface. This is used internally 4142 in the buffer-recycling overload of TaskPool.asyncBuf, which creates an 4143 instance and forwards it to the input range overload of asyncBuf. 4144 */ 4145 private struct RoundRobinBuffer(C1, C2) 4146 { 4147 // No need for constraints because they're already checked for in asyncBuf. 4148 4149 alias Array = Parameters!(C1.init)[0]; 4150 alias T = typeof(Array.init[0]); 4151 4152 T[][] bufs; 4153 size_t index; 4154 C1 nextDel; 4155 C2 emptyDel; 4156 bool _empty; 4157 bool primed; 4158 4159 this( 4160 C1 nextDel, 4161 C2 emptyDel, 4162 size_t initialBufSize, 4163 size_t nBuffers 4164 ) { 4165 this.nextDel = nextDel; 4166 this.emptyDel = emptyDel; 4167 bufs.length = nBuffers; 4168 4169 foreach (ref buf; bufs) 4170 { 4171 buf.length = initialBufSize; 4172 } 4173 } 4174 4175 void prime() 4176 in 4177 { 4178 assert(!empty); 4179 } 4180 do 4181 { 4182 scope(success) primed = true; 4183 nextDel(bufs[index]); 4184 } 4185 4186 4187 T[] front() @property 4188 in 4189 { 4190 assert(!empty); 4191 } 4192 do 4193 { 4194 if (!primed) prime(); 4195 return bufs[index]; 4196 } 4197 4198 void popFront() 4199 { 4200 if (empty || emptyDel()) 4201 { 4202 _empty = true; 4203 return; 4204 } 4205 4206 index = (index + 1) % bufs.length; 4207 primed = false; 4208 } 4209 4210 bool empty() @property const @safe pure nothrow 4211 { 4212 return _empty; 4213 } 4214 } 4215 4216 version (StdUnittest) 4217 { 4218 // This was the only way I could get nested maps to work. 4219 private __gshared TaskPool poolInstance; 4220 } 4221 4222 // These test basic functionality but don't stress test for threading bugs. 4223 // These are the tests that should be run every time Phobos is compiled. 4224 @system unittest 4225 { 4226 import std.algorithm.comparison : equal, min, max; 4227 import std.algorithm.iteration : filter, map, reduce; 4228 import std.array : split; 4229 import std.conv : text; 4230 import std.exception : assertThrown; 4231 import std.math.operations : isClose; 4232 import std.math.algebraic : sqrt, abs; 4233 import std.math.exponential : log; 4234 import std.range : indexed, iota, join; 4235 import std.typecons : Tuple, tuple; 4236 import std.stdio; 4237 4238 poolInstance = new TaskPool(2); 4239 scope(exit) poolInstance.stop(); 4240 4241 // The only way this can be verified is manually. 4242 debug(std_parallelism) stderr.writeln("totalCPUs = ", totalCPUs); 4243 4244 auto oldPriority = poolInstance.priority; 4245 poolInstance.priority = Thread.PRIORITY_MAX; 4246 assert(poolInstance.priority == Thread.PRIORITY_MAX); 4247 4248 poolInstance.priority = Thread.PRIORITY_MIN; 4249 assert(poolInstance.priority == Thread.PRIORITY_MIN); 4250 4251 poolInstance.priority = oldPriority; 4252 assert(poolInstance.priority == oldPriority); 4253 4254 static void refFun(ref uint num) 4255 { 4256 num++; 4257 } 4258 4259 uint x; 4260 4261 // Test task(). 4262 auto t = task!refFun(x); 4263 poolInstance.put(t); 4264 t.yieldForce; 4265 assert(t.args[0] == 1); 4266 4267 auto t2 = task(&refFun, x); 4268 poolInstance.put(t2); 4269 t2.yieldForce; 4270 assert(t2.args[0] == 1); 4271 4272 // Test scopedTask(). 4273 auto st = scopedTask!refFun(x); 4274 poolInstance.put(st); 4275 st.yieldForce; 4276 assert(st.args[0] == 1); 4277 4278 auto st2 = scopedTask(&refFun, x); 4279 poolInstance.put(st2); 4280 st2.yieldForce; 4281 assert(st2.args[0] == 1); 4282 4283 // Test executeInNewThread(). 4284 auto ct = scopedTask!refFun(x); 4285 ct.executeInNewThread(Thread.PRIORITY_MAX); 4286 ct.yieldForce; 4287 assert(ct.args[0] == 1); 4288 4289 // Test ref return. 4290 uint toInc = 0; 4291 static ref T makeRef(T)(ref T num) 4292 { 4293 return num; 4294 } 4295 4296 auto t3 = task!makeRef(toInc); 4297 taskPool.put(t3); 4298 assert(t3.args[0] == 0); 4299 t3.spinForce++; 4300 assert(t3.args[0] == 1); 4301 4302 static void testSafe() @safe { 4303 static int bump(int num) 4304 { 4305 return num + 1; 4306 } 4307 4308 auto safePool = new TaskPool(0); 4309 auto t = task(&bump, 1); 4310 taskPool.put(t); 4311 assert(t.yieldForce == 2); 4312 4313 auto st = scopedTask(&bump, 1); 4314 taskPool.put(st); 4315 assert(st.yieldForce == 2); 4316 safePool.stop(); 4317 } 4318 4319 auto arr = [1,2,3,4,5]; 4320 auto nums = new uint[5]; 4321 auto nums2 = new uint[5]; 4322 4323 foreach (i, ref elem; poolInstance.parallel(arr)) 4324 { 4325 elem++; 4326 nums[i] = cast(uint) i + 2; 4327 nums2[i] = elem; 4328 } 4329 4330 assert(nums == [2,3,4,5,6], text(nums)); 4331 assert(nums2 == nums, text(nums2)); 4332 assert(arr == nums, text(arr)); 4333 4334 // Test const/immutable arguments. 4335 static int add(int lhs, int rhs) 4336 { 4337 return lhs + rhs; 4338 } 4339 immutable addLhs = 1; 4340 immutable addRhs = 2; 4341 auto addTask = task(&add, addLhs, addRhs); 4342 auto addScopedTask = scopedTask(&add, addLhs, addRhs); 4343 poolInstance.put(addTask); 4344 poolInstance.put(addScopedTask); 4345 assert(addTask.yieldForce == 3); 4346 assert(addScopedTask.yieldForce == 3); 4347 4348 // Test parallel foreach with non-random access range. 4349 auto range = filter!"a != 666"([0, 1, 2, 3, 4]); 4350 4351 foreach (i, elem; poolInstance.parallel(range)) 4352 { 4353 nums[i] = cast(uint) i; 4354 } 4355 4356 assert(nums == [0,1,2,3,4]); 4357 4358 auto logs = new double[1_000_000]; 4359 foreach (i, ref elem; poolInstance.parallel(logs)) 4360 { 4361 elem = log(i + 1.0); 4362 } 4363 4364 foreach (i, elem; logs) 4365 { 4366 assert(isClose(elem, log(double(i + 1)))); 4367 } 4368 4369 assert(poolInstance.amap!"a * a"([1,2,3,4,5]) == [1,4,9,16,25]); 4370 assert(poolInstance.amap!"a * a"([1,2,3,4,5], new long[5]) == [1,4,9,16,25]); 4371 assert(poolInstance.amap!("a * a", "-a")([1,2,3]) == 4372 [tuple(1, -1), tuple(4, -2), tuple(9, -3)]); 4373 4374 auto tupleBuf = new Tuple!(int, int)[3]; 4375 poolInstance.amap!("a * a", "-a")([1,2,3], tupleBuf); 4376 assert(tupleBuf == [tuple(1, -1), tuple(4, -2), tuple(9, -3)]); 4377 poolInstance.amap!("a * a", "-a")([1,2,3], 5, tupleBuf); 4378 assert(tupleBuf == [tuple(1, -1), tuple(4, -2), tuple(9, -3)]); 4379 4380 // Test amap with a non-array buffer. 4381 auto toIndex = new int[5]; 4382 auto ind = indexed(toIndex, [3, 1, 4, 0, 2]); 4383 poolInstance.amap!"a * 2"([1, 2, 3, 4, 5], ind); 4384 assert(equal(ind, [2, 4, 6, 8, 10])); 4385 assert(equal(toIndex, [8, 4, 10, 2, 6])); 4386 poolInstance.amap!"a / 2"(ind, ind); 4387 assert(equal(ind, [1, 2, 3, 4, 5])); 4388 assert(equal(toIndex, [4, 2, 5, 1, 3])); 4389 4390 auto buf = new int[5]; 4391 poolInstance.amap!"a * a"([1,2,3,4,5], buf); 4392 assert(buf == [1,4,9,16,25]); 4393 poolInstance.amap!"a * a"([1,2,3,4,5], 4, buf); 4394 assert(buf == [1,4,9,16,25]); 4395 4396 assert(poolInstance.reduce!"a + b"([1]) == 1); 4397 assert(poolInstance.reduce!"a + b"([1,2,3,4]) == 10); 4398 assert(poolInstance.reduce!"a + b"(0.0, [1,2,3,4]) == 10); 4399 assert(poolInstance.reduce!"a + b"(0.0, [1,2,3,4], 1) == 10); 4400 assert(poolInstance.reduce!(min, max)([1,2,3,4]) == tuple(1, 4)); 4401 assert(poolInstance.reduce!("a + b", "a * b")(tuple(0, 1), [1,2,3,4]) == 4402 tuple(10, 24)); 4403 4404 immutable serialAns = reduce!"a + b"(iota(1000)); 4405 assert(poolInstance.reduce!"a + b"(0, iota(1000)) == serialAns); 4406 assert(poolInstance.reduce!"a + b"(iota(1000)) == serialAns); 4407 4408 // Test worker-local storage. 4409 auto wl = poolInstance.workerLocalStorage(0); 4410 foreach (i; poolInstance.parallel(iota(1000), 1)) 4411 { 4412 wl.get = wl.get + i; 4413 } 4414 4415 auto wlRange = wl.toRange; 4416 auto parallelSum = poolInstance.reduce!"a + b"(wlRange); 4417 assert(parallelSum == 499500); 4418 assert(wlRange[0 .. 1][0] == wlRange[0]); 4419 assert(wlRange[1 .. 2][0] == wlRange[1]); 4420 4421 // Test finish() 4422 { 4423 static void slowFun() { Thread.sleep(dur!"msecs"(1)); } 4424 4425 auto pool1 = new TaskPool(); 4426 auto tSlow = task!slowFun(); 4427 pool1.put(tSlow); 4428 pool1.finish(); 4429 tSlow.yieldForce; 4430 // Can't assert that pool1.status == PoolState.stopNow because status 4431 // doesn't change until after the "done" flag is set and the waiting 4432 // thread is woken up. 4433 4434 auto pool2 = new TaskPool(); 4435 auto tSlow2 = task!slowFun(); 4436 pool2.put(tSlow2); 4437 pool2.finish(true); // blocking 4438 assert(tSlow2.done); 4439 4440 // Test fix for https://issues.dlang.org/show_bug.cgi?id=8582 by making pool size zero. 4441 auto pool3 = new TaskPool(0); 4442 auto tSlow3 = task!slowFun(); 4443 pool3.put(tSlow3); 4444 pool3.finish(true); // blocking 4445 assert(tSlow3.done); 4446 4447 // This is correct because no thread will terminate unless pool2.status 4448 // and pool3.status have already been set to stopNow. 4449 assert(pool2.status == TaskPool.PoolState.stopNow); 4450 assert(pool3.status == TaskPool.PoolState.stopNow); 4451 } 4452 4453 // Test default pool stuff. 4454 assert(taskPool.size == totalCPUs - 1); 4455 4456 nums = new uint[1000]; 4457 foreach (i; parallel(iota(1000))) 4458 { 4459 nums[i] = cast(uint) i; 4460 } 4461 assert(equal(nums, iota(1000))); 4462 4463 assert(equal( 4464 poolInstance.map!"a * a"(iota(3_000_001), 10_000), 4465 map!"a * a"(iota(3_000_001)) 4466 )); 4467 4468 // The filter is to kill random access and test the non-random access 4469 // branch. 4470 assert(equal( 4471 poolInstance.map!"a * a"( 4472 filter!"a == a"(iota(3_000_001) 4473 ), 10_000, 1000), 4474 map!"a * a"(iota(3_000_001)) 4475 )); 4476 4477 assert( 4478 reduce!"a + b"(0UL, 4479 poolInstance.map!"a * a"(iota(300_001), 10_000) 4480 ) == 4481 reduce!"a + b"(0UL, 4482 map!"a * a"(iota(300_001)) 4483 ) 4484 ); 4485 4486 assert(equal( 4487 iota(1_000_002), 4488 poolInstance.asyncBuf(filter!"a == a"(iota(1_000_002))) 4489 )); 4490 4491 { 4492 import std.conv : to; 4493 import std.file : deleteme; 4494 4495 string temp_file = deleteme ~ "-tempDelMe.txt"; 4496 auto file = File(temp_file, "wb"); 4497 scope(exit) 4498 { 4499 file.close(); 4500 import std.file; 4501 remove(temp_file); 4502 } 4503 4504 auto written = [[1.0, 2, 3], [4.0, 5, 6], [7.0, 8, 9]]; 4505 foreach (row; written) 4506 { 4507 file.writeln(join(to!(string[])(row), "\t")); 4508 } 4509 4510 file = File(temp_file); 4511 4512 void next(ref char[] buf) 4513 { 4514 file.readln(buf); 4515 import std.string : chomp; 4516 buf = chomp(buf); 4517 } 4518 4519 double[][] read; 4520 auto asyncReader = taskPool.asyncBuf(&next, &file.eof); 4521 4522 foreach (line; asyncReader) 4523 { 4524 if (line.length == 0) continue; 4525 auto ls = line.split("\t"); 4526 read ~= to!(double[])(ls); 4527 } 4528 4529 assert(read == written); 4530 file.close(); 4531 } 4532 4533 // Test Map/AsyncBuf chaining. 4534 4535 auto abuf = poolInstance.asyncBuf(iota(-1.0, 3_000_000), 100); 4536 auto temp = poolInstance.map!sqrt( 4537 abuf, 100, 5 4538 ); 4539 auto lmchain = poolInstance.map!"a * a"(temp, 100, 5); 4540 lmchain.popFront(); 4541 4542 int ii; 4543 foreach ( elem; (lmchain)) 4544 { 4545 if (!isClose(elem, ii)) 4546 { 4547 stderr.writeln(ii, '\t', elem); 4548 } 4549 ii++; 4550 } 4551 4552 // Test buffer trick in parallel foreach. 4553 abuf = poolInstance.asyncBuf(iota(-1.0, 1_000_000), 100); 4554 abuf.popFront(); 4555 auto bufTrickTest = new size_t[abuf.length]; 4556 foreach (i, elem; parallel(abuf)) 4557 { 4558 bufTrickTest[i] = i; 4559 } 4560 4561 assert(equal(iota(1_000_000), bufTrickTest)); 4562 4563 auto myTask = task!(abs)(-1); 4564 taskPool.put(myTask); 4565 assert(myTask.spinForce == 1); 4566 4567 // Test that worker local storage from one pool receives an index of 0 4568 // when the index is queried w.r.t. another pool. The only way to do this 4569 // is non-deterministically. 4570 foreach (i; parallel(iota(1000), 1)) 4571 { 4572 assert(poolInstance.workerIndex == 0); 4573 } 4574 4575 foreach (i; poolInstance.parallel(iota(1000), 1)) 4576 { 4577 assert(taskPool.workerIndex == 0); 4578 } 4579 4580 // Test exception handling. 4581 static void parallelForeachThrow() 4582 { 4583 foreach (elem; parallel(iota(10))) 4584 { 4585 throw new Exception(""); 4586 } 4587 } 4588 4589 assertThrown!Exception(parallelForeachThrow()); 4590 4591 static int reduceException(int a, int b) 4592 { 4593 throw new Exception(""); 4594 } 4595 4596 assertThrown!Exception( 4597 poolInstance.reduce!reduceException(iota(3)) 4598 ); 4599 4600 static int mapException(int a) 4601 { 4602 throw new Exception(""); 4603 } 4604 4605 assertThrown!Exception( 4606 poolInstance.amap!mapException(iota(3)) 4607 ); 4608 4609 static void mapThrow() 4610 { 4611 auto m = poolInstance.map!mapException(iota(3)); 4612 m.popFront(); 4613 } 4614 4615 assertThrown!Exception(mapThrow()); 4616 4617 struct ThrowingRange 4618 { 4619 @property int front() 4620 { 4621 return 1; 4622 } 4623 void popFront() 4624 { 4625 throw new Exception(""); 4626 } 4627 enum bool empty = false; 4628 } 4629 4630 assertThrown!Exception(poolInstance.asyncBuf(ThrowingRange.init)); 4631 } 4632 4633 //version = parallelismStressTest; 4634 4635 // These are more like stress tests than real unit tests. They print out 4636 // tons of stuff and should not be run every time make unittest is run. 4637 version (parallelismStressTest) 4638 { 4639 @system unittest 4640 { 4641 import std.stdio : stderr, writeln, readln; 4642 import std.range : iota; 4643 import std.algorithm.iteration : filter, reduce; 4644 4645 size_t attempt; 4646 for (; attempt < 10; attempt++) 4647 foreach (poolSize; [0, 4]) 4648 { 4649 4650 poolInstance = new TaskPool(poolSize); 4651 4652 uint[] numbers = new uint[1_000]; 4653 4654 foreach (i; poolInstance.parallel( iota(0, numbers.length)) ) 4655 { 4656 numbers[i] = cast(uint) i; 4657 } 4658 4659 // Make sure it works. 4660 foreach (i; 0 .. numbers.length) 4661 { 4662 assert(numbers[i] == i); 4663 } 4664 4665 stderr.writeln("Done creating nums."); 4666 4667 4668 auto myNumbers = filter!"a % 7 > 0"( iota(0, 1000)); 4669 foreach (num; poolInstance.parallel(myNumbers)) 4670 { 4671 assert(num % 7 > 0 && num < 1000); 4672 } 4673 stderr.writeln("Done modulus test."); 4674 4675 uint[] squares = poolInstance.amap!"a * a"(numbers, 100); 4676 assert(squares.length == numbers.length); 4677 foreach (i, number; numbers) 4678 { 4679 assert(squares[i] == number * number); 4680 } 4681 stderr.writeln("Done squares."); 4682 4683 auto sumFuture = task!( reduce!"a + b" )(numbers); 4684 poolInstance.put(sumFuture); 4685 4686 ulong sumSquares = 0; 4687 foreach (elem; numbers) 4688 { 4689 sumSquares += elem * elem; 4690 } 4691 4692 uint mySum = sumFuture.spinForce(); 4693 assert(mySum == 999 * 1000 / 2); 4694 4695 auto mySumParallel = poolInstance.reduce!"a + b"(numbers); 4696 assert(mySum == mySumParallel); 4697 stderr.writeln("Done sums."); 4698 4699 auto myTask = task( 4700 { 4701 synchronized writeln("Our lives are parallel...Our lives are parallel."); 4702 }); 4703 poolInstance.put(myTask); 4704 4705 auto nestedOuter = "abcd"; 4706 auto nestedInner = iota(0, 10, 2); 4707 4708 foreach (i, letter; poolInstance.parallel(nestedOuter, 1)) 4709 { 4710 foreach (j, number; poolInstance.parallel(nestedInner, 1)) 4711 { 4712 synchronized writeln(i, ": ", letter, " ", j, ": ", number); 4713 } 4714 } 4715 4716 poolInstance.stop(); 4717 } 4718 4719 assert(attempt == 10); 4720 writeln("Press enter to go to next round of unittests."); 4721 readln(); 4722 } 4723 4724 // These unittests are intended more for actual testing and not so much 4725 // as examples. 4726 @system unittest 4727 { 4728 import std.stdio : stderr; 4729 import std.range : iota; 4730 import std.algorithm.iteration : filter, reduce; 4731 import std.math.algebraic : sqrt; 4732 import std.math.operations : isClose; 4733 import std.math.traits : isNaN; 4734 import std.conv : text; 4735 4736 foreach (attempt; 0 .. 10) 4737 foreach (poolSize; [0, 4]) 4738 { 4739 poolInstance = new TaskPool(poolSize); 4740 4741 // Test indexing. 4742 stderr.writeln("Creator Raw Index: ", poolInstance.threadIndex); 4743 assert(poolInstance.workerIndex() == 0); 4744 4745 // Test worker-local storage. 4746 auto workerLocalStorage = poolInstance.workerLocalStorage!uint(1); 4747 foreach (i; poolInstance.parallel(iota(0U, 1_000_000))) 4748 { 4749 workerLocalStorage.get++; 4750 } 4751 assert(reduce!"a + b"(workerLocalStorage.toRange) == 4752 1_000_000 + poolInstance.size + 1); 4753 4754 // Make sure work is reasonably balanced among threads. This test is 4755 // non-deterministic and is more of a sanity check than something that 4756 // has an absolute pass/fail. 4757 shared(uint)[void*] nJobsByThread; 4758 foreach (thread; poolInstance.pool) 4759 { 4760 nJobsByThread[cast(void*) thread] = 0; 4761 } 4762 nJobsByThread[ cast(void*) Thread.getThis()] = 0; 4763 4764 foreach (i; poolInstance.parallel( iota(0, 1_000_000), 100 )) 4765 { 4766 atomicOp!"+="( nJobsByThread[ cast(void*) Thread.getThis() ], 1); 4767 } 4768 4769 stderr.writeln("\nCurrent thread is: ", 4770 cast(void*) Thread.getThis()); 4771 stderr.writeln("Workload distribution: "); 4772 foreach (k, v; nJobsByThread) 4773 { 4774 stderr.writeln(k, '\t', v); 4775 } 4776 4777 // Test whether amap can be nested. 4778 real[][] matrix = new real[][](1000, 1000); 4779 foreach (i; poolInstance.parallel( iota(0, matrix.length) )) 4780 { 4781 foreach (j; poolInstance.parallel( iota(0, matrix[0].length) )) 4782 { 4783 matrix[i][j] = i * j; 4784 } 4785 } 4786 4787 // Get around weird bugs having to do w/ sqrt being an intrinsic: 4788 static real mySqrt(real num) 4789 { 4790 return sqrt(num); 4791 } 4792 4793 static real[] parallelSqrt(real[] nums) 4794 { 4795 return poolInstance.amap!mySqrt(nums); 4796 } 4797 4798 real[][] sqrtMatrix = poolInstance.amap!parallelSqrt(matrix); 4799 4800 foreach (i, row; sqrtMatrix) 4801 { 4802 foreach (j, elem; row) 4803 { 4804 real shouldBe = sqrt( cast(real) i * j); 4805 assert(isClose(shouldBe, elem)); 4806 sqrtMatrix[i][j] = shouldBe; 4807 } 4808 } 4809 4810 auto saySuccess = task( 4811 { 4812 stderr.writeln( 4813 "Success doing matrix stuff that involves nested pool use."); 4814 }); 4815 poolInstance.put(saySuccess); 4816 saySuccess.workForce(); 4817 4818 // A more thorough test of amap, reduce: Find the sum of the square roots of 4819 // matrix. 4820 4821 static real parallelSum(real[] input) 4822 { 4823 return poolInstance.reduce!"a + b"(input); 4824 } 4825 4826 auto sumSqrt = poolInstance.reduce!"a + b"( 4827 poolInstance.amap!parallelSum( 4828 sqrtMatrix 4829 ) 4830 ); 4831 4832 assert(isClose(sumSqrt, 4.437e8, 1e-2)); 4833 stderr.writeln("Done sum of square roots."); 4834 4835 // Test whether tasks work with function pointers. 4836 /+ // This part is buggy and needs to be fixed... 4837 auto nanTask = task(&isNaN, 1.0L); 4838 poolInstance.put(nanTask); 4839 assert(nanTask.spinForce == false); 4840 4841 if (poolInstance.size > 0) 4842 { 4843 // Test work waiting. 4844 static void uselessFun() 4845 { 4846 foreach (i; 0 .. 1_000_000) {} 4847 } 4848 4849 auto uselessTasks = new typeof(task(&uselessFun))[1000]; 4850 foreach (ref uselessTask; uselessTasks) 4851 { 4852 uselessTask = task(&uselessFun); 4853 } 4854 foreach (ref uselessTask; uselessTasks) 4855 { 4856 poolInstance.put(uselessTask); 4857 } 4858 foreach (ref uselessTask; uselessTasks) 4859 { 4860 uselessTask.workForce(); 4861 } 4862 } 4863 +/ 4864 4865 // Test the case of non-random access + ref returns. 4866 int[] nums = [1,2,3,4,5]; 4867 static struct RemoveRandom 4868 { 4869 int[] arr; 4870 4871 ref int front() 4872 { 4873 return arr.front; 4874 } 4875 void popFront() 4876 { 4877 arr.popFront(); 4878 } 4879 bool empty() 4880 { 4881 return arr.empty; 4882 } 4883 } 4884 4885 auto refRange = RemoveRandom(nums); 4886 foreach (ref elem; poolInstance.parallel(refRange)) 4887 { 4888 elem++; 4889 } 4890 assert(nums == [2,3,4,5,6], text(nums)); 4891 stderr.writeln("Nums: ", nums); 4892 4893 poolInstance.stop(); 4894 } 4895 } 4896 } 4897 4898 @system unittest 4899 { 4900 static struct __S_12733 4901 { 4902 invariant() { assert(checksum == 1_234_567_890); } 4903 this(ulong u){n = u;} 4904 void opAssign(__S_12733 s){this.n = s.n;} 4905 ulong n; 4906 ulong checksum = 1_234_567_890; 4907 } 4908 4909 static auto __genPair_12733(ulong n) { return __S_12733(n); } 4910 immutable ulong[] data = [ 2UL^^59-1, 2UL^^59-1, 2UL^^59-1, 112_272_537_195_293UL ]; 4911 4912 auto result = taskPool.amap!__genPair_12733(data); 4913 } 4914 4915 @safe unittest 4916 { 4917 import std.range : iota; 4918 4919 // this test was in std.range, but caused cycles. 4920 assert(__traits(compiles, { foreach (i; iota(0, 100UL).parallel) {} })); 4921 } 4922 4923 @safe unittest 4924 { 4925 import std.algorithm.iteration : each; 4926 4927 long[] arr; 4928 static assert(is(typeof({ 4929 arr.parallel.each!"a++"; 4930 }))); 4931 } 4932 4933 // https://issues.dlang.org/show_bug.cgi?id=17539 4934 @system unittest 4935 { 4936 import std.random : rndGen; 4937 // ensure compilation 4938 try foreach (rnd; rndGen.parallel) break; 4939 catch (ParallelForeachError e) {} 4940 }