1 /** 2 * $(SCRIPT inhibitQuickIndex = 1;) 3 * $(DIVC quickindex, 4 * $(BOOKTABLE, 5 * $(TR $(TH Category) $(TH Symbols)) 6 * $(TR $(TD Tid) $(TD 7 * $(MYREF join) 8 * $(MYREF locate) 9 * $(MYREF ownerTid) 10 * $(MYREF register) 11 * $(MYREF spawn) 12 * $(MYREF spawnLinked) 13 * $(MYREF thisTid) 14 * $(MYREF Tid) 15 * $(MYREF TidMissingException) 16 * $(MYREF unregister) 17 * )) 18 * $(TR $(TD Message passing) $(TD 19 * $(MYREF prioritySend) 20 * $(MYREF receive) 21 * $(MYREF receiveOnly) 22 * $(MYREF receiveTimeout) 23 * $(MYREF send) 24 * $(MYREF setMaxMailboxSize) 25 * )) 26 * $(TR $(TD Message-related types) $(TD 27 * $(MYREF LinkTerminated) 28 * $(MYREF MailboxFull) 29 * $(MYREF MessageMismatch) 30 * $(MYREF OnCrowding) 31 * $(MYREF OwnerTerminated) 32 * $(MYREF PriorityMessageException) 33 * )) 34 * $(TR $(TD Scheduler) $(TD 35 * $(MYREF FiberScheduler) 36 * $(MYREF Generator) 37 * $(MYREF Scheduler) 38 * $(MYREF scheduler) 39 * $(MYREF ThreadInfo) 40 * $(MYREF ThreadScheduler) 41 * $(MYREF yield) 42 * )) 43 * $(TR $(TD Misc) $(TD 44 * $(MYREF initOnce) 45 * )) 46 * )) 47 * 48 * This is a low-level messaging API upon which more structured or restrictive 49 * APIs may be built. The general idea is that every messageable entity is 50 * represented by a common handle type called a `Tid`, which allows messages to 51 * be sent to logical threads that are executing in both the current process 52 * and in external processes using the same interface. This is an important 53 * aspect of scalability because it allows the components of a program to be 54 * spread across available resources with few to no changes to the actual 55 * implementation. 56 * 57 * A logical thread is an execution context that has its own stack and which 58 * runs asynchronously to other logical threads. These may be preemptively 59 * scheduled kernel threads, $(MREF_ALTTEXT fibers, core, thread, fiber) 60 * (cooperative user-space threads), or some other concept with similar behavior. 61 * 62 * The type of concurrency used when logical threads are created is determined 63 * by the $(LREF Scheduler) selected at initialization time. The default behavior is 64 * currently to create a new kernel thread per call to spawn, but other 65 * schedulers are available that multiplex fibers across the main thread or 66 * use some combination of the two approaches. 67 * 68 * Copyright: Copyright Sean Kelly 2009 - 2014. 69 * License: <a href="http://www.boost.org/LICENSE_1_0.txt">Boost License 1.0</a>. 70 * Authors: Sean Kelly, Alex Rønne Petersen, Martin Nowak 71 * Source: $(PHOBOSSRC std/concurrency.d) 72 */ 73 /* Copyright Sean Kelly 2009 - 2014. 74 * Distributed under the Boost Software License, Version 1.0. 75 * (See accompanying file LICENSE_1_0.txt or copy at 76 * http://www.boost.org/LICENSE_1_0.txt) 77 */ 78 module std.concurrency; 79 80 public import std.variant; 81 82 import core.atomic; 83 import core.sync.condition; 84 import core.sync.mutex; 85 import core.thread; 86 import std.range.primitives; 87 import std.range.interfaces : InputRange; 88 import std.traits; 89 90 /// 91 @system unittest 92 { 93 __gshared string received; 94 static void spawnedFunc(Tid ownerTid) 95 { 96 import std.conv : text; 97 // Receive a message from the owner thread. 98 receive((int i){ 99 received = text("Received the number ", i); 100 101 // Send a message back to the owner thread 102 // indicating success. 103 send(ownerTid, true); 104 }); 105 } 106 107 // Start spawnedFunc in a new thread. 108 auto childTid = spawn(&spawnedFunc, thisTid); 109 110 // Send the number 42 to this new thread. 111 send(childTid, 42); 112 113 // Receive the result code. 114 auto wasSuccessful = receiveOnly!(bool); 115 assert(wasSuccessful); 116 assert(received == "Received the number 42"); 117 } 118 119 private 120 { 121 bool hasLocalAliasing(Types...)() 122 { 123 import std.typecons : Rebindable; 124 125 // Works around "statement is not reachable" 126 bool doesIt = false; 127 static foreach (T; Types) 128 { 129 static if (is(T == Tid)) 130 { /* Allowed */ } 131 else static if (is(T : Rebindable!R, R)) 132 doesIt |= hasLocalAliasing!R; 133 else static if (is(T == struct)) 134 doesIt |= hasLocalAliasing!(typeof(T.tupleof)); 135 else 136 doesIt |= std.traits.hasUnsharedAliasing!(T); 137 } 138 return doesIt; 139 } 140 141 @safe unittest 142 { 143 static struct Container { Tid t; } 144 static assert(!hasLocalAliasing!(Tid, Container, int)); 145 } 146 147 // https://issues.dlang.org/show_bug.cgi?id=20097 148 @safe unittest 149 { 150 import std.datetime.systime : SysTime; 151 static struct Container { SysTime time; } 152 static assert(!hasLocalAliasing!(SysTime, Container)); 153 } 154 155 enum MsgType 156 { 157 standard, 158 priority, 159 linkDead, 160 } 161 162 struct Message 163 { 164 MsgType type; 165 Variant data; 166 167 this(T...)(MsgType t, T vals) 168 if (T.length > 0) 169 { 170 static if (T.length == 1) 171 { 172 type = t; 173 data = vals[0]; 174 } 175 else 176 { 177 import std.typecons : Tuple; 178 179 type = t; 180 data = Tuple!(T)(vals); 181 } 182 } 183 184 @property auto convertsTo(T...)() 185 { 186 static if (T.length == 1) 187 { 188 return is(T[0] == Variant) || data.convertsTo!(T); 189 } 190 else 191 { 192 import std.typecons : Tuple; 193 return data.convertsTo!(Tuple!(T)); 194 } 195 } 196 197 @property auto get(T...)() 198 { 199 static if (T.length == 1) 200 { 201 static if (is(T[0] == Variant)) 202 return data; 203 else 204 return data.get!(T); 205 } 206 else 207 { 208 import std.typecons : Tuple; 209 return data.get!(Tuple!(T)); 210 } 211 } 212 213 auto map(Op)(Op op) 214 { 215 alias Args = Parameters!(Op); 216 217 static if (Args.length == 1) 218 { 219 static if (is(Args[0] == Variant)) 220 return op(data); 221 else 222 return op(data.get!(Args)); 223 } 224 else 225 { 226 import std.typecons : Tuple; 227 return op(data.get!(Tuple!(Args)).expand); 228 } 229 } 230 } 231 232 void checkops(T...)(T ops) 233 { 234 import std.format : format; 235 236 foreach (i, t1; T) 237 { 238 static assert(isFunctionPointer!t1 || isDelegate!t1, 239 format!"T %d is not a function pointer or delegates"(i)); 240 alias a1 = Parameters!(t1); 241 alias r1 = ReturnType!(t1); 242 243 static if (i < T.length - 1 && is(r1 == void)) 244 { 245 static assert(a1.length != 1 || !is(a1[0] == Variant), 246 "function with arguments " ~ a1.stringof ~ 247 " occludes successive function"); 248 249 foreach (t2; T[i + 1 .. $]) 250 { 251 alias a2 = Parameters!(t2); 252 253 static assert(!is(a1 == a2), 254 "function with arguments " ~ a1.stringof ~ " occludes successive function"); 255 } 256 } 257 } 258 } 259 260 @property ref ThreadInfo thisInfo() nothrow 261 { 262 import core.atomic : atomicLoad; 263 264 auto localScheduler = atomicLoad(scheduler); 265 if (localScheduler is null) 266 return ThreadInfo.thisInfo; 267 return localScheduler.thisInfo; 268 } 269 } 270 271 static ~this() 272 { 273 thisInfo.cleanup(); 274 } 275 276 // Exceptions 277 278 /** 279 * Thrown on calls to $(LREF receiveOnly) if a message other than the type 280 * the receiving thread expected is sent. 281 */ 282 class MessageMismatch : Exception 283 { 284 /// 285 this(string msg = "Unexpected message type") @safe pure nothrow @nogc 286 { 287 super(msg); 288 } 289 } 290 291 /** 292 * Thrown on calls to $(LREF receive) if the thread that spawned the receiving 293 * thread has terminated and no more messages exist. 294 */ 295 class OwnerTerminated : Exception 296 { 297 /// 298 this(Tid t, string msg = "Owner terminated") @safe pure nothrow @nogc 299 { 300 super(msg); 301 tid = t; 302 } 303 304 Tid tid; 305 } 306 307 /** 308 * Thrown if a linked thread has terminated. 309 */ 310 class LinkTerminated : Exception 311 { 312 /// 313 this(Tid t, string msg = "Link terminated") @safe pure nothrow @nogc 314 { 315 super(msg); 316 tid = t; 317 } 318 319 Tid tid; 320 } 321 322 /** 323 * Thrown if a message was sent to a thread via 324 * $(REF prioritySend, std,concurrency) and the receiver does not have a handler 325 * for a message of this type. 326 */ 327 class PriorityMessageException : Exception 328 { 329 /// 330 this(Variant vals) 331 { 332 super("Priority message"); 333 message = vals; 334 } 335 336 /** 337 * The message that was sent. 338 */ 339 Variant message; 340 } 341 342 /** 343 * Thrown on mailbox crowding if the mailbox is configured with 344 * `OnCrowding.throwException`. 345 */ 346 class MailboxFull : Exception 347 { 348 /// 349 this(Tid t, string msg = "Mailbox full") @safe pure nothrow @nogc 350 { 351 super(msg); 352 tid = t; 353 } 354 355 Tid tid; 356 } 357 358 /** 359 * Thrown when a `Tid` is missing, e.g. when $(LREF ownerTid) doesn't 360 * find an owner thread. 361 */ 362 class TidMissingException : Exception 363 { 364 import std.exception : basicExceptionCtors; 365 /// 366 mixin basicExceptionCtors; 367 } 368 369 370 // Thread ID 371 372 373 /** 374 * An opaque type used to represent a logical thread. 375 */ 376 struct Tid 377 { 378 private: 379 this(MessageBox m) @safe pure nothrow @nogc 380 { 381 mbox = m; 382 } 383 384 MessageBox mbox; 385 386 public: 387 388 /** 389 * Generate a convenient string for identifying this `Tid`. This is only 390 * useful to see if `Tid`'s that are currently executing are the same or 391 * different, e.g. for logging and debugging. It is potentially possible 392 * that a `Tid` executed in the future will have the same `toString()` output 393 * as another `Tid` that has already terminated. 394 */ 395 void toString(W)(ref W w) const 396 { 397 import std.format.write : formattedWrite; 398 auto p = () @trusted { return cast(void*) mbox; }(); 399 formattedWrite(w, "Tid(%x)", p); 400 } 401 402 } 403 404 @safe unittest 405 { 406 import std.conv : text; 407 Tid tid; 408 assert(text(tid) == "Tid(0)"); 409 auto tid2 = thisTid; 410 assert(text(tid2) != "Tid(0)"); 411 auto tid3 = tid2; 412 assert(text(tid2) == text(tid3)); 413 } 414 415 // https://issues.dlang.org/show_bug.cgi?id=21512 416 @system unittest 417 { 418 import std.format : format; 419 420 const(Tid) b = spawn(() {}); 421 assert(format!"%s"(b)[0 .. 4] == "Tid("); 422 } 423 424 /** 425 * Returns: The `Tid` of the caller's thread. 426 */ 427 @property Tid thisTid() @safe 428 { 429 // TODO: remove when concurrency is safe 430 static auto trus() @trusted 431 { 432 if (thisInfo.ident != Tid.init) 433 return thisInfo.ident; 434 thisInfo.ident = Tid(new MessageBox); 435 return thisInfo.ident; 436 } 437 438 return trus(); 439 } 440 441 /** 442 * Return the `Tid` of the thread which spawned the caller's thread. 443 * 444 * Throws: A `TidMissingException` exception if 445 * there is no owner thread. 446 */ 447 @property Tid ownerTid() 448 { 449 import std.exception : enforce; 450 451 enforce!TidMissingException(thisInfo.owner.mbox !is null, "Error: Thread has no owner thread."); 452 return thisInfo.owner; 453 } 454 455 @system unittest 456 { 457 import std.exception : assertThrown; 458 459 static void fun() 460 { 461 string res = receiveOnly!string(); 462 assert(res == "Main calling"); 463 ownerTid.send("Child responding"); 464 } 465 466 assertThrown!TidMissingException(ownerTid); 467 auto child = spawn(&fun); 468 child.send("Main calling"); 469 string res = receiveOnly!string(); 470 assert(res == "Child responding"); 471 } 472 473 // Thread Creation 474 475 private template isSpawnable(F, T...) 476 { 477 template isParamsImplicitlyConvertible(F1, F2, int i = 0) 478 { 479 alias param1 = Parameters!F1; 480 alias param2 = Parameters!F2; 481 static if (param1.length != param2.length) 482 enum isParamsImplicitlyConvertible = false; 483 else static if (param1.length == i) 484 enum isParamsImplicitlyConvertible = true; 485 else static if (is(param2[i] : param1[i])) 486 enum isParamsImplicitlyConvertible = isParamsImplicitlyConvertible!(F1, 487 F2, i + 1); 488 else 489 enum isParamsImplicitlyConvertible = false; 490 } 491 492 enum isSpawnable = isCallable!F && is(ReturnType!F : void) 493 && isParamsImplicitlyConvertible!(F, void function(T)) 494 && (isFunctionPointer!F || !hasUnsharedAliasing!F); 495 } 496 497 /** 498 * Starts `fn(args)` in a new logical thread. 499 * 500 * Executes the supplied function in a new logical thread represented by 501 * `Tid`. The calling thread is designated as the owner of the new thread. 502 * When the owner thread terminates an `OwnerTerminated` message will be 503 * sent to the new thread, causing an `OwnerTerminated` exception to be 504 * thrown on `receive()`. 505 * 506 * Params: 507 * fn = The function to execute. 508 * args = Arguments to the function. 509 * 510 * Returns: 511 * A `Tid` representing the new logical thread. 512 * 513 * Notes: 514 * `args` must not have unshared aliasing. In other words, all arguments 515 * to `fn` must either be `shared` or `immutable` or have no 516 * pointer indirection. This is necessary for enforcing isolation among 517 * threads. 518 * 519 * Similarly, if `fn` is a delegate, it must not have unshared aliases, meaning 520 * `fn` must be either `shared` or `immutable`. */ 521 Tid spawn(F, T...)(F fn, T args) 522 if (isSpawnable!(F, T)) 523 { 524 static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed."); 525 return _spawn(false, fn, args); 526 } 527 528 /// 529 @system unittest 530 { 531 static void f(string msg) 532 { 533 assert(msg == "Hello World"); 534 } 535 536 auto tid = spawn(&f, "Hello World"); 537 } 538 539 /// Fails: char[] has mutable aliasing. 540 @system unittest 541 { 542 string msg = "Hello, World!"; 543 544 static void f1(string msg) {} 545 static assert(!__traits(compiles, spawn(&f1, msg.dup))); 546 static assert( __traits(compiles, spawn(&f1, msg.idup))); 547 548 static void f2(char[] msg) {} 549 static assert(!__traits(compiles, spawn(&f2, msg.dup))); 550 static assert(!__traits(compiles, spawn(&f2, msg.idup))); 551 } 552 553 /// New thread with anonymous function 554 @system unittest 555 { 556 spawn({ 557 ownerTid.send("This is so great!"); 558 }); 559 assert(receiveOnly!string == "This is so great!"); 560 } 561 562 @system unittest 563 { 564 import core.thread : thread_joinAll; 565 566 __gshared string receivedMessage; 567 static void f1(string msg) 568 { 569 receivedMessage = msg; 570 } 571 572 auto tid1 = spawn(&f1, "Hello World"); 573 thread_joinAll; 574 assert(receivedMessage == "Hello World"); 575 } 576 577 /** 578 * Starts `fn(args)` in a logical thread and will receive a `LinkTerminated` 579 * message when the operation terminates. 580 * 581 * Executes the supplied function in a new logical thread represented by 582 * `Tid`. This new thread is linked to the calling thread so that if either 583 * it or the calling thread terminates a `LinkTerminated` message will be sent 584 * to the other, causing a `LinkTerminated` exception to be thrown on `receive()`. 585 * The owner relationship from `spawn()` is preserved as well, so if the link 586 * between threads is broken, owner termination will still result in an 587 * `OwnerTerminated` exception to be thrown on `receive()`. 588 * 589 * Params: 590 * fn = The function to execute. 591 * args = Arguments to the function. 592 * 593 * Returns: 594 * A Tid representing the new thread. 595 */ 596 Tid spawnLinked(F, T...)(F fn, T args) 597 if (isSpawnable!(F, T)) 598 { 599 static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed."); 600 return _spawn(true, fn, args); 601 } 602 603 /* 604 * 605 */ 606 private Tid _spawn(F, T...)(bool linked, F fn, T args) 607 if (isSpawnable!(F, T)) 608 { 609 // TODO: MessageList and &exec should be shared. 610 auto spawnTid = Tid(new MessageBox); 611 auto ownerTid = thisTid; 612 613 void exec() 614 { 615 thisInfo.ident = spawnTid; 616 thisInfo.owner = ownerTid; 617 fn(args); 618 } 619 620 // TODO: MessageList and &exec should be shared. 621 if (scheduler !is null) 622 scheduler.spawn(&exec); 623 else 624 { 625 auto t = new Thread(&exec); 626 spawnTid.mbox.m_thread = t; 627 t.start(); 628 } 629 thisInfo.links[spawnTid] = linked; 630 return spawnTid; 631 } 632 633 @system unittest 634 { 635 void function() fn1; 636 void function(int) fn2; 637 static assert(__traits(compiles, spawn(fn1))); 638 static assert(__traits(compiles, spawn(fn2, 2))); 639 static assert(!__traits(compiles, spawn(fn1, 1))); 640 static assert(!__traits(compiles, spawn(fn2))); 641 642 void delegate(int) shared dg1; 643 shared(void delegate(int)) dg2; 644 shared(void delegate(long) shared) dg3; 645 shared(void delegate(real, int, long) shared) dg4; 646 void delegate(int) immutable dg5; 647 void delegate(int) dg6; 648 static assert(__traits(compiles, spawn(dg1, 1))); 649 static assert(__traits(compiles, spawn(dg2, 2))); 650 static assert(__traits(compiles, spawn(dg3, 3))); 651 static assert(__traits(compiles, spawn(dg4, 4, 4, 4))); 652 static assert(__traits(compiles, spawn(dg5, 5))); 653 static assert(!__traits(compiles, spawn(dg6, 6))); 654 655 auto callable1 = new class{ void opCall(int) shared {} }; 656 auto callable2 = cast(shared) new class{ void opCall(int) shared {} }; 657 auto callable3 = new class{ void opCall(int) immutable {} }; 658 auto callable4 = cast(immutable) new class{ void opCall(int) immutable {} }; 659 auto callable5 = new class{ void opCall(int) {} }; 660 auto callable6 = cast(shared) new class{ void opCall(int) immutable {} }; 661 auto callable7 = cast(immutable) new class{ void opCall(int) shared {} }; 662 auto callable8 = cast(shared) new class{ void opCall(int) const shared {} }; 663 auto callable9 = cast(const shared) new class{ void opCall(int) shared {} }; 664 auto callable10 = cast(const shared) new class{ void opCall(int) const shared {} }; 665 auto callable11 = cast(immutable) new class{ void opCall(int) const shared {} }; 666 static assert(!__traits(compiles, spawn(callable1, 1))); 667 static assert( __traits(compiles, spawn(callable2, 2))); 668 static assert(!__traits(compiles, spawn(callable3, 3))); 669 static assert( __traits(compiles, spawn(callable4, 4))); 670 static assert(!__traits(compiles, spawn(callable5, 5))); 671 static assert(!__traits(compiles, spawn(callable6, 6))); 672 static assert(!__traits(compiles, spawn(callable7, 7))); 673 static assert( __traits(compiles, spawn(callable8, 8))); 674 static assert(!__traits(compiles, spawn(callable9, 9))); 675 static assert( __traits(compiles, spawn(callable10, 10))); 676 static assert( __traits(compiles, spawn(callable11, 11))); 677 } 678 679 /** 680 * Places the values as a message at the back of tid's message queue. 681 * 682 * Sends the supplied value to the thread represented by tid. As with 683 * $(REF spawn, std,concurrency), `T` must not have unshared aliasing. 684 */ 685 void send(T...)(Tid tid, T vals) 686 in (tid.mbox !is null) 687 { 688 static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed."); 689 _send(tid, vals); 690 } 691 692 /** 693 * Places the values as a message on the front of tid's message queue. 694 * 695 * Send a message to `tid` but place it at the front of `tid`'s message 696 * queue instead of at the back. This function is typically used for 697 * out-of-band communication, to signal exceptional conditions, etc. 698 */ 699 void prioritySend(T...)(Tid tid, T vals) 700 in (tid.mbox !is null) 701 { 702 static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed."); 703 _send(MsgType.priority, tid, vals); 704 } 705 706 /* 707 * ditto 708 */ 709 private void _send(T...)(Tid tid, T vals) 710 in (tid.mbox !is null) 711 { 712 _send(MsgType.standard, tid, vals); 713 } 714 715 /* 716 * Implementation of send. This allows parameter checking to be different for 717 * both Tid.send() and .send(). 718 */ 719 private void _send(T...)(MsgType type, Tid tid, T vals) 720 in (tid.mbox !is null) 721 { 722 auto msg = Message(type, vals); 723 tid.mbox.put(msg); 724 } 725 726 /** 727 * Receives a message from another thread. 728 * 729 * Receive a message from another thread, or block if no messages of the 730 * specified types are available. This function works by pattern matching 731 * a message against a set of delegates and executing the first match found. 732 * 733 * If a delegate that accepts a $(REF Variant, std,variant) is included as 734 * the last argument to `receive`, it will match any message that was not 735 * matched by an earlier delegate. If more than one argument is sent, 736 * the `Variant` will contain a $(REF Tuple, std,typecons) of all values 737 * sent. 738 * 739 * Params: 740 * ops = Variadic list of function pointers and delegates. Entries 741 * in this list must not occlude later entries. 742 * 743 * Throws: $(LREF OwnerTerminated) when the sending thread was terminated. 744 */ 745 void receive(T...)( T ops ) 746 in 747 { 748 assert(thisInfo.ident.mbox !is null, 749 "Cannot receive a message until a thread was spawned " 750 ~ "or thisTid was passed to a running thread."); 751 } 752 do 753 { 754 checkops( ops ); 755 756 thisInfo.ident.mbox.get( ops ); 757 } 758 759 /// 760 @system unittest 761 { 762 import std.variant : Variant; 763 764 auto process = () 765 { 766 receive( 767 (int i) { ownerTid.send(1); }, 768 (double f) { ownerTid.send(2); }, 769 (Variant v) { ownerTid.send(3); } 770 ); 771 }; 772 773 { 774 auto tid = spawn(process); 775 send(tid, 42); 776 assert(receiveOnly!int == 1); 777 } 778 779 { 780 auto tid = spawn(process); 781 send(tid, 3.14); 782 assert(receiveOnly!int == 2); 783 } 784 785 { 786 auto tid = spawn(process); 787 send(tid, "something else"); 788 assert(receiveOnly!int == 3); 789 } 790 } 791 792 @safe unittest 793 { 794 static assert( __traits( compiles, 795 { 796 receive( (Variant x) {} ); 797 receive( (int x) {}, (Variant x) {} ); 798 } ) ); 799 800 static assert( !__traits( compiles, 801 { 802 receive( (Variant x) {}, (int x) {} ); 803 } ) ); 804 805 static assert( !__traits( compiles, 806 { 807 receive( (int x) {}, (int x) {} ); 808 } ) ); 809 } 810 811 // Make sure receive() works with free functions as well. 812 version (StdUnittest) 813 { 814 private void receiveFunction(int x) {} 815 } 816 @safe unittest 817 { 818 static assert( __traits( compiles, 819 { 820 receive( &receiveFunction ); 821 receive( &receiveFunction, (Variant x) {} ); 822 } ) ); 823 } 824 825 826 private template receiveOnlyRet(T...) 827 { 828 static if ( T.length == 1 ) 829 { 830 alias receiveOnlyRet = T[0]; 831 } 832 else 833 { 834 import std.typecons : Tuple; 835 alias receiveOnlyRet = Tuple!(T); 836 } 837 } 838 839 /** 840 * Receives only messages with arguments of the specified types. 841 * 842 * Params: 843 * T = Variadic list of types to be received. 844 * 845 * Returns: The received message. If `T` has more than one entry, 846 * the message will be packed into a $(REF Tuple, std,typecons). 847 * 848 * Throws: $(LREF MessageMismatch) if a message of types other than `T` 849 * is received, 850 * $(LREF OwnerTerminated) when the sending thread was terminated. 851 */ 852 receiveOnlyRet!(T) receiveOnly(T...)() 853 in 854 { 855 assert(thisInfo.ident.mbox !is null, 856 "Cannot receive a message until a thread was spawned or thisTid was passed to a running thread."); 857 } 858 do 859 { 860 import std.format : format; 861 import std.meta : allSatisfy; 862 import std.typecons : Tuple; 863 864 Tuple!(T) ret; 865 866 thisInfo.ident.mbox.get((T val) { 867 static if (T.length) 868 { 869 static if (allSatisfy!(isAssignable, T)) 870 { 871 ret.field = val; 872 } 873 else 874 { 875 import core.lifetime : emplace; 876 emplace(&ret, val); 877 } 878 } 879 }, 880 (LinkTerminated e) { throw e; }, 881 (OwnerTerminated e) { throw e; }, 882 (Variant val) { 883 static if (T.length > 1) 884 string exp = T.stringof; 885 else 886 string exp = T[0].stringof; 887 888 throw new MessageMismatch( 889 format("Unexpected message type: expected '%s', got '%s'", exp, val.type.toString())); 890 }); 891 static if (T.length == 1) 892 return ret[0]; 893 else 894 return ret; 895 } 896 897 /// 898 @system unittest 899 { 900 auto tid = spawn( 901 { 902 assert(receiveOnly!int == 42); 903 }); 904 send(tid, 42); 905 } 906 907 /// 908 @system unittest 909 { 910 auto tid = spawn( 911 { 912 assert(receiveOnly!string == "text"); 913 }); 914 send(tid, "text"); 915 } 916 917 /// 918 @system unittest 919 { 920 struct Record { string name; int age; } 921 922 auto tid = spawn( 923 { 924 auto msg = receiveOnly!(double, Record); 925 assert(msg[0] == 0.5); 926 assert(msg[1].name == "Alice"); 927 assert(msg[1].age == 31); 928 }); 929 930 send(tid, 0.5, Record("Alice", 31)); 931 } 932 933 @system unittest 934 { 935 static void t1(Tid mainTid) 936 { 937 try 938 { 939 receiveOnly!string(); 940 mainTid.send(""); 941 } 942 catch (Throwable th) 943 { 944 mainTid.send(th.msg); 945 } 946 } 947 948 auto tid = spawn(&t1, thisTid); 949 tid.send(1); 950 string result = receiveOnly!string(); 951 assert(result == "Unexpected message type: expected 'string', got 'int'"); 952 } 953 954 // https://issues.dlang.org/show_bug.cgi?id=21663 955 @safe unittest 956 { 957 alias test = receiveOnly!(string, bool, bool); 958 } 959 960 /** 961 * Receives a message from another thread and gives up if no match 962 * arrives within a specified duration. 963 * 964 * Receive a message from another thread, or block until `duration` exceeds, 965 * if no messages of the specified types are available. This function works 966 * by pattern matching a message against a set of delegates and executing 967 * the first match found. 968 * 969 * If a delegate that accepts a $(REF Variant, std,variant) is included as 970 * the last argument, it will match any message that was not 971 * matched by an earlier delegate. If more than one argument is sent, 972 * the `Variant` will contain a $(REF Tuple, std,typecons) of all values 973 * sent. 974 * 975 * Params: 976 * duration = Duration, how long to wait. If `duration` is negative, 977 * won't wait at all. 978 * ops = Variadic list of function pointers and delegates. Entries 979 * in this list must not occlude later entries. 980 * 981 * Returns: `true` if it received a message and `false` if it timed out waiting 982 * for one. 983 * 984 * Throws: $(LREF OwnerTerminated) when the sending thread was terminated. 985 */ 986 bool receiveTimeout(T...)(Duration duration, T ops) 987 in 988 { 989 assert(thisInfo.ident.mbox !is null, 990 "Cannot receive a message until a thread was spawned or thisTid was passed to a running thread."); 991 } 992 do 993 { 994 checkops(ops); 995 996 return thisInfo.ident.mbox.get(duration, ops); 997 } 998 999 @safe unittest 1000 { 1001 static assert(__traits(compiles, { 1002 receiveTimeout(msecs(0), (Variant x) {}); 1003 receiveTimeout(msecs(0), (int x) {}, (Variant x) {}); 1004 })); 1005 1006 static assert(!__traits(compiles, { 1007 receiveTimeout(msecs(0), (Variant x) {}, (int x) {}); 1008 })); 1009 1010 static assert(!__traits(compiles, { 1011 receiveTimeout(msecs(0), (int x) {}, (int x) {}); 1012 })); 1013 1014 static assert(__traits(compiles, { 1015 receiveTimeout(msecs(10), (int x) {}, (Variant x) {}); 1016 })); 1017 } 1018 1019 // MessageBox Limits 1020 1021 /** 1022 * These behaviors may be specified when a mailbox is full. 1023 */ 1024 enum OnCrowding 1025 { 1026 block, /// Wait until room is available. 1027 throwException, /// Throw a $(LREF MailboxFull) exception. 1028 ignore /// Abort the send and return. 1029 } 1030 1031 private 1032 { 1033 bool onCrowdingBlock(Tid tid) @safe pure nothrow @nogc 1034 { 1035 return true; 1036 } 1037 1038 bool onCrowdingThrow(Tid tid) @safe pure 1039 { 1040 throw new MailboxFull(tid); 1041 } 1042 1043 bool onCrowdingIgnore(Tid tid) @safe pure nothrow @nogc 1044 { 1045 return false; 1046 } 1047 } 1048 1049 /** 1050 * Sets a maximum mailbox size. 1051 * 1052 * Sets a limit on the maximum number of user messages allowed in the mailbox. 1053 * If this limit is reached, the caller attempting to add a new message will 1054 * execute the behavior specified by doThis. If messages is zero, the mailbox 1055 * is unbounded. 1056 * 1057 * Params: 1058 * tid = The Tid of the thread for which this limit should be set. 1059 * messages = The maximum number of messages or zero if no limit. 1060 * doThis = The behavior executed when a message is sent to a full 1061 * mailbox. 1062 */ 1063 void setMaxMailboxSize(Tid tid, size_t messages, OnCrowding doThis) @safe pure 1064 in (tid.mbox !is null) 1065 { 1066 final switch (doThis) 1067 { 1068 case OnCrowding.block: 1069 return tid.mbox.setMaxMsgs(messages, &onCrowdingBlock); 1070 case OnCrowding.throwException: 1071 return tid.mbox.setMaxMsgs(messages, &onCrowdingThrow); 1072 case OnCrowding.ignore: 1073 return tid.mbox.setMaxMsgs(messages, &onCrowdingIgnore); 1074 } 1075 } 1076 1077 /** 1078 * Sets a maximum mailbox size. 1079 * 1080 * Sets a limit on the maximum number of user messages allowed in the mailbox. 1081 * If this limit is reached, the caller attempting to add a new message will 1082 * execute onCrowdingDoThis. If messages is zero, the mailbox is unbounded. 1083 * 1084 * Params: 1085 * tid = The Tid of the thread for which this limit should be set. 1086 * messages = The maximum number of messages or zero if no limit. 1087 * onCrowdingDoThis = The routine called when a message is sent to a full 1088 * mailbox. 1089 */ 1090 void setMaxMailboxSize(Tid tid, size_t messages, bool function(Tid) onCrowdingDoThis) 1091 in (tid.mbox !is null) 1092 { 1093 tid.mbox.setMaxMsgs(messages, onCrowdingDoThis); 1094 } 1095 1096 private 1097 { 1098 __gshared Tid[string] tidByName; 1099 __gshared string[][Tid] namesByTid; 1100 } 1101 1102 private @property Mutex registryLock() 1103 { 1104 __gshared Mutex impl; 1105 initOnce!impl(new Mutex); 1106 return impl; 1107 } 1108 1109 private void unregisterMe(ref ThreadInfo me) 1110 { 1111 if (me.ident != Tid.init) 1112 { 1113 synchronized (registryLock) 1114 { 1115 if (auto allNames = me.ident in namesByTid) 1116 { 1117 foreach (name; *allNames) 1118 tidByName.remove(name); 1119 namesByTid.remove(me.ident); 1120 } 1121 } 1122 } 1123 } 1124 1125 /** 1126 * Associates name with tid. 1127 * 1128 * Associates name with tid in a process-local map. When the thread 1129 * represented by tid terminates, any names associated with it will be 1130 * automatically unregistered. 1131 * 1132 * Params: 1133 * name = The name to associate with tid. 1134 * tid = The tid register by name. 1135 * 1136 * Returns: 1137 * true if the name is available and tid is not known to represent a 1138 * defunct thread. 1139 */ 1140 bool register(string name, Tid tid) 1141 in (tid.mbox !is null) 1142 { 1143 synchronized (registryLock) 1144 { 1145 if (name in tidByName) 1146 return false; 1147 if (tid.mbox.isClosed) 1148 return false; 1149 namesByTid[tid] ~= name; 1150 tidByName[name] = tid; 1151 return true; 1152 } 1153 } 1154 1155 /** 1156 * Removes the registered name associated with a tid. 1157 * 1158 * Params: 1159 * name = The name to unregister. 1160 * 1161 * Returns: 1162 * true if the name is registered, false if not. 1163 */ 1164 bool unregister(string name) 1165 { 1166 import std.algorithm.mutation : remove, SwapStrategy; 1167 import std.algorithm.searching : countUntil; 1168 1169 synchronized (registryLock) 1170 { 1171 if (auto tid = name in tidByName) 1172 { 1173 auto allNames = *tid in namesByTid; 1174 auto pos = countUntil(*allNames, name); 1175 remove!(SwapStrategy.unstable)(*allNames, pos); 1176 tidByName.remove(name); 1177 return true; 1178 } 1179 return false; 1180 } 1181 } 1182 1183 /** 1184 * Gets the `Tid` associated with name. 1185 * 1186 * Params: 1187 * name = The name to locate within the registry. 1188 * 1189 * Returns: 1190 * The associated `Tid` or `Tid.init` if name is not registered. 1191 */ 1192 Tid locate(string name) 1193 { 1194 synchronized (registryLock) 1195 { 1196 if (auto tid = name in tidByName) 1197 return *tid; 1198 return Tid.init; 1199 } 1200 } 1201 1202 /** 1203 * Waits for the thread associated with `tid` to complete. 1204 * 1205 * This function blocks until the thread represented by `tid` finishes executing 1206 * and then releases all OS resources associated with it (thread stack, thread-local 1207 * storage, etc.). This is essential for preventing resource leaks in long-running 1208 * applications that create many short-lived threads. 1209 * 1210 * For threads created by $(LREF spawn), this function must be called to properly 1211 * free OS resources. Without calling `join`, thread stacks (~8 MB each on typical 1212 * systems) will accumulate in virtual memory for the lifetime of the process. 1213 * 1214 * Params: 1215 * tid = The $(LREF Tid) of the thread to join. 1216 * 1217 * Throws: 1218 * $(REF ThreadError, core,thread,osthread) if the thread has already been joined 1219 * or if the thread was created by a custom $(LREF Scheduler). 1220 * 1221 * Example: 1222 * --- 1223 * auto tid = spawn(&someFunction); 1224 * // ... do other work ... 1225 * join(tid); // Wait for thread to complete and free resources 1226 * --- 1227 * 1228 * Note: 1229 * It is an error to call `join` on the same `Tid` more than once. 1230 * The function will throw if the thread was created by a $(LREF Scheduler) 1231 * rather than directly as a system thread. 1232 */ 1233 void join(Tid tid) 1234 { 1235 import core.thread.threadbase : ThreadError; 1236 1237 if (tid.mbox is null) 1238 throw new ThreadError("Cannot join Tid with null MessageBox"); 1239 1240 if (tid.mbox.m_thread is null) 1241 throw new ThreadError("Cannot join Tid created by a Scheduler"); 1242 1243 tid.mbox.m_thread.join(); 1244 } 1245 1246 /// 1247 @system unittest 1248 { 1249 import core.time : msecs; 1250 import core.thread : Thread; 1251 1252 auto tid = spawn((int x) { 1253 Thread.sleep(10.msecs); 1254 ownerTid.send(x * 2); 1255 }, 21); 1256 1257 join(tid); // Wait for thread to finish 1258 auto result = receiveOnly!int(); 1259 assert(result == 42); 1260 } 1261 1262 /** 1263 * Encapsulates all implementation-level data needed for scheduling. 1264 * 1265 * When defining a $(LREF Scheduler), an instance of this struct must be associated 1266 * with each logical thread. It contains all implementation-level information 1267 * needed by the internal API. 1268 */ 1269 struct ThreadInfo 1270 { 1271 Tid ident; 1272 bool[Tid] links; 1273 Tid owner; 1274 1275 /** 1276 * Gets a thread-local instance of `ThreadInfo`. 1277 * 1278 * Gets a thread-local instance of `ThreadInfo`, which should be used as the 1279 * default instance when info is requested for a thread not created by the 1280 * `Scheduler`. 1281 */ 1282 static @property ref thisInfo() nothrow 1283 { 1284 static ThreadInfo val; 1285 return val; 1286 } 1287 1288 /** 1289 * Cleans up this ThreadInfo. 1290 * 1291 * This must be called when a scheduled thread terminates. It tears down 1292 * the messaging system for the thread and notifies interested parties of 1293 * the thread's termination. 1294 */ 1295 void cleanup() 1296 { 1297 if (ident.mbox !is null) 1298 ident.mbox.close(); 1299 foreach (tid; links.keys) 1300 _send(MsgType.linkDead, tid, ident); 1301 if (owner != Tid.init) 1302 _send(MsgType.linkDead, owner, ident); 1303 unregisterMe(this); // clean up registry entries 1304 } 1305 1306 // https://issues.dlang.org/show_bug.cgi?id=20160 1307 @system unittest 1308 { 1309 register("main_thread", thisTid()); 1310 1311 ThreadInfo t; 1312 t.cleanup(); 1313 1314 assert(locate("main_thread") == thisTid()); 1315 } 1316 } 1317 1318 /** 1319 * A `Scheduler` controls how threading is performed by spawn. 1320 * 1321 * Implementing a `Scheduler` allows the concurrency mechanism used by this 1322 * module to be customized according to different needs. By default, a call 1323 * to spawn will create a new kernel thread that executes the supplied routine 1324 * and terminates when finished. But it is possible to create `Scheduler`s that 1325 * reuse threads, that multiplex `Fiber`s (coroutines) across a single thread, 1326 * or any number of other approaches. By making the choice of `Scheduler` a 1327 * user-level option, `std.concurrency` may be used for far more types of 1328 * application than if this behavior were predefined. 1329 * 1330 * Example: 1331 * --- 1332 * import std.concurrency; 1333 * import std.stdio; 1334 * 1335 * void main() 1336 * { 1337 * scheduler = new FiberScheduler; 1338 * scheduler.start( 1339 * { 1340 * writeln("the rest of main goes here"); 1341 * }); 1342 * } 1343 * --- 1344 * 1345 * Some schedulers have a dispatching loop that must run if they are to work 1346 * properly, so for the sake of consistency, when using a scheduler, `start()` 1347 * must be called within `main()`. This yields control to the scheduler and 1348 * will ensure that any spawned threads are executed in an expected manner. 1349 */ 1350 interface Scheduler 1351 { 1352 /** 1353 * Spawns the supplied op and starts the `Scheduler`. 1354 * 1355 * This is intended to be called at the start of the program to yield all 1356 * scheduling to the active `Scheduler` instance. This is necessary for 1357 * schedulers that explicitly dispatch threads rather than simply relying 1358 * on the operating system to do so, and so start should always be called 1359 * within `main()` to begin normal program execution. 1360 * 1361 * Params: 1362 * op = A wrapper for whatever the main thread would have done in the 1363 * absence of a custom scheduler. It will be automatically executed 1364 * via a call to spawn by the `Scheduler`. 1365 */ 1366 void start(void delegate() op); 1367 1368 /** 1369 * Assigns a logical thread to execute the supplied op. 1370 * 1371 * This routine is called by spawn. It is expected to instantiate a new 1372 * logical thread and run the supplied operation. This thread must call 1373 * `thisInfo.cleanup()` when the thread terminates if the scheduled thread 1374 * is not a kernel thread--all kernel threads will have their `ThreadInfo` 1375 * cleaned up automatically by a thread-local destructor. 1376 * 1377 * Params: 1378 * op = The function to execute. This may be the actual function passed 1379 * by the user to spawn itself, or may be a wrapper function. 1380 */ 1381 void spawn(void delegate() op); 1382 1383 /** 1384 * Yields execution to another logical thread. 1385 * 1386 * This routine is called at various points within concurrency-aware APIs 1387 * to provide a scheduler a chance to yield execution when using some sort 1388 * of cooperative multithreading model. If this is not appropriate, such 1389 * as when each logical thread is backed by a dedicated kernel thread, 1390 * this routine may be a no-op. 1391 */ 1392 void yield() nothrow; 1393 1394 /** 1395 * Returns an appropriate `ThreadInfo` instance. 1396 * 1397 * Returns an instance of `ThreadInfo` specific to the logical thread that 1398 * is calling this routine or, if the calling thread was not create by 1399 * this scheduler, returns `ThreadInfo.thisInfo` instead. 1400 */ 1401 @property ref ThreadInfo thisInfo() nothrow; 1402 1403 /** 1404 * Creates a `Condition` variable analog for signaling. 1405 * 1406 * Creates a new `Condition` variable analog which is used to check for and 1407 * to signal the addition of messages to a thread's message queue. Like 1408 * yield, some schedulers may need to define custom behavior so that calls 1409 * to `Condition.wait()` yield to another thread when no new messages are 1410 * available instead of blocking. 1411 * 1412 * Params: 1413 * m = The `Mutex` that will be associated with this condition. It will be 1414 * locked prior to any operation on the condition, and so in some 1415 * cases a `Scheduler` may need to hold this reference and unlock the 1416 * mutex before yielding execution to another logical thread. 1417 */ 1418 Condition newCondition(Mutex m) nothrow; 1419 } 1420 1421 /** 1422 * An example `Scheduler` using kernel threads. 1423 * 1424 * This is an example `Scheduler` that mirrors the default scheduling behavior 1425 * of creating one kernel thread per call to spawn. It is fully functional 1426 * and may be instantiated and used, but is not a necessary part of the 1427 * default functioning of this module. 1428 */ 1429 class ThreadScheduler : Scheduler 1430 { 1431 /** 1432 * This simply runs op directly, since no real scheduling is needed by 1433 * this approach. 1434 */ 1435 void start(void delegate() op) 1436 { 1437 op(); 1438 } 1439 1440 /** 1441 * Creates a new kernel thread and assigns it to run the supplied op. 1442 */ 1443 void spawn(void delegate() op) 1444 { 1445 auto t = new Thread(op); 1446 t.start(); 1447 } 1448 1449 /** 1450 * This scheduler does no explicit multiplexing, so this is a no-op. 1451 */ 1452 void yield() nothrow 1453 { 1454 // no explicit yield needed 1455 } 1456 1457 /** 1458 * Returns `ThreadInfo.thisInfo`, since it is a thread-local instance of 1459 * `ThreadInfo`, which is the correct behavior for this scheduler. 1460 */ 1461 @property ref ThreadInfo thisInfo() nothrow 1462 { 1463 return ThreadInfo.thisInfo; 1464 } 1465 1466 /** 1467 * Creates a new `Condition` variable. No custom behavior is needed here. 1468 */ 1469 Condition newCondition(Mutex m) nothrow 1470 { 1471 return new Condition(m); 1472 } 1473 } 1474 1475 /** 1476 * An example `Scheduler` using $(MREF_ALTTEXT `Fiber`s, core, thread, fiber). 1477 * 1478 * This is an example scheduler that creates a new `Fiber` per call to spawn 1479 * and multiplexes the execution of all fibers within the main thread. 1480 */ 1481 class FiberScheduler : Scheduler 1482 { 1483 /** 1484 * This creates a new `Fiber` for the supplied op and then starts the 1485 * dispatcher. 1486 */ 1487 void start(void delegate() op) 1488 { 1489 create(op); 1490 dispatch(); 1491 } 1492 1493 /** 1494 * This created a new `Fiber` for the supplied op and adds it to the 1495 * dispatch list. 1496 */ 1497 void spawn(void delegate() op) nothrow 1498 { 1499 create(op); 1500 yield(); 1501 } 1502 1503 /** 1504 * If the caller is a scheduled `Fiber`, this yields execution to another 1505 * scheduled `Fiber`. 1506 */ 1507 void yield() nothrow 1508 { 1509 // NOTE: It's possible that we should test whether the calling Fiber 1510 // is an InfoFiber before yielding, but I think it's reasonable 1511 // that any (non-Generator) fiber should yield here. 1512 if (Fiber.getThis()) 1513 Fiber.yield(); 1514 } 1515 1516 /** 1517 * Returns an appropriate `ThreadInfo` instance. 1518 * 1519 * Returns a `ThreadInfo` instance specific to the calling `Fiber` if the 1520 * `Fiber` was created by this dispatcher, otherwise it returns 1521 * `ThreadInfo.thisInfo`. 1522 */ 1523 @property ref ThreadInfo thisInfo() nothrow 1524 { 1525 auto f = cast(InfoFiber) Fiber.getThis(); 1526 1527 if (f !is null) 1528 return f.info; 1529 return ThreadInfo.thisInfo; 1530 } 1531 1532 /** 1533 * Returns a `Condition` analog that yields when wait or notify is called. 1534 * 1535 * Bug: 1536 * For the default implementation, `notifyAll` will behave like `notify`. 1537 * 1538 * Params: 1539 * m = A `Mutex` to use for locking if the condition needs to be waited on 1540 * or notified from multiple `Thread`s. 1541 * If `null`, no `Mutex` will be used and it is assumed that the 1542 * `Condition` is only waited on/notified from one `Thread`. 1543 */ 1544 Condition newCondition(Mutex m) nothrow 1545 { 1546 return new FiberCondition(m); 1547 } 1548 1549 protected: 1550 /** 1551 * Creates a new `Fiber` which calls the given delegate. 1552 * 1553 * Params: 1554 * op = The delegate the fiber should call 1555 */ 1556 void create(void delegate() op) nothrow 1557 { 1558 void wrap() 1559 { 1560 scope (exit) 1561 { 1562 thisInfo.cleanup(); 1563 } 1564 op(); 1565 } 1566 1567 m_fibers ~= new InfoFiber(&wrap); 1568 } 1569 1570 /** 1571 * `Fiber` which embeds a `ThreadInfo` 1572 */ 1573 static class InfoFiber : Fiber 1574 { 1575 ThreadInfo info; 1576 1577 this(void delegate() op) nothrow 1578 { 1579 super(op); 1580 } 1581 1582 this(void delegate() op, size_t sz) nothrow 1583 { 1584 super(op, sz); 1585 } 1586 } 1587 1588 private: 1589 class FiberCondition : Condition 1590 { 1591 this(Mutex m) nothrow 1592 { 1593 super(m); 1594 notified = false; 1595 } 1596 1597 override void wait() nothrow 1598 { 1599 scope (exit) notified = false; 1600 1601 while (!notified) 1602 switchContext(); 1603 } 1604 1605 override bool wait(Duration period) nothrow 1606 { 1607 import core.time : MonoTime; 1608 1609 scope (exit) notified = false; 1610 1611 for (auto limit = MonoTime.currTime + period; 1612 !notified && !period.isNegative; 1613 period = limit - MonoTime.currTime) 1614 { 1615 this.outer.yield(); 1616 } 1617 return notified; 1618 } 1619 1620 override void notify() nothrow 1621 { 1622 notified = true; 1623 switchContext(); 1624 } 1625 1626 override void notifyAll() nothrow 1627 { 1628 notified = true; 1629 switchContext(); 1630 } 1631 1632 private: 1633 void switchContext() nothrow 1634 { 1635 if (mutex_nothrow) mutex_nothrow.unlock_nothrow(); 1636 scope (exit) 1637 if (mutex_nothrow) 1638 mutex_nothrow.lock_nothrow(); 1639 this.outer.yield(); 1640 } 1641 1642 bool notified; 1643 } 1644 1645 void dispatch() 1646 { 1647 import std.algorithm.mutation : remove; 1648 1649 while (m_fibers.length > 0) 1650 { 1651 auto t = m_fibers[m_pos].call(Fiber.Rethrow.no); 1652 if (t !is null && !(cast(OwnerTerminated) t)) 1653 { 1654 throw t; 1655 } 1656 if (m_fibers[m_pos].state == Fiber.State.TERM) 1657 { 1658 if (m_pos >= (m_fibers = remove(m_fibers, m_pos)).length) 1659 m_pos = 0; 1660 } 1661 else if (m_pos++ >= m_fibers.length - 1) 1662 { 1663 m_pos = 0; 1664 } 1665 } 1666 } 1667 1668 Fiber[] m_fibers; 1669 size_t m_pos; 1670 } 1671 1672 @system unittest 1673 { 1674 static void receive(Condition cond, ref size_t received) 1675 { 1676 while (true) 1677 { 1678 synchronized (cond.mutex) 1679 { 1680 cond.wait(); 1681 ++received; 1682 } 1683 } 1684 } 1685 1686 static void send(Condition cond, ref size_t sent) 1687 { 1688 while (true) 1689 { 1690 synchronized (cond.mutex) 1691 { 1692 ++sent; 1693 cond.notify(); 1694 } 1695 } 1696 } 1697 1698 auto fs = new FiberScheduler; 1699 auto mtx = new Mutex; 1700 auto cond = fs.newCondition(mtx); 1701 1702 size_t received, sent; 1703 auto waiter = new Fiber({ receive(cond, received); }), notifier = new Fiber({ send(cond, sent); }); 1704 waiter.call(); 1705 assert(received == 0); 1706 notifier.call(); 1707 assert(sent == 1); 1708 assert(received == 0); 1709 waiter.call(); 1710 assert(received == 1); 1711 waiter.call(); 1712 assert(received == 1); 1713 } 1714 1715 /** 1716 * Sets the `Scheduler` behavior within the program. 1717 * 1718 * This variable sets the `Scheduler` behavior within this program. Typically, 1719 * when setting a `Scheduler`, `scheduler.start()` should be called in `main`. This 1720 * routine will not return until program execution is complete. 1721 */ 1722 __gshared Scheduler scheduler; 1723 1724 // Generator 1725 1726 /** 1727 * If the caller is a `Fiber` and is not a $(LREF Generator), this function will call 1728 * `scheduler.yield()` or `Fiber.yield()`, as appropriate. 1729 */ 1730 void yield() nothrow 1731 { 1732 auto fiber = Fiber.getThis(); 1733 if (!(cast(IsGenerator) fiber)) 1734 { 1735 if (scheduler is null) 1736 { 1737 if (fiber) 1738 return Fiber.yield(); 1739 } 1740 else 1741 scheduler.yield(); 1742 } 1743 } 1744 1745 /// Used to determine whether a Generator is running. 1746 private interface IsGenerator {} 1747 1748 1749 /** 1750 * A Generator is a $(MREF_ALTTEXT Fiber, core, thread, fiber) 1751 * that periodically returns values of type `T` to the 1752 * caller via `yield`. This is represented as an InputRange. 1753 */ 1754 class Generator(T) : 1755 Fiber, IsGenerator, InputRange!T 1756 { 1757 /** 1758 * Initializes a generator object which is associated with a static 1759 * D function. The function will be called once to prepare the range 1760 * for iteration. 1761 * 1762 * Params: 1763 * fn = The fiber function. 1764 * 1765 * In: 1766 * fn must not be null. 1767 */ 1768 this(void function() fn) 1769 { 1770 super(fn); 1771 call(); 1772 } 1773 1774 /** 1775 * Initializes a generator object which is associated with a static 1776 * D function. The function will be called once to prepare the range 1777 * for iteration. 1778 * 1779 * Params: 1780 * fn = The fiber function. 1781 * sz = The stack size for this fiber. 1782 * 1783 * In: 1784 * fn must not be null. 1785 */ 1786 this(void function() fn, size_t sz) 1787 { 1788 super(fn, sz); 1789 call(); 1790 } 1791 1792 /** 1793 * Initializes a generator object which is associated with a static 1794 * D function. The function will be called once to prepare the range 1795 * for iteration. 1796 * 1797 * Params: 1798 * fn = The fiber function. 1799 * sz = The stack size for this fiber. 1800 * guardPageSize = size of the guard page to trap fiber's stack 1801 * overflows. Refer to $(REF Fiber, core,thread)'s 1802 * documentation for more details. 1803 * 1804 * In: 1805 * fn must not be null. 1806 */ 1807 this(void function() fn, size_t sz, size_t guardPageSize) 1808 { 1809 super(fn, sz, guardPageSize); 1810 call(); 1811 } 1812 1813 /** 1814 * Initializes a generator object which is associated with a dynamic 1815 * D function. The function will be called once to prepare the range 1816 * for iteration. 1817 * 1818 * Params: 1819 * dg = The fiber function. 1820 * 1821 * In: 1822 * dg must not be null. 1823 */ 1824 this(void delegate() dg) 1825 { 1826 super(dg); 1827 call(); 1828 } 1829 1830 /** 1831 * Initializes a generator object which is associated with a dynamic 1832 * D function. The function will be called once to prepare the range 1833 * for iteration. 1834 * 1835 * Params: 1836 * dg = The fiber function. 1837 * sz = The stack size for this fiber. 1838 * 1839 * In: 1840 * dg must not be null. 1841 */ 1842 this(void delegate() dg, size_t sz) 1843 { 1844 super(dg, sz); 1845 call(); 1846 } 1847 1848 /** 1849 * Initializes a generator object which is associated with a dynamic 1850 * D function. The function will be called once to prepare the range 1851 * for iteration. 1852 * 1853 * Params: 1854 * dg = The fiber function. 1855 * sz = The stack size for this fiber. 1856 * guardPageSize = size of the guard page to trap fiber's stack 1857 * overflows. Refer to $(REF Fiber, core,thread)'s 1858 * documentation for more details. 1859 * 1860 * In: 1861 * dg must not be null. 1862 */ 1863 this(void delegate() dg, size_t sz, size_t guardPageSize) 1864 { 1865 super(dg, sz, guardPageSize); 1866 call(); 1867 } 1868 1869 /** 1870 * Returns true if the generator is empty. 1871 */ 1872 final bool empty() @property 1873 { 1874 return m_value is null || state == State.TERM; 1875 } 1876 1877 /** 1878 * Obtains the next value from the underlying function. 1879 */ 1880 final void popFront() 1881 { 1882 call(); 1883 } 1884 1885 /** 1886 * Returns the most recently generated value by shallow copy. 1887 */ 1888 final T front() @property 1889 { 1890 return *m_value; 1891 } 1892 1893 /** 1894 * Returns the most recently generated value without executing a 1895 * copy contructor. Will not compile for element types defining a 1896 * postblit, because `Generator` does not return by reference. 1897 */ 1898 final T moveFront() 1899 { 1900 static if (!hasElaborateCopyConstructor!T) 1901 { 1902 return front; 1903 } 1904 else 1905 { 1906 static assert(0, 1907 "Fiber front is always rvalue and thus cannot be moved since it defines a postblit."); 1908 } 1909 } 1910 1911 final int opApply(scope int delegate(T) loopBody) 1912 { 1913 int broken; 1914 for (; !empty; popFront()) 1915 { 1916 broken = loopBody(front); 1917 if (broken) break; 1918 } 1919 return broken; 1920 } 1921 1922 final int opApply(scope int delegate(size_t, T) loopBody) 1923 { 1924 int broken; 1925 for (size_t i; !empty; ++i, popFront()) 1926 { 1927 broken = loopBody(i, front); 1928 if (broken) break; 1929 } 1930 return broken; 1931 } 1932 private: 1933 T* m_value; 1934 } 1935 1936 /// 1937 @system unittest 1938 { 1939 auto tid = spawn({ 1940 int i; 1941 while (i < 9) 1942 i = receiveOnly!int; 1943 1944 ownerTid.send(i * 2); 1945 }); 1946 1947 auto r = new Generator!int({ 1948 foreach (i; 1 .. 10) 1949 yield(i); 1950 }); 1951 1952 foreach (e; r) 1953 tid.send(e); 1954 1955 assert(receiveOnly!int == 18); 1956 } 1957 1958 /** 1959 * Yields a value of type T to the caller of the currently executing 1960 * generator. 1961 * 1962 * Params: 1963 * value = The value to yield. 1964 */ 1965 void yield(T)(ref T value) 1966 { 1967 Generator!T cur = cast(Generator!T) Fiber.getThis(); 1968 if (cur !is null && cur.state == Fiber.State.EXEC) 1969 { 1970 cur.m_value = &value; 1971 return Fiber.yield(); 1972 } 1973 throw new Exception("yield(T) called with no active generator for the supplied type"); 1974 } 1975 1976 /// ditto 1977 void yield(T)(T value) 1978 { 1979 yield(value); 1980 } 1981 1982 @system unittest 1983 { 1984 import core.exception; 1985 import std.exception; 1986 1987 auto mainTid = thisTid; 1988 alias testdg = () { 1989 auto tid = spawn( 1990 (Tid mainTid) { 1991 int i; 1992 scope (failure) mainTid.send(false); 1993 try 1994 { 1995 for (i = 1; i < 10; i++) 1996 { 1997 if (receiveOnly!int() != i) 1998 { 1999 mainTid.send(false); 2000 break; 2001 } 2002 } 2003 } 2004 catch (OwnerTerminated e) 2005 { 2006 // i will advance 1 past the last value expected 2007 mainTid.send(i == 4); 2008 } 2009 }, mainTid); 2010 auto r = new Generator!int( 2011 { 2012 assertThrown!Exception(yield(2.0)); 2013 yield(); // ensure this is a no-op 2014 yield(1); 2015 yield(); // also once something has been yielded 2016 yield(2); 2017 yield(3); 2018 }); 2019 2020 foreach (e; r) 2021 { 2022 tid.send(e); 2023 } 2024 }; 2025 2026 scheduler = new ThreadScheduler; 2027 scheduler.spawn(testdg); 2028 assert(receiveOnly!bool()); 2029 2030 scheduler = new FiberScheduler; 2031 scheduler.start(testdg); 2032 assert(receiveOnly!bool()); 2033 scheduler = null; 2034 } 2035 /// 2036 @system unittest 2037 { 2038 import std.range; 2039 2040 InputRange!int myIota = iota(10).inputRangeObject; 2041 2042 myIota.popFront(); 2043 myIota.popFront(); 2044 assert(myIota.moveFront == 2); 2045 assert(myIota.front == 2); 2046 myIota.popFront(); 2047 assert(myIota.front == 3); 2048 2049 //can be assigned to std.range.interfaces.InputRange directly 2050 myIota = new Generator!int( 2051 { 2052 foreach (i; 0 .. 10) yield(i); 2053 }); 2054 2055 myIota.popFront(); 2056 myIota.popFront(); 2057 assert(myIota.moveFront == 2); 2058 assert(myIota.front == 2); 2059 myIota.popFront(); 2060 assert(myIota.front == 3); 2061 2062 size_t[2] counter = [0, 0]; 2063 foreach (i, unused; myIota) counter[] += [1, i]; 2064 2065 assert(myIota.empty); 2066 assert(counter == [7, 21]); 2067 } 2068 2069 private 2070 { 2071 /* 2072 * A MessageBox is a message queue for one thread. Other threads may send 2073 * messages to this owner by calling put(), and the owner receives them by 2074 * calling get(). The put() call is therefore effectively shared and the 2075 * get() call is effectively local. setMaxMsgs may be used by any thread 2076 * to limit the size of the message queue. 2077 */ 2078 class MessageBox 2079 { 2080 this() @trusted nothrow /* TODO: make @safe after relevant druntime PR gets merged */ 2081 { 2082 m_lock = new Mutex; 2083 m_closed = false; 2084 2085 if (scheduler is null) 2086 { 2087 m_putMsg = new Condition(m_lock); 2088 m_notFull = new Condition(m_lock); 2089 } 2090 else 2091 { 2092 m_putMsg = scheduler.newCondition(m_lock); 2093 m_notFull = scheduler.newCondition(m_lock); 2094 } 2095 } 2096 2097 /// 2098 final @property bool isClosed() @safe @nogc pure 2099 { 2100 synchronized (m_lock) 2101 { 2102 return m_closed; 2103 } 2104 } 2105 2106 /* 2107 * Sets a limit on the maximum number of user messages allowed in the 2108 * mailbox. If this limit is reached, the caller attempting to add 2109 * a new message will execute call. If num is zero, there is no limit 2110 * on the message queue. 2111 * 2112 * Params: 2113 * num = The maximum size of the queue or zero if the queue is 2114 * unbounded. 2115 * call = The routine to call when the queue is full. 2116 */ 2117 final void setMaxMsgs(size_t num, bool function(Tid) call) @safe @nogc pure 2118 { 2119 synchronized (m_lock) 2120 { 2121 m_maxMsgs = num; 2122 m_onMaxMsgs = call; 2123 } 2124 } 2125 2126 /* 2127 * If maxMsgs is not set, the message is added to the queue and the 2128 * owner is notified. If the queue is full, the message will still be 2129 * accepted if it is a control message, otherwise onCrowdingDoThis is 2130 * called. If the routine returns true, this call will block until 2131 * the owner has made space available in the queue. If it returns 2132 * false, this call will abort. 2133 * 2134 * Params: 2135 * msg = The message to put in the queue. 2136 * 2137 * Throws: 2138 * An exception if the queue is full and onCrowdingDoThis throws. 2139 */ 2140 final void put(ref Message msg) 2141 { 2142 synchronized (m_lock) 2143 { 2144 // TODO: Generate an error here if m_closed is true, or maybe 2145 // put a message in the caller's queue? 2146 if (!m_closed) 2147 { 2148 while (true) 2149 { 2150 if (isPriorityMsg(msg)) 2151 { 2152 m_sharedPty.put(msg); 2153 m_putMsg.notify(); 2154 return; 2155 } 2156 if (!mboxFull() || isControlMsg(msg)) 2157 { 2158 m_sharedBox.put(msg); 2159 m_putMsg.notify(); 2160 return; 2161 } 2162 if (m_onMaxMsgs !is null && !m_onMaxMsgs(thisTid)) 2163 { 2164 return; 2165 } 2166 m_putQueue++; 2167 m_notFull.wait(); 2168 m_putQueue--; 2169 } 2170 } 2171 } 2172 } 2173 2174 /* 2175 * Matches ops against each message in turn until a match is found. 2176 * 2177 * Params: 2178 * ops = The operations to match. Each may return a bool to indicate 2179 * whether a message with a matching type is truly a match. 2180 * 2181 * Returns: 2182 * true if a message was retrieved and false if not (such as if a 2183 * timeout occurred). 2184 * 2185 * Throws: 2186 * LinkTerminated if a linked thread terminated, or OwnerTerminated 2187 * if the owner thread terminates and no existing messages match the 2188 * supplied ops. 2189 */ 2190 bool get(T...)(scope T vals) 2191 { 2192 import std.meta : AliasSeq; 2193 2194 static assert(T.length, "T must not be empty"); 2195 2196 static if (is(T[0] : Duration)) 2197 { 2198 alias Ops = AliasSeq!(T[1 .. $]); 2199 alias ops = vals[1 .. $]; 2200 enum timedWait = true; 2201 Duration period = vals[0]; 2202 } 2203 else 2204 { 2205 alias Ops = AliasSeq!(T); 2206 alias ops = vals[0 .. $]; 2207 enum timedWait = false; 2208 } 2209 2210 bool onStandardMsg(ref Message msg) 2211 { 2212 foreach (i, t; Ops) 2213 { 2214 alias Args = Parameters!(t); 2215 auto op = ops[i]; 2216 2217 if (msg.convertsTo!(Args)) 2218 { 2219 alias RT = ReturnType!(t); 2220 static if (is(RT == bool)) 2221 { 2222 return msg.map(op); 2223 } 2224 else 2225 { 2226 msg.map(op); 2227 static if (!is(immutable RT == immutable noreturn)) 2228 return true; 2229 } 2230 } 2231 } 2232 return false; 2233 } 2234 2235 bool onLinkDeadMsg(ref Message msg) 2236 { 2237 assert(msg.convertsTo!(Tid), 2238 "Message could be converted to Tid"); 2239 auto tid = msg.get!(Tid); 2240 2241 if (bool* pDepends = tid in thisInfo.links) 2242 { 2243 auto depends = *pDepends; 2244 thisInfo.links.remove(tid); 2245 // Give the owner relationship precedence. 2246 if (depends && tid != thisInfo.owner) 2247 { 2248 auto e = new LinkTerminated(tid); 2249 auto m = Message(MsgType.standard, e); 2250 if (onStandardMsg(m)) 2251 return true; 2252 throw e; 2253 } 2254 } 2255 if (tid == thisInfo.owner) 2256 { 2257 thisInfo.owner = Tid.init; 2258 auto e = new OwnerTerminated(tid); 2259 auto m = Message(MsgType.standard, e); 2260 if (onStandardMsg(m)) 2261 return true; 2262 throw e; 2263 } 2264 return false; 2265 } 2266 2267 bool onControlMsg(ref Message msg) 2268 { 2269 switch (msg.type) 2270 { 2271 case MsgType.linkDead: 2272 return onLinkDeadMsg(msg); 2273 default: 2274 return false; 2275 } 2276 } 2277 2278 bool scan(ref ListT list) 2279 { 2280 for (auto range = list[]; !range.empty;) 2281 { 2282 // Only the message handler will throw, so if this occurs 2283 // we can be certain that the message was handled. 2284 scope (failure) 2285 list.removeAt(range); 2286 2287 if (isControlMsg(range.front)) 2288 { 2289 if (onControlMsg(range.front)) 2290 { 2291 // Although the linkDead message is a control message, 2292 // it can be handled by the user. Since the linkDead 2293 // message throws if not handled, if we get here then 2294 // it has been handled and we can return from receive. 2295 // This is a weird special case that will have to be 2296 // handled in a more general way if more are added. 2297 if (!isLinkDeadMsg(range.front)) 2298 { 2299 list.removeAt(range); 2300 continue; 2301 } 2302 list.removeAt(range); 2303 return true; 2304 } 2305 range.popFront(); 2306 continue; 2307 } 2308 else 2309 { 2310 if (onStandardMsg(range.front)) 2311 { 2312 list.removeAt(range); 2313 return true; 2314 } 2315 range.popFront(); 2316 continue; 2317 } 2318 } 2319 return false; 2320 } 2321 2322 bool pty(ref ListT list) 2323 { 2324 if (!list.empty) 2325 { 2326 auto range = list[]; 2327 2328 if (onStandardMsg(range.front)) 2329 { 2330 list.removeAt(range); 2331 return true; 2332 } 2333 if (range.front.convertsTo!(Throwable)) 2334 throw range.front.get!(Throwable); 2335 else if (range.front.convertsTo!(shared(Throwable))) 2336 /* Note: a shared type can be caught without the shared qualifier 2337 * so throwing shared will be an error */ 2338 throw cast() range.front.get!(shared(Throwable)); 2339 else 2340 throw new PriorityMessageException(range.front.data); 2341 } 2342 return false; 2343 } 2344 2345 static if (timedWait) 2346 { 2347 import core.time : MonoTime; 2348 auto limit = MonoTime.currTime + period; 2349 } 2350 2351 while (true) 2352 { 2353 ListT arrived; 2354 2355 if (pty(m_localPty) || scan(m_localBox)) 2356 { 2357 return true; 2358 } 2359 yield(); 2360 synchronized (m_lock) 2361 { 2362 updateMsgCount(); 2363 while (m_sharedPty.empty && m_sharedBox.empty) 2364 { 2365 // NOTE: We're notifying all waiters here instead of just 2366 // a few because the onCrowding behavior may have 2367 // changed and we don't want to block sender threads 2368 // unnecessarily if the new behavior is not to block. 2369 // This will admittedly result in spurious wakeups 2370 // in other situations, but what can you do? 2371 if (m_putQueue && !mboxFull()) 2372 m_notFull.notifyAll(); 2373 static if (timedWait) 2374 { 2375 if (period <= Duration.zero || !m_putMsg.wait(period)) 2376 return false; 2377 } 2378 else 2379 { 2380 m_putMsg.wait(); 2381 } 2382 } 2383 m_localPty.put(m_sharedPty); 2384 arrived.put(m_sharedBox); 2385 } 2386 if (m_localPty.empty) 2387 { 2388 scope (exit) m_localBox.put(arrived); 2389 if (scan(arrived)) 2390 { 2391 return true; 2392 } 2393 else 2394 { 2395 static if (timedWait) 2396 { 2397 period = limit - MonoTime.currTime; 2398 } 2399 continue; 2400 } 2401 } 2402 m_localBox.put(arrived); 2403 pty(m_localPty); 2404 return true; 2405 } 2406 } 2407 2408 /* 2409 * Called on thread termination. This routine processes any remaining 2410 * control messages, clears out message queues, and sets a flag to 2411 * reject any future messages. 2412 */ 2413 final void close() 2414 { 2415 static void onLinkDeadMsg(ref Message msg) 2416 { 2417 assert(msg.convertsTo!(Tid), 2418 "Message could be converted to Tid"); 2419 auto tid = msg.get!(Tid); 2420 2421 thisInfo.links.remove(tid); 2422 if (tid == thisInfo.owner) 2423 thisInfo.owner = Tid.init; 2424 } 2425 2426 static void sweep(ref ListT list) 2427 { 2428 for (auto range = list[]; !range.empty; range.popFront()) 2429 { 2430 if (range.front.type == MsgType.linkDead) 2431 onLinkDeadMsg(range.front); 2432 } 2433 } 2434 2435 ListT arrived; 2436 2437 sweep(m_localBox); 2438 synchronized (m_lock) 2439 { 2440 arrived.put(m_sharedBox); 2441 m_closed = true; 2442 } 2443 m_localBox.clear(); 2444 sweep(arrived); 2445 } 2446 2447 private: 2448 // Routines involving local data only, no lock needed. 2449 2450 bool mboxFull() @safe @nogc pure nothrow 2451 { 2452 return m_maxMsgs && m_maxMsgs <= m_localMsgs + m_sharedBox.length; 2453 } 2454 2455 void updateMsgCount() @safe @nogc pure nothrow 2456 { 2457 m_localMsgs = m_localBox.length; 2458 } 2459 2460 bool isControlMsg(ref Message msg) @safe @nogc pure nothrow 2461 { 2462 return msg.type != MsgType.standard && msg.type != MsgType.priority; 2463 } 2464 2465 bool isPriorityMsg(ref Message msg) @safe @nogc pure nothrow 2466 { 2467 return msg.type == MsgType.priority; 2468 } 2469 2470 bool isLinkDeadMsg(ref Message msg) @safe @nogc pure nothrow 2471 { 2472 return msg.type == MsgType.linkDead; 2473 } 2474 2475 alias OnMaxFn = bool function(Tid); 2476 alias ListT = List!(Message); 2477 2478 ListT m_localBox; 2479 ListT m_localPty; 2480 2481 Mutex m_lock; 2482 Condition m_putMsg; 2483 Condition m_notFull; 2484 size_t m_putQueue; 2485 ListT m_sharedBox; 2486 ListT m_sharedPty; 2487 OnMaxFn m_onMaxMsgs; 2488 size_t m_localMsgs; 2489 size_t m_maxMsgs; 2490 bool m_closed; 2491 package Thread m_thread; // For join(Tid) 2492 } 2493 2494 /* 2495 * 2496 */ 2497 struct List(T) 2498 { 2499 struct Range 2500 { 2501 import std.exception : enforce; 2502 2503 @property bool empty() const 2504 { 2505 return !m_prev.next; 2506 } 2507 2508 @property ref T front() 2509 { 2510 enforce(m_prev.next, "invalid list node"); 2511 return m_prev.next.val; 2512 } 2513 2514 @property void front(T val) 2515 { 2516 enforce(m_prev.next, "invalid list node"); 2517 m_prev.next.val = val; 2518 } 2519 2520 void popFront() 2521 { 2522 enforce(m_prev.next, "invalid list node"); 2523 m_prev = m_prev.next; 2524 } 2525 2526 private this(Node* p) 2527 { 2528 m_prev = p; 2529 } 2530 2531 private Node* m_prev; 2532 } 2533 2534 void put(T val) 2535 { 2536 put(newNode(val)); 2537 } 2538 2539 void put(ref List!(T) rhs) 2540 { 2541 if (!rhs.empty) 2542 { 2543 put(rhs.m_first); 2544 while (m_last.next !is null) 2545 { 2546 m_last = m_last.next; 2547 m_count++; 2548 } 2549 rhs.m_first = null; 2550 rhs.m_last = null; 2551 rhs.m_count = 0; 2552 } 2553 } 2554 2555 Range opSlice() 2556 { 2557 return Range(cast(Node*)&m_first); 2558 } 2559 2560 void removeAt(Range r) 2561 { 2562 import std.exception : enforce; 2563 2564 assert(m_count, "Can not remove from empty Range"); 2565 Node* n = r.m_prev; 2566 enforce(n && n.next, "attempting to remove invalid list node"); 2567 2568 if (m_last is m_first) 2569 m_last = null; 2570 else if (m_last is n.next) 2571 m_last = n; // nocoverage 2572 Node* to_free = n.next; 2573 n.next = n.next.next; 2574 freeNode(to_free); 2575 m_count--; 2576 } 2577 2578 @property size_t length() 2579 { 2580 return m_count; 2581 } 2582 2583 void clear() 2584 { 2585 m_first = m_last = null; 2586 m_count = 0; 2587 } 2588 2589 @property bool empty() 2590 { 2591 return m_first is null; 2592 } 2593 2594 private: 2595 struct Node 2596 { 2597 Node* next; 2598 T val; 2599 2600 this(T v) 2601 { 2602 val = v; 2603 } 2604 } 2605 2606 static shared struct SpinLock 2607 { 2608 void lock() { while (!cas(&locked, false, true)) { Thread.yield(); } } 2609 void unlock() { atomicStore!(MemoryOrder.rel)(locked, false); } 2610 bool locked; 2611 } 2612 2613 static shared SpinLock sm_lock; 2614 static shared Node* sm_head; 2615 2616 Node* newNode(T v) 2617 { 2618 Node* n; 2619 { 2620 sm_lock.lock(); 2621 scope (exit) sm_lock.unlock(); 2622 2623 if (sm_head) 2624 { 2625 n = cast(Node*) sm_head; 2626 sm_head = sm_head.next; 2627 } 2628 } 2629 if (n) 2630 { 2631 import core.lifetime : emplace; 2632 emplace!Node(n, v); 2633 } 2634 else 2635 { 2636 n = new Node(v); 2637 } 2638 return n; 2639 } 2640 2641 void freeNode(Node* n) 2642 { 2643 // destroy val to free any owned GC memory 2644 destroy(n.val); 2645 2646 sm_lock.lock(); 2647 scope (exit) sm_lock.unlock(); 2648 2649 auto sn = cast(shared(Node)*) n; 2650 sn.next = sm_head; 2651 sm_head = sn; 2652 } 2653 2654 void put(Node* n) 2655 { 2656 m_count++; 2657 if (!empty) 2658 { 2659 m_last.next = n; 2660 m_last = n; 2661 return; 2662 } 2663 m_first = n; 2664 m_last = n; 2665 } 2666 2667 Node* m_first; 2668 Node* m_last; 2669 size_t m_count; 2670 } 2671 } 2672 2673 @system unittest 2674 { 2675 import std.typecons : tuple, Tuple; 2676 2677 static void testfn(Tid tid) 2678 { 2679 receive((float val) { assert(0); }, (int val, int val2) { 2680 assert(val == 42 && val2 == 86); 2681 }); 2682 receive((Tuple!(int, int) val) { assert(val[0] == 42 && val[1] == 86); }); 2683 receive((Variant val) { }); 2684 receive((string val) { 2685 if ("the quick brown fox" != val) 2686 return false; 2687 return true; 2688 }, (string val) { assert(false); }); 2689 prioritySend(tid, "done"); 2690 } 2691 2692 static void runTest(Tid tid) 2693 { 2694 send(tid, 42, 86); 2695 send(tid, tuple(42, 86)); 2696 send(tid, "hello", "there"); 2697 send(tid, "the quick brown fox"); 2698 receive((string val) { assert(val == "done"); }); 2699 } 2700 2701 static void simpleTest() 2702 { 2703 auto tid = spawn(&testfn, thisTid); 2704 runTest(tid); 2705 2706 // Run the test again with a limited mailbox size. 2707 tid = spawn(&testfn, thisTid); 2708 setMaxMailboxSize(tid, 2, OnCrowding.block); 2709 runTest(tid); 2710 } 2711 2712 simpleTest(); 2713 2714 scheduler = new ThreadScheduler; 2715 simpleTest(); 2716 scheduler = null; 2717 } 2718 2719 private @property shared(Mutex) initOnceLock() 2720 { 2721 static shared Mutex lock; 2722 if (auto mtx = atomicLoad!(MemoryOrder.acq)(lock)) 2723 return mtx; 2724 auto mtx = new shared Mutex; 2725 if (cas(&lock, cast(shared) null, mtx)) 2726 return mtx; 2727 return atomicLoad!(MemoryOrder.acq)(lock); 2728 } 2729 2730 /** 2731 * Initializes $(D_PARAM var) with the lazy $(D_PARAM init) value in a 2732 * thread-safe manner. 2733 * 2734 * The implementation guarantees that all threads simultaneously calling 2735 * initOnce with the same $(D_PARAM var) argument block until $(D_PARAM var) is 2736 * fully initialized. All side-effects of $(D_PARAM init) are globally visible 2737 * afterwards. 2738 * 2739 * Params: 2740 * var = The variable to initialize 2741 * init = The lazy initializer value 2742 * 2743 * Returns: 2744 * A reference to the initialized variable 2745 */ 2746 auto ref initOnce(alias var)(lazy typeof(var) init) 2747 { 2748 return initOnce!var(init, initOnceLock); 2749 } 2750 2751 /// A typical use-case is to perform lazy but thread-safe initialization. 2752 @system unittest 2753 { 2754 static class MySingleton 2755 { 2756 static MySingleton instance() 2757 { 2758 __gshared MySingleton inst; 2759 return initOnce!inst(new MySingleton); 2760 } 2761 } 2762 2763 assert(MySingleton.instance !is null); 2764 } 2765 2766 @system unittest 2767 { 2768 static class MySingleton 2769 { 2770 static MySingleton instance() 2771 { 2772 __gshared MySingleton inst; 2773 return initOnce!inst(new MySingleton); 2774 } 2775 2776 private: 2777 this() { val = ++cnt; } 2778 size_t val; 2779 __gshared size_t cnt; 2780 } 2781 2782 foreach (_; 0 .. 10) 2783 spawn({ ownerTid.send(MySingleton.instance.val); }); 2784 foreach (_; 0 .. 10) 2785 assert(receiveOnly!size_t == MySingleton.instance.val); 2786 assert(MySingleton.cnt == 1); 2787 } 2788 2789 /** 2790 * Same as above, but takes a separate mutex instead of sharing one among 2791 * all initOnce instances. 2792 * 2793 * This should be used to avoid dead-locks when the $(D_PARAM init) 2794 * expression waits for the result of another thread that might also 2795 * call initOnce. Use with care. 2796 * 2797 * Params: 2798 * var = The variable to initialize 2799 * init = The lazy initializer value 2800 * mutex = A mutex to prevent race conditions 2801 * 2802 * Returns: 2803 * A reference to the initialized variable 2804 */ 2805 auto ref initOnce(alias var)(lazy typeof(var) init, shared Mutex mutex) 2806 { 2807 // check that var is global, can't take address of a TLS variable 2808 static assert(is(typeof({ __gshared p = &var; })), 2809 "var must be 'static shared' or '__gshared'."); 2810 import core.atomic : atomicLoad, MemoryOrder, atomicStore; 2811 2812 static shared bool flag; 2813 if (!atomicLoad!(MemoryOrder.acq)(flag)) 2814 { 2815 synchronized (mutex) 2816 { 2817 if (!atomicLoad!(MemoryOrder.raw)(flag)) 2818 { 2819 var = init; 2820 static if (!is(immutable typeof(var) == immutable noreturn)) 2821 atomicStore!(MemoryOrder.rel)(flag, true); 2822 } 2823 } 2824 } 2825 return var; 2826 } 2827 2828 /// ditto 2829 auto ref initOnce(alias var)(lazy typeof(var) init, Mutex mutex) 2830 { 2831 return initOnce!var(init, cast(shared) mutex); 2832 } 2833 2834 /// Use a separate mutex when init blocks on another thread that might also call initOnce. 2835 @system unittest 2836 { 2837 import core.sync.mutex : Mutex; 2838 2839 static shared bool varA, varB; 2840 static shared Mutex m; 2841 m = new shared Mutex; 2842 2843 spawn({ 2844 // use a different mutex for varB to avoid a dead-lock 2845 initOnce!varB(true, m); 2846 ownerTid.send(true); 2847 }); 2848 // init depends on the result of the spawned thread 2849 initOnce!varA(receiveOnly!bool); 2850 assert(varA == true); 2851 assert(varB == true); 2852 } 2853 2854 @system unittest 2855 { 2856 static shared bool a; 2857 __gshared bool b; 2858 static bool c; 2859 bool d; 2860 initOnce!a(true); 2861 initOnce!b(true); 2862 static assert(!__traits(compiles, initOnce!c(true))); // TLS 2863 static assert(!__traits(compiles, initOnce!d(true))); // local variable 2864 } 2865 2866 // test ability to send shared arrays 2867 @system unittest 2868 { 2869 static shared int[] x = new shared(int)[1]; 2870 auto tid = spawn({ 2871 auto arr = receiveOnly!(shared(int)[]); 2872 arr[0] = 5; 2873 ownerTid.send(true); 2874 }); 2875 tid.send(x); 2876 receiveOnly!(bool); 2877 assert(x[0] == 5); 2878 } 2879 2880 // https://issues.dlang.org/show_bug.cgi?id=13930 2881 @system unittest 2882 { 2883 immutable aa = ["0":0]; 2884 thisTid.send(aa); 2885 receiveOnly!(immutable int[string]); // compile error 2886 } 2887 2888 // https://issues.dlang.org/show_bug.cgi?id=19345 2889 @system unittest 2890 { 2891 static struct Aggregate { const int a; const int[5] b; } 2892 static void t1(Tid mainTid) 2893 { 2894 const sendMe = Aggregate(42, [1, 2, 3, 4, 5]); 2895 mainTid.send(sendMe); 2896 } 2897 2898 spawn(&t1, thisTid); 2899 auto result1 = receiveOnly!(const Aggregate)(); 2900 immutable expected = Aggregate(42, [1, 2, 3, 4, 5]); 2901 assert(result1 == expected); 2902 } 2903 2904 // Noreturn support 2905 @system unittest 2906 { 2907 static noreturn foo(int) { throw new Exception(""); } 2908 2909 if (false) spawn(&foo, 1); 2910 if (false) spawnLinked(&foo, 1); 2911 2912 if (false) receive(&foo); 2913 if (false) receiveTimeout(Duration.init, &foo); 2914 2915 // Wrapped in __traits(compiles) to skip codegen which crashes dmd's backend 2916 static assert(__traits(compiles, receiveOnly!noreturn() )); 2917 static assert(__traits(compiles, send(Tid.init, noreturn.init) )); 2918 static assert(__traits(compiles, prioritySend(Tid.init, noreturn.init) )); 2919 static assert(__traits(compiles, yield(noreturn.init) )); 2920 2921 static assert(__traits(compiles, { 2922 __gshared noreturn n; 2923 initOnce!n(noreturn.init); 2924 })); 2925 }