1 /** 2 `std.parallelism` implements high-level primitives for SMP parallelism. 3 These include parallel foreach, parallel reduce, parallel eager map, pipelining 4 and future/promise parallelism. `std.parallelism` is recommended when the 5 same operation is to be executed in parallel on different data, or when a 6 function is to be executed in a background thread and its result returned to a 7 well-defined main thread. For communication between arbitrary threads, see 8 `std.concurrency`. 9 10 `std.parallelism` is based on the concept of a `Task`. A `Task` is an 11 object that represents the fundamental unit of work in this library and may be 12 executed in parallel with any other `Task`. Using `Task` 13 directly allows programming with a future/promise paradigm. All other 14 supported parallelism paradigms (parallel foreach, map, reduce, pipelining) 15 represent an additional level of abstraction over `Task`. They 16 automatically create one or more `Task` objects, or closely related types 17 that are conceptually identical but not part of the public API. 18 19 After creation, a `Task` may be executed in a new thread, or submitted 20 to a `TaskPool` for execution. A `TaskPool` encapsulates a task queue 21 and its worker threads. Its purpose is to efficiently map a large 22 number of `Task`s onto a smaller number of threads. A task queue is a 23 FIFO queue of `Task` objects that have been submitted to the 24 `TaskPool` and are awaiting execution. A worker thread is a thread that 25 is associated with exactly one task queue. It executes the `Task` at the 26 front of its queue when the queue has work available, or sleeps when 27 no work is available. Each task queue is associated with zero or 28 more worker threads. If the result of a `Task` is needed before execution 29 by a worker thread has begun, the `Task` can be removed from the task queue 30 and executed immediately in the thread where the result is needed. 31 32 Warning: Unless marked as `@trusted` or `@safe`, artifacts in 33 this module allow implicit data sharing between threads and cannot 34 guarantee that client code is free from low level data races. 35 36 Source: $(PHOBOSSRC std/parallelism.d) 37 Author: David Simcha 38 Copyright: Copyright (c) 2009-2011, David Simcha. 39 License: $(HTTP boost.org/LICENSE_1_0.txt, Boost License 1.0) 40 */ 41 module std.parallelism; 42 43 version (OSX) 44 version = Darwin; 45 else version (iOS) 46 version = Darwin; 47 else version (TVOS) 48 version = Darwin; 49 else version (WatchOS) 50 version = Darwin; 51 52 /// 53 @system unittest 54 { 55 import std.algorithm.iteration : map; 56 import std.math.operations : isClose; 57 import std.parallelism : taskPool; 58 import std.range : iota; 59 60 // Parallel reduce can be combined with 61 // std.algorithm.iteration.map to interesting effect. 62 // The following example (thanks to Russel Winder) 63 // calculates pi by quadrature using 64 // std.algorithm.map and TaskPool.reduce. 65 // getTerm is evaluated in parallel as needed by 66 // TaskPool.reduce. 67 // 68 // Timings on an Intel i5-3450 quad core machine 69 // for n = 1_000_000_000: 70 // 71 // TaskPool.reduce: 1.067 s 72 // std.algorithm.reduce: 4.011 s 73 74 enum n = 1_000_000; 75 enum delta = 1.0 / n; 76 77 alias getTerm = (int i) 78 { 79 immutable x = ( i - 0.5 ) * delta; 80 return delta / ( 1.0 + x * x ) ; 81 }; 82 83 immutable pi = 4.0 * taskPool.reduce!"a + b"(n.iota.map!getTerm); 84 85 assert(pi.isClose(3.14159, 1e-5)); 86 } 87 88 import core.atomic; 89 import core.memory; 90 import core.sync.condition; 91 import core.thread; 92 93 import std.functional; 94 import std.meta; 95 import std.range.primitives; 96 import std.traits; 97 98 /* 99 (For now public undocumented with reserved name.) 100 101 A lazily initialized global constant. The underlying value is a shared global 102 statically initialized to `outOfBandValue` which must not be a legit value of 103 the constant. Upon the first call the situation is detected and the global is 104 initialized by calling `initializer`. The initializer is assumed to be pure 105 (even if not marked as such), i.e. return the same value upon repeated calls. 106 For that reason, no special precautions are taken so `initializer` may be called 107 more than one time leading to benign races on the cached value. 108 109 In the quiescent state the cost of the function is an atomic load from a global. 110 111 Params: 112 T = The type of the pseudo-constant (may be qualified) 113 outOfBandValue = A value that cannot be valid, it is used for initialization 114 initializer = The function performing initialization; must be `nothrow` 115 116 Returns: 117 The lazily initialized value 118 */ 119 @property pure 120 T __lazilyInitializedConstant(T, alias outOfBandValue, alias initializer)() 121 if (is(Unqual!T : T) 122 && is(typeof(initializer()) : T) 123 && is(typeof(outOfBandValue) : T)) 124 { 125 static T impl() nothrow 126 { 127 // Thread-local cache 128 static Unqual!T tls = outOfBandValue; 129 auto local = tls; 130 // Shortest path, no atomic operations 131 if (local != outOfBandValue) return local; 132 // Process-level cache 133 static shared Unqual!T result = outOfBandValue; 134 // Initialize both process-level cache and tls 135 local = atomicLoad(result); 136 if (local == outOfBandValue) 137 { 138 local = initializer(); 139 atomicStore(result, local); 140 } 141 tls = local; 142 return local; 143 } 144 145 import std.traits : SetFunctionAttributes; 146 alias Fun = SetFunctionAttributes!(typeof(&impl), "D", 147 functionAttributes!(typeof(&impl)) | FunctionAttribute.pure_); 148 auto purified = (() @trusted => cast(Fun) &impl)(); 149 return purified(); 150 } 151 152 // Returns the size of a cache line. 153 alias cacheLineSize = 154 __lazilyInitializedConstant!(immutable(size_t), size_t.max, cacheLineSizeImpl); 155 156 private size_t cacheLineSizeImpl() @nogc nothrow @trusted 157 { 158 size_t result = 0; 159 import core.cpuid : datacache; 160 foreach (ref const cachelevel; datacache) 161 { 162 if (cachelevel.lineSize > result && cachelevel.lineSize < uint.max) 163 { 164 result = cachelevel.lineSize; 165 } 166 } 167 return result; 168 } 169 170 @nogc @safe nothrow unittest 171 { 172 assert(cacheLineSize == cacheLineSizeImpl); 173 } 174 175 /* Atomics code. These forward to core.atomic, but are written like this 176 for two reasons: 177 178 1. They used to actually contain ASM code and I don' want to have to change 179 to directly calling core.atomic in a zillion different places. 180 181 2. core.atomic has some misc. issues that make my use cases difficult 182 without wrapping it. If I didn't wrap it, casts would be required 183 basically everywhere. 184 */ 185 private void atomicSetUbyte(T)(ref T stuff, T newVal) 186 if (__traits(isIntegral, T) && is(T : ubyte)) 187 { 188 //core.atomic.cas(cast(shared) &stuff, stuff, newVal); 189 atomicStore(*(cast(shared) &stuff), newVal); 190 } 191 192 private ubyte atomicReadUbyte(T)(ref T val) 193 if (__traits(isIntegral, T) && is(T : ubyte)) 194 { 195 return atomicLoad(*(cast(shared) &val)); 196 } 197 198 // This gets rid of the need for a lot of annoying casts in other parts of the 199 // code, when enums are involved. 200 private bool atomicCasUbyte(T)(ref T stuff, T testVal, T newVal) 201 if (__traits(isIntegral, T) && is(T : ubyte)) 202 { 203 return core.atomic.cas(cast(shared) &stuff, testVal, newVal); 204 } 205 206 /*--------------------- Generic helper functions, etc.------------------------*/ 207 private template MapType(R, functions...) 208 { 209 static assert(functions.length); 210 211 ElementType!R e = void; 212 alias MapType = 213 typeof(adjoin!(staticMap!(unaryFun, functions))(e)); 214 } 215 216 private template ReduceType(alias fun, R, E) 217 { 218 alias ReduceType = typeof(binaryFun!fun(E.init, ElementType!R.init)); 219 } 220 221 private template noUnsharedAliasing(T) 222 { 223 enum bool noUnsharedAliasing = !hasUnsharedAliasing!T; 224 } 225 226 // This template tests whether a function may be executed in parallel from 227 // @safe code via Task.executeInNewThread(). There is an additional 228 // requirement for executing it via a TaskPool. (See isSafeReturn). 229 private template isSafeTask(F) 230 { 231 enum bool isSafeTask = 232 (functionAttributes!F & (FunctionAttribute.safe | FunctionAttribute.trusted)) != 0 && 233 (functionAttributes!F & FunctionAttribute.ref_) == 0 && 234 (isFunctionPointer!F || !hasUnsharedAliasing!F) && 235 allSatisfy!(noUnsharedAliasing, Parameters!F); 236 } 237 238 @safe unittest 239 { 240 alias F1 = void function() @safe; 241 alias F2 = void function(); 242 alias F3 = void function(uint, string) @trusted; 243 alias F4 = void function(uint, char[]); 244 245 static assert( isSafeTask!F1); 246 static assert(!isSafeTask!F2); 247 static assert( isSafeTask!F3); 248 static assert(!isSafeTask!F4); 249 250 alias F5 = uint[] function(uint, string) pure @trusted; 251 static assert( isSafeTask!F5); 252 } 253 254 // This function decides whether Tasks that meet all of the other requirements 255 // for being executed from @safe code can be executed on a TaskPool. 256 // When executing via TaskPool, it's theoretically possible 257 // to return a value that is also pointed to by a worker thread's thread local 258 // storage. When executing from executeInNewThread(), the thread that executed 259 // the Task is terminated by the time the return value is visible in the calling 260 // thread, so this is a non-issue. It's also a non-issue for pure functions 261 // since they can't read global state. 262 private template isSafeReturn(T) 263 { 264 static if (!hasUnsharedAliasing!(T.ReturnType)) 265 { 266 enum isSafeReturn = true; 267 } 268 else static if (T.isPure) 269 { 270 enum isSafeReturn = true; 271 } 272 else 273 { 274 enum isSafeReturn = false; 275 } 276 } 277 278 private template randAssignable(R) 279 { 280 enum randAssignable = isRandomAccessRange!R && hasAssignableElements!R; 281 } 282 283 private enum TaskStatus : ubyte 284 { 285 notStarted, 286 inProgress, 287 done 288 } 289 290 private template AliasReturn(alias fun, T...) 291 { 292 alias AliasReturn = typeof({ T args; return fun(args); }); 293 } 294 295 // Should be private, but std.algorithm.reduce is used in the zero-thread case 296 // and won't work w/ private. 297 template reduceAdjoin(functions...) 298 { 299 static if (functions.length == 1) 300 { 301 alias reduceAdjoin = binaryFun!(functions[0]); 302 } 303 else 304 { 305 T reduceAdjoin(T, U)(T lhs, U rhs) 306 { 307 alias funs = staticMap!(binaryFun, functions); 308 309 foreach (i, Unused; typeof(lhs.expand)) 310 { 311 lhs.expand[i] = funs[i](lhs.expand[i], rhs); 312 } 313 314 return lhs; 315 } 316 } 317 } 318 319 private template reduceFinish(functions...) 320 { 321 static if (functions.length == 1) 322 { 323 alias reduceFinish = binaryFun!(functions[0]); 324 } 325 else 326 { 327 T reduceFinish(T)(T lhs, T rhs) 328 { 329 alias funs = staticMap!(binaryFun, functions); 330 331 foreach (i, Unused; typeof(lhs.expand)) 332 { 333 lhs.expand[i] = funs[i](lhs.expand[i], rhs.expand[i]); 334 } 335 336 return lhs; 337 } 338 } 339 } 340 341 private template isRoundRobin(R : RoundRobinBuffer!(C1, C2), C1, C2) 342 { 343 enum isRoundRobin = true; 344 } 345 346 private template isRoundRobin(T) 347 { 348 enum isRoundRobin = false; 349 } 350 351 @safe unittest 352 { 353 static assert( isRoundRobin!(RoundRobinBuffer!(void delegate(char[]), bool delegate()))); 354 static assert(!isRoundRobin!(uint)); 355 } 356 357 // This is the base "class" for all of the other tasks. Using C-style 358 // polymorphism to allow more direct control over memory allocation, etc. 359 private struct AbstractTask 360 { 361 AbstractTask* prev; 362 AbstractTask* next; 363 364 // Pointer to a function that executes this task. 365 void function(void*) runTask; 366 367 Throwable exception; 368 ubyte taskStatus = TaskStatus.notStarted; 369 370 bool done() @property 371 { 372 if (atomicReadUbyte(taskStatus) == TaskStatus.done) 373 { 374 if (exception) 375 { 376 throw exception; 377 } 378 379 return true; 380 } 381 382 return false; 383 } 384 385 void job() 386 { 387 runTask(&this); 388 } 389 } 390 391 /** 392 `Task` represents the fundamental unit of work. A `Task` may be 393 executed in parallel with any other `Task`. Using this struct directly 394 allows future/promise parallelism. In this paradigm, a function (or delegate 395 or other callable) is executed in a thread other than the one it was called 396 from. The calling thread does not block while the function is being executed. 397 A call to `workForce`, `yieldForce`, or `spinForce` is used to 398 ensure that the `Task` has finished executing and to obtain the return 399 value, if any. These functions and `done` also act as full memory barriers, 400 meaning that any memory writes made in the thread that executed the `Task` 401 are guaranteed to be visible in the calling thread after one of these functions 402 returns. 403 404 The $(REF task, std,parallelism) and $(REF scopedTask, std,parallelism) functions can 405 be used to create an instance of this struct. See `task` for usage examples. 406 407 Function results are returned from `yieldForce`, `spinForce` and 408 `workForce` by ref. If `fun` returns by ref, the reference will point 409 to the returned reference of `fun`. Otherwise it will point to a 410 field in this struct. 411 412 Copying of this struct is disabled, since it would provide no useful semantics. 413 If you want to pass this struct around, you should do so by reference or 414 pointer. 415 416 Bugs: Changes to `ref` and `out` arguments are not propagated to the 417 call site, only to `args` in this struct. 418 */ 419 struct Task(alias fun, Args...) 420 { 421 private AbstractTask base = {runTask : &impl}; 422 private alias base this; 423 424 private @property AbstractTask* basePtr() 425 { 426 return &base; 427 } 428 429 private static void impl(void* myTask) 430 { 431 import std.algorithm.internal : addressOf; 432 433 Task* myCastedTask = cast(typeof(this)*) myTask; 434 static if (is(ReturnType == void)) 435 { 436 fun(myCastedTask._args); 437 } 438 else static if (is(typeof(&(fun(myCastedTask._args))))) 439 { 440 myCastedTask.returnVal = addressOf(fun(myCastedTask._args)); 441 } 442 else 443 { 444 myCastedTask.returnVal = fun(myCastedTask._args); 445 } 446 } 447 448 private TaskPool pool; 449 private bool isScoped; // True if created with scopedTask. 450 451 Args _args; 452 453 /** 454 The arguments the function was called with. Changes to `out` and 455 `ref` arguments will be visible here. 456 */ 457 static if (__traits(isSame, fun, run)) 458 { 459 alias args = _args[1..$]; 460 } 461 else 462 { 463 alias args = _args; 464 } 465 466 467 // The purpose of this code is to decide whether functions whose 468 // return values have unshared aliasing can be executed via 469 // TaskPool from @safe code. See isSafeReturn. 470 static if (__traits(isSame, fun, run)) 471 { 472 static if (isFunctionPointer!(_args[0])) 473 { 474 private enum bool isPure = 475 (functionAttributes!(Args[0]) & FunctionAttribute.pure_) != 0; 476 } 477 else 478 { 479 // BUG: Should check this for delegates too, but std.traits 480 // apparently doesn't allow this. isPure is irrelevant 481 // for delegates, at least for now since shared delegates 482 // don't work. 483 private enum bool isPure = false; 484 } 485 486 } 487 else 488 { 489 // We already know that we can't execute aliases in @safe code, so 490 // just put a dummy value here. 491 private enum bool isPure = false; 492 } 493 494 495 /** 496 The return type of the function called by this `Task`. This can be 497 `void`. 498 */ 499 alias ReturnType = typeof(fun(_args)); 500 501 static if (!is(ReturnType == void)) 502 { 503 static if (is(typeof(&fun(_args)))) 504 { 505 // Ref return. 506 ReturnType* returnVal; 507 508 ref ReturnType fixRef(ReturnType* val) 509 { 510 return *val; 511 } 512 513 } 514 else 515 { 516 ReturnType returnVal; 517 518 ref ReturnType fixRef(ref ReturnType val) 519 { 520 return val; 521 } 522 } 523 } 524 525 private void enforcePool() 526 { 527 import std.exception : enforce; 528 enforce(this.pool !is null, "Job not submitted yet."); 529 } 530 531 static if (Args.length > 0) 532 { 533 private this(Args args) 534 { 535 _args = args; 536 } 537 } 538 539 // Work around DMD bug https://issues.dlang.org/show_bug.cgi?id=6588, 540 // allow immutable elements. 541 static if (allSatisfy!(isAssignable, Args)) 542 { 543 typeof(this) opAssign(typeof(this) rhs) 544 { 545 foreach (i, Type; typeof(this.tupleof)) 546 { 547 this.tupleof[i] = rhs.tupleof[i]; 548 } 549 return this; 550 } 551 } 552 else 553 { 554 @disable typeof(this) opAssign(typeof(this) rhs); 555 } 556 557 /** 558 If the `Task` isn't started yet, execute it in the current thread. 559 If it's done, return its return value, if any. If it's in progress, 560 busy spin until it's done, then return the return value. If it threw 561 an exception, rethrow that exception. 562 563 This function should be used when you expect the result of the 564 `Task` to be available on a timescale shorter than that of an OS 565 context switch. 566 */ 567 @property ref ReturnType spinForce() @trusted 568 { 569 enforcePool(); 570 571 this.pool.tryDeleteExecute(basePtr); 572 573 while (atomicReadUbyte(this.taskStatus) != TaskStatus.done) {} 574 575 if (exception) 576 { 577 throw exception; 578 } 579 580 static if (!is(ReturnType == void)) 581 { 582 return fixRef(this.returnVal); 583 } 584 } 585 586 /** 587 If the `Task` isn't started yet, execute it in the current thread. 588 If it's done, return its return value, if any. If it's in progress, 589 wait on a condition variable. If it threw an exception, rethrow that 590 exception. 591 592 This function should be used for expensive functions, as waiting on a 593 condition variable introduces latency, but avoids wasted CPU cycles. 594 */ 595 @property ref ReturnType yieldForce() @trusted 596 { 597 enforcePool(); 598 this.pool.tryDeleteExecute(basePtr); 599 600 if (done) 601 { 602 static if (is(ReturnType == void)) 603 { 604 return; 605 } 606 else 607 { 608 return fixRef(this.returnVal); 609 } 610 } 611 612 pool.waiterLock(); 613 scope(exit) pool.waiterUnlock(); 614 615 while (atomicReadUbyte(this.taskStatus) != TaskStatus.done) 616 { 617 pool.waitUntilCompletion(); 618 } 619 620 if (exception) 621 { 622 throw exception; // nocoverage 623 } 624 625 static if (!is(ReturnType == void)) 626 { 627 return fixRef(this.returnVal); 628 } 629 } 630 631 /** 632 If this `Task` was not started yet, execute it in the current 633 thread. If it is finished, return its result. If it is in progress, 634 execute any other `Task` from the `TaskPool` instance that 635 this `Task` was submitted to until this one 636 is finished. If it threw an exception, rethrow that exception. 637 If no other tasks are available or this `Task` was executed using 638 `executeInNewThread`, wait on a condition variable. 639 */ 640 @property ref ReturnType workForce() @trusted 641 { 642 enforcePool(); 643 this.pool.tryDeleteExecute(basePtr); 644 645 while (true) 646 { 647 if (done) // done() implicitly checks for exceptions. 648 { 649 static if (is(ReturnType == void)) 650 { 651 return; 652 } 653 else 654 { 655 return fixRef(this.returnVal); 656 } 657 } 658 659 AbstractTask* job; 660 { 661 // Locking explicitly and calling popNoSync() because 662 // pop() waits on a condition variable if there are no Tasks 663 // in the queue. 664 665 pool.queueLock(); 666 scope(exit) pool.queueUnlock(); 667 job = pool.popNoSync(); 668 } 669 670 671 if (job !is null) 672 { 673 674 version (verboseUnittest) 675 { 676 stderr.writeln("Doing workForce work."); 677 } 678 679 pool.doJob(job); 680 681 if (done) 682 { 683 static if (is(ReturnType == void)) 684 { 685 return; 686 } 687 else 688 { 689 return fixRef(this.returnVal); 690 } 691 } 692 } 693 else 694 { 695 version (verboseUnittest) 696 { 697 stderr.writeln("Yield from workForce."); 698 } 699 700 return yieldForce; 701 } 702 } 703 } 704 705 /** 706 Returns `true` if the `Task` is finished executing. 707 708 Throws: Rethrows any exception thrown during the execution of the 709 `Task`. 710 */ 711 @property bool done() @trusted 712 { 713 // Explicitly forwarded for documentation purposes. 714 return base.done; 715 } 716 717 /** 718 Create a new thread for executing this `Task`, execute it in the 719 newly created thread, then terminate the thread. This can be used for 720 future/promise parallelism. An explicit priority may be given 721 to the `Task`. If one is provided, its value is forwarded to 722 `core.thread.Thread.priority`. See $(REF task, std,parallelism) for 723 usage example. 724 */ 725 void executeInNewThread() @trusted 726 { 727 pool = new TaskPool(basePtr); 728 } 729 730 /// Ditto 731 void executeInNewThread(int priority) @trusted 732 { 733 pool = new TaskPool(basePtr, priority); 734 } 735 736 @safe ~this() 737 { 738 if (isScoped && pool !is null && taskStatus != TaskStatus.done) 739 { 740 yieldForce; 741 } 742 } 743 744 // When this is uncommented, it somehow gets called on returning from 745 // scopedTask even though the struct shouldn't be getting copied. 746 //@disable this(this) {} 747 } 748 749 // Calls `fpOrDelegate` with `args`. This is an 750 // adapter that makes `Task` work with delegates, function pointers and 751 // functors instead of just aliases. 752 ReturnType!F run(F, Args...)(F fpOrDelegate, ref Args args) 753 { 754 return fpOrDelegate(args); 755 } 756 757 /** 758 Creates a `Task` on the GC heap that calls an alias. This may be executed 759 via `Task.executeInNewThread` or by submitting to a 760 $(REF TaskPool, std,parallelism). A globally accessible instance of 761 `TaskPool` is provided by $(REF taskPool, std,parallelism). 762 763 Returns: A pointer to the `Task`. 764 765 Example: 766 --- 767 // Read two files into memory at the same time. 768 import std.file; 769 770 void main() 771 { 772 // Create and execute a Task for reading 773 // foo.txt. 774 auto file1Task = task!read("foo.txt"); 775 file1Task.executeInNewThread(); 776 777 // Read bar.txt in parallel. 778 auto file2Data = read("bar.txt"); 779 780 // Get the results of reading foo.txt. 781 auto file1Data = file1Task.yieldForce; 782 } 783 --- 784 785 --- 786 // Sorts an array using a parallel quick sort algorithm. 787 // The first partition is done serially. Both recursion 788 // branches are then executed in parallel. 789 // 790 // Timings for sorting an array of 1,000,000 doubles on 791 // an Athlon 64 X2 dual core machine: 792 // 793 // This implementation: 176 milliseconds. 794 // Equivalent serial implementation: 280 milliseconds 795 void parallelSort(T)(T[] data) 796 { 797 // Sort small subarrays serially. 798 if (data.length < 100) 799 { 800 std.algorithm.sort(data); 801 return; 802 } 803 804 // Partition the array. 805 swap(data[$ / 2], data[$ - 1]); 806 auto pivot = data[$ - 1]; 807 bool lessThanPivot(T elem) { return elem < pivot; } 808 809 auto greaterEqual = partition!lessThanPivot(data[0..$ - 1]); 810 swap(data[$ - greaterEqual.length - 1], data[$ - 1]); 811 812 auto less = data[0..$ - greaterEqual.length - 1]; 813 greaterEqual = data[$ - greaterEqual.length..$]; 814 815 // Execute both recursion branches in parallel. 816 auto recurseTask = task!parallelSort(greaterEqual); 817 taskPool.put(recurseTask); 818 parallelSort(less); 819 recurseTask.yieldForce; 820 } 821 --- 822 */ 823 auto task(alias fun, Args...)(Args args) 824 { 825 return new Task!(fun, Args)(args); 826 } 827 828 /** 829 Creates a `Task` on the GC heap that calls a function pointer, delegate, or 830 class/struct with overloaded opCall. 831 832 Example: 833 --- 834 // Read two files in at the same time again, 835 // but this time use a function pointer instead 836 // of an alias to represent std.file.read. 837 import std.file; 838 839 void main() 840 { 841 // Create and execute a Task for reading 842 // foo.txt. 843 auto file1Task = task(&read!string, "foo.txt", size_t.max); 844 file1Task.executeInNewThread(); 845 846 // Read bar.txt in parallel. 847 auto file2Data = read("bar.txt"); 848 849 // Get the results of reading foo.txt. 850 auto file1Data = file1Task.yieldForce; 851 } 852 --- 853 854 Notes: This function takes a non-scope delegate, meaning it can be 855 used with closures. If you can't allocate a closure due to objects 856 on the stack that have scoped destruction, see `scopedTask`, which 857 takes a scope delegate. 858 */ 859 auto task(F, Args...)(F delegateOrFp, Args args) 860 if (is(typeof(delegateOrFp(args))) && !isSafeTask!F) 861 { 862 return new Task!(run, F, Args)(delegateOrFp, args); 863 } 864 865 /** 866 Version of `task` usable from `@safe` code. Usage mechanics are 867 identical to the non-@safe case, but safety introduces some restrictions: 868 869 1. `fun` must be @safe or @trusted. 870 871 2. `F` must not have any unshared aliasing as defined by 872 $(REF hasUnsharedAliasing, std,traits). This means it 873 may not be an unshared delegate or a non-shared class or struct 874 with overloaded `opCall`. This also precludes accepting template 875 alias parameters. 876 877 3. `Args` must not have unshared aliasing. 878 879 4. `fun` must not return by reference. 880 881 5. The return type must not have unshared aliasing unless `fun` is 882 `pure` or the `Task` is executed via `executeInNewThread` instead 883 of using a `TaskPool`. 884 885 */ 886 @trusted auto task(F, Args...)(F fun, Args args) 887 if (__traits(compiles, () @safe => fun(args)) && isSafeTask!F) 888 { 889 return new Task!(run, F, Args)(fun, args); 890 } 891 892 @safe unittest 893 { 894 static struct Oops { 895 int convert() { 896 *cast(int*) 0xcafebabe = 0xdeadbeef; 897 return 0; 898 } 899 alias convert this; 900 } 901 static void foo(int) @safe {} 902 903 static assert(!__traits(compiles, task(&foo, Oops.init))); 904 static assert(!__traits(compiles, scopedTask(&foo, Oops.init))); 905 } 906 907 /** 908 These functions allow the creation of `Task` objects on the stack rather 909 than the GC heap. The lifetime of a `Task` created by `scopedTask` 910 cannot exceed the lifetime of the scope it was created in. 911 912 `scopedTask` might be preferred over `task`: 913 914 1. When a `Task` that calls a delegate is being created and a closure 915 cannot be allocated due to objects on the stack that have scoped 916 destruction. The delegate overload of `scopedTask` takes a `scope` 917 delegate. 918 919 2. As a micro-optimization, to avoid the heap allocation associated with 920 `task` or with the creation of a closure. 921 922 Usage is otherwise identical to `task`. 923 924 Notes: `Task` objects created using `scopedTask` will automatically 925 call `Task.yieldForce` in their destructor if necessary to ensure 926 the `Task` is complete before the stack frame they reside on is destroyed. 927 */ 928 auto scopedTask(alias fun, Args...)(Args args) 929 { 930 auto ret = Task!(fun, Args)(args); 931 ret.isScoped = true; 932 return ret; 933 } 934 935 /// Ditto 936 auto scopedTask(F, Args...)(scope F delegateOrFp, Args args) 937 if (is(typeof(delegateOrFp(args))) && !isSafeTask!F) 938 { 939 auto ret = Task!(run, F, Args)(delegateOrFp, args); 940 ret.isScoped = true; 941 return ret; 942 } 943 944 /// Ditto 945 @trusted auto scopedTask(F, Args...)(F fun, Args args) 946 if (__traits(compiles, () @safe => fun(args)) && isSafeTask!F) 947 { 948 auto ret = Task!(run, F, Args)(fun, args); 949 ret.isScoped = true; 950 return ret; 951 } 952 953 /** 954 The total number of CPU cores available on the current machine, as reported by 955 the operating system. 956 */ 957 alias totalCPUs = 958 __lazilyInitializedConstant!(immutable(uint), uint.max, totalCPUsImpl); 959 960 uint totalCPUsImpl() @nogc nothrow @trusted 961 { 962 version (Windows) 963 { 964 // BUGS: Only works on Windows 2000 and above. 965 import core.sys.windows.winbase : SYSTEM_INFO, GetSystemInfo; 966 import std.algorithm.comparison : max; 967 SYSTEM_INFO si; 968 GetSystemInfo(&si); 969 return max(1, cast(uint) si.dwNumberOfProcessors); 970 } 971 else version (linux) 972 { 973 import core.stdc.stdlib : calloc; 974 import core.stdc.string : memset; 975 import core.sys.linux.sched : CPU_ALLOC_SIZE, CPU_FREE, CPU_COUNT, CPU_COUNT_S, cpu_set_t, sched_getaffinity; 976 import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf; 977 978 int count = 0; 979 980 /** 981 * According to ruby's source code, CPU_ALLOC() doesn't work as expected. 982 * see: https://github.com/ruby/ruby/commit/7d9e04de496915dd9e4544ee18b1a0026dc79242 983 * 984 * The hardcode number also comes from ruby's source code. 985 * see: https://github.com/ruby/ruby/commit/0fa75e813ecf0f5f3dd01f89aa76d0e25ab4fcd4 986 */ 987 for (int n = 64; n <= 16384; n *= 2) 988 { 989 size_t size = CPU_ALLOC_SIZE(count); 990 if (size >= 0x400) 991 { 992 auto cpuset = cast(cpu_set_t*) calloc(1, size); 993 if (cpuset is null) break; 994 if (sched_getaffinity(0, size, cpuset) == 0) 995 { 996 count = CPU_COUNT_S(size, cpuset); 997 } 998 CPU_FREE(cpuset); 999 } 1000 else 1001 { 1002 cpu_set_t cpuset; 1003 if (sched_getaffinity(0, cpu_set_t.sizeof, &cpuset) == 0) 1004 { 1005 count = CPU_COUNT(&cpuset); 1006 } 1007 } 1008 1009 if (count > 0) 1010 return cast(uint) count; 1011 } 1012 1013 return cast(uint) sysconf(_SC_NPROCESSORS_ONLN); 1014 } 1015 else version (Darwin) 1016 { 1017 import core.sys.darwin.sys.sysctl : sysctlbyname; 1018 uint result; 1019 size_t len = result.sizeof; 1020 sysctlbyname("hw.physicalcpu", &result, &len, null, 0); 1021 return result; 1022 } 1023 else version (DragonFlyBSD) 1024 { 1025 import core.sys.dragonflybsd.sys.sysctl : sysctlbyname; 1026 uint result; 1027 size_t len = result.sizeof; 1028 sysctlbyname("hw.ncpu", &result, &len, null, 0); 1029 return result; 1030 } 1031 else version (FreeBSD) 1032 { 1033 import core.sys.freebsd.sys.sysctl : sysctlbyname; 1034 uint result; 1035 size_t len = result.sizeof; 1036 sysctlbyname("hw.ncpu", &result, &len, null, 0); 1037 return result; 1038 } 1039 else version (NetBSD) 1040 { 1041 import core.sys.netbsd.sys.sysctl : sysctlbyname; 1042 uint result; 1043 size_t len = result.sizeof; 1044 sysctlbyname("hw.ncpu", &result, &len, null, 0); 1045 return result; 1046 } 1047 else version (Solaris) 1048 { 1049 import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf; 1050 return cast(uint) sysconf(_SC_NPROCESSORS_ONLN); 1051 } 1052 else version (OpenBSD) 1053 { 1054 import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf; 1055 return cast(uint) sysconf(_SC_NPROCESSORS_ONLN); 1056 } 1057 else version (Hurd) 1058 { 1059 import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf; 1060 return cast(uint) sysconf(_SC_NPROCESSORS_ONLN); 1061 } 1062 else 1063 { 1064 static assert(0, "Don't know how to get N CPUs on this OS."); 1065 } 1066 } 1067 1068 /* 1069 This class serves two purposes: 1070 1071 1. It distinguishes std.parallelism threads from other threads so that 1072 the std.parallelism daemon threads can be terminated. 1073 1074 2. It adds a reference to the pool that the thread is a member of, 1075 which is also necessary to allow the daemon threads to be properly 1076 terminated. 1077 */ 1078 private final class ParallelismThread : Thread 1079 { 1080 this(void delegate() dg) 1081 { 1082 super(dg); 1083 } 1084 1085 TaskPool pool; 1086 } 1087 1088 // Kill daemon threads. 1089 shared static ~this() 1090 { 1091 foreach (ref thread; Thread) 1092 { 1093 auto pthread = cast(ParallelismThread) thread; 1094 if (pthread is null) continue; 1095 auto pool = pthread.pool; 1096 if (!pool.isDaemon) continue; 1097 pool.stop(); 1098 pthread.join(); 1099 } 1100 } 1101 1102 /** 1103 This class encapsulates a task queue and a set of worker threads. Its purpose 1104 is to efficiently map a large number of `Task`s onto a smaller number of 1105 threads. A task queue is a FIFO queue of `Task` objects that have been 1106 submitted to the `TaskPool` and are awaiting execution. A worker thread is a 1107 thread that executes the `Task` at the front of the queue when one is 1108 available and sleeps when the queue is empty. 1109 1110 This class should usually be used via the global instantiation 1111 available via the $(REF taskPool, std,parallelism) property. 1112 Occasionally it is useful to explicitly instantiate a `TaskPool`: 1113 1114 1. When you want `TaskPool` instances with multiple priorities, for example 1115 a low priority pool and a high priority pool. 1116 1117 2. When the threads in the global task pool are waiting on a synchronization 1118 primitive (for example a mutex), and you want to parallelize the code that 1119 needs to run before these threads can be resumed. 1120 1121 Note: The worker threads in this pool will not stop until 1122 `stop` or `finish` is called, even if the main thread 1123 has finished already. This may lead to programs that 1124 never end. If you do not want this behaviour, you can set `isDaemon` 1125 to true. 1126 */ 1127 final class TaskPool 1128 { 1129 private: 1130 1131 // A pool can either be a regular pool or a single-task pool. A 1132 // single-task pool is a dummy pool that's fired up for 1133 // Task.executeInNewThread(). 1134 bool isSingleTask; 1135 1136 ParallelismThread[] pool; 1137 Thread singleTaskThread; 1138 1139 AbstractTask* head; 1140 AbstractTask* tail; 1141 PoolState status = PoolState.running; 1142 Condition workerCondition; 1143 Condition waiterCondition; 1144 Mutex queueMutex; 1145 Mutex waiterMutex; // For waiterCondition 1146 1147 // The instanceStartIndex of the next instance that will be created. 1148 __gshared size_t nextInstanceIndex = 1; 1149 1150 // The index of the current thread. 1151 static size_t threadIndex; 1152 1153 // The index of the first thread in this instance. 1154 immutable size_t instanceStartIndex; 1155 1156 // The index that the next thread to be initialized in this pool will have. 1157 size_t nextThreadIndex; 1158 1159 enum PoolState : ubyte 1160 { 1161 running, 1162 finishing, 1163 stopNow 1164 } 1165 1166 void doJob(AbstractTask* job) 1167 { 1168 assert(job.taskStatus == TaskStatus.inProgress); 1169 assert(job.next is null); 1170 assert(job.prev is null); 1171 1172 scope(exit) 1173 { 1174 if (!isSingleTask) 1175 { 1176 waiterLock(); 1177 scope(exit) waiterUnlock(); 1178 notifyWaiters(); 1179 } 1180 } 1181 1182 try 1183 { 1184 job.job(); 1185 } 1186 catch (Throwable e) 1187 { 1188 job.exception = e; 1189 } 1190 1191 atomicSetUbyte(job.taskStatus, TaskStatus.done); 1192 } 1193 1194 // This function is used for dummy pools created by Task.executeInNewThread(). 1195 void doSingleTask() 1196 { 1197 // No synchronization. Pool is guaranteed to only have one thread, 1198 // and the queue is submitted to before this thread is created. 1199 assert(head); 1200 auto t = head; 1201 t.next = t.prev = head = null; 1202 doJob(t); 1203 } 1204 1205 // This function performs initialization for each thread that affects 1206 // thread local storage and therefore must be done from within the 1207 // worker thread. It then calls executeWorkLoop(). 1208 void startWorkLoop() 1209 { 1210 // Initialize thread index. 1211 { 1212 queueLock(); 1213 scope(exit) queueUnlock(); 1214 threadIndex = nextThreadIndex; 1215 nextThreadIndex++; 1216 } 1217 1218 executeWorkLoop(); 1219 } 1220 1221 // This is the main work loop that worker threads spend their time in 1222 // until they terminate. It's also entered by non-worker threads when 1223 // finish() is called with the blocking variable set to true. 1224 void executeWorkLoop() 1225 { 1226 while (atomicReadUbyte(status) != PoolState.stopNow) 1227 { 1228 AbstractTask* task = pop(); 1229 if (task is null) 1230 { 1231 if (atomicReadUbyte(status) == PoolState.finishing) 1232 { 1233 atomicSetUbyte(status, PoolState.stopNow); 1234 return; 1235 } 1236 } 1237 else 1238 { 1239 doJob(task); 1240 } 1241 } 1242 } 1243 1244 // Pop a task off the queue. 1245 AbstractTask* pop() 1246 { 1247 queueLock(); 1248 scope(exit) queueUnlock(); 1249 auto ret = popNoSync(); 1250 while (ret is null && status == PoolState.running) 1251 { 1252 wait(); 1253 ret = popNoSync(); 1254 } 1255 return ret; 1256 } 1257 1258 AbstractTask* popNoSync() 1259 out(returned) 1260 { 1261 /* If task.prev and task.next aren't null, then another thread 1262 * can try to delete this task from the pool after it's 1263 * alreadly been deleted/popped. 1264 */ 1265 if (returned !is null) 1266 { 1267 assert(returned.next is null); 1268 assert(returned.prev is null); 1269 } 1270 } 1271 do 1272 { 1273 if (isSingleTask) return null; 1274 1275 AbstractTask* returned = head; 1276 if (head !is null) 1277 { 1278 head = head.next; 1279 returned.prev = null; 1280 returned.next = null; 1281 returned.taskStatus = TaskStatus.inProgress; 1282 } 1283 if (head !is null) 1284 { 1285 head.prev = null; 1286 } 1287 1288 return returned; 1289 } 1290 1291 // Push a task onto the queue. 1292 void abstractPut(AbstractTask* task) 1293 { 1294 queueLock(); 1295 scope(exit) queueUnlock(); 1296 abstractPutNoSync(task); 1297 } 1298 1299 void abstractPutNoSync(AbstractTask* task) 1300 in 1301 { 1302 assert(task); 1303 } 1304 out 1305 { 1306 import std.conv : text; 1307 1308 assert(tail.prev !is tail); 1309 assert(tail.next is null, text(tail.prev, '\t', tail.next)); 1310 if (tail.prev !is null) 1311 { 1312 assert(tail.prev.next is tail, text(tail.prev, '\t', tail.next)); 1313 } 1314 } 1315 do 1316 { 1317 // Not using enforce() to save on function call overhead since this 1318 // is a performance critical function. 1319 if (status != PoolState.running) 1320 { 1321 throw new Error( 1322 "Cannot submit a new task to a pool after calling " ~ 1323 "finish() or stop()." 1324 ); 1325 } 1326 1327 task.next = null; 1328 if (head is null) //Queue is empty. 1329 { 1330 head = task; 1331 tail = task; 1332 tail.prev = null; 1333 } 1334 else 1335 { 1336 assert(tail); 1337 task.prev = tail; 1338 tail.next = task; 1339 tail = task; 1340 } 1341 notify(); 1342 } 1343 1344 void abstractPutGroupNoSync(AbstractTask* h, AbstractTask* t) 1345 { 1346 if (status != PoolState.running) 1347 { 1348 throw new Error( 1349 "Cannot submit a new task to a pool after calling " ~ 1350 "finish() or stop()." 1351 ); 1352 } 1353 1354 if (head is null) 1355 { 1356 head = h; 1357 tail = t; 1358 } 1359 else 1360 { 1361 h.prev = tail; 1362 tail.next = h; 1363 tail = t; 1364 } 1365 1366 notifyAll(); 1367 } 1368 1369 void tryDeleteExecute(AbstractTask* toExecute) 1370 { 1371 if (isSingleTask) return; 1372 1373 if ( !deleteItem(toExecute) ) 1374 { 1375 return; 1376 } 1377 1378 try 1379 { 1380 toExecute.job(); 1381 } 1382 catch (Exception e) 1383 { 1384 toExecute.exception = e; 1385 } 1386 1387 atomicSetUbyte(toExecute.taskStatus, TaskStatus.done); 1388 } 1389 1390 bool deleteItem(AbstractTask* item) 1391 { 1392 queueLock(); 1393 scope(exit) queueUnlock(); 1394 return deleteItemNoSync(item); 1395 } 1396 1397 bool deleteItemNoSync(AbstractTask* item) 1398 { 1399 if (item.taskStatus != TaskStatus.notStarted) 1400 { 1401 return false; 1402 } 1403 item.taskStatus = TaskStatus.inProgress; 1404 1405 if (item is head) 1406 { 1407 // Make sure head gets set properly. 1408 popNoSync(); 1409 return true; 1410 } 1411 if (item is tail) 1412 { 1413 tail = tail.prev; 1414 if (tail !is null) 1415 { 1416 tail.next = null; 1417 } 1418 item.next = null; 1419 item.prev = null; 1420 return true; 1421 } 1422 if (item.next !is null) 1423 { 1424 assert(item.next.prev is item); // Check queue consistency. 1425 item.next.prev = item.prev; 1426 } 1427 if (item.prev !is null) 1428 { 1429 assert(item.prev.next is item); // Check queue consistency. 1430 item.prev.next = item.next; 1431 } 1432 item.next = null; 1433 item.prev = null; 1434 return true; 1435 } 1436 1437 void queueLock() 1438 { 1439 assert(queueMutex); 1440 if (!isSingleTask) queueMutex.lock(); 1441 } 1442 1443 void queueUnlock() 1444 { 1445 assert(queueMutex); 1446 if (!isSingleTask) queueMutex.unlock(); 1447 } 1448 1449 void waiterLock() 1450 { 1451 if (!isSingleTask) waiterMutex.lock(); 1452 } 1453 1454 void waiterUnlock() 1455 { 1456 if (!isSingleTask) waiterMutex.unlock(); 1457 } 1458 1459 void wait() 1460 { 1461 if (!isSingleTask) workerCondition.wait(); 1462 } 1463 1464 void notify() 1465 { 1466 if (!isSingleTask) workerCondition.notify(); 1467 } 1468 1469 void notifyAll() 1470 { 1471 if (!isSingleTask) workerCondition.notifyAll(); 1472 } 1473 1474 void waitUntilCompletion() 1475 { 1476 if (isSingleTask) 1477 { 1478 singleTaskThread.join(); 1479 } 1480 else 1481 { 1482 waiterCondition.wait(); 1483 } 1484 } 1485 1486 void notifyWaiters() 1487 { 1488 if (!isSingleTask) waiterCondition.notifyAll(); 1489 } 1490 1491 // Private constructor for creating dummy pools that only have one thread, 1492 // only execute one Task, and then terminate. This is used for 1493 // Task.executeInNewThread(). 1494 this(AbstractTask* task, int priority = int.max) 1495 { 1496 assert(task); 1497 1498 // Dummy value, not used. 1499 instanceStartIndex = 0; 1500 1501 this.isSingleTask = true; 1502 task.taskStatus = TaskStatus.inProgress; 1503 this.head = task; 1504 singleTaskThread = new Thread(&doSingleTask); 1505 singleTaskThread.start(); 1506 1507 // Disabled until writing code to support 1508 // running thread with specified priority 1509 // See https://issues.dlang.org/show_bug.cgi?id=8960 1510 1511 /*if (priority != int.max) 1512 { 1513 singleTaskThread.priority = priority; 1514 }*/ 1515 } 1516 1517 public: 1518 // This is used in parallel_algorithm but is too unstable to document 1519 // as public API. 1520 size_t defaultWorkUnitSize(size_t rangeLen) const @safe pure nothrow 1521 { 1522 import std.algorithm.comparison : max; 1523 1524 if (this.size == 0) 1525 { 1526 return max(rangeLen, 1); 1527 } 1528 1529 immutable size_t eightSize = 4 * (this.size + 1); 1530 auto ret = (rangeLen / eightSize) + ((rangeLen % eightSize == 0) ? 0 : 1); 1531 return max(ret, 1); 1532 } 1533 1534 /** 1535 Default constructor that initializes a `TaskPool` with 1536 `totalCPUs` - 1 worker threads. The minus 1 is included because the 1537 main thread will also be available to do work. 1538 1539 Note: On single-core machines, the primitives provided by `TaskPool` 1540 operate transparently in single-threaded mode. 1541 */ 1542 this() @trusted 1543 { 1544 this(totalCPUs - 1); 1545 } 1546 1547 /** 1548 Allows for custom number of worker threads. 1549 */ 1550 this(size_t nWorkers) @trusted 1551 { 1552 synchronized(typeid(TaskPool)) 1553 { 1554 instanceStartIndex = nextInstanceIndex; 1555 1556 // The first worker thread to be initialized will have this index, 1557 // and will increment it. The second worker to be initialized will 1558 // have this index plus 1. 1559 nextThreadIndex = instanceStartIndex; 1560 nextInstanceIndex += nWorkers; 1561 } 1562 1563 queueMutex = new Mutex(this); 1564 waiterMutex = new Mutex(); 1565 workerCondition = new Condition(queueMutex); 1566 waiterCondition = new Condition(waiterMutex); 1567 1568 pool = new ParallelismThread[nWorkers]; 1569 foreach (ref poolThread; pool) 1570 { 1571 poolThread = new ParallelismThread(&startWorkLoop); 1572 poolThread.pool = this; 1573 poolThread.start(); 1574 } 1575 } 1576 1577 /** 1578 Implements a parallel foreach loop over a range. This works by implicitly 1579 creating and submitting one `Task` to the `TaskPool` for each worker 1580 thread. A work unit is a set of consecutive elements of `range` to 1581 be processed by a worker thread between communication with any other 1582 thread. The number of elements processed per work unit is controlled by the 1583 `workUnitSize` parameter. Smaller work units provide better load 1584 balancing, but larger work units avoid the overhead of communicating 1585 with other threads frequently to fetch the next work unit. Large work 1586 units also avoid false sharing in cases where the range is being modified. 1587 The less time a single iteration of the loop takes, the larger 1588 `workUnitSize` should be. For very expensive loop bodies, 1589 `workUnitSize` should be 1. An overload that chooses a default work 1590 unit size is also available. 1591 1592 Example: 1593 --- 1594 // Find the logarithm of every number from 1 to 1595 // 10_000_000 in parallel. 1596 auto logs = new double[10_000_000]; 1597 1598 // Parallel foreach works with or without an index 1599 // variable. It can iterate by ref if range.front 1600 // returns by ref. 1601 1602 // Iterate over logs using work units of size 100. 1603 foreach (i, ref elem; taskPool.parallel(logs, 100)) 1604 { 1605 elem = log(i + 1.0); 1606 } 1607 1608 // Same thing, but use the default work unit size. 1609 // 1610 // Timings on an Athlon 64 X2 dual core machine: 1611 // 1612 // Parallel foreach: 388 milliseconds 1613 // Regular foreach: 619 milliseconds 1614 foreach (i, ref elem; taskPool.parallel(logs)) 1615 { 1616 elem = log(i + 1.0); 1617 } 1618 --- 1619 1620 Notes: 1621 1622 The memory usage of this implementation is guaranteed to be constant 1623 in `range.length`. 1624 1625 Breaking from a parallel foreach loop via a break, labeled break, 1626 labeled continue, return or goto statement throws a 1627 `ParallelForeachError`. 1628 1629 In the case of non-random access ranges, parallel foreach buffers lazily 1630 to an array of size `workUnitSize` before executing the parallel portion 1631 of the loop. The exception is that, if a parallel foreach is executed 1632 over a range returned by `asyncBuf` or `map`, the copying is elided 1633 and the buffers are simply swapped. In this case `workUnitSize` is 1634 ignored and the work unit size is set to the buffer size of `range`. 1635 1636 A memory barrier is guaranteed to be executed on exit from the loop, 1637 so that results produced by all threads are visible in the calling thread. 1638 1639 $(B Exception Handling): 1640 1641 When at least one exception is thrown from inside a parallel foreach loop, 1642 the submission of additional `Task` objects is terminated as soon as 1643 possible, in a non-deterministic manner. All executing or 1644 enqueued work units are allowed to complete. Then, all exceptions that 1645 were thrown by any work unit are chained using `Throwable.next` and 1646 rethrown. The order of the exception chaining is non-deterministic. 1647 */ 1648 ParallelForeach!R parallel(R)(R range, size_t workUnitSize) 1649 { 1650 import std.exception : enforce; 1651 enforce(workUnitSize > 0, "workUnitSize must be > 0."); 1652 alias RetType = ParallelForeach!R; 1653 return RetType(this, range, workUnitSize); 1654 } 1655 1656 1657 /// Ditto 1658 ParallelForeach!R parallel(R)(R range) 1659 { 1660 static if (hasLength!R) 1661 { 1662 // Default work unit size is such that we would use 4x as many 1663 // slots as are in this thread pool. 1664 size_t workUnitSize = defaultWorkUnitSize(range.length); 1665 return parallel(range, workUnitSize); 1666 } 1667 else 1668 { 1669 // Just use a really, really dumb guess if the user is too lazy to 1670 // specify. 1671 return parallel(range, 512); 1672 } 1673 } 1674 1675 /// 1676 template amap(functions...) 1677 { 1678 /** 1679 Eager parallel map. The eagerness of this function means it has less 1680 overhead than the lazily evaluated `TaskPool.map` and should be 1681 preferred where the memory requirements of eagerness are acceptable. 1682 `functions` are the functions to be evaluated, passed as template 1683 alias parameters in a style similar to 1684 $(REF map, std,algorithm,iteration). 1685 The first argument must be a random access range. For performance 1686 reasons, amap will assume the range elements have not yet been 1687 initialized. Elements will be overwritten without calling a destructor 1688 nor doing an assignment. As such, the range must not contain meaningful 1689 data$(DDOC_COMMENT not a section): either un-initialized objects, or 1690 objects in their `.init` state. 1691 1692 --- 1693 auto numbers = iota(100_000_000.0); 1694 1695 // Find the square roots of numbers. 1696 // 1697 // Timings on an Athlon 64 X2 dual core machine: 1698 // 1699 // Parallel eager map: 0.802 s 1700 // Equivalent serial implementation: 1.768 s 1701 auto squareRoots = taskPool.amap!sqrt(numbers); 1702 --- 1703 1704 Immediately after the range argument, an optional work unit size argument 1705 may be provided. Work units as used by `amap` are identical to those 1706 defined for parallel foreach. If no work unit size is provided, the 1707 default work unit size is used. 1708 1709 --- 1710 // Same thing, but make work unit size 100. 1711 auto squareRoots = taskPool.amap!sqrt(numbers, 100); 1712 --- 1713 1714 An output range for returning the results may be provided as the last 1715 argument. If one is not provided, an array of the proper type will be 1716 allocated on the garbage collected heap. If one is provided, it must be a 1717 random access range with assignable elements, must have reference 1718 semantics with respect to assignment to its elements, and must have the 1719 same length as the input range. Writing to adjacent elements from 1720 different threads must be safe. 1721 1722 --- 1723 // Same thing, but explicitly allocate an array 1724 // to return the results in. The element type 1725 // of the array may be either the exact type 1726 // returned by functions or an implicit conversion 1727 // target. 1728 auto squareRoots = new float[numbers.length]; 1729 taskPool.amap!sqrt(numbers, squareRoots); 1730 1731 // Multiple functions, explicit output range, and 1732 // explicit work unit size. 1733 auto results = new Tuple!(float, real)[numbers.length]; 1734 taskPool.amap!(sqrt, log)(numbers, 100, results); 1735 --- 1736 1737 Note: 1738 1739 A memory barrier is guaranteed to be executed after all results are written 1740 but before returning so that results produced by all threads are visible 1741 in the calling thread. 1742 1743 Tips: 1744 1745 To perform the mapping operation in place, provide the same range for the 1746 input and output range. 1747 1748 To parallelize the copying of a range with expensive to evaluate elements 1749 to an array, pass an identity function (a function that just returns 1750 whatever argument is provided to it) to `amap`. 1751 1752 $(B Exception Handling): 1753 1754 When at least one exception is thrown from inside the map functions, 1755 the submission of additional `Task` objects is terminated as soon as 1756 possible, in a non-deterministic manner. All currently executing or 1757 enqueued work units are allowed to complete. Then, all exceptions that 1758 were thrown from any work unit are chained using `Throwable.next` and 1759 rethrown. The order of the exception chaining is non-deterministic. 1760 */ 1761 auto amap(Args...)(Args args) 1762 if (isRandomAccessRange!(Args[0])) 1763 { 1764 import core.internal.lifetime : emplaceRef; 1765 1766 alias fun = adjoin!(staticMap!(unaryFun, functions)); 1767 1768 alias range = args[0]; 1769 immutable len = range.length; 1770 1771 static if ( 1772 Args.length > 1 && 1773 randAssignable!(Args[$ - 1]) && 1774 is(MapType!(Args[0], functions) : ElementType!(Args[$ - 1])) 1775 ) 1776 { 1777 import std.conv : text; 1778 import std.exception : enforce; 1779 1780 alias buf = args[$ - 1]; 1781 alias args2 = args[0..$ - 1]; 1782 alias Args2 = Args[0..$ - 1]; 1783 enforce(buf.length == len, 1784 text("Can't use a user supplied buffer that's the wrong ", 1785 "size. (Expected :", len, " Got: ", buf.length)); 1786 } 1787 else static if (randAssignable!(Args[$ - 1]) && Args.length > 1) 1788 { 1789 static assert(0, "Wrong buffer type."); 1790 } 1791 else 1792 { 1793 import std.array : uninitializedArray; 1794 1795 auto buf = uninitializedArray!(MapType!(Args[0], functions)[])(len); 1796 alias args2 = args; 1797 alias Args2 = Args; 1798 } 1799 1800 if (!len) return buf; 1801 1802 static if (isIntegral!(Args2[$ - 1])) 1803 { 1804 static assert(args2.length == 2); 1805 auto workUnitSize = cast(size_t) args2[1]; 1806 } 1807 else 1808 { 1809 static assert(args2.length == 1, Args); 1810 auto workUnitSize = defaultWorkUnitSize(range.length); 1811 } 1812 1813 alias R = typeof(range); 1814 1815 if (workUnitSize > len) 1816 { 1817 workUnitSize = len; 1818 } 1819 1820 // Handle as a special case: 1821 if (size == 0) 1822 { 1823 size_t index = 0; 1824 foreach (elem; range) 1825 { 1826 emplaceRef(buf[index++], fun(elem)); 1827 } 1828 return buf; 1829 } 1830 1831 // Effectively -1: chunkIndex + 1 == 0: 1832 shared size_t workUnitIndex = size_t.max; 1833 shared bool shouldContinue = true; 1834 1835 void doIt() 1836 { 1837 import std.algorithm.comparison : min; 1838 1839 scope(failure) 1840 { 1841 // If an exception is thrown, all threads should bail. 1842 atomicStore(shouldContinue, false); 1843 } 1844 1845 while (atomicLoad(shouldContinue)) 1846 { 1847 immutable myUnitIndex = atomicOp!"+="(workUnitIndex, 1); 1848 immutable start = workUnitSize * myUnitIndex; 1849 if (start >= len) 1850 { 1851 atomicStore(shouldContinue, false); 1852 break; 1853 } 1854 1855 immutable end = min(len, start + workUnitSize); 1856 1857 static if (hasSlicing!R) 1858 { 1859 auto subrange = range[start .. end]; 1860 foreach (i; start .. end) 1861 { 1862 emplaceRef(buf[i], fun(subrange.front)); 1863 subrange.popFront(); 1864 } 1865 } 1866 else 1867 { 1868 foreach (i; start .. end) 1869 { 1870 emplaceRef(buf[i], fun(range[i])); 1871 } 1872 } 1873 } 1874 } 1875 1876 submitAndExecute(this, &doIt); 1877 return buf; 1878 } 1879 } 1880 1881 /// 1882 template map(functions...) 1883 { 1884 /** 1885 A semi-lazy parallel map that can be used for pipelining. The map 1886 functions are evaluated for the first `bufSize` elements and stored in a 1887 buffer and made available to `popFront`. Meanwhile, in the 1888 background a second buffer of the same size is filled. When the first 1889 buffer is exhausted, it is swapped with the second buffer and filled while 1890 the values from what was originally the second buffer are read. This 1891 implementation allows for elements to be written to the buffer without 1892 the need for atomic operations or synchronization for each write, and 1893 enables the mapping function to be evaluated efficiently in parallel. 1894 1895 `map` has more overhead than the simpler procedure used by `amap` 1896 but avoids the need to keep all results in memory simultaneously and works 1897 with non-random access ranges. 1898 1899 Params: 1900 1901 source = The $(REF_ALTTEXT input range, isInputRange, std,range,primitives) 1902 to be mapped. If `source` is not random 1903 access it will be lazily buffered to an array of size `bufSize` before 1904 the map function is evaluated. (For an exception to this rule, see Notes.) 1905 1906 bufSize = The size of the buffer to store the evaluated elements. 1907 1908 workUnitSize = The number of elements to evaluate in a single 1909 `Task`. Must be less than or equal to `bufSize`, and 1910 should be a fraction of `bufSize` such that all worker threads can be 1911 used. If the default of size_t.max is used, workUnitSize will be set to 1912 the pool-wide default. 1913 1914 Returns: An input range representing the results of the map. This range 1915 has a length iff `source` has a length. 1916 1917 Notes: 1918 1919 If a range returned by `map` or `asyncBuf` is used as an input to 1920 `map`, then as an optimization the copying from the output buffer 1921 of the first range to the input buffer of the second range is elided, even 1922 though the ranges returned by `map` and `asyncBuf` are non-random 1923 access ranges. This means that the `bufSize` parameter passed to the 1924 current call to `map` will be ignored and the size of the buffer 1925 will be the buffer size of `source`. 1926 1927 Example: 1928 --- 1929 // Pipeline reading a file, converting each line 1930 // to a number, taking the logarithms of the numbers, 1931 // and performing the additions necessary to find 1932 // the sum of the logarithms. 1933 1934 auto lineRange = File("numberList.txt").byLine(); 1935 auto dupedLines = std.algorithm.map!"a.idup"(lineRange); 1936 auto nums = taskPool.map!(to!double)(dupedLines); 1937 auto logs = taskPool.map!log10(nums); 1938 1939 double sum = 0; 1940 foreach (elem; logs) 1941 { 1942 sum += elem; 1943 } 1944 --- 1945 1946 $(B Exception Handling): 1947 1948 Any exceptions thrown while iterating over `source` 1949 or computing the map function are re-thrown on a call to `popFront` or, 1950 if thrown during construction, are simply allowed to propagate to the 1951 caller. In the case of exceptions thrown while computing the map function, 1952 the exceptions are chained as in `TaskPool.amap`. 1953 */ 1954 auto 1955 map(S)(S source, size_t bufSize = 100, size_t workUnitSize = size_t.max) 1956 if (isInputRange!S) 1957 { 1958 import std.exception : enforce; 1959 1960 enforce(workUnitSize == size_t.max || workUnitSize <= bufSize, 1961 "Work unit size must be smaller than buffer size."); 1962 alias fun = adjoin!(staticMap!(unaryFun, functions)); 1963 1964 static final class Map 1965 { 1966 // This is a class because the task needs to be located on the 1967 // heap and in the non-random access case source needs to be on 1968 // the heap, too. 1969 1970 private: 1971 enum bufferTrick = is(typeof(source.buf1)) && 1972 is(typeof(source.bufPos)) && 1973 is(typeof(source.doBufSwap())); 1974 1975 alias E = MapType!(S, functions); 1976 E[] buf1, buf2; 1977 S source; 1978 TaskPool pool; 1979 Task!(run, E[] delegate(E[]), E[]) nextBufTask; 1980 size_t workUnitSize; 1981 size_t bufPos; 1982 bool lastTaskWaited; 1983 1984 static if (isRandomAccessRange!S) 1985 { 1986 alias FromType = S; 1987 1988 void popSource() 1989 { 1990 import std.algorithm.comparison : min; 1991 1992 static if (__traits(compiles, source[0 .. source.length])) 1993 { 1994 source = source[min(buf1.length, source.length)..source.length]; 1995 } 1996 else static if (__traits(compiles, source[0..$])) 1997 { 1998 source = source[min(buf1.length, source.length)..$]; 1999 } 2000 else 2001 { 2002 static assert(0, "S must have slicing for Map." 2003 ~ " " ~ S.stringof ~ " doesn't."); 2004 } 2005 } 2006 } 2007 else static if (bufferTrick) 2008 { 2009 // Make sure we don't have the buffer recycling overload of 2010 // asyncBuf. 2011 static if ( 2012 is(typeof(source.source)) && 2013 isRoundRobin!(typeof(source.source)) 2014 ) 2015 { 2016 static assert(0, "Cannot execute a parallel map on " ~ 2017 "the buffer recycling overload of asyncBuf." 2018 ); 2019 } 2020 2021 alias FromType = typeof(source.buf1); 2022 FromType from; 2023 2024 // Just swap our input buffer with source's output buffer. 2025 // No need to copy element by element. 2026 FromType dumpToFrom() 2027 { 2028 import std.algorithm.mutation : swap; 2029 2030 assert(source.buf1.length <= from.length); 2031 from.length = source.buf1.length; 2032 swap(source.buf1, from); 2033 2034 // Just in case this source has been popped before 2035 // being sent to map: 2036 from = from[source.bufPos..$]; 2037 2038 static if (is(typeof(source._length))) 2039 { 2040 source._length -= (from.length - source.bufPos); 2041 } 2042 2043 source.doBufSwap(); 2044 2045 return from; 2046 } 2047 } 2048 else 2049 { 2050 alias FromType = ElementType!S[]; 2051 2052 // The temporary array that data is copied to before being 2053 // mapped. 2054 FromType from; 2055 2056 FromType dumpToFrom() 2057 { 2058 assert(from !is null); 2059 2060 size_t i; 2061 for (; !source.empty && i < from.length; source.popFront()) 2062 { 2063 from[i++] = source.front; 2064 } 2065 2066 from = from[0 .. i]; 2067 return from; 2068 } 2069 } 2070 2071 static if (hasLength!S) 2072 { 2073 size_t _length; 2074 2075 public @property size_t length() const @safe pure nothrow 2076 { 2077 return _length; 2078 } 2079 } 2080 2081 this(S source, size_t bufSize, size_t workUnitSize, TaskPool pool) 2082 { 2083 static if (bufferTrick) 2084 { 2085 bufSize = source.buf1.length; 2086 } 2087 2088 buf1.length = bufSize; 2089 buf2.length = bufSize; 2090 2091 static if (!isRandomAccessRange!S) 2092 { 2093 from.length = bufSize; 2094 } 2095 2096 this.workUnitSize = (workUnitSize == size_t.max) ? 2097 pool.defaultWorkUnitSize(bufSize) : workUnitSize; 2098 this.source = source; 2099 this.pool = pool; 2100 2101 static if (hasLength!S) 2102 { 2103 _length = source.length; 2104 } 2105 2106 buf1 = fillBuf(buf1); 2107 submitBuf2(); 2108 } 2109 2110 // The from parameter is a dummy and ignored in the random access 2111 // case. 2112 E[] fillBuf(E[] buf) 2113 { 2114 import std.algorithm.comparison : min; 2115 2116 static if (isRandomAccessRange!S) 2117 { 2118 import std.range : take; 2119 auto toMap = take(source, buf.length); 2120 scope(success) popSource(); 2121 } 2122 else 2123 { 2124 auto toMap = dumpToFrom(); 2125 } 2126 2127 buf = buf[0 .. min(buf.length, toMap.length)]; 2128 2129 // Handle as a special case: 2130 if (pool.size == 0) 2131 { 2132 size_t index = 0; 2133 foreach (elem; toMap) 2134 { 2135 buf[index++] = fun(elem); 2136 } 2137 return buf; 2138 } 2139 2140 pool.amap!functions(toMap, workUnitSize, buf); 2141 2142 return buf; 2143 } 2144 2145 void submitBuf2() 2146 in 2147 { 2148 assert(nextBufTask.prev is null); 2149 assert(nextBufTask.next is null); 2150 } 2151 do 2152 { 2153 // Hack to reuse the task object. 2154 2155 nextBufTask = typeof(nextBufTask).init; 2156 nextBufTask._args[0] = &fillBuf; 2157 nextBufTask._args[1] = buf2; 2158 pool.put(nextBufTask); 2159 } 2160 2161 void doBufSwap() 2162 { 2163 if (lastTaskWaited) 2164 { 2165 // Then the source is empty. Signal it here. 2166 buf1 = null; 2167 buf2 = null; 2168 2169 static if (!isRandomAccessRange!S) 2170 { 2171 from = null; 2172 } 2173 2174 return; 2175 } 2176 2177 buf2 = buf1; 2178 buf1 = nextBufTask.yieldForce; 2179 bufPos = 0; 2180 2181 if (source.empty) 2182 { 2183 lastTaskWaited = true; 2184 } 2185 else 2186 { 2187 submitBuf2(); 2188 } 2189 } 2190 2191 public: 2192 @property auto front() 2193 { 2194 return buf1[bufPos]; 2195 } 2196 2197 void popFront() 2198 { 2199 static if (hasLength!S) 2200 { 2201 _length--; 2202 } 2203 2204 bufPos++; 2205 if (bufPos >= buf1.length) 2206 { 2207 doBufSwap(); 2208 } 2209 } 2210 2211 static if (isInfinite!S) 2212 { 2213 enum bool empty = false; 2214 } 2215 else 2216 { 2217 2218 bool empty() const @property 2219 { 2220 // popFront() sets this when source is empty 2221 return buf1.length == 0; 2222 } 2223 } 2224 } 2225 return new Map(source, bufSize, workUnitSize, this); 2226 } 2227 } 2228 2229 /** 2230 Given a `source` range that is expensive to iterate over, returns an 2231 $(REF_ALTTEXT input range, isInputRange, std,range,primitives) that 2232 asynchronously buffers the contents of `source` into a buffer of `bufSize` elements in a worker thread, 2233 while making previously buffered elements from a second buffer, also of size 2234 `bufSize`, available via the range interface of the returned 2235 object. The returned range has a length iff `hasLength!S`. 2236 `asyncBuf` is useful, for example, when performing expensive operations 2237 on the elements of ranges that represent data on a disk or network. 2238 2239 Example: 2240 --- 2241 import std.conv, std.stdio; 2242 2243 void main() 2244 { 2245 // Fetch lines of a file in a background thread 2246 // while processing previously fetched lines, 2247 // dealing with byLine's buffer recycling by 2248 // eagerly duplicating every line. 2249 auto lines = File("foo.txt").byLine(); 2250 auto duped = std.algorithm.map!"a.idup"(lines); 2251 2252 // Fetch more lines in the background while we 2253 // process the lines already read into memory 2254 // into a matrix of doubles. 2255 double[][] matrix; 2256 auto asyncReader = taskPool.asyncBuf(duped); 2257 2258 foreach (line; asyncReader) 2259 { 2260 auto ls = line.split("\t"); 2261 matrix ~= to!(double[])(ls); 2262 } 2263 } 2264 --- 2265 2266 $(B Exception Handling): 2267 2268 Any exceptions thrown while iterating over `source` are re-thrown on a 2269 call to `popFront` or, if thrown during construction, simply 2270 allowed to propagate to the caller. 2271 */ 2272 auto asyncBuf(S)(S source, size_t bufSize = 100) 2273 if (isInputRange!S) 2274 { 2275 static final class AsyncBuf 2276 { 2277 // This is a class because the task and source both need to be on 2278 // the heap. 2279 2280 // The element type of S. 2281 alias E = ElementType!S; // Needs to be here b/c of forward ref bugs. 2282 2283 private: 2284 E[] buf1, buf2; 2285 S source; 2286 TaskPool pool; 2287 Task!(run, E[] delegate(E[]), E[]) nextBufTask; 2288 size_t bufPos; 2289 bool lastTaskWaited; 2290 2291 static if (hasLength!S) 2292 { 2293 size_t _length; 2294 2295 // Available if hasLength!S. 2296 public @property size_t length() const @safe pure nothrow 2297 { 2298 return _length; 2299 } 2300 } 2301 2302 this(S source, size_t bufSize, TaskPool pool) 2303 { 2304 buf1.length = bufSize; 2305 buf2.length = bufSize; 2306 2307 this.source = source; 2308 this.pool = pool; 2309 2310 static if (hasLength!S) 2311 { 2312 _length = source.length; 2313 } 2314 2315 buf1 = fillBuf(buf1); 2316 submitBuf2(); 2317 } 2318 2319 E[] fillBuf(E[] buf) 2320 { 2321 assert(buf !is null); 2322 2323 size_t i; 2324 for (; !source.empty && i < buf.length; source.popFront()) 2325 { 2326 buf[i++] = source.front; 2327 } 2328 2329 buf = buf[0 .. i]; 2330 return buf; 2331 } 2332 2333 void submitBuf2() 2334 in 2335 { 2336 assert(nextBufTask.prev is null); 2337 assert(nextBufTask.next is null); 2338 } 2339 do 2340 { 2341 // Hack to reuse the task object. 2342 2343 nextBufTask = typeof(nextBufTask).init; 2344 nextBufTask._args[0] = &fillBuf; 2345 nextBufTask._args[1] = buf2; 2346 pool.put(nextBufTask); 2347 } 2348 2349 void doBufSwap() 2350 { 2351 if (lastTaskWaited) 2352 { 2353 // Then source is empty. Signal it here. 2354 buf1 = null; 2355 buf2 = null; 2356 return; 2357 } 2358 2359 buf2 = buf1; 2360 buf1 = nextBufTask.yieldForce; 2361 bufPos = 0; 2362 2363 if (source.empty) 2364 { 2365 lastTaskWaited = true; 2366 } 2367 else 2368 { 2369 submitBuf2(); 2370 } 2371 } 2372 2373 public: 2374 E front() @property 2375 { 2376 return buf1[bufPos]; 2377 } 2378 2379 void popFront() 2380 { 2381 static if (hasLength!S) 2382 { 2383 _length--; 2384 } 2385 2386 bufPos++; 2387 if (bufPos >= buf1.length) 2388 { 2389 doBufSwap(); 2390 } 2391 } 2392 2393 static if (isInfinite!S) 2394 { 2395 enum bool empty = false; 2396 } 2397 2398 else 2399 { 2400 /// 2401 bool empty() @property 2402 { 2403 // popFront() sets this when source is empty: 2404 return buf1.length == 0; 2405 } 2406 } 2407 } 2408 return new AsyncBuf(source, bufSize, this); 2409 } 2410 2411 /** 2412 Given a callable object `next` that writes to a user-provided buffer and 2413 a second callable object `empty` that determines whether more data is 2414 available to write via `next`, returns an input range that 2415 asynchronously calls `next` with a set of size `nBuffers` of buffers 2416 and makes the results available in the order they were obtained via the 2417 input range interface of the returned object. Similarly to the 2418 input range overload of `asyncBuf`, the first half of the buffers 2419 are made available via the range interface while the second half are 2420 filled and vice-versa. 2421 2422 Params: 2423 2424 next = A callable object that takes a single argument that must be an array 2425 with mutable elements. When called, `next` writes data to 2426 the array provided by the caller. 2427 2428 empty = A callable object that takes no arguments and returns a type 2429 implicitly convertible to `bool`. This is used to signify 2430 that no more data is available to be obtained by calling `next`. 2431 2432 initialBufSize = The initial size of each buffer. If `next` takes its 2433 array by reference, it may resize the buffers. 2434 2435 nBuffers = The number of buffers to cycle through when calling `next`. 2436 2437 Example: 2438 --- 2439 // Fetch lines of a file in a background 2440 // thread while processing previously fetched 2441 // lines, without duplicating any lines. 2442 auto file = File("foo.txt"); 2443 2444 void next(ref char[] buf) 2445 { 2446 file.readln(buf); 2447 } 2448 2449 // Fetch more lines in the background while we 2450 // process the lines already read into memory 2451 // into a matrix of doubles. 2452 double[][] matrix; 2453 auto asyncReader = taskPool.asyncBuf(&next, &file.eof); 2454 2455 foreach (line; asyncReader) 2456 { 2457 auto ls = line.split("\t"); 2458 matrix ~= to!(double[])(ls); 2459 } 2460 --- 2461 2462 $(B Exception Handling): 2463 2464 Any exceptions thrown while iterating over `range` are re-thrown on a 2465 call to `popFront`. 2466 2467 Warning: 2468 2469 Using the range returned by this function in a parallel foreach loop 2470 will not work because buffers may be overwritten while the task that 2471 processes them is in queue. This is checked for at compile time 2472 and will result in a static assertion failure. 2473 */ 2474 auto asyncBuf(C1, C2)(C1 next, C2 empty, size_t initialBufSize = 0, size_t nBuffers = 100) 2475 if (is(typeof(C2.init()) : bool) && 2476 Parameters!C1.length == 1 && 2477 Parameters!C2.length == 0 && 2478 isArray!(Parameters!C1[0]) 2479 ) { 2480 auto roundRobin = RoundRobinBuffer!(C1, C2)(next, empty, initialBufSize, nBuffers); 2481 return asyncBuf(roundRobin, nBuffers / 2); 2482 } 2483 2484 /// 2485 template reduce(functions...) 2486 { 2487 /** 2488 Parallel reduce on a random access range. Except as otherwise noted, 2489 usage is similar to $(REF _reduce, std,algorithm,iteration). There is 2490 also $(LREF fold) which does the same thing with a different parameter 2491 order. 2492 2493 This function works by splitting the range to be reduced into work 2494 units, which are slices to be reduced in parallel. Once the results 2495 from all work units are computed, a final serial reduction is performed 2496 on these results to compute the final answer. Therefore, care must be 2497 taken to choose the seed value appropriately. 2498 2499 Because the reduction is being performed in parallel, `functions` 2500 must be associative. For notational simplicity, let # be an 2501 infix operator representing `functions`. Then, (a # b) # c must equal 2502 a # (b # c). Floating point addition is not associative 2503 even though addition in exact arithmetic is. Summing floating 2504 point numbers using this function may give different results than summing 2505 serially. However, for many practical purposes floating point addition 2506 can be treated as associative. 2507 2508 Note that, since `functions` are assumed to be associative, 2509 additional optimizations are made to the serial portion of the reduction 2510 algorithm. These take advantage of the instruction level parallelism of 2511 modern CPUs, in addition to the thread-level parallelism that the rest 2512 of this module exploits. This can lead to better than linear speedups 2513 relative to $(REF _reduce, std,algorithm,iteration), especially for 2514 fine-grained benchmarks like dot products. 2515 2516 An explicit seed may be provided as the first argument. If 2517 provided, it is used as the seed for all work units and for the final 2518 reduction of results from all work units. Therefore, if it is not the 2519 identity value for the operation being performed, results may differ 2520 from those generated by $(REF _reduce, std,algorithm,iteration) or 2521 depending on how many work units are used. The next argument must be 2522 the range to be reduced. 2523 --- 2524 // Find the sum of squares of a range in parallel, using 2525 // an explicit seed. 2526 // 2527 // Timings on an Athlon 64 X2 dual core machine: 2528 // 2529 // Parallel reduce: 72 milliseconds 2530 // Using std.algorithm.reduce instead: 181 milliseconds 2531 auto nums = iota(10_000_000.0f); 2532 auto sumSquares = taskPool.reduce!"a + b"( 2533 0.0, std.algorithm.map!"a * a"(nums) 2534 ); 2535 --- 2536 2537 If no explicit seed is provided, the first element of each work unit 2538 is used as a seed. For the final reduction, the result from the first 2539 work unit is used as the seed. 2540 --- 2541 // Find the sum of a range in parallel, using the first 2542 // element of each work unit as the seed. 2543 auto sum = taskPool.reduce!"a + b"(nums); 2544 --- 2545 2546 An explicit work unit size may be specified as the last argument. 2547 Specifying too small a work unit size will effectively serialize the 2548 reduction, as the final reduction of the result of each work unit will 2549 dominate computation time. If `TaskPool.size` for this instance 2550 is zero, this parameter is ignored and one work unit is used. 2551 --- 2552 // Use a work unit size of 100. 2553 auto sum2 = taskPool.reduce!"a + b"(nums, 100); 2554 2555 // Work unit size of 100 and explicit seed. 2556 auto sum3 = taskPool.reduce!"a + b"(0.0, nums, 100); 2557 --- 2558 2559 Parallel reduce supports multiple functions, like 2560 `std.algorithm.reduce`. 2561 --- 2562 // Find both the min and max of nums. 2563 auto minMax = taskPool.reduce!(min, max)(nums); 2564 assert(minMax[0] == reduce!min(nums)); 2565 assert(minMax[1] == reduce!max(nums)); 2566 --- 2567 2568 $(B Exception Handling): 2569 2570 After this function is finished executing, any exceptions thrown 2571 are chained together via `Throwable.next` and rethrown. The chaining 2572 order is non-deterministic. 2573 2574 See_Also: 2575 2576 $(LREF fold) is functionally equivalent to $(LREF _reduce) except the 2577 range parameter comes first and there is no need to use 2578 $(REF_ALTTEXT `tuple`,tuple,std,typecons) for multiple seeds. 2579 */ 2580 auto reduce(Args...)(Args args) 2581 { 2582 import core.exception : OutOfMemoryError; 2583 import core.internal.lifetime : emplaceRef; 2584 import std.exception : enforce; 2585 2586 alias fun = reduceAdjoin!functions; 2587 alias finishFun = reduceFinish!functions; 2588 2589 static if (isIntegral!(Args[$ - 1])) 2590 { 2591 size_t workUnitSize = cast(size_t) args[$ - 1]; 2592 alias args2 = args[0..$ - 1]; 2593 alias Args2 = Args[0..$ - 1]; 2594 } 2595 else 2596 { 2597 alias args2 = args; 2598 alias Args2 = Args; 2599 } 2600 2601 auto makeStartValue(Type)(Type e) 2602 { 2603 static if (functions.length == 1) 2604 { 2605 return e; 2606 } 2607 else 2608 { 2609 typeof(adjoin!(staticMap!(binaryFun, functions))(e, e)) seed = void; 2610 foreach (i, T; seed.Types) 2611 { 2612 emplaceRef(seed.expand[i], e); 2613 } 2614 2615 return seed; 2616 } 2617 } 2618 2619 static if (args2.length == 2) 2620 { 2621 static assert(isInputRange!(Args2[1])); 2622 alias range = args2[1]; 2623 alias seed = args2[0]; 2624 enum explicitSeed = true; 2625 2626 static if (!is(typeof(workUnitSize))) 2627 { 2628 size_t workUnitSize = defaultWorkUnitSize(range.length); 2629 } 2630 } 2631 else 2632 { 2633 static assert(args2.length == 1); 2634 alias range = args2[0]; 2635 2636 static if (!is(typeof(workUnitSize))) 2637 { 2638 size_t workUnitSize = defaultWorkUnitSize(range.length); 2639 } 2640 2641 enforce(!range.empty, 2642 "Cannot reduce an empty range with first element as start value."); 2643 2644 auto seed = makeStartValue(range.front); 2645 enum explicitSeed = false; 2646 range.popFront(); 2647 } 2648 2649 alias E = typeof(seed); 2650 alias R = typeof(range); 2651 2652 E reduceOnRange(R range, size_t lowerBound, size_t upperBound) 2653 { 2654 // This is for exploiting instruction level parallelism by 2655 // using multiple accumulator variables within each thread, 2656 // since we're assuming functions are associative anyhow. 2657 2658 // This is so that loops can be unrolled automatically. 2659 enum ilpTuple = AliasSeq!(0, 1, 2, 3, 4, 5); 2660 enum nILP = ilpTuple.length; 2661 immutable subSize = (upperBound - lowerBound) / nILP; 2662 2663 if (subSize <= 1) 2664 { 2665 // Handle as a special case. 2666 static if (explicitSeed) 2667 { 2668 E result = seed; 2669 } 2670 else 2671 { 2672 E result = makeStartValue(range[lowerBound]); 2673 lowerBound++; 2674 } 2675 2676 foreach (i; lowerBound .. upperBound) 2677 { 2678 result = fun(result, range[i]); 2679 } 2680 2681 return result; 2682 } 2683 2684 assert(subSize > 1); 2685 E[nILP] results; 2686 size_t[nILP] offsets; 2687 2688 foreach (i; ilpTuple) 2689 { 2690 offsets[i] = lowerBound + subSize * i; 2691 2692 static if (explicitSeed) 2693 { 2694 results[i] = seed; 2695 } 2696 else 2697 { 2698 results[i] = makeStartValue(range[offsets[i]]); 2699 offsets[i]++; 2700 } 2701 } 2702 2703 immutable nLoop = subSize - (!explicitSeed); 2704 foreach (i; 0 .. nLoop) 2705 { 2706 foreach (j; ilpTuple) 2707 { 2708 results[j] = fun(results[j], range[offsets[j]]); 2709 offsets[j]++; 2710 } 2711 } 2712 2713 // Finish the remainder. 2714 foreach (i; nILP * subSize + lowerBound .. upperBound) 2715 { 2716 results[$ - 1] = fun(results[$ - 1], range[i]); 2717 } 2718 2719 foreach (i; ilpTuple[1..$]) 2720 { 2721 results[0] = finishFun(results[0], results[i]); 2722 } 2723 2724 return results[0]; 2725 } 2726 2727 immutable len = range.length; 2728 if (len == 0) 2729 { 2730 return seed; 2731 } 2732 2733 if (this.size == 0) 2734 { 2735 return finishFun(seed, reduceOnRange(range, 0, len)); 2736 } 2737 2738 // Unlike the rest of the functions here, I can't use the Task object 2739 // recycling trick here because this has to work on non-commutative 2740 // operations. After all the tasks are done executing, fun() has to 2741 // be applied on the results of these to get a final result, but 2742 // it can't be evaluated out of order. 2743 2744 if (workUnitSize > len) 2745 { 2746 workUnitSize = len; 2747 } 2748 2749 immutable size_t nWorkUnits = (len / workUnitSize) + ((len % workUnitSize == 0) ? 0 : 1); 2750 assert(nWorkUnits * workUnitSize >= len); 2751 2752 alias RTask = Task!(run, typeof(&reduceOnRange), R, size_t, size_t); 2753 RTask[] tasks; 2754 2755 // Can't use alloca() due to https://issues.dlang.org/show_bug.cgi?id=3753 2756 // Use a fixed buffer backed by malloc(). 2757 enum maxStack = 2_048; 2758 byte[maxStack] buf = void; 2759 immutable size_t nBytesNeeded = nWorkUnits * RTask.sizeof; 2760 2761 import core.stdc.stdlib : malloc, free; 2762 if (nBytesNeeded <= maxStack) 2763 { 2764 tasks = (cast(RTask*) buf.ptr)[0 .. nWorkUnits]; 2765 } 2766 else 2767 { 2768 auto ptr = cast(RTask*) malloc(nBytesNeeded); 2769 if (!ptr) 2770 { 2771 throw new OutOfMemoryError( 2772 "Out of memory in std.parallelism." 2773 ); 2774 } 2775 2776 tasks = ptr[0 .. nWorkUnits]; 2777 } 2778 2779 scope(exit) 2780 { 2781 if (nBytesNeeded > maxStack) 2782 { 2783 free(tasks.ptr); 2784 } 2785 } 2786 2787 // Hack to take the address of a nested function w/o 2788 // making a closure. 2789 static auto scopedAddress(D)(scope D del) @system 2790 { 2791 auto tmp = del; 2792 return tmp; 2793 } 2794 2795 size_t curPos = 0; 2796 void useTask(ref RTask task) 2797 { 2798 import std.algorithm.comparison : min; 2799 import core.lifetime : emplace; 2800 2801 // Private constructor, so can't feed it's arguments directly 2802 // to emplace 2803 emplace(&task, RTask 2804 ( 2805 scopedAddress(&reduceOnRange), 2806 range, 2807 curPos, // lower bound. 2808 cast() min(len, curPos + workUnitSize) // upper bound. 2809 )); 2810 2811 task.pool = this; 2812 2813 curPos += workUnitSize; 2814 } 2815 2816 foreach (ref task; tasks) 2817 { 2818 useTask(task); 2819 } 2820 2821 foreach (i; 1 .. tasks.length - 1) 2822 { 2823 tasks[i].next = tasks[i + 1].basePtr; 2824 tasks[i + 1].prev = tasks[i].basePtr; 2825 } 2826 2827 if (tasks.length > 1) 2828 { 2829 queueLock(); 2830 scope(exit) queueUnlock(); 2831 2832 abstractPutGroupNoSync( 2833 tasks[1].basePtr, 2834 tasks[$ - 1].basePtr 2835 ); 2836 } 2837 2838 if (tasks.length > 0) 2839 { 2840 try 2841 { 2842 tasks[0].job(); 2843 } 2844 catch (Throwable e) 2845 { 2846 tasks[0].exception = e; 2847 } 2848 tasks[0].taskStatus = TaskStatus.done; 2849 2850 // Try to execute each of these in the current thread 2851 foreach (ref task; tasks[1..$]) 2852 { 2853 tryDeleteExecute(task.basePtr); 2854 } 2855 } 2856 2857 // Now that we've tried to execute every task, they're all either 2858 // done or in progress. Force all of them. 2859 E result = seed; 2860 2861 Throwable firstException; 2862 2863 foreach (ref task; tasks) 2864 { 2865 try 2866 { 2867 task.yieldForce; 2868 } 2869 catch (Throwable e) 2870 { 2871 /* Chain e to front because order doesn't matter and because 2872 * e is not likely to be a chain itself (so fewer traversals) 2873 */ 2874 firstException = Throwable.chainTogether(e, firstException); 2875 continue; 2876 } 2877 2878 if (!firstException) result = finishFun(result, task.returnVal); 2879 } 2880 2881 if (firstException) throw firstException; 2882 2883 return result; 2884 } 2885 } 2886 2887 /// 2888 template fold(functions...) 2889 { 2890 /** Implements the homonym function (also known as `accumulate`, `compress`, 2891 `inject`, or `foldl`) present in various programming languages of 2892 functional flavor. 2893 2894 `fold` is functionally equivalent to $(LREF reduce) except the range 2895 parameter comes first and there is no need to use $(REF_ALTTEXT 2896 `tuple`,tuple,std,typecons) for multiple seeds. 2897 2898 There may be one or more callable entities (`functions` argument) to 2899 apply. 2900 2901 Params: 2902 args = Just the range to _fold over; or the range and one seed 2903 per function; or the range, one seed per function, and 2904 the work unit size 2905 2906 Returns: 2907 The accumulated result as a single value for single function and 2908 as a tuple of values for multiple functions 2909 2910 See_Also: 2911 Similar to $(REF _fold, std,algorithm,iteration), `fold` is a wrapper around $(LREF reduce). 2912 2913 Example: 2914 --- 2915 static int adder(int a, int b) 2916 { 2917 return a + b; 2918 } 2919 static int multiplier(int a, int b) 2920 { 2921 return a * b; 2922 } 2923 2924 // Just the range 2925 auto x = taskPool.fold!adder([1, 2, 3, 4]); 2926 assert(x == 10); 2927 2928 // The range and the seeds (0 and 1 below; also note multiple 2929 // functions in this example) 2930 auto y = taskPool.fold!(adder, multiplier)([1, 2, 3, 4], 0, 1); 2931 assert(y[0] == 10); 2932 assert(y[1] == 24); 2933 2934 // The range, the seed (0), and the work unit size (20) 2935 auto z = taskPool.fold!adder([1, 2, 3, 4], 0, 20); 2936 assert(z == 10); 2937 --- 2938 */ 2939 auto fold(Args...)(Args args) 2940 { 2941 static assert(isInputRange!(Args[0]), "First argument must be an InputRange"); 2942 2943 alias range = args[0]; 2944 2945 static if (Args.length == 1) 2946 { 2947 // Just the range 2948 return reduce!functions(range); 2949 } 2950 else static if (Args.length == 1 + functions.length || 2951 Args.length == 1 + functions.length + 1) 2952 { 2953 static if (functions.length == 1) 2954 { 2955 alias seeds = args[1]; 2956 } 2957 else 2958 { 2959 auto seeds() 2960 { 2961 import std.typecons : tuple; 2962 return tuple(args[1 .. functions.length+1]); 2963 } 2964 } 2965 2966 static if (Args.length == 1 + functions.length) 2967 { 2968 // The range and the seeds 2969 return reduce!functions(seeds, range); 2970 } 2971 else static if (Args.length == 1 + functions.length + 1) 2972 { 2973 // The range, the seeds, and the work unit size 2974 static assert(isIntegral!(Args[$-1]), "Work unit size must be an integral type"); 2975 return reduce!functions(seeds, range, args[$-1]); 2976 } 2977 } 2978 else 2979 { 2980 import std.conv : text; 2981 static assert(0, "Invalid number of arguments (" ~ Args.length.text ~ "): Should be an input range, " 2982 ~ functions.length.text ~ " optional seed(s), and an optional work unit size."); 2983 } 2984 } 2985 } 2986 2987 // This test is not included in the documentation because even though these 2988 // examples are for the inner fold() template, with their current location, 2989 // they would appear under the outer one. (We can't move this inside the 2990 // outer fold() template because then dmd runs out of memory possibly due to 2991 // recursive template instantiation, which is surprisingly not caught.) 2992 @system unittest 2993 { 2994 // Just the range 2995 auto x = taskPool.fold!"a + b"([1, 2, 3, 4]); 2996 assert(x == 10); 2997 2998 // The range and the seeds (0 and 1 below; also note multiple 2999 // functions in this example) 3000 auto y = taskPool.fold!("a + b", "a * b")([1, 2, 3, 4], 0, 1); 3001 assert(y[0] == 10); 3002 assert(y[1] == 24); 3003 3004 // The range, the seed (0), and the work unit size (20) 3005 auto z = taskPool.fold!"a + b"([1, 2, 3, 4], 0, 20); 3006 assert(z == 10); 3007 } 3008 3009 /** 3010 Gets the index of the current thread relative to this `TaskPool`. Any 3011 thread not in this pool will receive an index of 0. The worker threads in 3012 this pool receive unique indices of 1 through `this.size`. 3013 3014 This function is useful for maintaining worker-local resources. 3015 3016 Example: 3017 --- 3018 // Execute a loop that computes the greatest common 3019 // divisor of every number from 0 through 999 with 3020 // 42 in parallel. Write the results out to 3021 // a set of files, one for each thread. This allows 3022 // results to be written out without any synchronization. 3023 3024 import std.conv, std.range, std.numeric, std.stdio; 3025 3026 void main() 3027 { 3028 auto filesHandles = new File[taskPool.size + 1]; 3029 scope(exit) { 3030 foreach (ref handle; fileHandles) 3031 { 3032 handle.close(); 3033 } 3034 } 3035 3036 foreach (i, ref handle; fileHandles) 3037 { 3038 handle = File("workerResults" ~ to!string(i) ~ ".txt"); 3039 } 3040 3041 foreach (num; parallel(iota(1_000))) 3042 { 3043 auto outHandle = fileHandles[taskPool.workerIndex]; 3044 outHandle.writeln(num, '\t', gcd(num, 42)); 3045 } 3046 } 3047 --- 3048 */ 3049 size_t workerIndex() @property @safe const nothrow 3050 { 3051 immutable rawInd = threadIndex; 3052 return (rawInd >= instanceStartIndex && rawInd < instanceStartIndex + size) ? 3053 (rawInd - instanceStartIndex + 1) : 0; 3054 } 3055 3056 /** 3057 Struct for creating worker-local storage. Worker-local storage is 3058 thread-local storage that exists only for worker threads in a given 3059 `TaskPool` plus a single thread outside the pool. It is allocated on the 3060 garbage collected heap in a way that avoids _false sharing, and doesn't 3061 necessarily have global scope within any thread. It can be accessed from 3062 any worker thread in the `TaskPool` that created it, and one thread 3063 outside this `TaskPool`. All threads outside the pool that created a 3064 given instance of worker-local storage share a single slot. 3065 3066 Since the underlying data for this struct is heap-allocated, this struct 3067 has reference semantics when passed between functions. 3068 3069 The main uses cases for `WorkerLocalStorage` are: 3070 3071 1. Performing parallel reductions with an imperative, as opposed to 3072 functional, programming style. In this case, it's useful to treat 3073 `WorkerLocalStorage` as local to each thread for only the parallel 3074 portion of an algorithm. 3075 3076 2. Recycling temporary buffers across iterations of a parallel foreach loop. 3077 3078 Example: 3079 --- 3080 // Calculate pi as in our synopsis example, but 3081 // use an imperative instead of a functional style. 3082 immutable n = 1_000_000_000; 3083 immutable delta = 1.0L / n; 3084 3085 auto sums = taskPool.workerLocalStorage(0.0L); 3086 foreach (i; parallel(iota(n))) 3087 { 3088 immutable x = ( i - 0.5L ) * delta; 3089 immutable toAdd = delta / ( 1.0 + x * x ); 3090 sums.get += toAdd; 3091 } 3092 3093 // Add up the results from each worker thread. 3094 real pi = 0; 3095 foreach (threadResult; sums.toRange) 3096 { 3097 pi += 4.0L * threadResult; 3098 } 3099 --- 3100 */ 3101 static struct WorkerLocalStorage(T) 3102 { 3103 private: 3104 TaskPool pool; 3105 size_t size; 3106 3107 size_t elemSize; 3108 bool* stillThreadLocal; 3109 3110 static size_t roundToLine(size_t num) pure nothrow 3111 { 3112 if (num % cacheLineSize == 0) 3113 { 3114 return num; 3115 } 3116 else 3117 { 3118 return ((num / cacheLineSize) + 1) * cacheLineSize; 3119 } 3120 } 3121 3122 void* data; 3123 3124 void initialize(TaskPool pool) 3125 { 3126 this.pool = pool; 3127 size = pool.size + 1; 3128 stillThreadLocal = new bool; 3129 *stillThreadLocal = true; 3130 3131 // Determines whether the GC should scan the array. 3132 auto blkInfo = (typeid(T).flags & 1) ? 3133 cast(GC.BlkAttr) 0 : 3134 GC.BlkAttr.NO_SCAN; 3135 3136 immutable nElem = pool.size + 1; 3137 elemSize = roundToLine(T.sizeof); 3138 3139 // The + 3 is to pad one full cache line worth of space on either side 3140 // of the data structure to make sure false sharing with completely 3141 // unrelated heap data is prevented, and to provide enough padding to 3142 // make sure that data is cache line-aligned. 3143 data = GC.malloc(elemSize * (nElem + 3), blkInfo) + elemSize; 3144 3145 // Cache line align data ptr. 3146 data = cast(void*) roundToLine(cast(size_t) data); 3147 3148 foreach (i; 0 .. nElem) 3149 { 3150 this.opIndex(i) = T.init; 3151 } 3152 } 3153 3154 ref opIndex(this Qualified)(size_t index) 3155 { 3156 import std.conv : text; 3157 assert(index < size, text(index, '\t', uint.max)); 3158 return *(cast(CopyTypeQualifiers!(Qualified, T)*) (data + elemSize * index)); 3159 } 3160 3161 void opIndexAssign(T val, size_t index) 3162 { 3163 assert(index < size); 3164 *(cast(T*) (data + elemSize * index)) = val; 3165 } 3166 3167 public: 3168 /** 3169 Get the current thread's instance. Returns by ref. 3170 Note that calling `get` from any thread 3171 outside the `TaskPool` that created this instance will return the 3172 same reference, so an instance of worker-local storage should only be 3173 accessed from one thread outside the pool that created it. If this 3174 rule is violated, undefined behavior will result. 3175 3176 If assertions are enabled and `toRange` has been called, then this 3177 WorkerLocalStorage instance is no longer worker-local and an assertion 3178 failure will result when calling this method. This is not checked 3179 when assertions are disabled for performance reasons. 3180 */ 3181 ref get(this Qualified)() @property 3182 { 3183 assert(*stillThreadLocal, 3184 "Cannot call get() on this instance of WorkerLocalStorage " ~ 3185 "because it is no longer worker-local." 3186 ); 3187 return opIndex(pool.workerIndex); 3188 } 3189 3190 /** 3191 Assign a value to the current thread's instance. This function has 3192 the same caveats as its overload. 3193 */ 3194 void get(T val) @property 3195 { 3196 assert(*stillThreadLocal, 3197 "Cannot call get() on this instance of WorkerLocalStorage " ~ 3198 "because it is no longer worker-local." 3199 ); 3200 3201 opIndexAssign(val, pool.workerIndex); 3202 } 3203 3204 /** 3205 Returns a range view of the values for all threads, which can be used 3206 to further process the results of each thread after running the parallel 3207 part of your algorithm. Do not use this method in the parallel portion 3208 of your algorithm. 3209 3210 Calling this function sets a flag indicating that this struct is no 3211 longer worker-local, and attempting to use the `get` method again 3212 will result in an assertion failure if assertions are enabled. 3213 */ 3214 WorkerLocalStorageRange!T toRange() @property 3215 { 3216 if (*stillThreadLocal) 3217 { 3218 *stillThreadLocal = false; 3219 3220 // Make absolutely sure results are visible to all threads. 3221 // This is probably not necessary since some other 3222 // synchronization primitive will be used to signal that the 3223 // parallel part of the algorithm is done, but the 3224 // performance impact should be negligible, so it's better 3225 // to be safe. 3226 ubyte barrierDummy; 3227 atomicSetUbyte(barrierDummy, 1); 3228 } 3229 3230 return WorkerLocalStorageRange!T(this); 3231 } 3232 } 3233 3234 /** 3235 Range primitives for worker-local storage. The purpose of this is to 3236 access results produced by each worker thread from a single thread once you 3237 are no longer using the worker-local storage from multiple threads. 3238 Do not use this struct in the parallel portion of your algorithm. 3239 3240 The proper way to instantiate this object is to call 3241 `WorkerLocalStorage.toRange`. Once instantiated, this object behaves 3242 as a finite random-access range with assignable, lvalue elements and 3243 a length equal to the number of worker threads in the `TaskPool` that 3244 created it plus 1. 3245 */ 3246 static struct WorkerLocalStorageRange(T) 3247 { 3248 private: 3249 WorkerLocalStorage!T workerLocalStorage; 3250 3251 size_t _length; 3252 size_t beginOffset; 3253 3254 this(WorkerLocalStorage!T wl) 3255 { 3256 this.workerLocalStorage = wl; 3257 _length = wl.size; 3258 } 3259 3260 public: 3261 ref front(this Qualified)() @property 3262 { 3263 return this[0]; 3264 } 3265 3266 ref back(this Qualified)() @property 3267 { 3268 return this[_length - 1]; 3269 } 3270 3271 void popFront() 3272 { 3273 if (_length > 0) 3274 { 3275 beginOffset++; 3276 _length--; 3277 } 3278 } 3279 3280 void popBack() 3281 { 3282 if (_length > 0) 3283 { 3284 _length--; 3285 } 3286 } 3287 3288 typeof(this) save() @property 3289 { 3290 return this; 3291 } 3292 3293 ref opIndex(this Qualified)(size_t index) 3294 { 3295 assert(index < _length); 3296 return workerLocalStorage[index + beginOffset]; 3297 } 3298 3299 void opIndexAssign(T val, size_t index) 3300 { 3301 assert(index < _length); 3302 workerLocalStorage[index] = val; 3303 } 3304 3305 typeof(this) opSlice(size_t lower, size_t upper) 3306 { 3307 assert(upper <= _length); 3308 auto newWl = this.workerLocalStorage; 3309 newWl.data += lower * newWl.elemSize; 3310 newWl.size = upper - lower; 3311 return typeof(this)(newWl); 3312 } 3313 3314 bool empty() const @property 3315 { 3316 return length == 0; 3317 } 3318 3319 size_t length() const @property 3320 { 3321 return _length; 3322 } 3323 } 3324 3325 /** 3326 Creates an instance of worker-local storage, initialized with a given 3327 value. The value is `lazy` so that you can, for example, easily 3328 create one instance of a class for each worker. For usage example, 3329 see the `WorkerLocalStorage` struct. 3330 */ 3331 WorkerLocalStorage!T workerLocalStorage(T)(lazy T initialVal = T.init) 3332 { 3333 WorkerLocalStorage!T ret; 3334 ret.initialize(this); 3335 foreach (i; 0 .. size + 1) 3336 { 3337 ret[i] = initialVal; 3338 } 3339 3340 // Memory barrier to make absolutely sure that what we wrote is 3341 // visible to worker threads. 3342 ubyte barrierDummy; 3343 atomicSetUbyte(barrierDummy, 0); 3344 3345 return ret; 3346 } 3347 3348 /** 3349 Signals to all worker threads to terminate as soon as they are finished 3350 with their current `Task`, or immediately if they are not executing a 3351 `Task`. `Task`s that were in queue will not be executed unless 3352 a call to `Task.workForce`, `Task.yieldForce` or `Task.spinForce` 3353 causes them to be executed. 3354 3355 Use only if you have waited on every `Task` and therefore know the 3356 queue is empty, or if you speculatively executed some tasks and no longer 3357 need the results. 3358 */ 3359 void stop() @trusted 3360 { 3361 queueLock(); 3362 scope(exit) queueUnlock(); 3363 atomicSetUbyte(status, PoolState.stopNow); 3364 notifyAll(); 3365 } 3366 3367 /** 3368 Signals worker threads to terminate when the queue becomes empty. 3369 3370 If blocking argument is true, wait for all worker threads to terminate 3371 before returning. This option might be used in applications where 3372 task results are never consumed-- e.g. when `TaskPool` is employed as a 3373 rudimentary scheduler for tasks which communicate by means other than 3374 return values. 3375 3376 Warning: Calling this function with $(D blocking = true) from a worker 3377 thread that is a member of the same `TaskPool` that 3378 `finish` is being called on will result in a deadlock. 3379 */ 3380 void finish(bool blocking = false) @trusted 3381 { 3382 { 3383 queueLock(); 3384 scope(exit) queueUnlock(); 3385 atomicCasUbyte(status, PoolState.running, PoolState.finishing); 3386 notifyAll(); 3387 } 3388 if (blocking) 3389 { 3390 // Use this thread as a worker until everything is finished. 3391 executeWorkLoop(); 3392 3393 foreach (t; pool) 3394 { 3395 // Maybe there should be something here to prevent a thread 3396 // from calling join() on itself if this function is called 3397 // from a worker thread in the same pool, but: 3398 // 3399 // 1. Using an if statement to skip join() would result in 3400 // finish() returning without all tasks being finished. 3401 // 3402 // 2. If an exception were thrown, it would bubble up to the 3403 // Task from which finish() was called and likely be 3404 // swallowed. 3405 t.join(); 3406 } 3407 } 3408 } 3409 3410 /// Returns the number of worker threads in the pool. 3411 @property size_t size() @safe const pure nothrow 3412 { 3413 return pool.length; 3414 } 3415 3416 /** 3417 Put a `Task` object on the back of the task queue. The `Task` 3418 object may be passed by pointer or reference. 3419 3420 Example: 3421 --- 3422 import std.file; 3423 3424 // Create a task. 3425 auto t = task!read("foo.txt"); 3426 3427 // Add it to the queue to be executed. 3428 taskPool.put(t); 3429 --- 3430 3431 Notes: 3432 3433 @trusted overloads of this function are called for `Task`s if 3434 $(REF hasUnsharedAliasing, std,traits) is false for the `Task`'s 3435 return type or the function the `Task` executes is `pure`. 3436 `Task` objects that meet all other requirements specified in the 3437 `@trusted` overloads of `task` and `scopedTask` may be created 3438 and executed from `@safe` code via `Task.executeInNewThread` but 3439 not via `TaskPool`. 3440 3441 While this function takes the address of variables that may 3442 be on the stack, some overloads are marked as @trusted. 3443 `Task` includes a destructor that waits for the task to complete 3444 before destroying the stack frame it is allocated on. Therefore, 3445 it is impossible for the stack frame to be destroyed before the task is 3446 complete and no longer referenced by a `TaskPool`. 3447 */ 3448 void put(alias fun, Args...)(ref Task!(fun, Args) task) 3449 if (!isSafeReturn!(typeof(task))) 3450 { 3451 task.pool = this; 3452 abstractPut(task.basePtr); 3453 } 3454 3455 /// Ditto 3456 void put(alias fun, Args...)(Task!(fun, Args)* task) 3457 if (!isSafeReturn!(typeof(*task))) 3458 { 3459 import std.exception : enforce; 3460 enforce(task !is null, "Cannot put a null Task on a TaskPool queue."); 3461 put(*task); 3462 } 3463 3464 @trusted void put(alias fun, Args...)(ref Task!(fun, Args) task) 3465 if (isSafeReturn!(typeof(task))) 3466 { 3467 task.pool = this; 3468 abstractPut(task.basePtr); 3469 } 3470 3471 @trusted void put(alias fun, Args...)(Task!(fun, Args)* task) 3472 if (isSafeReturn!(typeof(*task))) 3473 { 3474 import std.exception : enforce; 3475 enforce(task !is null, "Cannot put a null Task on a TaskPool queue."); 3476 put(*task); 3477 } 3478 3479 /** 3480 These properties control whether the worker threads are daemon threads. 3481 A daemon thread is automatically terminated when all non-daemon threads 3482 have terminated. A non-daemon thread will prevent a program from 3483 terminating as long as it has not terminated. 3484 3485 If any `TaskPool` with non-daemon threads is active, either `stop` 3486 or `finish` must be called on it before the program can terminate. 3487 3488 The worker treads in the `TaskPool` instance returned by the 3489 `taskPool` property are daemon by default. The worker threads of 3490 manually instantiated task pools are non-daemon by default. 3491 3492 Note: For a size zero pool, the getter arbitrarily returns true and the 3493 setter has no effect. 3494 */ 3495 bool isDaemon() @property @trusted 3496 { 3497 queueLock(); 3498 scope(exit) queueUnlock(); 3499 return (size == 0) ? true : pool[0].isDaemon; 3500 } 3501 3502 /// Ditto 3503 void isDaemon(bool newVal) @property @trusted 3504 { 3505 queueLock(); 3506 scope(exit) queueUnlock(); 3507 foreach (thread; pool) 3508 { 3509 thread.isDaemon = newVal; 3510 } 3511 } 3512 3513 /** 3514 These functions allow getting and setting the OS scheduling priority of 3515 the worker threads in this `TaskPool`. They forward to 3516 `core.thread.Thread.priority`, so a given priority value here means the 3517 same thing as an identical priority value in `core.thread`. 3518 3519 Note: For a size zero pool, the getter arbitrarily returns 3520 `core.thread.Thread.PRIORITY_MIN` and the setter has no effect. 3521 */ 3522 int priority() @property @trusted 3523 { 3524 return (size == 0) ? core.thread.Thread.PRIORITY_MIN : 3525 pool[0].priority; 3526 } 3527 3528 /// Ditto 3529 void priority(int newPriority) @property @trusted 3530 { 3531 if (size > 0) 3532 { 3533 foreach (t; pool) 3534 { 3535 t.priority = newPriority; 3536 } 3537 } 3538 } 3539 } 3540 3541 @system unittest 3542 { 3543 import std.algorithm.iteration : sum; 3544 import std.range : iota; 3545 import std.typecons : tuple; 3546 3547 enum N = 100; 3548 auto r = iota(1, N + 1); 3549 const expected = r.sum(); 3550 3551 // Just the range 3552 assert(taskPool.fold!"a + b"(r) == expected); 3553 3554 // Range and seeds 3555 assert(taskPool.fold!"a + b"(r, 0) == expected); 3556 assert(taskPool.fold!("a + b", "a + b")(r, 0, 0) == tuple(expected, expected)); 3557 3558 // Range, seeds, and work unit size 3559 assert(taskPool.fold!"a + b"(r, 0, 42) == expected); 3560 assert(taskPool.fold!("a + b", "a + b")(r, 0, 0, 42) == tuple(expected, expected)); 3561 } 3562 3563 // Issue 16705 3564 @system unittest 3565 { 3566 struct MyIota 3567 { 3568 size_t front; 3569 void popFront()(){front++;} 3570 auto empty(){return front >= 25;} 3571 auto opIndex(size_t i){return front+i;} 3572 auto length(){return 25-front;} 3573 } 3574 3575 auto mySum = taskPool.reduce!"a + b"(MyIota()); 3576 } 3577 3578 /** 3579 Returns a lazily initialized global instantiation of `TaskPool`. 3580 This function can safely be called concurrently from multiple non-worker 3581 threads. The worker threads in this pool are daemon threads, meaning that it 3582 is not necessary to call `TaskPool.stop` or `TaskPool.finish` before 3583 terminating the main thread. 3584 */ 3585 @property TaskPool taskPool() @trusted 3586 { 3587 import std.concurrency : initOnce; 3588 __gshared TaskPool pool; 3589 return initOnce!pool({ 3590 auto p = new TaskPool(defaultPoolThreads); 3591 p.isDaemon = true; 3592 return p; 3593 }()); 3594 } 3595 3596 private shared uint _defaultPoolThreads = uint.max; 3597 3598 /** 3599 These properties get and set the number of worker threads in the `TaskPool` 3600 instance returned by `taskPool`. The default value is `totalCPUs` - 1. 3601 Calling the setter after the first call to `taskPool` does not changes 3602 number of worker threads in the instance returned by `taskPool`. 3603 */ 3604 @property uint defaultPoolThreads() @trusted 3605 { 3606 const local = atomicLoad(_defaultPoolThreads); 3607 return local < uint.max ? local : totalCPUs - 1; 3608 } 3609 3610 /// Ditto 3611 @property void defaultPoolThreads(uint newVal) @trusted 3612 { 3613 atomicStore(_defaultPoolThreads, newVal); 3614 } 3615 3616 /** 3617 Convenience functions that forwards to `taskPool.parallel`. The 3618 purpose of these is to make parallel foreach less verbose and more 3619 readable. 3620 3621 Example: 3622 --- 3623 // Find the logarithm of every number from 3624 // 1 to 1_000_000 in parallel, using the 3625 // default TaskPool instance. 3626 auto logs = new double[1_000_000]; 3627 3628 foreach (i, ref elem; parallel(logs)) 3629 { 3630 elem = log(i + 1.0); 3631 } 3632 --- 3633 3634 */ 3635 ParallelForeach!R parallel(R)(R range) 3636 { 3637 return taskPool.parallel(range); 3638 } 3639 3640 /// Ditto 3641 ParallelForeach!R parallel(R)(R range, size_t workUnitSize) 3642 { 3643 return taskPool.parallel(range, workUnitSize); 3644 } 3645 3646 // `each` should be usable with parallel 3647 // https://issues.dlang.org/show_bug.cgi?id=17019 3648 @system unittest 3649 { 3650 import std.algorithm.iteration : each, sum; 3651 import std.range : iota; 3652 3653 // check behavior with parallel 3654 auto arr = new int[10]; 3655 parallel(arr).each!((ref e) => e += 1); 3656 assert(arr.sum == 10); 3657 3658 auto arrIndex = new int[10]; 3659 parallel(arrIndex).each!((i, ref e) => e += i); 3660 assert(arrIndex.sum == 10.iota.sum); 3661 } 3662 3663 // https://issues.dlang.org/show_bug.cgi?id=22745 3664 @system unittest 3665 { 3666 auto pool = new TaskPool(0); 3667 int[] empty; 3668 foreach (i; pool.parallel(empty)) {} 3669 pool.finish(); 3670 } 3671 3672 // Thrown when a parallel foreach loop is broken from. 3673 class ParallelForeachError : Error 3674 { 3675 this() 3676 { 3677 super("Cannot break from a parallel foreach loop using break, return, " 3678 ~ "labeled break/continue or goto statements."); 3679 } 3680 } 3681 3682 /*------Structs that implement opApply for parallel foreach.------------------*/ 3683 private template randLen(R) 3684 { 3685 enum randLen = isRandomAccessRange!R && hasLength!R; 3686 } 3687 3688 private void submitAndExecute( 3689 TaskPool pool, 3690 scope void delegate() doIt 3691 ) 3692 { 3693 import core.exception : OutOfMemoryError; 3694 immutable nThreads = pool.size + 1; 3695 3696 alias PTask = typeof(scopedTask(doIt)); 3697 import core.stdc.stdlib : malloc, free; 3698 import core.stdc.string : memcpy; 3699 3700 // The logical thing to do would be to just use alloca() here, but that 3701 // causes problems on Windows for reasons that I don't understand 3702 // (tentatively a compiler bug) and definitely doesn't work on Posix due 3703 // to https://issues.dlang.org/show_bug.cgi?id=3753. 3704 // Therefore, allocate a fixed buffer and fall back to `malloc()` if 3705 // someone's using a ridiculous amount of threads. 3706 // Also, the using a byte array instead of a PTask array as the fixed buffer 3707 // is to prevent d'tors from being called on uninitialized excess PTask 3708 // instances. 3709 enum nBuf = 64; 3710 byte[nBuf * PTask.sizeof] buf = void; 3711 PTask[] tasks; 3712 if (nThreads <= nBuf) 3713 { 3714 tasks = (cast(PTask*) buf.ptr)[0 .. nThreads]; 3715 } 3716 else 3717 { 3718 auto ptr = cast(PTask*) malloc(nThreads * PTask.sizeof); 3719 if (!ptr) throw new OutOfMemoryError("Out of memory in std.parallelism."); 3720 tasks = ptr[0 .. nThreads]; 3721 } 3722 3723 scope(exit) 3724 { 3725 if (nThreads > nBuf) 3726 { 3727 free(tasks.ptr); 3728 } 3729 } 3730 3731 foreach (ref t; tasks) 3732 { 3733 import core.stdc.string : memcpy; 3734 3735 // This silly looking code is necessary to prevent d'tors from being 3736 // called on uninitialized objects. 3737 auto temp = scopedTask(doIt); 3738 memcpy(&t, &temp, PTask.sizeof); 3739 3740 // This has to be done to t after copying, not temp before copying. 3741 // Otherwise, temp's destructor will sit here and wait for the 3742 // task to finish. 3743 t.pool = pool; 3744 } 3745 3746 foreach (i; 1 .. tasks.length - 1) 3747 { 3748 tasks[i].next = tasks[i + 1].basePtr; 3749 tasks[i + 1].prev = tasks[i].basePtr; 3750 } 3751 3752 if (tasks.length > 1) 3753 { 3754 pool.queueLock(); 3755 scope(exit) pool.queueUnlock(); 3756 3757 pool.abstractPutGroupNoSync( 3758 tasks[1].basePtr, 3759 tasks[$ - 1].basePtr 3760 ); 3761 } 3762 3763 if (tasks.length > 0) 3764 { 3765 try 3766 { 3767 tasks[0].job(); 3768 } 3769 catch (Throwable e) 3770 { 3771 tasks[0].exception = e; // nocoverage 3772 } 3773 tasks[0].taskStatus = TaskStatus.done; 3774 3775 // Try to execute each of these in the current thread 3776 foreach (ref task; tasks[1..$]) 3777 { 3778 pool.tryDeleteExecute(task.basePtr); 3779 } 3780 } 3781 3782 Throwable firstException; 3783 3784 foreach (i, ref task; tasks) 3785 { 3786 try 3787 { 3788 task.yieldForce; 3789 } 3790 catch (Throwable e) 3791 { 3792 /* Chain e to front because order doesn't matter and because 3793 * e is not likely to be a chain itself (so fewer traversals) 3794 */ 3795 firstException = Throwable.chainTogether(e, firstException); 3796 continue; 3797 } 3798 } 3799 3800 if (firstException) throw firstException; 3801 } 3802 3803 void foreachErr() 3804 { 3805 throw new ParallelForeachError(); 3806 } 3807 3808 int doSizeZeroCase(R, Delegate)(ref ParallelForeach!R p, Delegate dg) 3809 { 3810 with(p) 3811 { 3812 int res = 0; 3813 size_t index = 0; 3814 3815 // The explicit ElementType!R in the foreach loops is necessary for 3816 // correct behavior when iterating over strings. 3817 static if (hasLvalueElements!R) 3818 { 3819 foreach (ref ElementType!R elem; range) 3820 { 3821 static if (Parameters!dg.length == 2) 3822 { 3823 res = dg(index, elem); 3824 } 3825 else 3826 { 3827 res = dg(elem); 3828 } 3829 if (res) break; 3830 index++; 3831 } 3832 } 3833 else 3834 { 3835 foreach (ElementType!R elem; range) 3836 { 3837 static if (Parameters!dg.length == 2) 3838 { 3839 res = dg(index, elem); 3840 } 3841 else 3842 { 3843 res = dg(elem); 3844 } 3845 if (res) break; 3846 index++; 3847 } 3848 } 3849 if (res) foreachErr; 3850 return res; 3851 } 3852 } 3853 3854 private enum string parallelApplyMixinRandomAccess = q{ 3855 // Handle empty thread pool as special case. 3856 if (pool.size == 0) 3857 { 3858 return doSizeZeroCase(this, dg); 3859 } 3860 3861 // Whether iteration is with or without an index variable. 3862 enum withIndex = Parameters!(typeof(dg)).length == 2; 3863 3864 shared size_t workUnitIndex = size_t.max; // Effectively -1: chunkIndex + 1 == 0 3865 immutable len = range.length; 3866 if (!len) return 0; 3867 3868 shared bool shouldContinue = true; 3869 3870 void doIt() 3871 { 3872 import std.algorithm.comparison : min; 3873 3874 scope(failure) 3875 { 3876 // If an exception is thrown, all threads should bail. 3877 atomicStore(shouldContinue, false); 3878 } 3879 3880 while (atomicLoad(shouldContinue)) 3881 { 3882 immutable myUnitIndex = atomicOp!"+="(workUnitIndex, 1); 3883 immutable start = workUnitSize * myUnitIndex; 3884 if (start >= len) 3885 { 3886 atomicStore(shouldContinue, false); 3887 break; 3888 } 3889 3890 immutable end = min(len, start + workUnitSize); 3891 3892 foreach (i; start .. end) 3893 { 3894 static if (withIndex) 3895 { 3896 if (dg(i, range[i])) foreachErr(); 3897 } 3898 else 3899 { 3900 if (dg(range[i])) foreachErr(); 3901 } 3902 } 3903 } 3904 } 3905 3906 submitAndExecute(pool, &doIt); 3907 3908 return 0; 3909 }; 3910 3911 enum string parallelApplyMixinInputRange = q{ 3912 // Handle empty thread pool as special case. 3913 if (pool.size == 0) 3914 { 3915 return doSizeZeroCase(this, dg); 3916 } 3917 3918 // Whether iteration is with or without an index variable. 3919 enum withIndex = Parameters!(typeof(dg)).length == 2; 3920 3921 // This protects the range while copying it. 3922 auto rangeMutex = new Mutex(); 3923 3924 shared bool shouldContinue = true; 3925 3926 // The total number of elements that have been popped off range. 3927 // This is updated only while protected by rangeMutex; 3928 size_t nPopped = 0; 3929 3930 static if ( 3931 is(typeof(range.buf1)) && 3932 is(typeof(range.bufPos)) && 3933 is(typeof(range.doBufSwap())) 3934 ) 3935 { 3936 // Make sure we don't have the buffer recycling overload of 3937 // asyncBuf. 3938 static if ( 3939 is(typeof(range.source)) && 3940 isRoundRobin!(typeof(range.source)) 3941 ) 3942 { 3943 static assert(0, "Cannot execute a parallel foreach loop on " ~ 3944 "the buffer recycling overload of asyncBuf."); 3945 } 3946 3947 enum bool bufferTrick = true; 3948 } 3949 else 3950 { 3951 enum bool bufferTrick = false; 3952 } 3953 3954 void doIt() 3955 { 3956 scope(failure) 3957 { 3958 // If an exception is thrown, all threads should bail. 3959 atomicStore(shouldContinue, false); 3960 } 3961 3962 static if (hasLvalueElements!R) 3963 { 3964 alias Temp = ElementType!R*[]; 3965 Temp temp; 3966 3967 // Returns: The previous value of nPopped. 3968 size_t makeTemp() 3969 { 3970 import std.algorithm.internal : addressOf; 3971 import std.array : uninitializedArray; 3972 3973 if (temp is null) 3974 { 3975 temp = uninitializedArray!Temp(workUnitSize); 3976 } 3977 3978 rangeMutex.lock(); 3979 scope(exit) rangeMutex.unlock(); 3980 3981 size_t i = 0; 3982 for (; i < workUnitSize && !range.empty; range.popFront(), i++) 3983 { 3984 temp[i] = addressOf(range.front); 3985 } 3986 3987 temp = temp[0 .. i]; 3988 auto ret = nPopped; 3989 nPopped += temp.length; 3990 return ret; 3991 } 3992 3993 } 3994 else 3995 { 3996 3997 alias Temp = ElementType!R[]; 3998 Temp temp; 3999 4000 // Returns: The previous value of nPopped. 4001 static if (!bufferTrick) size_t makeTemp() 4002 { 4003 import std.array : uninitializedArray; 4004 4005 if (temp is null) 4006 { 4007 temp = uninitializedArray!Temp(workUnitSize); 4008 } 4009 4010 rangeMutex.lock(); 4011 scope(exit) rangeMutex.unlock(); 4012 4013 size_t i = 0; 4014 for (; i < workUnitSize && !range.empty; range.popFront(), i++) 4015 { 4016 temp[i] = range.front; 4017 } 4018 4019 temp = temp[0 .. i]; 4020 auto ret = nPopped; 4021 nPopped += temp.length; 4022 return ret; 4023 } 4024 4025 static if (bufferTrick) size_t makeTemp() 4026 { 4027 import std.algorithm.mutation : swap; 4028 rangeMutex.lock(); 4029 scope(exit) rangeMutex.unlock(); 4030 4031 // Elide copying by just swapping buffers. 4032 temp.length = range.buf1.length; 4033 swap(range.buf1, temp); 4034 4035 // This is necessary in case popFront() has been called on 4036 // range before entering the parallel foreach loop. 4037 temp = temp[range.bufPos..$]; 4038 4039 static if (is(typeof(range._length))) 4040 { 4041 range._length -= (temp.length - range.bufPos); 4042 } 4043 4044 range.doBufSwap(); 4045 auto ret = nPopped; 4046 nPopped += temp.length; 4047 return ret; 4048 } 4049 } 4050 4051 while (atomicLoad(shouldContinue)) 4052 { 4053 auto overallIndex = makeTemp(); 4054 if (temp.empty) 4055 { 4056 atomicStore(shouldContinue, false); 4057 break; 4058 } 4059 4060 foreach (i; 0 .. temp.length) 4061 { 4062 scope(success) overallIndex++; 4063 4064 static if (hasLvalueElements!R) 4065 { 4066 static if (withIndex) 4067 { 4068 if (dg(overallIndex, *temp[i])) foreachErr(); 4069 } 4070 else 4071 { 4072 if (dg(*temp[i])) foreachErr(); 4073 } 4074 } 4075 else 4076 { 4077 static if (withIndex) 4078 { 4079 if (dg(overallIndex, temp[i])) foreachErr(); 4080 } 4081 else 4082 { 4083 if (dg(temp[i])) foreachErr(); 4084 } 4085 } 4086 } 4087 } 4088 } 4089 4090 submitAndExecute(pool, &doIt); 4091 4092 return 0; 4093 }; 4094 4095 4096 private struct ParallelForeach(R) 4097 { 4098 TaskPool pool; 4099 R range; 4100 size_t workUnitSize; 4101 alias E = ElementType!R; 4102 4103 static if (hasLvalueElements!R) 4104 { 4105 alias NoIndexDg = int delegate(ref E); 4106 alias IndexDg = int delegate(size_t, ref E); 4107 } 4108 else 4109 { 4110 alias NoIndexDg = int delegate(E); 4111 alias IndexDg = int delegate(size_t, E); 4112 } 4113 4114 int opApply(scope NoIndexDg dg) 4115 { 4116 static if (randLen!R) 4117 { 4118 mixin(parallelApplyMixinRandomAccess); 4119 } 4120 else 4121 { 4122 mixin(parallelApplyMixinInputRange); 4123 } 4124 } 4125 4126 int opApply(scope IndexDg dg) 4127 { 4128 static if (randLen!R) 4129 { 4130 mixin(parallelApplyMixinRandomAccess); 4131 } 4132 else 4133 { 4134 mixin(parallelApplyMixinInputRange); 4135 } 4136 } 4137 } 4138 4139 /* 4140 This struct buffers the output of a callable that outputs data into a 4141 user-supplied buffer into a set of buffers of some fixed size. It allows these 4142 buffers to be accessed with an input range interface. This is used internally 4143 in the buffer-recycling overload of TaskPool.asyncBuf, which creates an 4144 instance and forwards it to the input range overload of asyncBuf. 4145 */ 4146 private struct RoundRobinBuffer(C1, C2) 4147 { 4148 // No need for constraints because they're already checked for in asyncBuf. 4149 4150 alias Array = Parameters!(C1.init)[0]; 4151 alias T = typeof(Array.init[0]); 4152 4153 T[][] bufs; 4154 size_t index; 4155 C1 nextDel; 4156 C2 emptyDel; 4157 bool _empty; 4158 bool primed; 4159 4160 this( 4161 C1 nextDel, 4162 C2 emptyDel, 4163 size_t initialBufSize, 4164 size_t nBuffers 4165 ) { 4166 this.nextDel = nextDel; 4167 this.emptyDel = emptyDel; 4168 bufs.length = nBuffers; 4169 4170 foreach (ref buf; bufs) 4171 { 4172 buf.length = initialBufSize; 4173 } 4174 } 4175 4176 void prime() 4177 in 4178 { 4179 assert(!empty); 4180 } 4181 do 4182 { 4183 scope(success) primed = true; 4184 nextDel(bufs[index]); 4185 } 4186 4187 4188 T[] front() @property 4189 in 4190 { 4191 assert(!empty); 4192 } 4193 do 4194 { 4195 if (!primed) prime(); 4196 return bufs[index]; 4197 } 4198 4199 void popFront() 4200 { 4201 if (empty || emptyDel()) 4202 { 4203 _empty = true; 4204 return; 4205 } 4206 4207 index = (index + 1) % bufs.length; 4208 primed = false; 4209 } 4210 4211 bool empty() @property const @safe pure nothrow 4212 { 4213 return _empty; 4214 } 4215 } 4216 4217 version (StdUnittest) 4218 { 4219 // This was the only way I could get nested maps to work. 4220 private __gshared TaskPool poolInstance; 4221 } 4222 4223 // These test basic functionality but don't stress test for threading bugs. 4224 // These are the tests that should be run every time Phobos is compiled. 4225 @system unittest 4226 { 4227 import std.algorithm.comparison : equal, min, max; 4228 import std.algorithm.iteration : filter, map, reduce; 4229 import std.array : split; 4230 import std.conv : text; 4231 import std.exception : assertThrown; 4232 import std.math.operations : isClose; 4233 import std.math.algebraic : sqrt, abs; 4234 import std.math.exponential : log; 4235 import std.range : indexed, iota, join; 4236 import std.typecons : Tuple, tuple; 4237 import std.stdio; 4238 4239 poolInstance = new TaskPool(2); 4240 scope(exit) poolInstance.stop(); 4241 4242 // The only way this can be verified is manually. 4243 debug(std_parallelism) stderr.writeln("totalCPUs = ", totalCPUs); 4244 4245 auto oldPriority = poolInstance.priority; 4246 poolInstance.priority = Thread.PRIORITY_MAX; 4247 assert(poolInstance.priority == Thread.PRIORITY_MAX); 4248 4249 poolInstance.priority = Thread.PRIORITY_MIN; 4250 assert(poolInstance.priority == Thread.PRIORITY_MIN); 4251 4252 poolInstance.priority = oldPriority; 4253 assert(poolInstance.priority == oldPriority); 4254 4255 static void refFun(ref uint num) 4256 { 4257 num++; 4258 } 4259 4260 uint x; 4261 4262 // Test task(). 4263 auto t = task!refFun(x); 4264 poolInstance.put(t); 4265 t.yieldForce; 4266 assert(t.args[0] == 1); 4267 4268 auto t2 = task(&refFun, x); 4269 poolInstance.put(t2); 4270 t2.yieldForce; 4271 assert(t2.args[0] == 1); 4272 4273 // Test scopedTask(). 4274 auto st = scopedTask!refFun(x); 4275 poolInstance.put(st); 4276 st.yieldForce; 4277 assert(st.args[0] == 1); 4278 4279 auto st2 = scopedTask(&refFun, x); 4280 poolInstance.put(st2); 4281 st2.yieldForce; 4282 assert(st2.args[0] == 1); 4283 4284 // Test executeInNewThread(). 4285 auto ct = scopedTask!refFun(x); 4286 ct.executeInNewThread(Thread.PRIORITY_MAX); 4287 ct.yieldForce; 4288 assert(ct.args[0] == 1); 4289 4290 // Test ref return. 4291 uint toInc = 0; 4292 static ref T makeRef(T)(ref T num) 4293 { 4294 return num; 4295 } 4296 4297 auto t3 = task!makeRef(toInc); 4298 taskPool.put(t3); 4299 assert(t3.args[0] == 0); 4300 t3.spinForce++; 4301 assert(t3.args[0] == 1); 4302 4303 static void testSafe() @safe { 4304 static int bump(int num) 4305 { 4306 return num + 1; 4307 } 4308 4309 auto safePool = new TaskPool(0); 4310 auto t = task(&bump, 1); 4311 taskPool.put(t); 4312 assert(t.yieldForce == 2); 4313 4314 auto st = scopedTask(&bump, 1); 4315 taskPool.put(st); 4316 assert(st.yieldForce == 2); 4317 safePool.stop(); 4318 } 4319 4320 auto arr = [1,2,3,4,5]; 4321 auto nums = new uint[5]; 4322 auto nums2 = new uint[5]; 4323 4324 foreach (i, ref elem; poolInstance.parallel(arr)) 4325 { 4326 elem++; 4327 nums[i] = cast(uint) i + 2; 4328 nums2[i] = elem; 4329 } 4330 4331 assert(nums == [2,3,4,5,6], text(nums)); 4332 assert(nums2 == nums, text(nums2)); 4333 assert(arr == nums, text(arr)); 4334 4335 // Test const/immutable arguments. 4336 static int add(int lhs, int rhs) 4337 { 4338 return lhs + rhs; 4339 } 4340 immutable addLhs = 1; 4341 immutable addRhs = 2; 4342 auto addTask = task(&add, addLhs, addRhs); 4343 auto addScopedTask = scopedTask(&add, addLhs, addRhs); 4344 poolInstance.put(addTask); 4345 poolInstance.put(addScopedTask); 4346 assert(addTask.yieldForce == 3); 4347 assert(addScopedTask.yieldForce == 3); 4348 4349 // Test parallel foreach with non-random access range. 4350 auto range = filter!"a != 666"([0, 1, 2, 3, 4]); 4351 4352 foreach (i, elem; poolInstance.parallel(range)) 4353 { 4354 nums[i] = cast(uint) i; 4355 } 4356 4357 assert(nums == [0,1,2,3,4]); 4358 4359 auto logs = new double[1_000_000]; 4360 foreach (i, ref elem; poolInstance.parallel(logs)) 4361 { 4362 elem = log(i + 1.0); 4363 } 4364 4365 foreach (i, elem; logs) 4366 { 4367 assert(isClose(elem, log(double(i + 1)))); 4368 } 4369 4370 assert(poolInstance.amap!"a * a"([1,2,3,4,5]) == [1,4,9,16,25]); 4371 assert(poolInstance.amap!"a * a"([1,2,3,4,5], new long[5]) == [1,4,9,16,25]); 4372 assert(poolInstance.amap!("a * a", "-a")([1,2,3]) == 4373 [tuple(1, -1), tuple(4, -2), tuple(9, -3)]); 4374 4375 auto tupleBuf = new Tuple!(int, int)[3]; 4376 poolInstance.amap!("a * a", "-a")([1,2,3], tupleBuf); 4377 assert(tupleBuf == [tuple(1, -1), tuple(4, -2), tuple(9, -3)]); 4378 poolInstance.amap!("a * a", "-a")([1,2,3], 5, tupleBuf); 4379 assert(tupleBuf == [tuple(1, -1), tuple(4, -2), tuple(9, -3)]); 4380 4381 // Test amap with a non-array buffer. 4382 auto toIndex = new int[5]; 4383 auto ind = indexed(toIndex, [3, 1, 4, 0, 2]); 4384 poolInstance.amap!"a * 2"([1, 2, 3, 4, 5], ind); 4385 assert(equal(ind, [2, 4, 6, 8, 10])); 4386 assert(equal(toIndex, [8, 4, 10, 2, 6])); 4387 poolInstance.amap!"a / 2"(ind, ind); 4388 assert(equal(ind, [1, 2, 3, 4, 5])); 4389 assert(equal(toIndex, [4, 2, 5, 1, 3])); 4390 4391 auto buf = new int[5]; 4392 poolInstance.amap!"a * a"([1,2,3,4,5], buf); 4393 assert(buf == [1,4,9,16,25]); 4394 poolInstance.amap!"a * a"([1,2,3,4,5], 4, buf); 4395 assert(buf == [1,4,9,16,25]); 4396 4397 assert(poolInstance.reduce!"a + b"([1]) == 1); 4398 assert(poolInstance.reduce!"a + b"([1,2,3,4]) == 10); 4399 assert(poolInstance.reduce!"a + b"(0.0, [1,2,3,4]) == 10); 4400 assert(poolInstance.reduce!"a + b"(0.0, [1,2,3,4], 1) == 10); 4401 assert(poolInstance.reduce!(min, max)([1,2,3,4]) == tuple(1, 4)); 4402 assert(poolInstance.reduce!("a + b", "a * b")(tuple(0, 1), [1,2,3,4]) == 4403 tuple(10, 24)); 4404 4405 immutable serialAns = reduce!"a + b"(iota(1000)); 4406 assert(poolInstance.reduce!"a + b"(0, iota(1000)) == serialAns); 4407 assert(poolInstance.reduce!"a + b"(iota(1000)) == serialAns); 4408 4409 // Test worker-local storage. 4410 auto wl = poolInstance.workerLocalStorage(0); 4411 foreach (i; poolInstance.parallel(iota(1000), 1)) 4412 { 4413 wl.get = wl.get + i; 4414 } 4415 4416 auto wlRange = wl.toRange; 4417 auto parallelSum = poolInstance.reduce!"a + b"(wlRange); 4418 assert(parallelSum == 499500); 4419 assert(wlRange[0 .. 1][0] == wlRange[0]); 4420 assert(wlRange[1 .. 2][0] == wlRange[1]); 4421 4422 // Test finish() 4423 { 4424 static void slowFun() { Thread.sleep(dur!"msecs"(1)); } 4425 4426 auto pool1 = new TaskPool(); 4427 auto tSlow = task!slowFun(); 4428 pool1.put(tSlow); 4429 pool1.finish(); 4430 tSlow.yieldForce; 4431 // Can't assert that pool1.status == PoolState.stopNow because status 4432 // doesn't change until after the "done" flag is set and the waiting 4433 // thread is woken up. 4434 4435 auto pool2 = new TaskPool(); 4436 auto tSlow2 = task!slowFun(); 4437 pool2.put(tSlow2); 4438 pool2.finish(true); // blocking 4439 assert(tSlow2.done); 4440 4441 // Test fix for https://issues.dlang.org/show_bug.cgi?id=8582 by making pool size zero. 4442 auto pool3 = new TaskPool(0); 4443 auto tSlow3 = task!slowFun(); 4444 pool3.put(tSlow3); 4445 pool3.finish(true); // blocking 4446 assert(tSlow3.done); 4447 4448 // This is correct because no thread will terminate unless pool2.status 4449 // and pool3.status have already been set to stopNow. 4450 assert(pool2.status == TaskPool.PoolState.stopNow); 4451 assert(pool3.status == TaskPool.PoolState.stopNow); 4452 } 4453 4454 // Test default pool stuff. 4455 assert(taskPool.size == totalCPUs - 1); 4456 4457 nums = new uint[1000]; 4458 foreach (i; parallel(iota(1000))) 4459 { 4460 nums[i] = cast(uint) i; 4461 } 4462 assert(equal(nums, iota(1000))); 4463 4464 assert(equal( 4465 poolInstance.map!"a * a"(iota(3_000_001), 10_000), 4466 map!"a * a"(iota(3_000_001)) 4467 )); 4468 4469 // The filter is to kill random access and test the non-random access 4470 // branch. 4471 assert(equal( 4472 poolInstance.map!"a * a"( 4473 filter!"a == a"(iota(3_000_001) 4474 ), 10_000, 1000), 4475 map!"a * a"(iota(3_000_001)) 4476 )); 4477 4478 assert( 4479 reduce!"a + b"(0UL, 4480 poolInstance.map!"a * a"(iota(300_001), 10_000) 4481 ) == 4482 reduce!"a + b"(0UL, 4483 map!"a * a"(iota(300_001)) 4484 ) 4485 ); 4486 4487 assert(equal( 4488 iota(1_000_002), 4489 poolInstance.asyncBuf(filter!"a == a"(iota(1_000_002))) 4490 )); 4491 4492 { 4493 import std.conv : to; 4494 import std.file : deleteme; 4495 4496 string temp_file = deleteme ~ "-tempDelMe.txt"; 4497 auto file = File(temp_file, "wb"); 4498 scope(exit) 4499 { 4500 file.close(); 4501 import std.file; 4502 remove(temp_file); 4503 } 4504 4505 auto written = [[1.0, 2, 3], [4.0, 5, 6], [7.0, 8, 9]]; 4506 foreach (row; written) 4507 { 4508 file.writeln(join(to!(string[])(row), "\t")); 4509 } 4510 4511 file = File(temp_file); 4512 4513 void next(ref char[] buf) 4514 { 4515 file.readln(buf); 4516 import std.string : chomp; 4517 buf = chomp(buf); 4518 } 4519 4520 double[][] read; 4521 auto asyncReader = taskPool.asyncBuf(&next, &file.eof); 4522 4523 foreach (line; asyncReader) 4524 { 4525 if (line.length == 0) continue; 4526 auto ls = line.split("\t"); 4527 read ~= to!(double[])(ls); 4528 } 4529 4530 assert(read == written); 4531 file.close(); 4532 } 4533 4534 // Test Map/AsyncBuf chaining. 4535 4536 auto abuf = poolInstance.asyncBuf(iota(-1.0, 3_000_000), 100); 4537 auto temp = poolInstance.map!sqrt( 4538 abuf, 100, 5 4539 ); 4540 auto lmchain = poolInstance.map!"a * a"(temp, 100, 5); 4541 lmchain.popFront(); 4542 4543 int ii; 4544 foreach ( elem; (lmchain)) 4545 { 4546 if (!isClose(elem, ii)) 4547 { 4548 stderr.writeln(ii, '\t', elem); 4549 } 4550 ii++; 4551 } 4552 4553 // Test buffer trick in parallel foreach. 4554 abuf = poolInstance.asyncBuf(iota(-1.0, 1_000_000), 100); 4555 abuf.popFront(); 4556 auto bufTrickTest = new size_t[abuf.length]; 4557 foreach (i, elem; parallel(abuf)) 4558 { 4559 bufTrickTest[i] = i; 4560 } 4561 4562 assert(equal(iota(1_000_000), bufTrickTest)); 4563 4564 auto myTask = task!(abs)(-1); 4565 taskPool.put(myTask); 4566 assert(myTask.spinForce == 1); 4567 4568 // Test that worker local storage from one pool receives an index of 0 4569 // when the index is queried w.r.t. another pool. The only way to do this 4570 // is non-deterministically. 4571 foreach (i; parallel(iota(1000), 1)) 4572 { 4573 assert(poolInstance.workerIndex == 0); 4574 } 4575 4576 foreach (i; poolInstance.parallel(iota(1000), 1)) 4577 { 4578 assert(taskPool.workerIndex == 0); 4579 } 4580 4581 // Test exception handling. 4582 static void parallelForeachThrow() 4583 { 4584 foreach (elem; parallel(iota(10))) 4585 { 4586 throw new Exception(""); 4587 } 4588 } 4589 4590 assertThrown!Exception(parallelForeachThrow()); 4591 4592 static int reduceException(int a, int b) 4593 { 4594 throw new Exception(""); 4595 } 4596 4597 assertThrown!Exception( 4598 poolInstance.reduce!reduceException(iota(3)) 4599 ); 4600 4601 static int mapException(int a) 4602 { 4603 throw new Exception(""); 4604 } 4605 4606 assertThrown!Exception( 4607 poolInstance.amap!mapException(iota(3)) 4608 ); 4609 4610 static void mapThrow() 4611 { 4612 auto m = poolInstance.map!mapException(iota(3)); 4613 m.popFront(); 4614 } 4615 4616 assertThrown!Exception(mapThrow()); 4617 4618 struct ThrowingRange 4619 { 4620 @property int front() 4621 { 4622 return 1; 4623 } 4624 void popFront() 4625 { 4626 throw new Exception(""); 4627 } 4628 enum bool empty = false; 4629 } 4630 4631 assertThrown!Exception(poolInstance.asyncBuf(ThrowingRange.init)); 4632 } 4633 4634 //version = parallelismStressTest; 4635 4636 // These are more like stress tests than real unit tests. They print out 4637 // tons of stuff and should not be run every time make unittest is run. 4638 version (parallelismStressTest) 4639 { 4640 @system unittest 4641 { 4642 import std.stdio : stderr, writeln, readln; 4643 import std.range : iota; 4644 import std.algorithm.iteration : filter, reduce; 4645 4646 size_t attempt; 4647 for (; attempt < 10; attempt++) 4648 foreach (poolSize; [0, 4]) 4649 { 4650 4651 poolInstance = new TaskPool(poolSize); 4652 4653 uint[] numbers = new uint[1_000]; 4654 4655 foreach (i; poolInstance.parallel( iota(0, numbers.length)) ) 4656 { 4657 numbers[i] = cast(uint) i; 4658 } 4659 4660 // Make sure it works. 4661 foreach (i; 0 .. numbers.length) 4662 { 4663 assert(numbers[i] == i); 4664 } 4665 4666 stderr.writeln("Done creating nums."); 4667 4668 4669 auto myNumbers = filter!"a % 7 > 0"( iota(0, 1000)); 4670 foreach (num; poolInstance.parallel(myNumbers)) 4671 { 4672 assert(num % 7 > 0 && num < 1000); 4673 } 4674 stderr.writeln("Done modulus test."); 4675 4676 uint[] squares = poolInstance.amap!"a * a"(numbers, 100); 4677 assert(squares.length == numbers.length); 4678 foreach (i, number; numbers) 4679 { 4680 assert(squares[i] == number * number); 4681 } 4682 stderr.writeln("Done squares."); 4683 4684 auto sumFuture = task!( reduce!"a + b" )(numbers); 4685 poolInstance.put(sumFuture); 4686 4687 ulong sumSquares = 0; 4688 foreach (elem; numbers) 4689 { 4690 sumSquares += elem * elem; 4691 } 4692 4693 uint mySum = sumFuture.spinForce(); 4694 assert(mySum == 999 * 1000 / 2); 4695 4696 auto mySumParallel = poolInstance.reduce!"a + b"(numbers); 4697 assert(mySum == mySumParallel); 4698 stderr.writeln("Done sums."); 4699 4700 auto myTask = task( 4701 { 4702 synchronized writeln("Our lives are parallel...Our lives are parallel."); 4703 }); 4704 poolInstance.put(myTask); 4705 4706 auto nestedOuter = "abcd"; 4707 auto nestedInner = iota(0, 10, 2); 4708 4709 foreach (i, letter; poolInstance.parallel(nestedOuter, 1)) 4710 { 4711 foreach (j, number; poolInstance.parallel(nestedInner, 1)) 4712 { 4713 synchronized writeln(i, ": ", letter, " ", j, ": ", number); 4714 } 4715 } 4716 4717 poolInstance.stop(); 4718 } 4719 4720 assert(attempt == 10); 4721 writeln("Press enter to go to next round of unittests."); 4722 readln(); 4723 } 4724 4725 // These unittests are intended more for actual testing and not so much 4726 // as examples. 4727 @system unittest 4728 { 4729 import std.stdio : stderr; 4730 import std.range : iota; 4731 import std.algorithm.iteration : filter, reduce; 4732 import std.math.algebraic : sqrt; 4733 import std.math.operations : isClose; 4734 import std.math.traits : isNaN; 4735 import std.conv : text; 4736 4737 foreach (attempt; 0 .. 10) 4738 foreach (poolSize; [0, 4]) 4739 { 4740 poolInstance = new TaskPool(poolSize); 4741 4742 // Test indexing. 4743 stderr.writeln("Creator Raw Index: ", poolInstance.threadIndex); 4744 assert(poolInstance.workerIndex() == 0); 4745 4746 // Test worker-local storage. 4747 auto workerLocalStorage = poolInstance.workerLocalStorage!uint(1); 4748 foreach (i; poolInstance.parallel(iota(0U, 1_000_000))) 4749 { 4750 workerLocalStorage.get++; 4751 } 4752 assert(reduce!"a + b"(workerLocalStorage.toRange) == 4753 1_000_000 + poolInstance.size + 1); 4754 4755 // Make sure work is reasonably balanced among threads. This test is 4756 // non-deterministic and is more of a sanity check than something that 4757 // has an absolute pass/fail. 4758 shared(uint)[void*] nJobsByThread; 4759 foreach (thread; poolInstance.pool) 4760 { 4761 nJobsByThread[cast(void*) thread] = 0; 4762 } 4763 nJobsByThread[ cast(void*) Thread.getThis()] = 0; 4764 4765 foreach (i; poolInstance.parallel( iota(0, 1_000_000), 100 )) 4766 { 4767 atomicOp!"+="( nJobsByThread[ cast(void*) Thread.getThis() ], 1); 4768 } 4769 4770 stderr.writeln("\nCurrent thread is: ", 4771 cast(void*) Thread.getThis()); 4772 stderr.writeln("Workload distribution: "); 4773 foreach (k, v; nJobsByThread) 4774 { 4775 stderr.writeln(k, '\t', v); 4776 } 4777 4778 // Test whether amap can be nested. 4779 real[][] matrix = new real[][](1000, 1000); 4780 foreach (i; poolInstance.parallel( iota(0, matrix.length) )) 4781 { 4782 foreach (j; poolInstance.parallel( iota(0, matrix[0].length) )) 4783 { 4784 matrix[i][j] = i * j; 4785 } 4786 } 4787 4788 // Get around weird bugs having to do w/ sqrt being an intrinsic: 4789 static real mySqrt(real num) 4790 { 4791 return sqrt(num); 4792 } 4793 4794 static real[] parallelSqrt(real[] nums) 4795 { 4796 return poolInstance.amap!mySqrt(nums); 4797 } 4798 4799 real[][] sqrtMatrix = poolInstance.amap!parallelSqrt(matrix); 4800 4801 foreach (i, row; sqrtMatrix) 4802 { 4803 foreach (j, elem; row) 4804 { 4805 real shouldBe = sqrt( cast(real) i * j); 4806 assert(isClose(shouldBe, elem)); 4807 sqrtMatrix[i][j] = shouldBe; 4808 } 4809 } 4810 4811 auto saySuccess = task( 4812 { 4813 stderr.writeln( 4814 "Success doing matrix stuff that involves nested pool use."); 4815 }); 4816 poolInstance.put(saySuccess); 4817 saySuccess.workForce(); 4818 4819 // A more thorough test of amap, reduce: Find the sum of the square roots of 4820 // matrix. 4821 4822 static real parallelSum(real[] input) 4823 { 4824 return poolInstance.reduce!"a + b"(input); 4825 } 4826 4827 auto sumSqrt = poolInstance.reduce!"a + b"( 4828 poolInstance.amap!parallelSum( 4829 sqrtMatrix 4830 ) 4831 ); 4832 4833 assert(isClose(sumSqrt, 4.437e8, 1e-2)); 4834 stderr.writeln("Done sum of square roots."); 4835 4836 // Test whether tasks work with function pointers. 4837 /+ // This part is buggy and needs to be fixed... 4838 auto nanTask = task(&isNaN, 1.0L); 4839 poolInstance.put(nanTask); 4840 assert(nanTask.spinForce == false); 4841 4842 if (poolInstance.size > 0) 4843 { 4844 // Test work waiting. 4845 static void uselessFun() 4846 { 4847 foreach (i; 0 .. 1_000_000) {} 4848 } 4849 4850 auto uselessTasks = new typeof(task(&uselessFun))[1000]; 4851 foreach (ref uselessTask; uselessTasks) 4852 { 4853 uselessTask = task(&uselessFun); 4854 } 4855 foreach (ref uselessTask; uselessTasks) 4856 { 4857 poolInstance.put(uselessTask); 4858 } 4859 foreach (ref uselessTask; uselessTasks) 4860 { 4861 uselessTask.workForce(); 4862 } 4863 } 4864 +/ 4865 4866 // Test the case of non-random access + ref returns. 4867 int[] nums = [1,2,3,4,5]; 4868 static struct RemoveRandom 4869 { 4870 int[] arr; 4871 4872 ref int front() 4873 { 4874 return arr.front; 4875 } 4876 void popFront() 4877 { 4878 arr.popFront(); 4879 } 4880 bool empty() 4881 { 4882 return arr.empty; 4883 } 4884 } 4885 4886 auto refRange = RemoveRandom(nums); 4887 foreach (ref elem; poolInstance.parallel(refRange)) 4888 { 4889 elem++; 4890 } 4891 assert(nums == [2,3,4,5,6], text(nums)); 4892 stderr.writeln("Nums: ", nums); 4893 4894 poolInstance.stop(); 4895 } 4896 } 4897 } 4898 4899 @system unittest 4900 { 4901 static struct __S_12733 4902 { 4903 invariant() { assert(checksum == 1_234_567_890); } 4904 this(ulong u){n = u;} 4905 void opAssign(__S_12733 s){this.n = s.n;} 4906 ulong n; 4907 ulong checksum = 1_234_567_890; 4908 } 4909 4910 static auto __genPair_12733(ulong n) { return __S_12733(n); } 4911 immutable ulong[] data = [ 2UL^^59-1, 2UL^^59-1, 2UL^^59-1, 112_272_537_195_293UL ]; 4912 4913 auto result = taskPool.amap!__genPair_12733(data); 4914 } 4915 4916 @safe unittest 4917 { 4918 import std.range : iota; 4919 4920 // this test was in std.range, but caused cycles. 4921 assert(__traits(compiles, { foreach (i; iota(0, 100UL).parallel) {} })); 4922 } 4923 4924 @safe unittest 4925 { 4926 import std.algorithm.iteration : each; 4927 4928 long[] arr; 4929 static assert(is(typeof({ 4930 arr.parallel.each!"a++"; 4931 }))); 4932 } 4933 4934 // https://issues.dlang.org/show_bug.cgi?id=17539 4935 @system unittest 4936 { 4937 import std.random : rndGen; 4938 // ensure compilation 4939 try foreach (rnd; rndGen.parallel) break; 4940 catch (ParallelForeachError e) {} 4941 }