1 /** 2 * $(SCRIPT inhibitQuickIndex = 1;) 3 * $(DIVC quickindex, 4 * $(BOOKTABLE, 5 * $(TR $(TH Category) $(TH Symbols)) 6 * $(TR $(TD Tid) $(TD 7 * $(MYREF locate) 8 * $(MYREF ownerTid) 9 * $(MYREF register) 10 * $(MYREF spawn) 11 * $(MYREF spawnLinked) 12 * $(MYREF thisTid) 13 * $(MYREF Tid) 14 * $(MYREF TidMissingException) 15 * $(MYREF unregister) 16 * )) 17 * $(TR $(TD Message passing) $(TD 18 * $(MYREF prioritySend) 19 * $(MYREF receive) 20 * $(MYREF receiveOnly) 21 * $(MYREF receiveTimeout) 22 * $(MYREF send) 23 * $(MYREF setMaxMailboxSize) 24 * )) 25 * $(TR $(TD Message-related types) $(TD 26 * $(MYREF LinkTerminated) 27 * $(MYREF MailboxFull) 28 * $(MYREF MessageMismatch) 29 * $(MYREF OnCrowding) 30 * $(MYREF OwnerTerminated) 31 * $(MYREF PriorityMessageException) 32 * )) 33 * $(TR $(TD Scheduler) $(TD 34 * $(MYREF FiberScheduler) 35 * $(MYREF Generator) 36 * $(MYREF Scheduler) 37 * $(MYREF scheduler) 38 * $(MYREF ThreadInfo) 39 * $(MYREF ThreadScheduler) 40 * $(MYREF yield) 41 * )) 42 * $(TR $(TD Misc) $(TD 43 * $(MYREF initOnce) 44 * )) 45 * )) 46 * 47 * This is a low-level messaging API upon which more structured or restrictive 48 * APIs may be built. The general idea is that every messageable entity is 49 * represented by a common handle type called a `Tid`, which allows messages to 50 * be sent to logical threads that are executing in both the current process 51 * and in external processes using the same interface. This is an important 52 * aspect of scalability because it allows the components of a program to be 53 * spread across available resources with few to no changes to the actual 54 * implementation. 55 * 56 * A logical thread is an execution context that has its own stack and which 57 * runs asynchronously to other logical threads. These may be preemptively 58 * scheduled kernel threads, $(MREF_ALTTEXT fibers, core, thread, fiber) 59 * (cooperative user-space threads), or some other concept with similar behavior. 60 * 61 * The type of concurrency used when logical threads are created is determined 62 * by the $(LREF Scheduler) selected at initialization time. The default behavior is 63 * currently to create a new kernel thread per call to spawn, but other 64 * schedulers are available that multiplex fibers across the main thread or 65 * use some combination of the two approaches. 66 * 67 * Copyright: Copyright Sean Kelly 2009 - 2014. 68 * License: <a href="http://www.boost.org/LICENSE_1_0.txt">Boost License 1.0</a>. 69 * Authors: Sean Kelly, Alex Rønne Petersen, Martin Nowak 70 * Source: $(PHOBOSSRC std/concurrency.d) 71 */ 72 /* Copyright Sean Kelly 2009 - 2014. 73 * Distributed under the Boost Software License, Version 1.0. 74 * (See accompanying file LICENSE_1_0.txt or copy at 75 * http://www.boost.org/LICENSE_1_0.txt) 76 */ 77 module std.concurrency; 78 79 public import std.variant; 80 81 import core.atomic; 82 import core.sync.condition; 83 import core.sync.mutex; 84 import core.thread; 85 import std.range.primitives; 86 import std.range.interfaces : InputRange; 87 import std.traits; 88 89 /// 90 @system unittest 91 { 92 __gshared string received; 93 static void spawnedFunc(Tid ownerTid) 94 { 95 import std.conv : text; 96 // Receive a message from the owner thread. 97 receive((int i){ 98 received = text("Received the number ", i); 99 100 // Send a message back to the owner thread 101 // indicating success. 102 send(ownerTid, true); 103 }); 104 } 105 106 // Start spawnedFunc in a new thread. 107 auto childTid = spawn(&spawnedFunc, thisTid); 108 109 // Send the number 42 to this new thread. 110 send(childTid, 42); 111 112 // Receive the result code. 113 auto wasSuccessful = receiveOnly!(bool); 114 assert(wasSuccessful); 115 assert(received == "Received the number 42"); 116 } 117 118 private 119 { 120 bool hasLocalAliasing(Types...)() 121 { 122 import std.typecons : Rebindable; 123 124 // Works around "statement is not reachable" 125 bool doesIt = false; 126 static foreach (T; Types) 127 { 128 static if (is(T == Tid)) 129 { /* Allowed */ } 130 else static if (is(T : Rebindable!R, R)) 131 doesIt |= hasLocalAliasing!R; 132 else static if (is(T == struct)) 133 doesIt |= hasLocalAliasing!(typeof(T.tupleof)); 134 else 135 doesIt |= std.traits.hasUnsharedAliasing!(T); 136 } 137 return doesIt; 138 } 139 140 @safe unittest 141 { 142 static struct Container { Tid t; } 143 static assert(!hasLocalAliasing!(Tid, Container, int)); 144 } 145 146 // https://issues.dlang.org/show_bug.cgi?id=20097 147 @safe unittest 148 { 149 import std.datetime.systime : SysTime; 150 static struct Container { SysTime time; } 151 static assert(!hasLocalAliasing!(SysTime, Container)); 152 } 153 154 enum MsgType 155 { 156 standard, 157 priority, 158 linkDead, 159 } 160 161 struct Message 162 { 163 MsgType type; 164 Variant data; 165 166 this(T...)(MsgType t, T vals) 167 if (T.length > 0) 168 { 169 static if (T.length == 1) 170 { 171 type = t; 172 data = vals[0]; 173 } 174 else 175 { 176 import std.typecons : Tuple; 177 178 type = t; 179 data = Tuple!(T)(vals); 180 } 181 } 182 183 @property auto convertsTo(T...)() 184 { 185 static if (T.length == 1) 186 { 187 return is(T[0] == Variant) || data.convertsTo!(T); 188 } 189 else 190 { 191 import std.typecons : Tuple; 192 return data.convertsTo!(Tuple!(T)); 193 } 194 } 195 196 @property auto get(T...)() 197 { 198 static if (T.length == 1) 199 { 200 static if (is(T[0] == Variant)) 201 return data; 202 else 203 return data.get!(T); 204 } 205 else 206 { 207 import std.typecons : Tuple; 208 return data.get!(Tuple!(T)); 209 } 210 } 211 212 auto map(Op)(Op op) 213 { 214 alias Args = Parameters!(Op); 215 216 static if (Args.length == 1) 217 { 218 static if (is(Args[0] == Variant)) 219 return op(data); 220 else 221 return op(data.get!(Args)); 222 } 223 else 224 { 225 import std.typecons : Tuple; 226 return op(data.get!(Tuple!(Args)).expand); 227 } 228 } 229 } 230 231 void checkops(T...)(T ops) 232 { 233 import std.format : format; 234 235 foreach (i, t1; T) 236 { 237 static assert(isFunctionPointer!t1 || isDelegate!t1, 238 format!"T %d is not a function pointer or delegates"(i)); 239 alias a1 = Parameters!(t1); 240 alias r1 = ReturnType!(t1); 241 242 static if (i < T.length - 1 && is(r1 == void)) 243 { 244 static assert(a1.length != 1 || !is(a1[0] == Variant), 245 "function with arguments " ~ a1.stringof ~ 246 " occludes successive function"); 247 248 foreach (t2; T[i + 1 .. $]) 249 { 250 alias a2 = Parameters!(t2); 251 252 static assert(!is(a1 == a2), 253 "function with arguments " ~ a1.stringof ~ " occludes successive function"); 254 } 255 } 256 } 257 } 258 259 @property ref ThreadInfo thisInfo() nothrow 260 { 261 import core.atomic : atomicLoad; 262 263 auto localScheduler = atomicLoad(scheduler); 264 if (localScheduler is null) 265 return ThreadInfo.thisInfo; 266 return localScheduler.thisInfo; 267 } 268 } 269 270 static ~this() 271 { 272 thisInfo.cleanup(); 273 } 274 275 // Exceptions 276 277 /** 278 * Thrown on calls to $(LREF receiveOnly) if a message other than the type 279 * the receiving thread expected is sent. 280 */ 281 class MessageMismatch : Exception 282 { 283 /// 284 this(string msg = "Unexpected message type") @safe pure nothrow @nogc 285 { 286 super(msg); 287 } 288 } 289 290 /** 291 * Thrown on calls to $(LREF receive) if the thread that spawned the receiving 292 * thread has terminated and no more messages exist. 293 */ 294 class OwnerTerminated : Exception 295 { 296 /// 297 this(Tid t, string msg = "Owner terminated") @safe pure nothrow @nogc 298 { 299 super(msg); 300 tid = t; 301 } 302 303 Tid tid; 304 } 305 306 /** 307 * Thrown if a linked thread has terminated. 308 */ 309 class LinkTerminated : Exception 310 { 311 /// 312 this(Tid t, string msg = "Link terminated") @safe pure nothrow @nogc 313 { 314 super(msg); 315 tid = t; 316 } 317 318 Tid tid; 319 } 320 321 /** 322 * Thrown if a message was sent to a thread via 323 * $(REF prioritySend, std,concurrency) and the receiver does not have a handler 324 * for a message of this type. 325 */ 326 class PriorityMessageException : Exception 327 { 328 /// 329 this(Variant vals) 330 { 331 super("Priority message"); 332 message = vals; 333 } 334 335 /** 336 * The message that was sent. 337 */ 338 Variant message; 339 } 340 341 /** 342 * Thrown on mailbox crowding if the mailbox is configured with 343 * `OnCrowding.throwException`. 344 */ 345 class MailboxFull : Exception 346 { 347 /// 348 this(Tid t, string msg = "Mailbox full") @safe pure nothrow @nogc 349 { 350 super(msg); 351 tid = t; 352 } 353 354 Tid tid; 355 } 356 357 /** 358 * Thrown when a `Tid` is missing, e.g. when $(LREF ownerTid) doesn't 359 * find an owner thread. 360 */ 361 class TidMissingException : Exception 362 { 363 import std.exception : basicExceptionCtors; 364 /// 365 mixin basicExceptionCtors; 366 } 367 368 369 // Thread ID 370 371 372 /** 373 * An opaque type used to represent a logical thread. 374 */ 375 struct Tid 376 { 377 private: 378 this(MessageBox m) @safe pure nothrow @nogc 379 { 380 mbox = m; 381 } 382 383 MessageBox mbox; 384 385 public: 386 387 /** 388 * Generate a convenient string for identifying this `Tid`. This is only 389 * useful to see if `Tid`'s that are currently executing are the same or 390 * different, e.g. for logging and debugging. It is potentially possible 391 * that a `Tid` executed in the future will have the same `toString()` output 392 * as another `Tid` that has already terminated. 393 */ 394 void toString(W)(ref W w) const 395 { 396 import std.format.write : formattedWrite; 397 auto p = () @trusted { return cast(void*) mbox; }(); 398 formattedWrite(w, "Tid(%x)", p); 399 } 400 401 } 402 403 @safe unittest 404 { 405 import std.conv : text; 406 Tid tid; 407 assert(text(tid) == "Tid(0)"); 408 auto tid2 = thisTid; 409 assert(text(tid2) != "Tid(0)"); 410 auto tid3 = tid2; 411 assert(text(tid2) == text(tid3)); 412 } 413 414 // https://issues.dlang.org/show_bug.cgi?id=21512 415 @system unittest 416 { 417 import std.format : format; 418 419 const(Tid) b = spawn(() {}); 420 assert(format!"%s"(b)[0 .. 4] == "Tid("); 421 } 422 423 /** 424 * Returns: The `Tid` of the caller's thread. 425 */ 426 @property Tid thisTid() @safe 427 { 428 // TODO: remove when concurrency is safe 429 static auto trus() @trusted 430 { 431 if (thisInfo.ident != Tid.init) 432 return thisInfo.ident; 433 thisInfo.ident = Tid(new MessageBox); 434 return thisInfo.ident; 435 } 436 437 return trus(); 438 } 439 440 /** 441 * Return the `Tid` of the thread which spawned the caller's thread. 442 * 443 * Throws: A `TidMissingException` exception if 444 * there is no owner thread. 445 */ 446 @property Tid ownerTid() 447 { 448 import std.exception : enforce; 449 450 enforce!TidMissingException(thisInfo.owner.mbox !is null, "Error: Thread has no owner thread."); 451 return thisInfo.owner; 452 } 453 454 @system unittest 455 { 456 import std.exception : assertThrown; 457 458 static void fun() 459 { 460 string res = receiveOnly!string(); 461 assert(res == "Main calling"); 462 ownerTid.send("Child responding"); 463 } 464 465 assertThrown!TidMissingException(ownerTid); 466 auto child = spawn(&fun); 467 child.send("Main calling"); 468 string res = receiveOnly!string(); 469 assert(res == "Child responding"); 470 } 471 472 // Thread Creation 473 474 private template isSpawnable(F, T...) 475 { 476 template isParamsImplicitlyConvertible(F1, F2, int i = 0) 477 { 478 alias param1 = Parameters!F1; 479 alias param2 = Parameters!F2; 480 static if (param1.length != param2.length) 481 enum isParamsImplicitlyConvertible = false; 482 else static if (param1.length == i) 483 enum isParamsImplicitlyConvertible = true; 484 else static if (is(param2[i] : param1[i])) 485 enum isParamsImplicitlyConvertible = isParamsImplicitlyConvertible!(F1, 486 F2, i + 1); 487 else 488 enum isParamsImplicitlyConvertible = false; 489 } 490 491 enum isSpawnable = isCallable!F && is(ReturnType!F : void) 492 && isParamsImplicitlyConvertible!(F, void function(T)) 493 && (isFunctionPointer!F || !hasUnsharedAliasing!F); 494 } 495 496 /** 497 * Starts `fn(args)` in a new logical thread. 498 * 499 * Executes the supplied function in a new logical thread represented by 500 * `Tid`. The calling thread is designated as the owner of the new thread. 501 * When the owner thread terminates an `OwnerTerminated` message will be 502 * sent to the new thread, causing an `OwnerTerminated` exception to be 503 * thrown on `receive()`. 504 * 505 * Params: 506 * fn = The function to execute. 507 * args = Arguments to the function. 508 * 509 * Returns: 510 * A `Tid` representing the new logical thread. 511 * 512 * Notes: 513 * `args` must not have unshared aliasing. In other words, all arguments 514 * to `fn` must either be `shared` or `immutable` or have no 515 * pointer indirection. This is necessary for enforcing isolation among 516 * threads. 517 * 518 * Similarly, if `fn` is a delegate, it must not have unshared aliases, meaning 519 * `fn` must be either `shared` or `immutable`. */ 520 Tid spawn(F, T...)(F fn, T args) 521 if (isSpawnable!(F, T)) 522 { 523 static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed."); 524 return _spawn(false, fn, args); 525 } 526 527 /// 528 @system unittest 529 { 530 static void f(string msg) 531 { 532 assert(msg == "Hello World"); 533 } 534 535 auto tid = spawn(&f, "Hello World"); 536 } 537 538 /// Fails: char[] has mutable aliasing. 539 @system unittest 540 { 541 string msg = "Hello, World!"; 542 543 static void f1(string msg) {} 544 static assert(!__traits(compiles, spawn(&f1, msg.dup))); 545 static assert( __traits(compiles, spawn(&f1, msg.idup))); 546 547 static void f2(char[] msg) {} 548 static assert(!__traits(compiles, spawn(&f2, msg.dup))); 549 static assert(!__traits(compiles, spawn(&f2, msg.idup))); 550 } 551 552 /// New thread with anonymous function 553 @system unittest 554 { 555 spawn({ 556 ownerTid.send("This is so great!"); 557 }); 558 assert(receiveOnly!string == "This is so great!"); 559 } 560 561 @system unittest 562 { 563 import core.thread : thread_joinAll; 564 565 __gshared string receivedMessage; 566 static void f1(string msg) 567 { 568 receivedMessage = msg; 569 } 570 571 auto tid1 = spawn(&f1, "Hello World"); 572 thread_joinAll; 573 assert(receivedMessage == "Hello World"); 574 } 575 576 /** 577 * Starts `fn(args)` in a logical thread and will receive a `LinkTerminated` 578 * message when the operation terminates. 579 * 580 * Executes the supplied function in a new logical thread represented by 581 * `Tid`. This new thread is linked to the calling thread so that if either 582 * it or the calling thread terminates a `LinkTerminated` message will be sent 583 * to the other, causing a `LinkTerminated` exception to be thrown on `receive()`. 584 * The owner relationship from `spawn()` is preserved as well, so if the link 585 * between threads is broken, owner termination will still result in an 586 * `OwnerTerminated` exception to be thrown on `receive()`. 587 * 588 * Params: 589 * fn = The function to execute. 590 * args = Arguments to the function. 591 * 592 * Returns: 593 * A Tid representing the new thread. 594 */ 595 Tid spawnLinked(F, T...)(F fn, T args) 596 if (isSpawnable!(F, T)) 597 { 598 static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed."); 599 return _spawn(true, fn, args); 600 } 601 602 /* 603 * 604 */ 605 private Tid _spawn(F, T...)(bool linked, F fn, T args) 606 if (isSpawnable!(F, T)) 607 { 608 // TODO: MessageList and &exec should be shared. 609 auto spawnTid = Tid(new MessageBox); 610 auto ownerTid = thisTid; 611 612 void exec() 613 { 614 thisInfo.ident = spawnTid; 615 thisInfo.owner = ownerTid; 616 fn(args); 617 } 618 619 // TODO: MessageList and &exec should be shared. 620 if (scheduler !is null) 621 scheduler.spawn(&exec); 622 else 623 { 624 auto t = new Thread(&exec); 625 t.start(); 626 } 627 thisInfo.links[spawnTid] = linked; 628 return spawnTid; 629 } 630 631 @system unittest 632 { 633 void function() fn1; 634 void function(int) fn2; 635 static assert(__traits(compiles, spawn(fn1))); 636 static assert(__traits(compiles, spawn(fn2, 2))); 637 static assert(!__traits(compiles, spawn(fn1, 1))); 638 static assert(!__traits(compiles, spawn(fn2))); 639 640 void delegate(int) shared dg1; 641 shared(void delegate(int)) dg2; 642 shared(void delegate(long) shared) dg3; 643 shared(void delegate(real, int, long) shared) dg4; 644 void delegate(int) immutable dg5; 645 void delegate(int) dg6; 646 static assert(__traits(compiles, spawn(dg1, 1))); 647 static assert(__traits(compiles, spawn(dg2, 2))); 648 static assert(__traits(compiles, spawn(dg3, 3))); 649 static assert(__traits(compiles, spawn(dg4, 4, 4, 4))); 650 static assert(__traits(compiles, spawn(dg5, 5))); 651 static assert(!__traits(compiles, spawn(dg6, 6))); 652 653 auto callable1 = new class{ void opCall(int) shared {} }; 654 auto callable2 = cast(shared) new class{ void opCall(int) shared {} }; 655 auto callable3 = new class{ void opCall(int) immutable {} }; 656 auto callable4 = cast(immutable) new class{ void opCall(int) immutable {} }; 657 auto callable5 = new class{ void opCall(int) {} }; 658 auto callable6 = cast(shared) new class{ void opCall(int) immutable {} }; 659 auto callable7 = cast(immutable) new class{ void opCall(int) shared {} }; 660 auto callable8 = cast(shared) new class{ void opCall(int) const shared {} }; 661 auto callable9 = cast(const shared) new class{ void opCall(int) shared {} }; 662 auto callable10 = cast(const shared) new class{ void opCall(int) const shared {} }; 663 auto callable11 = cast(immutable) new class{ void opCall(int) const shared {} }; 664 static assert(!__traits(compiles, spawn(callable1, 1))); 665 static assert( __traits(compiles, spawn(callable2, 2))); 666 static assert(!__traits(compiles, spawn(callable3, 3))); 667 static assert( __traits(compiles, spawn(callable4, 4))); 668 static assert(!__traits(compiles, spawn(callable5, 5))); 669 static assert(!__traits(compiles, spawn(callable6, 6))); 670 static assert(!__traits(compiles, spawn(callable7, 7))); 671 static assert( __traits(compiles, spawn(callable8, 8))); 672 static assert(!__traits(compiles, spawn(callable9, 9))); 673 static assert( __traits(compiles, spawn(callable10, 10))); 674 static assert( __traits(compiles, spawn(callable11, 11))); 675 } 676 677 /** 678 * Places the values as a message at the back of tid's message queue. 679 * 680 * Sends the supplied value to the thread represented by tid. As with 681 * $(REF spawn, std,concurrency), `T` must not have unshared aliasing. 682 */ 683 void send(T...)(Tid tid, T vals) 684 in (tid.mbox !is null) 685 { 686 static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed."); 687 _send(tid, vals); 688 } 689 690 /** 691 * Places the values as a message on the front of tid's message queue. 692 * 693 * Send a message to `tid` but place it at the front of `tid`'s message 694 * queue instead of at the back. This function is typically used for 695 * out-of-band communication, to signal exceptional conditions, etc. 696 */ 697 void prioritySend(T...)(Tid tid, T vals) 698 in (tid.mbox !is null) 699 { 700 static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed."); 701 _send(MsgType.priority, tid, vals); 702 } 703 704 /* 705 * ditto 706 */ 707 private void _send(T...)(Tid tid, T vals) 708 in (tid.mbox !is null) 709 { 710 _send(MsgType.standard, tid, vals); 711 } 712 713 /* 714 * Implementation of send. This allows parameter checking to be different for 715 * both Tid.send() and .send(). 716 */ 717 private void _send(T...)(MsgType type, Tid tid, T vals) 718 in (tid.mbox !is null) 719 { 720 auto msg = Message(type, vals); 721 tid.mbox.put(msg); 722 } 723 724 /** 725 * Receives a message from another thread. 726 * 727 * Receive a message from another thread, or block if no messages of the 728 * specified types are available. This function works by pattern matching 729 * a message against a set of delegates and executing the first match found. 730 * 731 * If a delegate that accepts a $(REF Variant, std,variant) is included as 732 * the last argument to `receive`, it will match any message that was not 733 * matched by an earlier delegate. If more than one argument is sent, 734 * the `Variant` will contain a $(REF Tuple, std,typecons) of all values 735 * sent. 736 * 737 * Params: 738 * ops = Variadic list of function pointers and delegates. Entries 739 * in this list must not occlude later entries. 740 * 741 * Throws: $(LREF OwnerTerminated) when the sending thread was terminated. 742 */ 743 void receive(T...)( T ops ) 744 in 745 { 746 assert(thisInfo.ident.mbox !is null, 747 "Cannot receive a message until a thread was spawned " 748 ~ "or thisTid was passed to a running thread."); 749 } 750 do 751 { 752 checkops( ops ); 753 754 thisInfo.ident.mbox.get( ops ); 755 } 756 757 /// 758 @system unittest 759 { 760 import std.variant : Variant; 761 762 auto process = () 763 { 764 receive( 765 (int i) { ownerTid.send(1); }, 766 (double f) { ownerTid.send(2); }, 767 (Variant v) { ownerTid.send(3); } 768 ); 769 }; 770 771 { 772 auto tid = spawn(process); 773 send(tid, 42); 774 assert(receiveOnly!int == 1); 775 } 776 777 { 778 auto tid = spawn(process); 779 send(tid, 3.14); 780 assert(receiveOnly!int == 2); 781 } 782 783 { 784 auto tid = spawn(process); 785 send(tid, "something else"); 786 assert(receiveOnly!int == 3); 787 } 788 } 789 790 @safe unittest 791 { 792 static assert( __traits( compiles, 793 { 794 receive( (Variant x) {} ); 795 receive( (int x) {}, (Variant x) {} ); 796 } ) ); 797 798 static assert( !__traits( compiles, 799 { 800 receive( (Variant x) {}, (int x) {} ); 801 } ) ); 802 803 static assert( !__traits( compiles, 804 { 805 receive( (int x) {}, (int x) {} ); 806 } ) ); 807 } 808 809 // Make sure receive() works with free functions as well. 810 version (StdUnittest) 811 { 812 private void receiveFunction(int x) {} 813 } 814 @safe unittest 815 { 816 static assert( __traits( compiles, 817 { 818 receive( &receiveFunction ); 819 receive( &receiveFunction, (Variant x) {} ); 820 } ) ); 821 } 822 823 824 private template receiveOnlyRet(T...) 825 { 826 static if ( T.length == 1 ) 827 { 828 alias receiveOnlyRet = T[0]; 829 } 830 else 831 { 832 import std.typecons : Tuple; 833 alias receiveOnlyRet = Tuple!(T); 834 } 835 } 836 837 /** 838 * Receives only messages with arguments of the specified types. 839 * 840 * Params: 841 * T = Variadic list of types to be received. 842 * 843 * Returns: The received message. If `T` has more than one entry, 844 * the message will be packed into a $(REF Tuple, std,typecons). 845 * 846 * Throws: $(LREF MessageMismatch) if a message of types other than `T` 847 * is received, 848 * $(LREF OwnerTerminated) when the sending thread was terminated. 849 */ 850 receiveOnlyRet!(T) receiveOnly(T...)() 851 in 852 { 853 assert(thisInfo.ident.mbox !is null, 854 "Cannot receive a message until a thread was spawned or thisTid was passed to a running thread."); 855 } 856 do 857 { 858 import std.format : format; 859 import std.meta : allSatisfy; 860 import std.typecons : Tuple; 861 862 Tuple!(T) ret; 863 864 thisInfo.ident.mbox.get((T val) { 865 static if (T.length) 866 { 867 static if (allSatisfy!(isAssignable, T)) 868 { 869 ret.field = val; 870 } 871 else 872 { 873 import core.lifetime : emplace; 874 emplace(&ret, val); 875 } 876 } 877 }, 878 (LinkTerminated e) { throw e; }, 879 (OwnerTerminated e) { throw e; }, 880 (Variant val) { 881 static if (T.length > 1) 882 string exp = T.stringof; 883 else 884 string exp = T[0].stringof; 885 886 throw new MessageMismatch( 887 format("Unexpected message type: expected '%s', got '%s'", exp, val.type.toString())); 888 }); 889 static if (T.length == 1) 890 return ret[0]; 891 else 892 return ret; 893 } 894 895 /// 896 @system unittest 897 { 898 auto tid = spawn( 899 { 900 assert(receiveOnly!int == 42); 901 }); 902 send(tid, 42); 903 } 904 905 /// 906 @system unittest 907 { 908 auto tid = spawn( 909 { 910 assert(receiveOnly!string == "text"); 911 }); 912 send(tid, "text"); 913 } 914 915 /// 916 @system unittest 917 { 918 struct Record { string name; int age; } 919 920 auto tid = spawn( 921 { 922 auto msg = receiveOnly!(double, Record); 923 assert(msg[0] == 0.5); 924 assert(msg[1].name == "Alice"); 925 assert(msg[1].age == 31); 926 }); 927 928 send(tid, 0.5, Record("Alice", 31)); 929 } 930 931 @system unittest 932 { 933 static void t1(Tid mainTid) 934 { 935 try 936 { 937 receiveOnly!string(); 938 mainTid.send(""); 939 } 940 catch (Throwable th) 941 { 942 mainTid.send(th.msg); 943 } 944 } 945 946 auto tid = spawn(&t1, thisTid); 947 tid.send(1); 948 string result = receiveOnly!string(); 949 assert(result == "Unexpected message type: expected 'string', got 'int'"); 950 } 951 952 // https://issues.dlang.org/show_bug.cgi?id=21663 953 @safe unittest 954 { 955 alias test = receiveOnly!(string, bool, bool); 956 } 957 958 /** 959 * Receives a message from another thread and gives up if no match 960 * arrives within a specified duration. 961 * 962 * Receive a message from another thread, or block until `duration` exceeds, 963 * if no messages of the specified types are available. This function works 964 * by pattern matching a message against a set of delegates and executing 965 * the first match found. 966 * 967 * If a delegate that accepts a $(REF Variant, std,variant) is included as 968 * the last argument, it will match any message that was not 969 * matched by an earlier delegate. If more than one argument is sent, 970 * the `Variant` will contain a $(REF Tuple, std,typecons) of all values 971 * sent. 972 * 973 * Params: 974 * duration = Duration, how long to wait. If `duration` is negative, 975 * won't wait at all. 976 * ops = Variadic list of function pointers and delegates. Entries 977 * in this list must not occlude later entries. 978 * 979 * Returns: `true` if it received a message and `false` if it timed out waiting 980 * for one. 981 * 982 * Throws: $(LREF OwnerTerminated) when the sending thread was terminated. 983 */ 984 bool receiveTimeout(T...)(Duration duration, T ops) 985 in 986 { 987 assert(thisInfo.ident.mbox !is null, 988 "Cannot receive a message until a thread was spawned or thisTid was passed to a running thread."); 989 } 990 do 991 { 992 checkops(ops); 993 994 return thisInfo.ident.mbox.get(duration, ops); 995 } 996 997 @safe unittest 998 { 999 static assert(__traits(compiles, { 1000 receiveTimeout(msecs(0), (Variant x) {}); 1001 receiveTimeout(msecs(0), (int x) {}, (Variant x) {}); 1002 })); 1003 1004 static assert(!__traits(compiles, { 1005 receiveTimeout(msecs(0), (Variant x) {}, (int x) {}); 1006 })); 1007 1008 static assert(!__traits(compiles, { 1009 receiveTimeout(msecs(0), (int x) {}, (int x) {}); 1010 })); 1011 1012 static assert(__traits(compiles, { 1013 receiveTimeout(msecs(10), (int x) {}, (Variant x) {}); 1014 })); 1015 } 1016 1017 // MessageBox Limits 1018 1019 /** 1020 * These behaviors may be specified when a mailbox is full. 1021 */ 1022 enum OnCrowding 1023 { 1024 block, /// Wait until room is available. 1025 throwException, /// Throw a $(LREF MailboxFull) exception. 1026 ignore /// Abort the send and return. 1027 } 1028 1029 private 1030 { 1031 bool onCrowdingBlock(Tid tid) @safe pure nothrow @nogc 1032 { 1033 return true; 1034 } 1035 1036 bool onCrowdingThrow(Tid tid) @safe pure 1037 { 1038 throw new MailboxFull(tid); 1039 } 1040 1041 bool onCrowdingIgnore(Tid tid) @safe pure nothrow @nogc 1042 { 1043 return false; 1044 } 1045 } 1046 1047 /** 1048 * Sets a maximum mailbox size. 1049 * 1050 * Sets a limit on the maximum number of user messages allowed in the mailbox. 1051 * If this limit is reached, the caller attempting to add a new message will 1052 * execute the behavior specified by doThis. If messages is zero, the mailbox 1053 * is unbounded. 1054 * 1055 * Params: 1056 * tid = The Tid of the thread for which this limit should be set. 1057 * messages = The maximum number of messages or zero if no limit. 1058 * doThis = The behavior executed when a message is sent to a full 1059 * mailbox. 1060 */ 1061 void setMaxMailboxSize(Tid tid, size_t messages, OnCrowding doThis) @safe pure 1062 in (tid.mbox !is null) 1063 { 1064 final switch (doThis) 1065 { 1066 case OnCrowding.block: 1067 return tid.mbox.setMaxMsgs(messages, &onCrowdingBlock); 1068 case OnCrowding.throwException: 1069 return tid.mbox.setMaxMsgs(messages, &onCrowdingThrow); 1070 case OnCrowding.ignore: 1071 return tid.mbox.setMaxMsgs(messages, &onCrowdingIgnore); 1072 } 1073 } 1074 1075 /** 1076 * Sets a maximum mailbox size. 1077 * 1078 * Sets a limit on the maximum number of user messages allowed in the mailbox. 1079 * If this limit is reached, the caller attempting to add a new message will 1080 * execute onCrowdingDoThis. If messages is zero, the mailbox is unbounded. 1081 * 1082 * Params: 1083 * tid = The Tid of the thread for which this limit should be set. 1084 * messages = The maximum number of messages or zero if no limit. 1085 * onCrowdingDoThis = The routine called when a message is sent to a full 1086 * mailbox. 1087 */ 1088 void setMaxMailboxSize(Tid tid, size_t messages, bool function(Tid) onCrowdingDoThis) 1089 in (tid.mbox !is null) 1090 { 1091 tid.mbox.setMaxMsgs(messages, onCrowdingDoThis); 1092 } 1093 1094 private 1095 { 1096 __gshared Tid[string] tidByName; 1097 __gshared string[][Tid] namesByTid; 1098 } 1099 1100 private @property Mutex registryLock() 1101 { 1102 __gshared Mutex impl; 1103 initOnce!impl(new Mutex); 1104 return impl; 1105 } 1106 1107 private void unregisterMe(ref ThreadInfo me) 1108 { 1109 if (me.ident != Tid.init) 1110 { 1111 synchronized (registryLock) 1112 { 1113 if (auto allNames = me.ident in namesByTid) 1114 { 1115 foreach (name; *allNames) 1116 tidByName.remove(name); 1117 namesByTid.remove(me.ident); 1118 } 1119 } 1120 } 1121 } 1122 1123 /** 1124 * Associates name with tid. 1125 * 1126 * Associates name with tid in a process-local map. When the thread 1127 * represented by tid terminates, any names associated with it will be 1128 * automatically unregistered. 1129 * 1130 * Params: 1131 * name = The name to associate with tid. 1132 * tid = The tid register by name. 1133 * 1134 * Returns: 1135 * true if the name is available and tid is not known to represent a 1136 * defunct thread. 1137 */ 1138 bool register(string name, Tid tid) 1139 in (tid.mbox !is null) 1140 { 1141 synchronized (registryLock) 1142 { 1143 if (name in tidByName) 1144 return false; 1145 if (tid.mbox.isClosed) 1146 return false; 1147 namesByTid[tid] ~= name; 1148 tidByName[name] = tid; 1149 return true; 1150 } 1151 } 1152 1153 /** 1154 * Removes the registered name associated with a tid. 1155 * 1156 * Params: 1157 * name = The name to unregister. 1158 * 1159 * Returns: 1160 * true if the name is registered, false if not. 1161 */ 1162 bool unregister(string name) 1163 { 1164 import std.algorithm.mutation : remove, SwapStrategy; 1165 import std.algorithm.searching : countUntil; 1166 1167 synchronized (registryLock) 1168 { 1169 if (auto tid = name in tidByName) 1170 { 1171 auto allNames = *tid in namesByTid; 1172 auto pos = countUntil(*allNames, name); 1173 remove!(SwapStrategy.unstable)(*allNames, pos); 1174 tidByName.remove(name); 1175 return true; 1176 } 1177 return false; 1178 } 1179 } 1180 1181 /** 1182 * Gets the `Tid` associated with name. 1183 * 1184 * Params: 1185 * name = The name to locate within the registry. 1186 * 1187 * Returns: 1188 * The associated `Tid` or `Tid.init` if name is not registered. 1189 */ 1190 Tid locate(string name) 1191 { 1192 synchronized (registryLock) 1193 { 1194 if (auto tid = name in tidByName) 1195 return *tid; 1196 return Tid.init; 1197 } 1198 } 1199 1200 /** 1201 * Encapsulates all implementation-level data needed for scheduling. 1202 * 1203 * When defining a $(LREF Scheduler), an instance of this struct must be associated 1204 * with each logical thread. It contains all implementation-level information 1205 * needed by the internal API. 1206 */ 1207 struct ThreadInfo 1208 { 1209 Tid ident; 1210 bool[Tid] links; 1211 Tid owner; 1212 1213 /** 1214 * Gets a thread-local instance of `ThreadInfo`. 1215 * 1216 * Gets a thread-local instance of `ThreadInfo`, which should be used as the 1217 * default instance when info is requested for a thread not created by the 1218 * `Scheduler`. 1219 */ 1220 static @property ref thisInfo() nothrow 1221 { 1222 static ThreadInfo val; 1223 return val; 1224 } 1225 1226 /** 1227 * Cleans up this ThreadInfo. 1228 * 1229 * This must be called when a scheduled thread terminates. It tears down 1230 * the messaging system for the thread and notifies interested parties of 1231 * the thread's termination. 1232 */ 1233 void cleanup() 1234 { 1235 if (ident.mbox !is null) 1236 ident.mbox.close(); 1237 foreach (tid; links.keys) 1238 _send(MsgType.linkDead, tid, ident); 1239 if (owner != Tid.init) 1240 _send(MsgType.linkDead, owner, ident); 1241 unregisterMe(this); // clean up registry entries 1242 } 1243 1244 // https://issues.dlang.org/show_bug.cgi?id=20160 1245 @system unittest 1246 { 1247 register("main_thread", thisTid()); 1248 1249 ThreadInfo t; 1250 t.cleanup(); 1251 1252 assert(locate("main_thread") == thisTid()); 1253 } 1254 } 1255 1256 /** 1257 * A `Scheduler` controls how threading is performed by spawn. 1258 * 1259 * Implementing a `Scheduler` allows the concurrency mechanism used by this 1260 * module to be customized according to different needs. By default, a call 1261 * to spawn will create a new kernel thread that executes the supplied routine 1262 * and terminates when finished. But it is possible to create `Scheduler`s that 1263 * reuse threads, that multiplex `Fiber`s (coroutines) across a single thread, 1264 * or any number of other approaches. By making the choice of `Scheduler` a 1265 * user-level option, `std.concurrency` may be used for far more types of 1266 * application than if this behavior were predefined. 1267 * 1268 * Example: 1269 * --- 1270 * import std.concurrency; 1271 * import std.stdio; 1272 * 1273 * void main() 1274 * { 1275 * scheduler = new FiberScheduler; 1276 * scheduler.start( 1277 * { 1278 * writeln("the rest of main goes here"); 1279 * }); 1280 * } 1281 * --- 1282 * 1283 * Some schedulers have a dispatching loop that must run if they are to work 1284 * properly, so for the sake of consistency, when using a scheduler, `start()` 1285 * must be called within `main()`. This yields control to the scheduler and 1286 * will ensure that any spawned threads are executed in an expected manner. 1287 */ 1288 interface Scheduler 1289 { 1290 /** 1291 * Spawns the supplied op and starts the `Scheduler`. 1292 * 1293 * This is intended to be called at the start of the program to yield all 1294 * scheduling to the active `Scheduler` instance. This is necessary for 1295 * schedulers that explicitly dispatch threads rather than simply relying 1296 * on the operating system to do so, and so start should always be called 1297 * within `main()` to begin normal program execution. 1298 * 1299 * Params: 1300 * op = A wrapper for whatever the main thread would have done in the 1301 * absence of a custom scheduler. It will be automatically executed 1302 * via a call to spawn by the `Scheduler`. 1303 */ 1304 void start(void delegate() op); 1305 1306 /** 1307 * Assigns a logical thread to execute the supplied op. 1308 * 1309 * This routine is called by spawn. It is expected to instantiate a new 1310 * logical thread and run the supplied operation. This thread must call 1311 * `thisInfo.cleanup()` when the thread terminates if the scheduled thread 1312 * is not a kernel thread--all kernel threads will have their `ThreadInfo` 1313 * cleaned up automatically by a thread-local destructor. 1314 * 1315 * Params: 1316 * op = The function to execute. This may be the actual function passed 1317 * by the user to spawn itself, or may be a wrapper function. 1318 */ 1319 void spawn(void delegate() op); 1320 1321 /** 1322 * Yields execution to another logical thread. 1323 * 1324 * This routine is called at various points within concurrency-aware APIs 1325 * to provide a scheduler a chance to yield execution when using some sort 1326 * of cooperative multithreading model. If this is not appropriate, such 1327 * as when each logical thread is backed by a dedicated kernel thread, 1328 * this routine may be a no-op. 1329 */ 1330 void yield() nothrow; 1331 1332 /** 1333 * Returns an appropriate `ThreadInfo` instance. 1334 * 1335 * Returns an instance of `ThreadInfo` specific to the logical thread that 1336 * is calling this routine or, if the calling thread was not create by 1337 * this scheduler, returns `ThreadInfo.thisInfo` instead. 1338 */ 1339 @property ref ThreadInfo thisInfo() nothrow; 1340 1341 /** 1342 * Creates a `Condition` variable analog for signaling. 1343 * 1344 * Creates a new `Condition` variable analog which is used to check for and 1345 * to signal the addition of messages to a thread's message queue. Like 1346 * yield, some schedulers may need to define custom behavior so that calls 1347 * to `Condition.wait()` yield to another thread when no new messages are 1348 * available instead of blocking. 1349 * 1350 * Params: 1351 * m = The `Mutex` that will be associated with this condition. It will be 1352 * locked prior to any operation on the condition, and so in some 1353 * cases a `Scheduler` may need to hold this reference and unlock the 1354 * mutex before yielding execution to another logical thread. 1355 */ 1356 Condition newCondition(Mutex m) nothrow; 1357 } 1358 1359 /** 1360 * An example `Scheduler` using kernel threads. 1361 * 1362 * This is an example `Scheduler` that mirrors the default scheduling behavior 1363 * of creating one kernel thread per call to spawn. It is fully functional 1364 * and may be instantiated and used, but is not a necessary part of the 1365 * default functioning of this module. 1366 */ 1367 class ThreadScheduler : Scheduler 1368 { 1369 /** 1370 * This simply runs op directly, since no real scheduling is needed by 1371 * this approach. 1372 */ 1373 void start(void delegate() op) 1374 { 1375 op(); 1376 } 1377 1378 /** 1379 * Creates a new kernel thread and assigns it to run the supplied op. 1380 */ 1381 void spawn(void delegate() op) 1382 { 1383 auto t = new Thread(op); 1384 t.start(); 1385 } 1386 1387 /** 1388 * This scheduler does no explicit multiplexing, so this is a no-op. 1389 */ 1390 void yield() nothrow 1391 { 1392 // no explicit yield needed 1393 } 1394 1395 /** 1396 * Returns `ThreadInfo.thisInfo`, since it is a thread-local instance of 1397 * `ThreadInfo`, which is the correct behavior for this scheduler. 1398 */ 1399 @property ref ThreadInfo thisInfo() nothrow 1400 { 1401 return ThreadInfo.thisInfo; 1402 } 1403 1404 /** 1405 * Creates a new `Condition` variable. No custom behavior is needed here. 1406 */ 1407 Condition newCondition(Mutex m) nothrow 1408 { 1409 return new Condition(m); 1410 } 1411 } 1412 1413 /** 1414 * An example `Scheduler` using $(MREF_ALTTEXT `Fiber`s, core, thread, fiber). 1415 * 1416 * This is an example scheduler that creates a new `Fiber` per call to spawn 1417 * and multiplexes the execution of all fibers within the main thread. 1418 */ 1419 class FiberScheduler : Scheduler 1420 { 1421 /** 1422 * This creates a new `Fiber` for the supplied op and then starts the 1423 * dispatcher. 1424 */ 1425 void start(void delegate() op) 1426 { 1427 create(op); 1428 dispatch(); 1429 } 1430 1431 /** 1432 * This created a new `Fiber` for the supplied op and adds it to the 1433 * dispatch list. 1434 */ 1435 void spawn(void delegate() op) nothrow 1436 { 1437 create(op); 1438 yield(); 1439 } 1440 1441 /** 1442 * If the caller is a scheduled `Fiber`, this yields execution to another 1443 * scheduled `Fiber`. 1444 */ 1445 void yield() nothrow 1446 { 1447 // NOTE: It's possible that we should test whether the calling Fiber 1448 // is an InfoFiber before yielding, but I think it's reasonable 1449 // that any (non-Generator) fiber should yield here. 1450 if (Fiber.getThis()) 1451 Fiber.yield(); 1452 } 1453 1454 /** 1455 * Returns an appropriate `ThreadInfo` instance. 1456 * 1457 * Returns a `ThreadInfo` instance specific to the calling `Fiber` if the 1458 * `Fiber` was created by this dispatcher, otherwise it returns 1459 * `ThreadInfo.thisInfo`. 1460 */ 1461 @property ref ThreadInfo thisInfo() nothrow 1462 { 1463 auto f = cast(InfoFiber) Fiber.getThis(); 1464 1465 if (f !is null) 1466 return f.info; 1467 return ThreadInfo.thisInfo; 1468 } 1469 1470 /** 1471 * Returns a `Condition` analog that yields when wait or notify is called. 1472 * 1473 * Bug: 1474 * For the default implementation, `notifyAll` will behave like `notify`. 1475 * 1476 * Params: 1477 * m = A `Mutex` to use for locking if the condition needs to be waited on 1478 * or notified from multiple `Thread`s. 1479 * If `null`, no `Mutex` will be used and it is assumed that the 1480 * `Condition` is only waited on/notified from one `Thread`. 1481 */ 1482 Condition newCondition(Mutex m) nothrow 1483 { 1484 return new FiberCondition(m); 1485 } 1486 1487 protected: 1488 /** 1489 * Creates a new `Fiber` which calls the given delegate. 1490 * 1491 * Params: 1492 * op = The delegate the fiber should call 1493 */ 1494 void create(void delegate() op) nothrow 1495 { 1496 void wrap() 1497 { 1498 scope (exit) 1499 { 1500 thisInfo.cleanup(); 1501 } 1502 op(); 1503 } 1504 1505 m_fibers ~= new InfoFiber(&wrap); 1506 } 1507 1508 /** 1509 * `Fiber` which embeds a `ThreadInfo` 1510 */ 1511 static class InfoFiber : Fiber 1512 { 1513 ThreadInfo info; 1514 1515 this(void delegate() op) nothrow 1516 { 1517 super(op); 1518 } 1519 1520 this(void delegate() op, size_t sz) nothrow 1521 { 1522 super(op, sz); 1523 } 1524 } 1525 1526 private: 1527 class FiberCondition : Condition 1528 { 1529 this(Mutex m) nothrow 1530 { 1531 super(m); 1532 notified = false; 1533 } 1534 1535 override void wait() nothrow 1536 { 1537 scope (exit) notified = false; 1538 1539 while (!notified) 1540 switchContext(); 1541 } 1542 1543 override bool wait(Duration period) nothrow 1544 { 1545 import core.time : MonoTime; 1546 1547 scope (exit) notified = false; 1548 1549 for (auto limit = MonoTime.currTime + period; 1550 !notified && !period.isNegative; 1551 period = limit - MonoTime.currTime) 1552 { 1553 this.outer.yield(); 1554 } 1555 return notified; 1556 } 1557 1558 override void notify() nothrow 1559 { 1560 notified = true; 1561 switchContext(); 1562 } 1563 1564 override void notifyAll() nothrow 1565 { 1566 notified = true; 1567 switchContext(); 1568 } 1569 1570 private: 1571 void switchContext() nothrow 1572 { 1573 if (mutex_nothrow) mutex_nothrow.unlock_nothrow(); 1574 scope (exit) 1575 if (mutex_nothrow) 1576 mutex_nothrow.lock_nothrow(); 1577 this.outer.yield(); 1578 } 1579 1580 bool notified; 1581 } 1582 1583 void dispatch() 1584 { 1585 import std.algorithm.mutation : remove; 1586 1587 while (m_fibers.length > 0) 1588 { 1589 auto t = m_fibers[m_pos].call(Fiber.Rethrow.no); 1590 if (t !is null && !(cast(OwnerTerminated) t)) 1591 { 1592 throw t; 1593 } 1594 if (m_fibers[m_pos].state == Fiber.State.TERM) 1595 { 1596 if (m_pos >= (m_fibers = remove(m_fibers, m_pos)).length) 1597 m_pos = 0; 1598 } 1599 else if (m_pos++ >= m_fibers.length - 1) 1600 { 1601 m_pos = 0; 1602 } 1603 } 1604 } 1605 1606 Fiber[] m_fibers; 1607 size_t m_pos; 1608 } 1609 1610 @system unittest 1611 { 1612 static void receive(Condition cond, ref size_t received) 1613 { 1614 while (true) 1615 { 1616 synchronized (cond.mutex) 1617 { 1618 cond.wait(); 1619 ++received; 1620 } 1621 } 1622 } 1623 1624 static void send(Condition cond, ref size_t sent) 1625 { 1626 while (true) 1627 { 1628 synchronized (cond.mutex) 1629 { 1630 ++sent; 1631 cond.notify(); 1632 } 1633 } 1634 } 1635 1636 auto fs = new FiberScheduler; 1637 auto mtx = new Mutex; 1638 auto cond = fs.newCondition(mtx); 1639 1640 size_t received, sent; 1641 auto waiter = new Fiber({ receive(cond, received); }), notifier = new Fiber({ send(cond, sent); }); 1642 waiter.call(); 1643 assert(received == 0); 1644 notifier.call(); 1645 assert(sent == 1); 1646 assert(received == 0); 1647 waiter.call(); 1648 assert(received == 1); 1649 waiter.call(); 1650 assert(received == 1); 1651 } 1652 1653 /** 1654 * Sets the `Scheduler` behavior within the program. 1655 * 1656 * This variable sets the `Scheduler` behavior within this program. Typically, 1657 * when setting a `Scheduler`, `scheduler.start()` should be called in `main`. This 1658 * routine will not return until program execution is complete. 1659 */ 1660 __gshared Scheduler scheduler; 1661 1662 // Generator 1663 1664 /** 1665 * If the caller is a `Fiber` and is not a $(LREF Generator), this function will call 1666 * `scheduler.yield()` or `Fiber.yield()`, as appropriate. 1667 */ 1668 void yield() nothrow 1669 { 1670 auto fiber = Fiber.getThis(); 1671 if (!(cast(IsGenerator) fiber)) 1672 { 1673 if (scheduler is null) 1674 { 1675 if (fiber) 1676 return Fiber.yield(); 1677 } 1678 else 1679 scheduler.yield(); 1680 } 1681 } 1682 1683 /// Used to determine whether a Generator is running. 1684 private interface IsGenerator {} 1685 1686 1687 /** 1688 * A Generator is a $(MREF_ALTTEXT Fiber, core, thread, fiber) 1689 * that periodically returns values of type `T` to the 1690 * caller via `yield`. This is represented as an InputRange. 1691 */ 1692 class Generator(T) : 1693 Fiber, IsGenerator, InputRange!T 1694 { 1695 /** 1696 * Initializes a generator object which is associated with a static 1697 * D function. The function will be called once to prepare the range 1698 * for iteration. 1699 * 1700 * Params: 1701 * fn = The fiber function. 1702 * 1703 * In: 1704 * fn must not be null. 1705 */ 1706 this(void function() fn) 1707 { 1708 super(fn); 1709 call(); 1710 } 1711 1712 /** 1713 * Initializes a generator object which is associated with a static 1714 * D function. The function will be called once to prepare the range 1715 * for iteration. 1716 * 1717 * Params: 1718 * fn = The fiber function. 1719 * sz = The stack size for this fiber. 1720 * 1721 * In: 1722 * fn must not be null. 1723 */ 1724 this(void function() fn, size_t sz) 1725 { 1726 super(fn, sz); 1727 call(); 1728 } 1729 1730 /** 1731 * Initializes a generator object which is associated with a static 1732 * D function. The function will be called once to prepare the range 1733 * for iteration. 1734 * 1735 * Params: 1736 * fn = The fiber function. 1737 * sz = The stack size for this fiber. 1738 * guardPageSize = size of the guard page to trap fiber's stack 1739 * overflows. Refer to $(REF Fiber, core,thread)'s 1740 * documentation for more details. 1741 * 1742 * In: 1743 * fn must not be null. 1744 */ 1745 this(void function() fn, size_t sz, size_t guardPageSize) 1746 { 1747 super(fn, sz, guardPageSize); 1748 call(); 1749 } 1750 1751 /** 1752 * Initializes a generator object which is associated with a dynamic 1753 * D function. The function will be called once to prepare the range 1754 * for iteration. 1755 * 1756 * Params: 1757 * dg = The fiber function. 1758 * 1759 * In: 1760 * dg must not be null. 1761 */ 1762 this(void delegate() dg) 1763 { 1764 super(dg); 1765 call(); 1766 } 1767 1768 /** 1769 * Initializes a generator object which is associated with a dynamic 1770 * D function. The function will be called once to prepare the range 1771 * for iteration. 1772 * 1773 * Params: 1774 * dg = The fiber function. 1775 * sz = The stack size for this fiber. 1776 * 1777 * In: 1778 * dg must not be null. 1779 */ 1780 this(void delegate() dg, size_t sz) 1781 { 1782 super(dg, sz); 1783 call(); 1784 } 1785 1786 /** 1787 * Initializes a generator object which is associated with a dynamic 1788 * D function. The function will be called once to prepare the range 1789 * for iteration. 1790 * 1791 * Params: 1792 * dg = The fiber function. 1793 * sz = The stack size for this fiber. 1794 * guardPageSize = size of the guard page to trap fiber's stack 1795 * overflows. Refer to $(REF Fiber, core,thread)'s 1796 * documentation for more details. 1797 * 1798 * In: 1799 * dg must not be null. 1800 */ 1801 this(void delegate() dg, size_t sz, size_t guardPageSize) 1802 { 1803 super(dg, sz, guardPageSize); 1804 call(); 1805 } 1806 1807 /** 1808 * Returns true if the generator is empty. 1809 */ 1810 final bool empty() @property 1811 { 1812 return m_value is null || state == State.TERM; 1813 } 1814 1815 /** 1816 * Obtains the next value from the underlying function. 1817 */ 1818 final void popFront() 1819 { 1820 call(); 1821 } 1822 1823 /** 1824 * Returns the most recently generated value by shallow copy. 1825 */ 1826 final T front() @property 1827 { 1828 return *m_value; 1829 } 1830 1831 /** 1832 * Returns the most recently generated value without executing a 1833 * copy contructor. Will not compile for element types defining a 1834 * postblit, because `Generator` does not return by reference. 1835 */ 1836 final T moveFront() 1837 { 1838 static if (!hasElaborateCopyConstructor!T) 1839 { 1840 return front; 1841 } 1842 else 1843 { 1844 static assert(0, 1845 "Fiber front is always rvalue and thus cannot be moved since it defines a postblit."); 1846 } 1847 } 1848 1849 final int opApply(scope int delegate(T) loopBody) 1850 { 1851 int broken; 1852 for (; !empty; popFront()) 1853 { 1854 broken = loopBody(front); 1855 if (broken) break; 1856 } 1857 return broken; 1858 } 1859 1860 final int opApply(scope int delegate(size_t, T) loopBody) 1861 { 1862 int broken; 1863 for (size_t i; !empty; ++i, popFront()) 1864 { 1865 broken = loopBody(i, front); 1866 if (broken) break; 1867 } 1868 return broken; 1869 } 1870 private: 1871 T* m_value; 1872 } 1873 1874 /// 1875 @system unittest 1876 { 1877 auto tid = spawn({ 1878 int i; 1879 while (i < 9) 1880 i = receiveOnly!int; 1881 1882 ownerTid.send(i * 2); 1883 }); 1884 1885 auto r = new Generator!int({ 1886 foreach (i; 1 .. 10) 1887 yield(i); 1888 }); 1889 1890 foreach (e; r) 1891 tid.send(e); 1892 1893 assert(receiveOnly!int == 18); 1894 } 1895 1896 /** 1897 * Yields a value of type T to the caller of the currently executing 1898 * generator. 1899 * 1900 * Params: 1901 * value = The value to yield. 1902 */ 1903 void yield(T)(ref T value) 1904 { 1905 Generator!T cur = cast(Generator!T) Fiber.getThis(); 1906 if (cur !is null && cur.state == Fiber.State.EXEC) 1907 { 1908 cur.m_value = &value; 1909 return Fiber.yield(); 1910 } 1911 throw new Exception("yield(T) called with no active generator for the supplied type"); 1912 } 1913 1914 /// ditto 1915 void yield(T)(T value) 1916 { 1917 yield(value); 1918 } 1919 1920 @system unittest 1921 { 1922 import core.exception; 1923 import std.exception; 1924 1925 auto mainTid = thisTid; 1926 alias testdg = () { 1927 auto tid = spawn( 1928 (Tid mainTid) { 1929 int i; 1930 scope (failure) mainTid.send(false); 1931 try 1932 { 1933 for (i = 1; i < 10; i++) 1934 { 1935 if (receiveOnly!int() != i) 1936 { 1937 mainTid.send(false); 1938 break; 1939 } 1940 } 1941 } 1942 catch (OwnerTerminated e) 1943 { 1944 // i will advance 1 past the last value expected 1945 mainTid.send(i == 4); 1946 } 1947 }, mainTid); 1948 auto r = new Generator!int( 1949 { 1950 assertThrown!Exception(yield(2.0)); 1951 yield(); // ensure this is a no-op 1952 yield(1); 1953 yield(); // also once something has been yielded 1954 yield(2); 1955 yield(3); 1956 }); 1957 1958 foreach (e; r) 1959 { 1960 tid.send(e); 1961 } 1962 }; 1963 1964 scheduler = new ThreadScheduler; 1965 scheduler.spawn(testdg); 1966 assert(receiveOnly!bool()); 1967 1968 scheduler = new FiberScheduler; 1969 scheduler.start(testdg); 1970 assert(receiveOnly!bool()); 1971 scheduler = null; 1972 } 1973 /// 1974 @system unittest 1975 { 1976 import std.range; 1977 1978 InputRange!int myIota = iota(10).inputRangeObject; 1979 1980 myIota.popFront(); 1981 myIota.popFront(); 1982 assert(myIota.moveFront == 2); 1983 assert(myIota.front == 2); 1984 myIota.popFront(); 1985 assert(myIota.front == 3); 1986 1987 //can be assigned to std.range.interfaces.InputRange directly 1988 myIota = new Generator!int( 1989 { 1990 foreach (i; 0 .. 10) yield(i); 1991 }); 1992 1993 myIota.popFront(); 1994 myIota.popFront(); 1995 assert(myIota.moveFront == 2); 1996 assert(myIota.front == 2); 1997 myIota.popFront(); 1998 assert(myIota.front == 3); 1999 2000 size_t[2] counter = [0, 0]; 2001 foreach (i, unused; myIota) counter[] += [1, i]; 2002 2003 assert(myIota.empty); 2004 assert(counter == [7, 21]); 2005 } 2006 2007 private 2008 { 2009 /* 2010 * A MessageBox is a message queue for one thread. Other threads may send 2011 * messages to this owner by calling put(), and the owner receives them by 2012 * calling get(). The put() call is therefore effectively shared and the 2013 * get() call is effectively local. setMaxMsgs may be used by any thread 2014 * to limit the size of the message queue. 2015 */ 2016 class MessageBox 2017 { 2018 this() @trusted nothrow /* TODO: make @safe after relevant druntime PR gets merged */ 2019 { 2020 m_lock = new Mutex; 2021 m_closed = false; 2022 2023 if (scheduler is null) 2024 { 2025 m_putMsg = new Condition(m_lock); 2026 m_notFull = new Condition(m_lock); 2027 } 2028 else 2029 { 2030 m_putMsg = scheduler.newCondition(m_lock); 2031 m_notFull = scheduler.newCondition(m_lock); 2032 } 2033 } 2034 2035 /// 2036 final @property bool isClosed() @safe @nogc pure 2037 { 2038 synchronized (m_lock) 2039 { 2040 return m_closed; 2041 } 2042 } 2043 2044 /* 2045 * Sets a limit on the maximum number of user messages allowed in the 2046 * mailbox. If this limit is reached, the caller attempting to add 2047 * a new message will execute call. If num is zero, there is no limit 2048 * on the message queue. 2049 * 2050 * Params: 2051 * num = The maximum size of the queue or zero if the queue is 2052 * unbounded. 2053 * call = The routine to call when the queue is full. 2054 */ 2055 final void setMaxMsgs(size_t num, bool function(Tid) call) @safe @nogc pure 2056 { 2057 synchronized (m_lock) 2058 { 2059 m_maxMsgs = num; 2060 m_onMaxMsgs = call; 2061 } 2062 } 2063 2064 /* 2065 * If maxMsgs is not set, the message is added to the queue and the 2066 * owner is notified. If the queue is full, the message will still be 2067 * accepted if it is a control message, otherwise onCrowdingDoThis is 2068 * called. If the routine returns true, this call will block until 2069 * the owner has made space available in the queue. If it returns 2070 * false, this call will abort. 2071 * 2072 * Params: 2073 * msg = The message to put in the queue. 2074 * 2075 * Throws: 2076 * An exception if the queue is full and onCrowdingDoThis throws. 2077 */ 2078 final void put(ref Message msg) 2079 { 2080 synchronized (m_lock) 2081 { 2082 // TODO: Generate an error here if m_closed is true, or maybe 2083 // put a message in the caller's queue? 2084 if (!m_closed) 2085 { 2086 while (true) 2087 { 2088 if (isPriorityMsg(msg)) 2089 { 2090 m_sharedPty.put(msg); 2091 m_putMsg.notify(); 2092 return; 2093 } 2094 if (!mboxFull() || isControlMsg(msg)) 2095 { 2096 m_sharedBox.put(msg); 2097 m_putMsg.notify(); 2098 return; 2099 } 2100 if (m_onMaxMsgs !is null && !m_onMaxMsgs(thisTid)) 2101 { 2102 return; 2103 } 2104 m_putQueue++; 2105 m_notFull.wait(); 2106 m_putQueue--; 2107 } 2108 } 2109 } 2110 } 2111 2112 /* 2113 * Matches ops against each message in turn until a match is found. 2114 * 2115 * Params: 2116 * ops = The operations to match. Each may return a bool to indicate 2117 * whether a message with a matching type is truly a match. 2118 * 2119 * Returns: 2120 * true if a message was retrieved and false if not (such as if a 2121 * timeout occurred). 2122 * 2123 * Throws: 2124 * LinkTerminated if a linked thread terminated, or OwnerTerminated 2125 * if the owner thread terminates and no existing messages match the 2126 * supplied ops. 2127 */ 2128 bool get(T...)(scope T vals) 2129 { 2130 import std.meta : AliasSeq; 2131 2132 static assert(T.length, "T must not be empty"); 2133 2134 static if (is(T[0] : Duration)) 2135 { 2136 alias Ops = AliasSeq!(T[1 .. $]); 2137 alias ops = vals[1 .. $]; 2138 enum timedWait = true; 2139 Duration period = vals[0]; 2140 } 2141 else 2142 { 2143 alias Ops = AliasSeq!(T); 2144 alias ops = vals[0 .. $]; 2145 enum timedWait = false; 2146 } 2147 2148 bool onStandardMsg(ref Message msg) 2149 { 2150 foreach (i, t; Ops) 2151 { 2152 alias Args = Parameters!(t); 2153 auto op = ops[i]; 2154 2155 if (msg.convertsTo!(Args)) 2156 { 2157 alias RT = ReturnType!(t); 2158 static if (is(RT == bool)) 2159 { 2160 return msg.map(op); 2161 } 2162 else 2163 { 2164 msg.map(op); 2165 static if (!is(immutable RT == immutable noreturn)) 2166 return true; 2167 } 2168 } 2169 } 2170 return false; 2171 } 2172 2173 bool onLinkDeadMsg(ref Message msg) 2174 { 2175 assert(msg.convertsTo!(Tid), 2176 "Message could be converted to Tid"); 2177 auto tid = msg.get!(Tid); 2178 2179 if (bool* pDepends = tid in thisInfo.links) 2180 { 2181 auto depends = *pDepends; 2182 thisInfo.links.remove(tid); 2183 // Give the owner relationship precedence. 2184 if (depends && tid != thisInfo.owner) 2185 { 2186 auto e = new LinkTerminated(tid); 2187 auto m = Message(MsgType.standard, e); 2188 if (onStandardMsg(m)) 2189 return true; 2190 throw e; 2191 } 2192 } 2193 if (tid == thisInfo.owner) 2194 { 2195 thisInfo.owner = Tid.init; 2196 auto e = new OwnerTerminated(tid); 2197 auto m = Message(MsgType.standard, e); 2198 if (onStandardMsg(m)) 2199 return true; 2200 throw e; 2201 } 2202 return false; 2203 } 2204 2205 bool onControlMsg(ref Message msg) 2206 { 2207 switch (msg.type) 2208 { 2209 case MsgType.linkDead: 2210 return onLinkDeadMsg(msg); 2211 default: 2212 return false; 2213 } 2214 } 2215 2216 bool scan(ref ListT list) 2217 { 2218 for (auto range = list[]; !range.empty;) 2219 { 2220 // Only the message handler will throw, so if this occurs 2221 // we can be certain that the message was handled. 2222 scope (failure) 2223 list.removeAt(range); 2224 2225 if (isControlMsg(range.front)) 2226 { 2227 if (onControlMsg(range.front)) 2228 { 2229 // Although the linkDead message is a control message, 2230 // it can be handled by the user. Since the linkDead 2231 // message throws if not handled, if we get here then 2232 // it has been handled and we can return from receive. 2233 // This is a weird special case that will have to be 2234 // handled in a more general way if more are added. 2235 if (!isLinkDeadMsg(range.front)) 2236 { 2237 list.removeAt(range); 2238 continue; 2239 } 2240 list.removeAt(range); 2241 return true; 2242 } 2243 range.popFront(); 2244 continue; 2245 } 2246 else 2247 { 2248 if (onStandardMsg(range.front)) 2249 { 2250 list.removeAt(range); 2251 return true; 2252 } 2253 range.popFront(); 2254 continue; 2255 } 2256 } 2257 return false; 2258 } 2259 2260 bool pty(ref ListT list) 2261 { 2262 if (!list.empty) 2263 { 2264 auto range = list[]; 2265 2266 if (onStandardMsg(range.front)) 2267 { 2268 list.removeAt(range); 2269 return true; 2270 } 2271 if (range.front.convertsTo!(Throwable)) 2272 throw range.front.get!(Throwable); 2273 else if (range.front.convertsTo!(shared(Throwable))) 2274 /* Note: a shared type can be caught without the shared qualifier 2275 * so throwing shared will be an error */ 2276 throw cast() range.front.get!(shared(Throwable)); 2277 else 2278 throw new PriorityMessageException(range.front.data); 2279 } 2280 return false; 2281 } 2282 2283 static if (timedWait) 2284 { 2285 import core.time : MonoTime; 2286 auto limit = MonoTime.currTime + period; 2287 } 2288 2289 while (true) 2290 { 2291 ListT arrived; 2292 2293 if (pty(m_localPty) || scan(m_localBox)) 2294 { 2295 return true; 2296 } 2297 yield(); 2298 synchronized (m_lock) 2299 { 2300 updateMsgCount(); 2301 while (m_sharedPty.empty && m_sharedBox.empty) 2302 { 2303 // NOTE: We're notifying all waiters here instead of just 2304 // a few because the onCrowding behavior may have 2305 // changed and we don't want to block sender threads 2306 // unnecessarily if the new behavior is not to block. 2307 // This will admittedly result in spurious wakeups 2308 // in other situations, but what can you do? 2309 if (m_putQueue && !mboxFull()) 2310 m_notFull.notifyAll(); 2311 static if (timedWait) 2312 { 2313 if (period <= Duration.zero || !m_putMsg.wait(period)) 2314 return false; 2315 } 2316 else 2317 { 2318 m_putMsg.wait(); 2319 } 2320 } 2321 m_localPty.put(m_sharedPty); 2322 arrived.put(m_sharedBox); 2323 } 2324 if (m_localPty.empty) 2325 { 2326 scope (exit) m_localBox.put(arrived); 2327 if (scan(arrived)) 2328 { 2329 return true; 2330 } 2331 else 2332 { 2333 static if (timedWait) 2334 { 2335 period = limit - MonoTime.currTime; 2336 } 2337 continue; 2338 } 2339 } 2340 m_localBox.put(arrived); 2341 pty(m_localPty); 2342 return true; 2343 } 2344 } 2345 2346 /* 2347 * Called on thread termination. This routine processes any remaining 2348 * control messages, clears out message queues, and sets a flag to 2349 * reject any future messages. 2350 */ 2351 final void close() 2352 { 2353 static void onLinkDeadMsg(ref Message msg) 2354 { 2355 assert(msg.convertsTo!(Tid), 2356 "Message could be converted to Tid"); 2357 auto tid = msg.get!(Tid); 2358 2359 thisInfo.links.remove(tid); 2360 if (tid == thisInfo.owner) 2361 thisInfo.owner = Tid.init; 2362 } 2363 2364 static void sweep(ref ListT list) 2365 { 2366 for (auto range = list[]; !range.empty; range.popFront()) 2367 { 2368 if (range.front.type == MsgType.linkDead) 2369 onLinkDeadMsg(range.front); 2370 } 2371 } 2372 2373 ListT arrived; 2374 2375 sweep(m_localBox); 2376 synchronized (m_lock) 2377 { 2378 arrived.put(m_sharedBox); 2379 m_closed = true; 2380 } 2381 m_localBox.clear(); 2382 sweep(arrived); 2383 } 2384 2385 private: 2386 // Routines involving local data only, no lock needed. 2387 2388 bool mboxFull() @safe @nogc pure nothrow 2389 { 2390 return m_maxMsgs && m_maxMsgs <= m_localMsgs + m_sharedBox.length; 2391 } 2392 2393 void updateMsgCount() @safe @nogc pure nothrow 2394 { 2395 m_localMsgs = m_localBox.length; 2396 } 2397 2398 bool isControlMsg(ref Message msg) @safe @nogc pure nothrow 2399 { 2400 return msg.type != MsgType.standard && msg.type != MsgType.priority; 2401 } 2402 2403 bool isPriorityMsg(ref Message msg) @safe @nogc pure nothrow 2404 { 2405 return msg.type == MsgType.priority; 2406 } 2407 2408 bool isLinkDeadMsg(ref Message msg) @safe @nogc pure nothrow 2409 { 2410 return msg.type == MsgType.linkDead; 2411 } 2412 2413 alias OnMaxFn = bool function(Tid); 2414 alias ListT = List!(Message); 2415 2416 ListT m_localBox; 2417 ListT m_localPty; 2418 2419 Mutex m_lock; 2420 Condition m_putMsg; 2421 Condition m_notFull; 2422 size_t m_putQueue; 2423 ListT m_sharedBox; 2424 ListT m_sharedPty; 2425 OnMaxFn m_onMaxMsgs; 2426 size_t m_localMsgs; 2427 size_t m_maxMsgs; 2428 bool m_closed; 2429 } 2430 2431 /* 2432 * 2433 */ 2434 struct List(T) 2435 { 2436 struct Range 2437 { 2438 import std.exception : enforce; 2439 2440 @property bool empty() const 2441 { 2442 return !m_prev.next; 2443 } 2444 2445 @property ref T front() 2446 { 2447 enforce(m_prev.next, "invalid list node"); 2448 return m_prev.next.val; 2449 } 2450 2451 @property void front(T val) 2452 { 2453 enforce(m_prev.next, "invalid list node"); 2454 m_prev.next.val = val; 2455 } 2456 2457 void popFront() 2458 { 2459 enforce(m_prev.next, "invalid list node"); 2460 m_prev = m_prev.next; 2461 } 2462 2463 private this(Node* p) 2464 { 2465 m_prev = p; 2466 } 2467 2468 private Node* m_prev; 2469 } 2470 2471 void put(T val) 2472 { 2473 put(newNode(val)); 2474 } 2475 2476 void put(ref List!(T) rhs) 2477 { 2478 if (!rhs.empty) 2479 { 2480 put(rhs.m_first); 2481 while (m_last.next !is null) 2482 { 2483 m_last = m_last.next; 2484 m_count++; 2485 } 2486 rhs.m_first = null; 2487 rhs.m_last = null; 2488 rhs.m_count = 0; 2489 } 2490 } 2491 2492 Range opSlice() 2493 { 2494 return Range(cast(Node*)&m_first); 2495 } 2496 2497 void removeAt(Range r) 2498 { 2499 import std.exception : enforce; 2500 2501 assert(m_count, "Can not remove from empty Range"); 2502 Node* n = r.m_prev; 2503 enforce(n && n.next, "attempting to remove invalid list node"); 2504 2505 if (m_last is m_first) 2506 m_last = null; 2507 else if (m_last is n.next) 2508 m_last = n; // nocoverage 2509 Node* to_free = n.next; 2510 n.next = n.next.next; 2511 freeNode(to_free); 2512 m_count--; 2513 } 2514 2515 @property size_t length() 2516 { 2517 return m_count; 2518 } 2519 2520 void clear() 2521 { 2522 m_first = m_last = null; 2523 m_count = 0; 2524 } 2525 2526 @property bool empty() 2527 { 2528 return m_first is null; 2529 } 2530 2531 private: 2532 struct Node 2533 { 2534 Node* next; 2535 T val; 2536 2537 this(T v) 2538 { 2539 val = v; 2540 } 2541 } 2542 2543 static shared struct SpinLock 2544 { 2545 void lock() { while (!cas(&locked, false, true)) { Thread.yield(); } } 2546 void unlock() { atomicStore!(MemoryOrder.rel)(locked, false); } 2547 bool locked; 2548 } 2549 2550 static shared SpinLock sm_lock; 2551 static shared Node* sm_head; 2552 2553 Node* newNode(T v) 2554 { 2555 Node* n; 2556 { 2557 sm_lock.lock(); 2558 scope (exit) sm_lock.unlock(); 2559 2560 if (sm_head) 2561 { 2562 n = cast(Node*) sm_head; 2563 sm_head = sm_head.next; 2564 } 2565 } 2566 if (n) 2567 { 2568 import core.lifetime : emplace; 2569 emplace!Node(n, v); 2570 } 2571 else 2572 { 2573 n = new Node(v); 2574 } 2575 return n; 2576 } 2577 2578 void freeNode(Node* n) 2579 { 2580 // destroy val to free any owned GC memory 2581 destroy(n.val); 2582 2583 sm_lock.lock(); 2584 scope (exit) sm_lock.unlock(); 2585 2586 auto sn = cast(shared(Node)*) n; 2587 sn.next = sm_head; 2588 sm_head = sn; 2589 } 2590 2591 void put(Node* n) 2592 { 2593 m_count++; 2594 if (!empty) 2595 { 2596 m_last.next = n; 2597 m_last = n; 2598 return; 2599 } 2600 m_first = n; 2601 m_last = n; 2602 } 2603 2604 Node* m_first; 2605 Node* m_last; 2606 size_t m_count; 2607 } 2608 } 2609 2610 @system unittest 2611 { 2612 import std.typecons : tuple, Tuple; 2613 2614 static void testfn(Tid tid) 2615 { 2616 receive((float val) { assert(0); }, (int val, int val2) { 2617 assert(val == 42 && val2 == 86); 2618 }); 2619 receive((Tuple!(int, int) val) { assert(val[0] == 42 && val[1] == 86); }); 2620 receive((Variant val) { }); 2621 receive((string val) { 2622 if ("the quick brown fox" != val) 2623 return false; 2624 return true; 2625 }, (string val) { assert(false); }); 2626 prioritySend(tid, "done"); 2627 } 2628 2629 static void runTest(Tid tid) 2630 { 2631 send(tid, 42, 86); 2632 send(tid, tuple(42, 86)); 2633 send(tid, "hello", "there"); 2634 send(tid, "the quick brown fox"); 2635 receive((string val) { assert(val == "done"); }); 2636 } 2637 2638 static void simpleTest() 2639 { 2640 auto tid = spawn(&testfn, thisTid); 2641 runTest(tid); 2642 2643 // Run the test again with a limited mailbox size. 2644 tid = spawn(&testfn, thisTid); 2645 setMaxMailboxSize(tid, 2, OnCrowding.block); 2646 runTest(tid); 2647 } 2648 2649 simpleTest(); 2650 2651 scheduler = new ThreadScheduler; 2652 simpleTest(); 2653 scheduler = null; 2654 } 2655 2656 private @property shared(Mutex) initOnceLock() 2657 { 2658 static shared Mutex lock; 2659 if (auto mtx = atomicLoad!(MemoryOrder.acq)(lock)) 2660 return mtx; 2661 auto mtx = new shared Mutex; 2662 if (cas(&lock, cast(shared) null, mtx)) 2663 return mtx; 2664 return atomicLoad!(MemoryOrder.acq)(lock); 2665 } 2666 2667 /** 2668 * Initializes $(D_PARAM var) with the lazy $(D_PARAM init) value in a 2669 * thread-safe manner. 2670 * 2671 * The implementation guarantees that all threads simultaneously calling 2672 * initOnce with the same $(D_PARAM var) argument block until $(D_PARAM var) is 2673 * fully initialized. All side-effects of $(D_PARAM init) are globally visible 2674 * afterwards. 2675 * 2676 * Params: 2677 * var = The variable to initialize 2678 * init = The lazy initializer value 2679 * 2680 * Returns: 2681 * A reference to the initialized variable 2682 */ 2683 auto ref initOnce(alias var)(lazy typeof(var) init) 2684 { 2685 return initOnce!var(init, initOnceLock); 2686 } 2687 2688 /// A typical use-case is to perform lazy but thread-safe initialization. 2689 @system unittest 2690 { 2691 static class MySingleton 2692 { 2693 static MySingleton instance() 2694 { 2695 __gshared MySingleton inst; 2696 return initOnce!inst(new MySingleton); 2697 } 2698 } 2699 2700 assert(MySingleton.instance !is null); 2701 } 2702 2703 @system unittest 2704 { 2705 static class MySingleton 2706 { 2707 static MySingleton instance() 2708 { 2709 __gshared MySingleton inst; 2710 return initOnce!inst(new MySingleton); 2711 } 2712 2713 private: 2714 this() { val = ++cnt; } 2715 size_t val; 2716 __gshared size_t cnt; 2717 } 2718 2719 foreach (_; 0 .. 10) 2720 spawn({ ownerTid.send(MySingleton.instance.val); }); 2721 foreach (_; 0 .. 10) 2722 assert(receiveOnly!size_t == MySingleton.instance.val); 2723 assert(MySingleton.cnt == 1); 2724 } 2725 2726 /** 2727 * Same as above, but takes a separate mutex instead of sharing one among 2728 * all initOnce instances. 2729 * 2730 * This should be used to avoid dead-locks when the $(D_PARAM init) 2731 * expression waits for the result of another thread that might also 2732 * call initOnce. Use with care. 2733 * 2734 * Params: 2735 * var = The variable to initialize 2736 * init = The lazy initializer value 2737 * mutex = A mutex to prevent race conditions 2738 * 2739 * Returns: 2740 * A reference to the initialized variable 2741 */ 2742 auto ref initOnce(alias var)(lazy typeof(var) init, shared Mutex mutex) 2743 { 2744 // check that var is global, can't take address of a TLS variable 2745 static assert(is(typeof({ __gshared p = &var; })), 2746 "var must be 'static shared' or '__gshared'."); 2747 import core.atomic : atomicLoad, MemoryOrder, atomicStore; 2748 2749 static shared bool flag; 2750 if (!atomicLoad!(MemoryOrder.acq)(flag)) 2751 { 2752 synchronized (mutex) 2753 { 2754 if (!atomicLoad!(MemoryOrder.raw)(flag)) 2755 { 2756 var = init; 2757 static if (!is(immutable typeof(var) == immutable noreturn)) 2758 atomicStore!(MemoryOrder.rel)(flag, true); 2759 } 2760 } 2761 } 2762 return var; 2763 } 2764 2765 /// ditto 2766 auto ref initOnce(alias var)(lazy typeof(var) init, Mutex mutex) 2767 { 2768 return initOnce!var(init, cast(shared) mutex); 2769 } 2770 2771 /// Use a separate mutex when init blocks on another thread that might also call initOnce. 2772 @system unittest 2773 { 2774 import core.sync.mutex : Mutex; 2775 2776 static shared bool varA, varB; 2777 static shared Mutex m; 2778 m = new shared Mutex; 2779 2780 spawn({ 2781 // use a different mutex for varB to avoid a dead-lock 2782 initOnce!varB(true, m); 2783 ownerTid.send(true); 2784 }); 2785 // init depends on the result of the spawned thread 2786 initOnce!varA(receiveOnly!bool); 2787 assert(varA == true); 2788 assert(varB == true); 2789 } 2790 2791 @system unittest 2792 { 2793 static shared bool a; 2794 __gshared bool b; 2795 static bool c; 2796 bool d; 2797 initOnce!a(true); 2798 initOnce!b(true); 2799 static assert(!__traits(compiles, initOnce!c(true))); // TLS 2800 static assert(!__traits(compiles, initOnce!d(true))); // local variable 2801 } 2802 2803 // test ability to send shared arrays 2804 @system unittest 2805 { 2806 static shared int[] x = new shared(int)[1]; 2807 auto tid = spawn({ 2808 auto arr = receiveOnly!(shared(int)[]); 2809 arr[0] = 5; 2810 ownerTid.send(true); 2811 }); 2812 tid.send(x); 2813 receiveOnly!(bool); 2814 assert(x[0] == 5); 2815 } 2816 2817 // https://issues.dlang.org/show_bug.cgi?id=13930 2818 @system unittest 2819 { 2820 immutable aa = ["0":0]; 2821 thisTid.send(aa); 2822 receiveOnly!(immutable int[string]); // compile error 2823 } 2824 2825 // https://issues.dlang.org/show_bug.cgi?id=19345 2826 @system unittest 2827 { 2828 static struct Aggregate { const int a; const int[5] b; } 2829 static void t1(Tid mainTid) 2830 { 2831 const sendMe = Aggregate(42, [1, 2, 3, 4, 5]); 2832 mainTid.send(sendMe); 2833 } 2834 2835 spawn(&t1, thisTid); 2836 auto result1 = receiveOnly!(const Aggregate)(); 2837 immutable expected = Aggregate(42, [1, 2, 3, 4, 5]); 2838 assert(result1 == expected); 2839 } 2840 2841 // Noreturn support 2842 @system unittest 2843 { 2844 static noreturn foo(int) { throw new Exception(""); } 2845 2846 if (false) spawn(&foo, 1); 2847 if (false) spawnLinked(&foo, 1); 2848 2849 if (false) receive(&foo); 2850 if (false) receiveTimeout(Duration.init, &foo); 2851 2852 // Wrapped in __traits(compiles) to skip codegen which crashes dmd's backend 2853 static assert(__traits(compiles, receiveOnly!noreturn() )); 2854 static assert(__traits(compiles, send(Tid.init, noreturn.init) )); 2855 static assert(__traits(compiles, prioritySend(Tid.init, noreturn.init) )); 2856 static assert(__traits(compiles, yield(noreturn.init) )); 2857 2858 static assert(__traits(compiles, { 2859 __gshared noreturn n; 2860 initOnce!n(noreturn.init); 2861 })); 2862 }