1 /**
2  * $(SCRIPT inhibitQuickIndex = 1;)
3  * $(DIVC quickindex,
4  * $(BOOKTABLE,
5  * $(TR $(TH Category) $(TH Symbols))
6  * $(TR $(TD Tid) $(TD
7  *     $(MYREF locate)
8  *     $(MYREF ownerTid)
9  *     $(MYREF register)
10  *     $(MYREF spawn)
11  *     $(MYREF spawnLinked)
12  *     $(MYREF thisTid)
13  *     $(MYREF Tid)
14  *     $(MYREF TidMissingException)
15  *     $(MYREF unregister)
16  * ))
17  * $(TR $(TD Message passing) $(TD
18  *     $(MYREF prioritySend)
19  *     $(MYREF receive)
20  *     $(MYREF receiveOnly)
21  *     $(MYREF receiveTimeout)
22  *     $(MYREF send)
23  *     $(MYREF setMaxMailboxSize)
24  * ))
25  * $(TR $(TD Message-related types) $(TD
26  *     $(MYREF LinkTerminated)
27  *     $(MYREF MailboxFull)
28  *     $(MYREF MessageMismatch)
29  *     $(MYREF OnCrowding)
30  *     $(MYREF OwnerTerminated)
31  *     $(MYREF PriorityMessageException)
32  * ))
33  * $(TR $(TD Scheduler) $(TD
34  *     $(MYREF FiberScheduler)
35  *     $(MYREF Generator)
36  *     $(MYREF Scheduler)
37  *     $(MYREF scheduler)
38  *     $(MYREF ThreadInfo)
39  *     $(MYREF ThreadScheduler)
40  *     $(MYREF yield)
41  * ))
42  * $(TR $(TD Misc) $(TD
43  *     $(MYREF initOnce)
44  * ))
45  * ))
46  *
47  * This is a low-level messaging API upon which more structured or restrictive
48  * APIs may be built.  The general idea is that every messageable entity is
49  * represented by a common handle type called a `Tid`, which allows messages to
50  * be sent to logical threads that are executing in both the current process
51  * and in external processes using the same interface.  This is an important
52  * aspect of scalability because it allows the components of a program to be
53  * spread across available resources with few to no changes to the actual
54  * implementation.
55  *
56  * A logical thread is an execution context that has its own stack and which
57  * runs asynchronously to other logical threads.  These may be preemptively
58  * scheduled kernel threads, $(MREF_ALTTEXT fibers, core, thread, fiber)
59  * (cooperative user-space threads), or some other concept with similar behavior.
60  *
61  * The type of concurrency used when logical threads are created is determined
62  * by the $(LREF Scheduler) selected at initialization time.  The default behavior is
63  * currently to create a new kernel thread per call to spawn, but other
64  * schedulers are available that multiplex fibers across the main thread or
65  * use some combination of the two approaches.
66  *
67  * Copyright: Copyright Sean Kelly 2009 - 2014.
68  * License:   <a href="http://www.boost.org/LICENSE_1_0.txt">Boost License 1.0</a>.
69  * Authors:   Sean Kelly, Alex Rønne Petersen, Martin Nowak
70  * Source:    $(PHOBOSSRC std/concurrency.d)
71  */
72 /*          Copyright Sean Kelly 2009 - 2014.
73  * Distributed under the Boost Software License, Version 1.0.
74  *    (See accompanying file LICENSE_1_0.txt or copy at
75  *          http://www.boost.org/LICENSE_1_0.txt)
76  */
77 module std.concurrency;
78 
79 public import std.variant;
80 
81 import core.atomic;
82 import core.sync.condition;
83 import core.sync.mutex;
84 import core.thread;
85 import std.range.primitives;
86 import std.range.interfaces : InputRange;
87 import std.traits;
88 
89 ///
90 @system unittest
91 {
92     __gshared string received;
93     static void spawnedFunc(Tid ownerTid)
94     {
95         import std.conv : text;
96         // Receive a message from the owner thread.
97         receive((int i){
98             received = text("Received the number ", i);
99 
100             // Send a message back to the owner thread
101             // indicating success.
102             send(ownerTid, true);
103         });
104     }
105 
106     // Start spawnedFunc in a new thread.
107     auto childTid = spawn(&spawnedFunc, thisTid);
108 
109     // Send the number 42 to this new thread.
110     send(childTid, 42);
111 
112     // Receive the result code.
113     auto wasSuccessful = receiveOnly!(bool);
114     assert(wasSuccessful);
115     assert(received == "Received the number 42");
116 }
117 
118 private
119 {
120     bool hasLocalAliasing(Types...)()
121     {
122         import std.typecons : Rebindable;
123 
124         // Works around "statement is not reachable"
125         bool doesIt = false;
126         static foreach (T; Types)
127         {
128             static if (is(T == Tid))
129             { /* Allowed */ }
130             else static if (is(T : Rebindable!R, R))
131                 doesIt |= hasLocalAliasing!R;
132             else static if (is(T == struct))
133                 doesIt |= hasLocalAliasing!(typeof(T.tupleof));
134             else
135                 doesIt |= std.traits.hasUnsharedAliasing!(T);
136         }
137         return doesIt;
138     }
139 
140     @safe unittest
141     {
142         static struct Container { Tid t; }
143         static assert(!hasLocalAliasing!(Tid, Container, int));
144     }
145 
146     // https://issues.dlang.org/show_bug.cgi?id=20097
147     @safe unittest
148     {
149         import std.datetime.systime : SysTime;
150         static struct Container { SysTime time; }
151         static assert(!hasLocalAliasing!(SysTime, Container));
152     }
153 
154     enum MsgType
155     {
156         standard,
157         priority,
158         linkDead,
159     }
160 
161     struct Message
162     {
163         MsgType type;
164         Variant data;
165 
166         this(T...)(MsgType t, T vals) if (T.length > 0)
167         {
168             static if (T.length == 1)
169             {
170                 type = t;
171                 data = vals[0];
172             }
173             else
174             {
175                 import std.typecons : Tuple;
176 
177                 type = t;
178                 data = Tuple!(T)(vals);
179             }
180         }
181 
182         @property auto convertsTo(T...)()
183         {
184             static if (T.length == 1)
185             {
186                 return is(T[0] == Variant) || data.convertsTo!(T);
187             }
188             else
189             {
190                 import std.typecons : Tuple;
191                 return data.convertsTo!(Tuple!(T));
192             }
193         }
194 
195         @property auto get(T...)()
196         {
197             static if (T.length == 1)
198             {
199                 static if (is(T[0] == Variant))
200                     return data;
201                 else
202                     return data.get!(T);
203             }
204             else
205             {
206                 import std.typecons : Tuple;
207                 return data.get!(Tuple!(T));
208             }
209         }
210 
211         auto map(Op)(Op op)
212         {
213             alias Args = Parameters!(Op);
214 
215             static if (Args.length == 1)
216             {
217                 static if (is(Args[0] == Variant))
218                     return op(data);
219                 else
220                     return op(data.get!(Args));
221             }
222             else
223             {
224                 import std.typecons : Tuple;
225                 return op(data.get!(Tuple!(Args)).expand);
226             }
227         }
228     }
229 
230     void checkops(T...)(T ops)
231     {
232         import std.format : format;
233 
234         foreach (i, t1; T)
235         {
236             static assert(isFunctionPointer!t1 || isDelegate!t1,
237                     format!"T %d is not a function pointer or delegates"(i));
238             alias a1 = Parameters!(t1);
239             alias r1 = ReturnType!(t1);
240 
241             static if (i < T.length - 1 && is(r1 == void))
242             {
243                 static assert(a1.length != 1 || !is(a1[0] == Variant),
244                               "function with arguments " ~ a1.stringof ~
245                               " occludes successive function");
246 
247                 foreach (t2; T[i + 1 .. $])
248                 {
249                     alias a2 = Parameters!(t2);
250 
251                     static assert(!is(a1 == a2),
252                         "function with arguments " ~ a1.stringof ~ " occludes successive function");
253                 }
254             }
255         }
256     }
257 
258     @property ref ThreadInfo thisInfo() nothrow
259     {
260         import core.atomic : atomicLoad;
261 
262         auto localScheduler = atomicLoad(scheduler);
263         if (localScheduler is null)
264             return ThreadInfo.thisInfo;
265         return localScheduler.thisInfo;
266     }
267 }
268 
269 static ~this()
270 {
271     thisInfo.cleanup();
272 }
273 
274 // Exceptions
275 
276 /**
277  * Thrown on calls to $(LREF receiveOnly) if a message other than the type
278  * the receiving thread expected is sent.
279  */
280 class MessageMismatch : Exception
281 {
282     ///
283     this(string msg = "Unexpected message type") @safe pure nothrow @nogc
284     {
285         super(msg);
286     }
287 }
288 
289 /**
290  * Thrown on calls to $(LREF receive) if the thread that spawned the receiving
291  * thread has terminated and no more messages exist.
292  */
293 class OwnerTerminated : Exception
294 {
295     ///
296     this(Tid t, string msg = "Owner terminated") @safe pure nothrow @nogc
297     {
298         super(msg);
299         tid = t;
300     }
301 
302     Tid tid;
303 }
304 
305 /**
306  * Thrown if a linked thread has terminated.
307  */
308 class LinkTerminated : Exception
309 {
310     ///
311     this(Tid t, string msg = "Link terminated") @safe pure nothrow @nogc
312     {
313         super(msg);
314         tid = t;
315     }
316 
317     Tid tid;
318 }
319 
320 /**
321  * Thrown if a message was sent to a thread via
322  * $(REF prioritySend, std,concurrency) and the receiver does not have a handler
323  * for a message of this type.
324  */
325 class PriorityMessageException : Exception
326 {
327     ///
328     this(Variant vals)
329     {
330         super("Priority message");
331         message = vals;
332     }
333 
334     /**
335      * The message that was sent.
336      */
337     Variant message;
338 }
339 
340 /**
341  * Thrown on mailbox crowding if the mailbox is configured with
342  * `OnCrowding.throwException`.
343  */
344 class MailboxFull : Exception
345 {
346     ///
347     this(Tid t, string msg = "Mailbox full") @safe pure nothrow @nogc
348     {
349         super(msg);
350         tid = t;
351     }
352 
353     Tid tid;
354 }
355 
356 /**
357  * Thrown when a `Tid` is missing, e.g. when $(LREF ownerTid) doesn't
358  * find an owner thread.
359  */
360 class TidMissingException : Exception
361 {
362     import std.exception : basicExceptionCtors;
363     ///
364     mixin basicExceptionCtors;
365 }
366 
367 
368 // Thread ID
369 
370 
371 /**
372  * An opaque type used to represent a logical thread.
373  */
374 struct Tid
375 {
376 private:
377     this(MessageBox m) @safe pure nothrow @nogc
378     {
379         mbox = m;
380     }
381 
382     MessageBox mbox;
383 
384 public:
385 
386     /**
387      * Generate a convenient string for identifying this `Tid`.  This is only
388      * useful to see if `Tid`'s that are currently executing are the same or
389      * different, e.g. for logging and debugging.  It is potentially possible
390      * that a `Tid` executed in the future will have the same `toString()` output
391      * as another `Tid` that has already terminated.
392      */
393     void toString(W)(ref W w) const
394     {
395         import std.format.write : formattedWrite;
396         auto p = () @trusted { return cast(void*) mbox; }();
397         formattedWrite(w, "Tid(%x)", p);
398     }
399 
400 }
401 
402 @safe unittest
403 {
404     import std.conv : text;
405     Tid tid;
406     assert(text(tid) == "Tid(0)");
407     auto tid2 = thisTid;
408     assert(text(tid2) != "Tid(0)");
409     auto tid3 = tid2;
410     assert(text(tid2) == text(tid3));
411 }
412 
413 // https://issues.dlang.org/show_bug.cgi?id=21512
414 @system unittest
415 {
416     import std.format : format;
417 
418     const(Tid) b = spawn(() {});
419     assert(format!"%s"(b)[0 .. 4] == "Tid(");
420 }
421 
422 /**
423  * Returns: The `Tid` of the caller's thread.
424  */
425 @property Tid thisTid() @safe
426 {
427     // TODO: remove when concurrency is safe
428     static auto trus() @trusted
429     {
430         if (thisInfo.ident != Tid.init)
431             return thisInfo.ident;
432         thisInfo.ident = Tid(new MessageBox);
433         return thisInfo.ident;
434     }
435 
436     return trus();
437 }
438 
439 /**
440  * Return the `Tid` of the thread which spawned the caller's thread.
441  *
442  * Throws: A `TidMissingException` exception if
443  * there is no owner thread.
444  */
445 @property Tid ownerTid()
446 {
447     import std.exception : enforce;
448 
449     enforce!TidMissingException(thisInfo.owner.mbox !is null, "Error: Thread has no owner thread.");
450     return thisInfo.owner;
451 }
452 
453 @system unittest
454 {
455     import std.exception : assertThrown;
456 
457     static void fun()
458     {
459         string res = receiveOnly!string();
460         assert(res == "Main calling");
461         ownerTid.send("Child responding");
462     }
463 
464     assertThrown!TidMissingException(ownerTid);
465     auto child = spawn(&fun);
466     child.send("Main calling");
467     string res = receiveOnly!string();
468     assert(res == "Child responding");
469 }
470 
471 // Thread Creation
472 
473 private template isSpawnable(F, T...)
474 {
475     template isParamsImplicitlyConvertible(F1, F2, int i = 0)
476     {
477         alias param1 = Parameters!F1;
478         alias param2 = Parameters!F2;
479         static if (param1.length != param2.length)
480             enum isParamsImplicitlyConvertible = false;
481         else static if (param1.length == i)
482             enum isParamsImplicitlyConvertible = true;
483         else static if (is(param2[i] : param1[i]))
484             enum isParamsImplicitlyConvertible = isParamsImplicitlyConvertible!(F1,
485                     F2, i + 1);
486         else
487             enum isParamsImplicitlyConvertible = false;
488     }
489 
490     enum isSpawnable = isCallable!F && is(ReturnType!F : void)
491             && isParamsImplicitlyConvertible!(F, void function(T))
492             && (isFunctionPointer!F || !hasUnsharedAliasing!F);
493 }
494 
495 /**
496  * Starts `fn(args)` in a new logical thread.
497  *
498  * Executes the supplied function in a new logical thread represented by
499  * `Tid`.  The calling thread is designated as the owner of the new thread.
500  * When the owner thread terminates an `OwnerTerminated` message will be
501  * sent to the new thread, causing an `OwnerTerminated` exception to be
502  * thrown on `receive()`.
503  *
504  * Params:
505  *  fn   = The function to execute.
506  *  args = Arguments to the function.
507  *
508  * Returns:
509  *  A `Tid` representing the new logical thread.
510  *
511  * Notes:
512  *  `args` must not have unshared aliasing.  In other words, all arguments
513  *  to `fn` must either be `shared` or `immutable` or have no
514  *  pointer indirection.  This is necessary for enforcing isolation among
515  *  threads.
516  *
517  * Similarly, if `fn` is a delegate, it must not have unshared aliases, meaning
518  * `fn` must be either `shared` or `immutable`. */
519 Tid spawn(F, T...)(F fn, T args)
520 if (isSpawnable!(F, T))
521 {
522     static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
523     return _spawn(false, fn, args);
524 }
525 
526 ///
527 @system unittest
528 {
529     static void f(string msg)
530     {
531         assert(msg == "Hello World");
532     }
533 
534     auto tid = spawn(&f, "Hello World");
535 }
536 
537 /// Fails: char[] has mutable aliasing.
538 @system unittest
539 {
540     string msg = "Hello, World!";
541 
542     static void f1(string msg) {}
543     static assert(!__traits(compiles, spawn(&f1, msg.dup)));
544     static assert( __traits(compiles, spawn(&f1, msg.idup)));
545 
546     static void f2(char[] msg) {}
547     static assert(!__traits(compiles, spawn(&f2, msg.dup)));
548     static assert(!__traits(compiles, spawn(&f2, msg.idup)));
549 }
550 
551 /// New thread with anonymous function
552 @system unittest
553 {
554     spawn({
555         ownerTid.send("This is so great!");
556     });
557     assert(receiveOnly!string == "This is so great!");
558 }
559 
560 @system unittest
561 {
562     import core.thread : thread_joinAll;
563 
564     __gshared string receivedMessage;
565     static void f1(string msg)
566     {
567         receivedMessage = msg;
568     }
569 
570     auto tid1 = spawn(&f1, "Hello World");
571     thread_joinAll;
572     assert(receivedMessage == "Hello World");
573 }
574 
575 /**
576  * Starts `fn(args)` in a logical thread and will receive a `LinkTerminated`
577  * message when the operation terminates.
578  *
579  * Executes the supplied function in a new logical thread represented by
580  * `Tid`.  This new thread is linked to the calling thread so that if either
581  * it or the calling thread terminates a `LinkTerminated` message will be sent
582  * to the other, causing a `LinkTerminated` exception to be thrown on `receive()`.
583  * The owner relationship from `spawn()` is preserved as well, so if the link
584  * between threads is broken, owner termination will still result in an
585  * `OwnerTerminated` exception to be thrown on `receive()`.
586  *
587  * Params:
588  *  fn   = The function to execute.
589  *  args = Arguments to the function.
590  *
591  * Returns:
592  *  A Tid representing the new thread.
593  */
594 Tid spawnLinked(F, T...)(F fn, T args)
595 if (isSpawnable!(F, T))
596 {
597     static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
598     return _spawn(true, fn, args);
599 }
600 
601 /*
602  *
603  */
604 private Tid _spawn(F, T...)(bool linked, F fn, T args)
605 if (isSpawnable!(F, T))
606 {
607     // TODO: MessageList and &exec should be shared.
608     auto spawnTid = Tid(new MessageBox);
609     auto ownerTid = thisTid;
610 
611     void exec()
612     {
613         thisInfo.ident = spawnTid;
614         thisInfo.owner = ownerTid;
615         fn(args);
616     }
617 
618     // TODO: MessageList and &exec should be shared.
619     if (scheduler !is null)
620         scheduler.spawn(&exec);
621     else
622     {
623         auto t = new Thread(&exec);
624         t.start();
625     }
626     thisInfo.links[spawnTid] = linked;
627     return spawnTid;
628 }
629 
630 @system unittest
631 {
632     void function() fn1;
633     void function(int) fn2;
634     static assert(__traits(compiles, spawn(fn1)));
635     static assert(__traits(compiles, spawn(fn2, 2)));
636     static assert(!__traits(compiles, spawn(fn1, 1)));
637     static assert(!__traits(compiles, spawn(fn2)));
638 
639     void delegate(int) shared dg1;
640     shared(void delegate(int)) dg2;
641     shared(void delegate(long) shared) dg3;
642     shared(void delegate(real, int, long) shared) dg4;
643     void delegate(int) immutable dg5;
644     void delegate(int) dg6;
645     static assert(__traits(compiles, spawn(dg1, 1)));
646     static assert(__traits(compiles, spawn(dg2, 2)));
647     static assert(__traits(compiles, spawn(dg3, 3)));
648     static assert(__traits(compiles, spawn(dg4, 4, 4, 4)));
649     static assert(__traits(compiles, spawn(dg5, 5)));
650     static assert(!__traits(compiles, spawn(dg6, 6)));
651 
652     auto callable1  = new class{ void opCall(int) shared {} };
653     auto callable2  = cast(shared) new class{ void opCall(int) shared {} };
654     auto callable3  = new class{ void opCall(int) immutable {} };
655     auto callable4  = cast(immutable) new class{ void opCall(int) immutable {} };
656     auto callable5  = new class{ void opCall(int) {} };
657     auto callable6  = cast(shared) new class{ void opCall(int) immutable {} };
658     auto callable7  = cast(immutable) new class{ void opCall(int) shared {} };
659     auto callable8  = cast(shared) new class{ void opCall(int) const shared {} };
660     auto callable9  = cast(const shared) new class{ void opCall(int) shared {} };
661     auto callable10 = cast(const shared) new class{ void opCall(int) const shared {} };
662     auto callable11 = cast(immutable) new class{ void opCall(int) const shared {} };
663     static assert(!__traits(compiles, spawn(callable1,  1)));
664     static assert( __traits(compiles, spawn(callable2,  2)));
665     static assert(!__traits(compiles, spawn(callable3,  3)));
666     static assert( __traits(compiles, spawn(callable4,  4)));
667     static assert(!__traits(compiles, spawn(callable5,  5)));
668     static assert(!__traits(compiles, spawn(callable6,  6)));
669     static assert(!__traits(compiles, spawn(callable7,  7)));
670     static assert( __traits(compiles, spawn(callable8,  8)));
671     static assert(!__traits(compiles, spawn(callable9,  9)));
672     static assert( __traits(compiles, spawn(callable10, 10)));
673     static assert( __traits(compiles, spawn(callable11, 11)));
674 }
675 
676 /**
677  * Places the values as a message at the back of tid's message queue.
678  *
679  * Sends the supplied value to the thread represented by tid.  As with
680  * $(REF spawn, std,concurrency), `T` must not have unshared aliasing.
681  */
682 void send(T...)(Tid tid, T vals)
683 in (tid.mbox !is null)
684 {
685     static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
686     _send(tid, vals);
687 }
688 
689 /**
690  * Places the values as a message on the front of tid's message queue.
691  *
692  * Send a message to `tid` but place it at the front of `tid`'s message
693  * queue instead of at the back.  This function is typically used for
694  * out-of-band communication, to signal exceptional conditions, etc.
695  */
696 void prioritySend(T...)(Tid tid, T vals)
697 in (tid.mbox !is null)
698 {
699     static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
700     _send(MsgType.priority, tid, vals);
701 }
702 
703 /*
704  * ditto
705  */
706 private void _send(T...)(Tid tid, T vals)
707 in (tid.mbox !is null)
708 {
709     _send(MsgType.standard, tid, vals);
710 }
711 
712 /*
713  * Implementation of send.  This allows parameter checking to be different for
714  * both Tid.send() and .send().
715  */
716 private void _send(T...)(MsgType type, Tid tid, T vals)
717 in (tid.mbox !is null)
718 {
719     auto msg = Message(type, vals);
720     tid.mbox.put(msg);
721 }
722 
723 /**
724  * Receives a message from another thread.
725  *
726  * Receive a message from another thread, or block if no messages of the
727  * specified types are available.  This function works by pattern matching
728  * a message against a set of delegates and executing the first match found.
729  *
730  * If a delegate that accepts a $(REF Variant, std,variant) is included as
731  * the last argument to `receive`, it will match any message that was not
732  * matched by an earlier delegate.  If more than one argument is sent,
733  * the `Variant` will contain a $(REF Tuple, std,typecons) of all values
734  * sent.
735  *
736  * Params:
737  *     ops = Variadic list of function pointers and delegates. Entries
738  *           in this list must not occlude later entries.
739  *
740  * Throws: $(LREF OwnerTerminated) when the sending thread was terminated.
741  */
742 void receive(T...)( T ops )
743 in
744 {
745     assert(thisInfo.ident.mbox !is null,
746            "Cannot receive a message until a thread was spawned "
747            ~ "or thisTid was passed to a running thread.");
748 }
749 do
750 {
751     checkops( ops );
752 
753     thisInfo.ident.mbox.get( ops );
754 }
755 
756 ///
757 @system unittest
758 {
759     import std.variant : Variant;
760 
761     auto process = ()
762     {
763         receive(
764             (int i) { ownerTid.send(1); },
765             (double f) { ownerTid.send(2); },
766             (Variant v) { ownerTid.send(3); }
767         );
768     };
769 
770     {
771         auto tid = spawn(process);
772         send(tid, 42);
773         assert(receiveOnly!int == 1);
774     }
775 
776     {
777         auto tid = spawn(process);
778         send(tid, 3.14);
779         assert(receiveOnly!int == 2);
780     }
781 
782     {
783         auto tid = spawn(process);
784         send(tid, "something else");
785         assert(receiveOnly!int == 3);
786     }
787 }
788 
789 @safe unittest
790 {
791     static assert( __traits( compiles,
792                       {
793                           receive( (Variant x) {} );
794                           receive( (int x) {}, (Variant x) {} );
795                       } ) );
796 
797     static assert( !__traits( compiles,
798                        {
799                            receive( (Variant x) {}, (int x) {} );
800                        } ) );
801 
802     static assert( !__traits( compiles,
803                        {
804                            receive( (int x) {}, (int x) {} );
805                        } ) );
806 }
807 
808 // Make sure receive() works with free functions as well.
809 version (StdUnittest)
810 {
811     private void receiveFunction(int x) {}
812 }
813 @safe unittest
814 {
815     static assert( __traits( compiles,
816                       {
817                           receive( &receiveFunction );
818                           receive( &receiveFunction, (Variant x) {} );
819                       } ) );
820 }
821 
822 
823 private template receiveOnlyRet(T...)
824 {
825     static if ( T.length == 1 )
826     {
827         alias receiveOnlyRet = T[0];
828     }
829     else
830     {
831         import std.typecons : Tuple;
832         alias receiveOnlyRet = Tuple!(T);
833     }
834 }
835 
836 /**
837  * Receives only messages with arguments of the specified types.
838  *
839  * Params:
840  *     T = Variadic list of types to be received.
841  *
842  * Returns: The received message.  If `T` has more than one entry,
843  *          the message will be packed into a $(REF Tuple, std,typecons).
844  *
845  * Throws: $(LREF MessageMismatch) if a message of types other than `T`
846  *         is received,
847  *         $(LREF OwnerTerminated) when the sending thread was terminated.
848  */
849 receiveOnlyRet!(T) receiveOnly(T...)()
850 in
851 {
852     assert(thisInfo.ident.mbox !is null,
853         "Cannot receive a message until a thread was spawned or thisTid was passed to a running thread.");
854 }
855 do
856 {
857     import std.format : format;
858     import std.meta : allSatisfy;
859     import std.typecons : Tuple;
860 
861     Tuple!(T) ret;
862 
863     thisInfo.ident.mbox.get((T val) {
864         static if (T.length)
865         {
866             static if (allSatisfy!(isAssignable, T))
867             {
868                 ret.field = val;
869             }
870             else
871             {
872                 import core.lifetime : emplace;
873                 emplace(&ret, val);
874             }
875         }
876     },
877     (LinkTerminated e) { throw e; },
878     (OwnerTerminated e) { throw e; },
879     (Variant val) {
880         static if (T.length > 1)
881             string exp = T.stringof;
882         else
883             string exp = T[0].stringof;
884 
885         throw new MessageMismatch(
886             format("Unexpected message type: expected '%s', got '%s'", exp, val.type.toString()));
887     });
888     static if (T.length == 1)
889         return ret[0];
890     else
891         return ret;
892 }
893 
894 ///
895 @system unittest
896 {
897     auto tid = spawn(
898     {
899         assert(receiveOnly!int == 42);
900     });
901     send(tid, 42);
902 }
903 
904 ///
905 @system unittest
906 {
907     auto tid = spawn(
908     {
909         assert(receiveOnly!string == "text");
910     });
911     send(tid, "text");
912 }
913 
914 ///
915 @system unittest
916 {
917     struct Record { string name; int age; }
918 
919     auto tid = spawn(
920     {
921         auto msg = receiveOnly!(double, Record);
922         assert(msg[0] == 0.5);
923         assert(msg[1].name == "Alice");
924         assert(msg[1].age == 31);
925     });
926 
927     send(tid, 0.5, Record("Alice", 31));
928 }
929 
930 @system unittest
931 {
932     static void t1(Tid mainTid)
933     {
934         try
935         {
936             receiveOnly!string();
937             mainTid.send("");
938         }
939         catch (Throwable th)
940         {
941             mainTid.send(th.msg);
942         }
943     }
944 
945     auto tid = spawn(&t1, thisTid);
946     tid.send(1);
947     string result = receiveOnly!string();
948     assert(result == "Unexpected message type: expected 'string', got 'int'");
949 }
950 
951 // https://issues.dlang.org/show_bug.cgi?id=21663
952 @safe unittest
953 {
954     alias test = receiveOnly!(string, bool, bool);
955 }
956 
957 /**
958  * Receives a message from another thread and gives up if no match
959  * arrives within a specified duration.
960  *
961  * Receive a message from another thread, or block until `duration` exceeds,
962  * if no messages of the specified types are available. This function works
963  * by pattern matching a message against a set of delegates and executing
964  * the first match found.
965  *
966  * If a delegate that accepts a $(REF Variant, std,variant) is included as
967  * the last argument, it will match any message that was not
968  * matched by an earlier delegate.  If more than one argument is sent,
969  * the `Variant` will contain a $(REF Tuple, std,typecons) of all values
970  * sent.
971  *
972  * Params:
973  *     duration = Duration, how long to wait. If `duration` is negative,
974  *                won't wait at all.
975  *     ops = Variadic list of function pointers and delegates. Entries
976  *           in this list must not occlude later entries.
977  *
978  * Returns: `true` if it received a message and `false` if it timed out waiting
979  *          for one.
980  *
981  * Throws: $(LREF OwnerTerminated) when the sending thread was terminated.
982  */
983 bool receiveTimeout(T...)(Duration duration, T ops)
984 in
985 {
986     assert(thisInfo.ident.mbox !is null,
987         "Cannot receive a message until a thread was spawned or thisTid was passed to a running thread.");
988 }
989 do
990 {
991     checkops(ops);
992 
993     return thisInfo.ident.mbox.get(duration, ops);
994 }
995 
996 @safe unittest
997 {
998     static assert(__traits(compiles, {
999         receiveTimeout(msecs(0), (Variant x) {});
1000         receiveTimeout(msecs(0), (int x) {}, (Variant x) {});
1001     }));
1002 
1003     static assert(!__traits(compiles, {
1004         receiveTimeout(msecs(0), (Variant x) {}, (int x) {});
1005     }));
1006 
1007     static assert(!__traits(compiles, {
1008         receiveTimeout(msecs(0), (int x) {}, (int x) {});
1009     }));
1010 
1011     static assert(__traits(compiles, {
1012         receiveTimeout(msecs(10), (int x) {}, (Variant x) {});
1013     }));
1014 }
1015 
1016 // MessageBox Limits
1017 
1018 /**
1019  * These behaviors may be specified when a mailbox is full.
1020  */
1021 enum OnCrowding
1022 {
1023     block, /// Wait until room is available.
1024     throwException, /// Throw a $(LREF MailboxFull) exception.
1025     ignore /// Abort the send and return.
1026 }
1027 
1028 private
1029 {
1030     bool onCrowdingBlock(Tid tid) @safe pure nothrow @nogc
1031     {
1032         return true;
1033     }
1034 
1035     bool onCrowdingThrow(Tid tid) @safe pure
1036     {
1037         throw new MailboxFull(tid);
1038     }
1039 
1040     bool onCrowdingIgnore(Tid tid) @safe pure nothrow @nogc
1041     {
1042         return false;
1043     }
1044 }
1045 
1046 /**
1047  * Sets a maximum mailbox size.
1048  *
1049  * Sets a limit on the maximum number of user messages allowed in the mailbox.
1050  * If this limit is reached, the caller attempting to add a new message will
1051  * execute the behavior specified by doThis.  If messages is zero, the mailbox
1052  * is unbounded.
1053  *
1054  * Params:
1055  *  tid      = The Tid of the thread for which this limit should be set.
1056  *  messages = The maximum number of messages or zero if no limit.
1057  *  doThis   = The behavior executed when a message is sent to a full
1058  *             mailbox.
1059  */
1060 void setMaxMailboxSize(Tid tid, size_t messages, OnCrowding doThis) @safe pure
1061 in (tid.mbox !is null)
1062 {
1063     final switch (doThis)
1064     {
1065     case OnCrowding.block:
1066         return tid.mbox.setMaxMsgs(messages, &onCrowdingBlock);
1067     case OnCrowding.throwException:
1068         return tid.mbox.setMaxMsgs(messages, &onCrowdingThrow);
1069     case OnCrowding.ignore:
1070         return tid.mbox.setMaxMsgs(messages, &onCrowdingIgnore);
1071     }
1072 }
1073 
1074 /**
1075  * Sets a maximum mailbox size.
1076  *
1077  * Sets a limit on the maximum number of user messages allowed in the mailbox.
1078  * If this limit is reached, the caller attempting to add a new message will
1079  * execute onCrowdingDoThis.  If messages is zero, the mailbox is unbounded.
1080  *
1081  * Params:
1082  *  tid      = The Tid of the thread for which this limit should be set.
1083  *  messages = The maximum number of messages or zero if no limit.
1084  *  onCrowdingDoThis = The routine called when a message is sent to a full
1085  *                     mailbox.
1086  */
1087 void setMaxMailboxSize(Tid tid, size_t messages, bool function(Tid) onCrowdingDoThis)
1088 in (tid.mbox !is null)
1089 {
1090     tid.mbox.setMaxMsgs(messages, onCrowdingDoThis);
1091 }
1092 
1093 private
1094 {
1095     __gshared Tid[string] tidByName;
1096     __gshared string[][Tid] namesByTid;
1097 }
1098 
1099 private @property Mutex registryLock()
1100 {
1101     __gshared Mutex impl;
1102     initOnce!impl(new Mutex);
1103     return impl;
1104 }
1105 
1106 private void unregisterMe(ref ThreadInfo me)
1107 {
1108     if (me.ident != Tid.init)
1109     {
1110         synchronized (registryLock)
1111         {
1112             if (auto allNames = me.ident in namesByTid)
1113             {
1114                 foreach (name; *allNames)
1115                     tidByName.remove(name);
1116                 namesByTid.remove(me.ident);
1117             }
1118         }
1119     }
1120 }
1121 
1122 /**
1123  * Associates name with tid.
1124  *
1125  * Associates name with tid in a process-local map.  When the thread
1126  * represented by tid terminates, any names associated with it will be
1127  * automatically unregistered.
1128  *
1129  * Params:
1130  *  name = The name to associate with tid.
1131  *  tid  = The tid register by name.
1132  *
1133  * Returns:
1134  *  true if the name is available and tid is not known to represent a
1135  *  defunct thread.
1136  */
1137 bool register(string name, Tid tid)
1138 in (tid.mbox !is null)
1139 {
1140     synchronized (registryLock)
1141     {
1142         if (name in tidByName)
1143             return false;
1144         if (tid.mbox.isClosed)
1145             return false;
1146         namesByTid[tid] ~= name;
1147         tidByName[name] = tid;
1148         return true;
1149     }
1150 }
1151 
1152 /**
1153  * Removes the registered name associated with a tid.
1154  *
1155  * Params:
1156  *  name = The name to unregister.
1157  *
1158  * Returns:
1159  *  true if the name is registered, false if not.
1160  */
1161 bool unregister(string name)
1162 {
1163     import std.algorithm.mutation : remove, SwapStrategy;
1164     import std.algorithm.searching : countUntil;
1165 
1166     synchronized (registryLock)
1167     {
1168         if (auto tid = name in tidByName)
1169         {
1170             auto allNames = *tid in namesByTid;
1171             auto pos = countUntil(*allNames, name);
1172             remove!(SwapStrategy.unstable)(*allNames, pos);
1173             tidByName.remove(name);
1174             return true;
1175         }
1176         return false;
1177     }
1178 }
1179 
1180 /**
1181  * Gets the `Tid` associated with name.
1182  *
1183  * Params:
1184  *  name = The name to locate within the registry.
1185  *
1186  * Returns:
1187  *  The associated `Tid` or `Tid.init` if name is not registered.
1188  */
1189 Tid locate(string name)
1190 {
1191     synchronized (registryLock)
1192     {
1193         if (auto tid = name in tidByName)
1194             return *tid;
1195         return Tid.init;
1196     }
1197 }
1198 
1199 /**
1200  * Encapsulates all implementation-level data needed for scheduling.
1201  *
1202  * When defining a $(LREF Scheduler), an instance of this struct must be associated
1203  * with each logical thread.  It contains all implementation-level information
1204  * needed by the internal API.
1205  */
1206 struct ThreadInfo
1207 {
1208     Tid ident;
1209     bool[Tid] links;
1210     Tid owner;
1211 
1212     /**
1213      * Gets a thread-local instance of `ThreadInfo`.
1214      *
1215      * Gets a thread-local instance of `ThreadInfo`, which should be used as the
1216      * default instance when info is requested for a thread not created by the
1217      * `Scheduler`.
1218      */
1219     static @property ref thisInfo() nothrow
1220     {
1221         static ThreadInfo val;
1222         return val;
1223     }
1224 
1225     /**
1226      * Cleans up this ThreadInfo.
1227      *
1228      * This must be called when a scheduled thread terminates.  It tears down
1229      * the messaging system for the thread and notifies interested parties of
1230      * the thread's termination.
1231      */
1232     void cleanup()
1233     {
1234         if (ident.mbox !is null)
1235             ident.mbox.close();
1236         foreach (tid; links.keys)
1237             _send(MsgType.linkDead, tid, ident);
1238         if (owner != Tid.init)
1239             _send(MsgType.linkDead, owner, ident);
1240         unregisterMe(this); // clean up registry entries
1241     }
1242 
1243     // https://issues.dlang.org/show_bug.cgi?id=20160
1244     @system unittest
1245     {
1246         register("main_thread", thisTid());
1247 
1248         ThreadInfo t;
1249         t.cleanup();
1250 
1251         assert(locate("main_thread") == thisTid());
1252     }
1253 }
1254 
1255 /**
1256  * A `Scheduler` controls how threading is performed by spawn.
1257  *
1258  * Implementing a `Scheduler` allows the concurrency mechanism used by this
1259  * module to be customized according to different needs.  By default, a call
1260  * to spawn will create a new kernel thread that executes the supplied routine
1261  * and terminates when finished.  But it is possible to create `Scheduler`s that
1262  * reuse threads, that multiplex `Fiber`s (coroutines) across a single thread,
1263  * or any number of other approaches.  By making the choice of `Scheduler` a
1264  * user-level option, `std.concurrency` may be used for far more types of
1265  * application than if this behavior were predefined.
1266  *
1267  * Example:
1268  * ---
1269  * import std.concurrency;
1270  * import std.stdio;
1271  *
1272  * void main()
1273  * {
1274  *     scheduler = new FiberScheduler;
1275  *     scheduler.start(
1276  *     {
1277  *         writeln("the rest of main goes here");
1278  *     });
1279  * }
1280  * ---
1281  *
1282  * Some schedulers have a dispatching loop that must run if they are to work
1283  * properly, so for the sake of consistency, when using a scheduler, `start()`
1284  * must be called within `main()`.  This yields control to the scheduler and
1285  * will ensure that any spawned threads are executed in an expected manner.
1286  */
1287 interface Scheduler
1288 {
1289     /**
1290      * Spawns the supplied op and starts the `Scheduler`.
1291      *
1292      * This is intended to be called at the start of the program to yield all
1293      * scheduling to the active `Scheduler` instance.  This is necessary for
1294      * schedulers that explicitly dispatch threads rather than simply relying
1295      * on the operating system to do so, and so start should always be called
1296      * within `main()` to begin normal program execution.
1297      *
1298      * Params:
1299      *  op = A wrapper for whatever the main thread would have done in the
1300      *       absence of a custom scheduler.  It will be automatically executed
1301      *       via a call to spawn by the `Scheduler`.
1302      */
1303     void start(void delegate() op);
1304 
1305     /**
1306      * Assigns a logical thread to execute the supplied op.
1307      *
1308      * This routine is called by spawn.  It is expected to instantiate a new
1309      * logical thread and run the supplied operation.  This thread must call
1310      * `thisInfo.cleanup()` when the thread terminates if the scheduled thread
1311      * is not a kernel thread--all kernel threads will have their `ThreadInfo`
1312      * cleaned up automatically by a thread-local destructor.
1313      *
1314      * Params:
1315      *  op = The function to execute.  This may be the actual function passed
1316      *       by the user to spawn itself, or may be a wrapper function.
1317      */
1318     void spawn(void delegate() op);
1319 
1320     /**
1321      * Yields execution to another logical thread.
1322      *
1323      * This routine is called at various points within concurrency-aware APIs
1324      * to provide a scheduler a chance to yield execution when using some sort
1325      * of cooperative multithreading model.  If this is not appropriate, such
1326      * as when each logical thread is backed by a dedicated kernel thread,
1327      * this routine may be a no-op.
1328      */
1329     void yield() nothrow;
1330 
1331     /**
1332      * Returns an appropriate `ThreadInfo` instance.
1333      *
1334      * Returns an instance of `ThreadInfo` specific to the logical thread that
1335      * is calling this routine or, if the calling thread was not create by
1336      * this scheduler, returns `ThreadInfo.thisInfo` instead.
1337      */
1338     @property ref ThreadInfo thisInfo() nothrow;
1339 
1340     /**
1341      * Creates a `Condition` variable analog for signaling.
1342      *
1343      * Creates a new `Condition` variable analog which is used to check for and
1344      * to signal the addition of messages to a thread's message queue.  Like
1345      * yield, some schedulers may need to define custom behavior so that calls
1346      * to `Condition.wait()` yield to another thread when no new messages are
1347      * available instead of blocking.
1348      *
1349      * Params:
1350      *  m = The `Mutex` that will be associated with this condition.  It will be
1351      *      locked prior to any operation on the condition, and so in some
1352      *      cases a `Scheduler` may need to hold this reference and unlock the
1353      *      mutex before yielding execution to another logical thread.
1354      */
1355     Condition newCondition(Mutex m) nothrow;
1356 }
1357 
1358 /**
1359  * An example `Scheduler` using kernel threads.
1360  *
1361  * This is an example `Scheduler` that mirrors the default scheduling behavior
1362  * of creating one kernel thread per call to spawn.  It is fully functional
1363  * and may be instantiated and used, but is not a necessary part of the
1364  * default functioning of this module.
1365  */
1366 class ThreadScheduler : Scheduler
1367 {
1368     /**
1369      * This simply runs op directly, since no real scheduling is needed by
1370      * this approach.
1371      */
1372     void start(void delegate() op)
1373     {
1374         op();
1375     }
1376 
1377     /**
1378      * Creates a new kernel thread and assigns it to run the supplied op.
1379      */
1380     void spawn(void delegate() op)
1381     {
1382         auto t = new Thread(op);
1383         t.start();
1384     }
1385 
1386     /**
1387      * This scheduler does no explicit multiplexing, so this is a no-op.
1388      */
1389     void yield() nothrow
1390     {
1391         // no explicit yield needed
1392     }
1393 
1394     /**
1395      * Returns `ThreadInfo.thisInfo`, since it is a thread-local instance of
1396      * `ThreadInfo`, which is the correct behavior for this scheduler.
1397      */
1398     @property ref ThreadInfo thisInfo() nothrow
1399     {
1400         return ThreadInfo.thisInfo;
1401     }
1402 
1403     /**
1404      * Creates a new `Condition` variable.  No custom behavior is needed here.
1405      */
1406     Condition newCondition(Mutex m) nothrow
1407     {
1408         return new Condition(m);
1409     }
1410 }
1411 
1412 /**
1413  * An example `Scheduler` using $(MREF_ALTTEXT `Fiber`s, core, thread, fiber).
1414  *
1415  * This is an example scheduler that creates a new `Fiber` per call to spawn
1416  * and multiplexes the execution of all fibers within the main thread.
1417  */
1418 class FiberScheduler : Scheduler
1419 {
1420     /**
1421      * This creates a new `Fiber` for the supplied op and then starts the
1422      * dispatcher.
1423      */
1424     void start(void delegate() op)
1425     {
1426         create(op);
1427         dispatch();
1428     }
1429 
1430     /**
1431      * This created a new `Fiber` for the supplied op and adds it to the
1432      * dispatch list.
1433      */
1434     void spawn(void delegate() op) nothrow
1435     {
1436         create(op);
1437         yield();
1438     }
1439 
1440     /**
1441      * If the caller is a scheduled `Fiber`, this yields execution to another
1442      * scheduled `Fiber`.
1443      */
1444     void yield() nothrow
1445     {
1446         // NOTE: It's possible that we should test whether the calling Fiber
1447         //       is an InfoFiber before yielding, but I think it's reasonable
1448         //       that any (non-Generator) fiber should yield here.
1449         if (Fiber.getThis())
1450             Fiber.yield();
1451     }
1452 
1453     /**
1454      * Returns an appropriate `ThreadInfo` instance.
1455      *
1456      * Returns a `ThreadInfo` instance specific to the calling `Fiber` if the
1457      * `Fiber` was created by this dispatcher, otherwise it returns
1458      * `ThreadInfo.thisInfo`.
1459      */
1460     @property ref ThreadInfo thisInfo() nothrow
1461     {
1462         auto f = cast(InfoFiber) Fiber.getThis();
1463 
1464         if (f !is null)
1465             return f.info;
1466         return ThreadInfo.thisInfo;
1467     }
1468 
1469     /**
1470      * Returns a `Condition` analog that yields when wait or notify is called.
1471      *
1472      * Bug:
1473      * For the default implementation, `notifyAll` will behave like `notify`.
1474      *
1475      * Params:
1476      *   m = A `Mutex` to use for locking if the condition needs to be waited on
1477      *       or notified from multiple `Thread`s.
1478      *       If `null`, no `Mutex` will be used and it is assumed that the
1479      *       `Condition` is only waited on/notified from one `Thread`.
1480      */
1481     Condition newCondition(Mutex m) nothrow
1482     {
1483         return new FiberCondition(m);
1484     }
1485 
1486 protected:
1487     /**
1488      * Creates a new `Fiber` which calls the given delegate.
1489      *
1490      * Params:
1491      *   op = The delegate the fiber should call
1492      */
1493     void create(void delegate() op) nothrow
1494     {
1495         void wrap()
1496         {
1497             scope (exit)
1498             {
1499                 thisInfo.cleanup();
1500             }
1501             op();
1502         }
1503 
1504         m_fibers ~= new InfoFiber(&wrap);
1505     }
1506 
1507     /**
1508      * `Fiber` which embeds a `ThreadInfo`
1509      */
1510     static class InfoFiber : Fiber
1511     {
1512         ThreadInfo info;
1513 
1514         this(void delegate() op) nothrow
1515         {
1516             super(op);
1517         }
1518 
1519         this(void delegate() op, size_t sz) nothrow
1520         {
1521             super(op, sz);
1522         }
1523     }
1524 
1525 private:
1526     class FiberCondition : Condition
1527     {
1528         this(Mutex m) nothrow
1529         {
1530             super(m);
1531             notified = false;
1532         }
1533 
1534         override void wait() nothrow
1535         {
1536             scope (exit) notified = false;
1537 
1538             while (!notified)
1539                 switchContext();
1540         }
1541 
1542         override bool wait(Duration period) nothrow
1543         {
1544             import core.time : MonoTime;
1545 
1546             scope (exit) notified = false;
1547 
1548             for (auto limit = MonoTime.currTime + period;
1549                  !notified && !period.isNegative;
1550                  period = limit - MonoTime.currTime)
1551             {
1552                 this.outer.yield();
1553             }
1554             return notified;
1555         }
1556 
1557         override void notify() nothrow
1558         {
1559             notified = true;
1560             switchContext();
1561         }
1562 
1563         override void notifyAll() nothrow
1564         {
1565             notified = true;
1566             switchContext();
1567         }
1568 
1569     private:
1570         void switchContext() nothrow
1571         {
1572             if (mutex_nothrow) mutex_nothrow.unlock_nothrow();
1573             scope (exit)
1574                 if (mutex_nothrow)
1575                     mutex_nothrow.lock_nothrow();
1576             this.outer.yield();
1577         }
1578 
1579         bool notified;
1580     }
1581 
1582     void dispatch()
1583     {
1584         import std.algorithm.mutation : remove;
1585 
1586         while (m_fibers.length > 0)
1587         {
1588             auto t = m_fibers[m_pos].call(Fiber.Rethrow.no);
1589             if (t !is null && !(cast(OwnerTerminated) t))
1590             {
1591                 throw t;
1592             }
1593             if (m_fibers[m_pos].state == Fiber.State.TERM)
1594             {
1595                 if (m_pos >= (m_fibers = remove(m_fibers, m_pos)).length)
1596                     m_pos = 0;
1597             }
1598             else if (m_pos++ >= m_fibers.length - 1)
1599             {
1600                 m_pos = 0;
1601             }
1602         }
1603     }
1604 
1605     Fiber[] m_fibers;
1606     size_t m_pos;
1607 }
1608 
1609 @system unittest
1610 {
1611     static void receive(Condition cond, ref size_t received)
1612     {
1613         while (true)
1614         {
1615             synchronized (cond.mutex)
1616             {
1617                 cond.wait();
1618                 ++received;
1619             }
1620         }
1621     }
1622 
1623     static void send(Condition cond, ref size_t sent)
1624     {
1625         while (true)
1626         {
1627             synchronized (cond.mutex)
1628             {
1629                 ++sent;
1630                 cond.notify();
1631             }
1632         }
1633     }
1634 
1635     auto fs = new FiberScheduler;
1636     auto mtx = new Mutex;
1637     auto cond = fs.newCondition(mtx);
1638 
1639     size_t received, sent;
1640     auto waiter = new Fiber({ receive(cond, received); }), notifier = new Fiber({ send(cond, sent); });
1641     waiter.call();
1642     assert(received == 0);
1643     notifier.call();
1644     assert(sent == 1);
1645     assert(received == 0);
1646     waiter.call();
1647     assert(received == 1);
1648     waiter.call();
1649     assert(received == 1);
1650 }
1651 
1652 /**
1653  * Sets the `Scheduler` behavior within the program.
1654  *
1655  * This variable sets the `Scheduler` behavior within this program.  Typically,
1656  * when setting a `Scheduler`, `scheduler.start()` should be called in `main`.  This
1657  * routine will not return until program execution is complete.
1658  */
1659 __gshared Scheduler scheduler;
1660 
1661 // Generator
1662 
1663 /**
1664  * If the caller is a `Fiber` and is not a $(LREF Generator), this function will call
1665  * `scheduler.yield()` or `Fiber.yield()`, as appropriate.
1666  */
1667 void yield() nothrow
1668 {
1669     auto fiber = Fiber.getThis();
1670     if (!(cast(IsGenerator) fiber))
1671     {
1672         if (scheduler is null)
1673         {
1674             if (fiber)
1675                 return Fiber.yield();
1676         }
1677         else
1678             scheduler.yield();
1679     }
1680 }
1681 
1682 /// Used to determine whether a Generator is running.
1683 private interface IsGenerator {}
1684 
1685 
1686 /**
1687  * A Generator is a $(MREF_ALTTEXT Fiber, core, thread, fiber)
1688  * that periodically returns values of type `T` to the
1689  * caller via `yield`.  This is represented as an InputRange.
1690  */
1691 class Generator(T) :
1692     Fiber, IsGenerator, InputRange!T
1693 {
1694     /**
1695      * Initializes a generator object which is associated with a static
1696      * D function.  The function will be called once to prepare the range
1697      * for iteration.
1698      *
1699      * Params:
1700      *  fn = The fiber function.
1701      *
1702      * In:
1703      *  fn must not be null.
1704      */
1705     this(void function() fn)
1706     {
1707         super(fn);
1708         call();
1709     }
1710 
1711     /**
1712      * Initializes a generator object which is associated with a static
1713      * D function.  The function will be called once to prepare the range
1714      * for iteration.
1715      *
1716      * Params:
1717      *  fn = The fiber function.
1718      *  sz = The stack size for this fiber.
1719      *
1720      * In:
1721      *  fn must not be null.
1722      */
1723     this(void function() fn, size_t sz)
1724     {
1725         super(fn, sz);
1726         call();
1727     }
1728 
1729     /**
1730      * Initializes a generator object which is associated with a static
1731      * D function.  The function will be called once to prepare the range
1732      * for iteration.
1733      *
1734      * Params:
1735      *  fn = The fiber function.
1736      *  sz = The stack size for this fiber.
1737      *  guardPageSize = size of the guard page to trap fiber's stack
1738      *                  overflows. Refer to $(REF Fiber, core,thread)'s
1739      *                  documentation for more details.
1740      *
1741      * In:
1742      *  fn must not be null.
1743      */
1744     this(void function() fn, size_t sz, size_t guardPageSize)
1745     {
1746         super(fn, sz, guardPageSize);
1747         call();
1748     }
1749 
1750     /**
1751      * Initializes a generator object which is associated with a dynamic
1752      * D function.  The function will be called once to prepare the range
1753      * for iteration.
1754      *
1755      * Params:
1756      *  dg = The fiber function.
1757      *
1758      * In:
1759      *  dg must not be null.
1760      */
1761     this(void delegate() dg)
1762     {
1763         super(dg);
1764         call();
1765     }
1766 
1767     /**
1768      * Initializes a generator object which is associated with a dynamic
1769      * D function.  The function will be called once to prepare the range
1770      * for iteration.
1771      *
1772      * Params:
1773      *  dg = The fiber function.
1774      *  sz = The stack size for this fiber.
1775      *
1776      * In:
1777      *  dg must not be null.
1778      */
1779     this(void delegate() dg, size_t sz)
1780     {
1781         super(dg, sz);
1782         call();
1783     }
1784 
1785     /**
1786      * Initializes a generator object which is associated with a dynamic
1787      * D function.  The function will be called once to prepare the range
1788      * for iteration.
1789      *
1790      * Params:
1791      *  dg = The fiber function.
1792      *  sz = The stack size for this fiber.
1793      *  guardPageSize = size of the guard page to trap fiber's stack
1794      *                  overflows. Refer to $(REF Fiber, core,thread)'s
1795      *                  documentation for more details.
1796      *
1797      * In:
1798      *  dg must not be null.
1799      */
1800     this(void delegate() dg, size_t sz, size_t guardPageSize)
1801     {
1802         super(dg, sz, guardPageSize);
1803         call();
1804     }
1805 
1806     /**
1807      * Returns true if the generator is empty.
1808      */
1809     final bool empty() @property
1810     {
1811         return m_value is null || state == State.TERM;
1812     }
1813 
1814     /**
1815      * Obtains the next value from the underlying function.
1816      */
1817     final void popFront()
1818     {
1819         call();
1820     }
1821 
1822     /**
1823      * Returns the most recently generated value by shallow copy.
1824      */
1825     final T front() @property
1826     {
1827         return *m_value;
1828     }
1829 
1830     /**
1831      * Returns the most recently generated value without executing a
1832      * copy contructor. Will not compile for element types defining a
1833      * postblit, because `Generator` does not return by reference.
1834      */
1835     final T moveFront()
1836     {
1837         static if (!hasElaborateCopyConstructor!T)
1838         {
1839             return front;
1840         }
1841         else
1842         {
1843             static assert(0,
1844                     "Fiber front is always rvalue and thus cannot be moved since it defines a postblit.");
1845         }
1846     }
1847 
1848     final int opApply(scope int delegate(T) loopBody)
1849     {
1850         int broken;
1851         for (; !empty; popFront())
1852         {
1853             broken = loopBody(front);
1854             if (broken) break;
1855         }
1856         return broken;
1857     }
1858 
1859     final int opApply(scope int delegate(size_t, T) loopBody)
1860     {
1861         int broken;
1862         for (size_t i; !empty; ++i, popFront())
1863         {
1864             broken = loopBody(i, front);
1865             if (broken) break;
1866         }
1867         return broken;
1868     }
1869 private:
1870     T* m_value;
1871 }
1872 
1873 ///
1874 @system unittest
1875 {
1876     auto tid = spawn({
1877         int i;
1878         while (i < 9)
1879             i = receiveOnly!int;
1880 
1881         ownerTid.send(i * 2);
1882     });
1883 
1884     auto r = new Generator!int({
1885         foreach (i; 1 .. 10)
1886             yield(i);
1887     });
1888 
1889     foreach (e; r)
1890         tid.send(e);
1891 
1892     assert(receiveOnly!int == 18);
1893 }
1894 
1895 /**
1896  * Yields a value of type T to the caller of the currently executing
1897  * generator.
1898  *
1899  * Params:
1900  *  value = The value to yield.
1901  */
1902 void yield(T)(ref T value)
1903 {
1904     Generator!T cur = cast(Generator!T) Fiber.getThis();
1905     if (cur !is null && cur.state == Fiber.State.EXEC)
1906     {
1907         cur.m_value = &value;
1908         return Fiber.yield();
1909     }
1910     throw new Exception("yield(T) called with no active generator for the supplied type");
1911 }
1912 
1913 /// ditto
1914 void yield(T)(T value)
1915 {
1916     yield(value);
1917 }
1918 
1919 @system unittest
1920 {
1921     import core.exception;
1922     import std.exception;
1923 
1924     auto mainTid = thisTid;
1925     alias testdg = () {
1926         auto tid = spawn(
1927         (Tid mainTid) {
1928             int i;
1929             scope (failure) mainTid.send(false);
1930             try
1931             {
1932                 for (i = 1; i < 10; i++)
1933                 {
1934                     if (receiveOnly!int() != i)
1935                     {
1936                         mainTid.send(false);
1937                         break;
1938                     }
1939                 }
1940             }
1941             catch (OwnerTerminated e)
1942             {
1943                 // i will advance 1 past the last value expected
1944                 mainTid.send(i == 4);
1945             }
1946         }, mainTid);
1947         auto r = new Generator!int(
1948         {
1949             assertThrown!Exception(yield(2.0));
1950             yield(); // ensure this is a no-op
1951             yield(1);
1952             yield(); // also once something has been yielded
1953             yield(2);
1954             yield(3);
1955         });
1956 
1957         foreach (e; r)
1958         {
1959             tid.send(e);
1960         }
1961     };
1962 
1963     scheduler = new ThreadScheduler;
1964     scheduler.spawn(testdg);
1965     assert(receiveOnly!bool());
1966 
1967     scheduler = new FiberScheduler;
1968     scheduler.start(testdg);
1969     assert(receiveOnly!bool());
1970     scheduler = null;
1971 }
1972 ///
1973 @system unittest
1974 {
1975     import std.range;
1976 
1977     InputRange!int myIota = iota(10).inputRangeObject;
1978 
1979     myIota.popFront();
1980     myIota.popFront();
1981     assert(myIota.moveFront == 2);
1982     assert(myIota.front == 2);
1983     myIota.popFront();
1984     assert(myIota.front == 3);
1985 
1986     //can be assigned to std.range.interfaces.InputRange directly
1987     myIota = new Generator!int(
1988     {
1989         foreach (i; 0 .. 10) yield(i);
1990     });
1991 
1992     myIota.popFront();
1993     myIota.popFront();
1994     assert(myIota.moveFront == 2);
1995     assert(myIota.front == 2);
1996     myIota.popFront();
1997     assert(myIota.front == 3);
1998 
1999     size_t[2] counter = [0, 0];
2000     foreach (i, unused; myIota) counter[] += [1, i];
2001 
2002     assert(myIota.empty);
2003     assert(counter == [7, 21]);
2004 }
2005 
2006 private
2007 {
2008     /*
2009      * A MessageBox is a message queue for one thread.  Other threads may send
2010      * messages to this owner by calling put(), and the owner receives them by
2011      * calling get().  The put() call is therefore effectively shared and the
2012      * get() call is effectively local.  setMaxMsgs may be used by any thread
2013      * to limit the size of the message queue.
2014      */
2015     class MessageBox
2016     {
2017         this() @trusted nothrow /* TODO: make @safe after relevant druntime PR gets merged */
2018         {
2019             m_lock = new Mutex;
2020             m_closed = false;
2021 
2022             if (scheduler is null)
2023             {
2024                 m_putMsg = new Condition(m_lock);
2025                 m_notFull = new Condition(m_lock);
2026             }
2027             else
2028             {
2029                 m_putMsg = scheduler.newCondition(m_lock);
2030                 m_notFull = scheduler.newCondition(m_lock);
2031             }
2032         }
2033 
2034         ///
2035         final @property bool isClosed() @safe @nogc pure
2036         {
2037             synchronized (m_lock)
2038             {
2039                 return m_closed;
2040             }
2041         }
2042 
2043         /*
2044          * Sets a limit on the maximum number of user messages allowed in the
2045          * mailbox.  If this limit is reached, the caller attempting to add
2046          * a new message will execute call.  If num is zero, there is no limit
2047          * on the message queue.
2048          *
2049          * Params:
2050          *  num  = The maximum size of the queue or zero if the queue is
2051          *         unbounded.
2052          *  call = The routine to call when the queue is full.
2053          */
2054         final void setMaxMsgs(size_t num, bool function(Tid) call) @safe @nogc pure
2055         {
2056             synchronized (m_lock)
2057             {
2058                 m_maxMsgs = num;
2059                 m_onMaxMsgs = call;
2060             }
2061         }
2062 
2063         /*
2064          * If maxMsgs is not set, the message is added to the queue and the
2065          * owner is notified.  If the queue is full, the message will still be
2066          * accepted if it is a control message, otherwise onCrowdingDoThis is
2067          * called.  If the routine returns true, this call will block until
2068          * the owner has made space available in the queue.  If it returns
2069          * false, this call will abort.
2070          *
2071          * Params:
2072          *  msg = The message to put in the queue.
2073          *
2074          * Throws:
2075          *  An exception if the queue is full and onCrowdingDoThis throws.
2076          */
2077         final void put(ref Message msg)
2078         {
2079             synchronized (m_lock)
2080             {
2081                 // TODO: Generate an error here if m_closed is true, or maybe
2082                 //       put a message in the caller's queue?
2083                 if (!m_closed)
2084                 {
2085                     while (true)
2086                     {
2087                         if (isPriorityMsg(msg))
2088                         {
2089                             m_sharedPty.put(msg);
2090                             m_putMsg.notify();
2091                             return;
2092                         }
2093                         if (!mboxFull() || isControlMsg(msg))
2094                         {
2095                             m_sharedBox.put(msg);
2096                             m_putMsg.notify();
2097                             return;
2098                         }
2099                         if (m_onMaxMsgs !is null && !m_onMaxMsgs(thisTid))
2100                         {
2101                             return;
2102                         }
2103                         m_putQueue++;
2104                         m_notFull.wait();
2105                         m_putQueue--;
2106                     }
2107                 }
2108             }
2109         }
2110 
2111         /*
2112          * Matches ops against each message in turn until a match is found.
2113          *
2114          * Params:
2115          *  ops = The operations to match.  Each may return a bool to indicate
2116          *        whether a message with a matching type is truly a match.
2117          *
2118          * Returns:
2119          *  true if a message was retrieved and false if not (such as if a
2120          *  timeout occurred).
2121          *
2122          * Throws:
2123          *  LinkTerminated if a linked thread terminated, or OwnerTerminated
2124          * if the owner thread terminates and no existing messages match the
2125          * supplied ops.
2126          */
2127         bool get(T...)(scope T vals)
2128         {
2129             import std.meta : AliasSeq;
2130 
2131             static assert(T.length, "T must not be empty");
2132 
2133             static if (is(T[0] : Duration))
2134             {
2135                 alias Ops = AliasSeq!(T[1 .. $]);
2136                 alias ops = vals[1 .. $];
2137                 enum timedWait = true;
2138                 Duration period = vals[0];
2139             }
2140             else
2141             {
2142                 alias Ops = AliasSeq!(T);
2143                 alias ops = vals[0 .. $];
2144                 enum timedWait = false;
2145             }
2146 
2147             bool onStandardMsg(ref Message msg)
2148             {
2149                 foreach (i, t; Ops)
2150                 {
2151                     alias Args = Parameters!(t);
2152                     auto op = ops[i];
2153 
2154                     if (msg.convertsTo!(Args))
2155                     {
2156                         alias RT = ReturnType!(t);
2157                         static if (is(RT == bool))
2158                         {
2159                             return msg.map(op);
2160                         }
2161                         else
2162                         {
2163                             msg.map(op);
2164                             static if (!is(immutable RT == immutable noreturn))
2165                                 return true;
2166                         }
2167                     }
2168                 }
2169                 return false;
2170             }
2171 
2172             bool onLinkDeadMsg(ref Message msg)
2173             {
2174                 assert(msg.convertsTo!(Tid),
2175                         "Message could be converted to Tid");
2176                 auto tid = msg.get!(Tid);
2177 
2178                 if (bool* pDepends = tid in thisInfo.links)
2179                 {
2180                     auto depends = *pDepends;
2181                     thisInfo.links.remove(tid);
2182                     // Give the owner relationship precedence.
2183                     if (depends && tid != thisInfo.owner)
2184                     {
2185                         auto e = new LinkTerminated(tid);
2186                         auto m = Message(MsgType.standard, e);
2187                         if (onStandardMsg(m))
2188                             return true;
2189                         throw e;
2190                     }
2191                 }
2192                 if (tid == thisInfo.owner)
2193                 {
2194                     thisInfo.owner = Tid.init;
2195                     auto e = new OwnerTerminated(tid);
2196                     auto m = Message(MsgType.standard, e);
2197                     if (onStandardMsg(m))
2198                         return true;
2199                     throw e;
2200                 }
2201                 return false;
2202             }
2203 
2204             bool onControlMsg(ref Message msg)
2205             {
2206                 switch (msg.type)
2207                 {
2208                 case MsgType.linkDead:
2209                     return onLinkDeadMsg(msg);
2210                 default:
2211                     return false;
2212                 }
2213             }
2214 
2215             bool scan(ref ListT list)
2216             {
2217                 for (auto range = list[]; !range.empty;)
2218                 {
2219                     // Only the message handler will throw, so if this occurs
2220                     // we can be certain that the message was handled.
2221                     scope (failure)
2222                         list.removeAt(range);
2223 
2224                     if (isControlMsg(range.front))
2225                     {
2226                         if (onControlMsg(range.front))
2227                         {
2228                             // Although the linkDead message is a control message,
2229                             // it can be handled by the user.  Since the linkDead
2230                             // message throws if not handled, if we get here then
2231                             // it has been handled and we can return from receive.
2232                             // This is a weird special case that will have to be
2233                             // handled in a more general way if more are added.
2234                             if (!isLinkDeadMsg(range.front))
2235                             {
2236                                 list.removeAt(range);
2237                                 continue;
2238                             }
2239                             list.removeAt(range);
2240                             return true;
2241                         }
2242                         range.popFront();
2243                         continue;
2244                     }
2245                     else
2246                     {
2247                         if (onStandardMsg(range.front))
2248                         {
2249                             list.removeAt(range);
2250                             return true;
2251                         }
2252                         range.popFront();
2253                         continue;
2254                     }
2255                 }
2256                 return false;
2257             }
2258 
2259             bool pty(ref ListT list)
2260             {
2261                 if (!list.empty)
2262                 {
2263                     auto range = list[];
2264 
2265                     if (onStandardMsg(range.front))
2266                     {
2267                         list.removeAt(range);
2268                         return true;
2269                     }
2270                     if (range.front.convertsTo!(Throwable))
2271                         throw range.front.get!(Throwable);
2272                     else if (range.front.convertsTo!(shared(Throwable)))
2273                         /* Note: a shared type can be caught without the shared qualifier
2274                          * so throwing shared will be an error */
2275                         throw cast() range.front.get!(shared(Throwable));
2276                     else
2277                         throw new PriorityMessageException(range.front.data);
2278                 }
2279                 return false;
2280             }
2281 
2282             static if (timedWait)
2283             {
2284                 import core.time : MonoTime;
2285                 auto limit = MonoTime.currTime + period;
2286             }
2287 
2288             while (true)
2289             {
2290                 ListT arrived;
2291 
2292                 if (pty(m_localPty) || scan(m_localBox))
2293                 {
2294                     return true;
2295                 }
2296                 yield();
2297                 synchronized (m_lock)
2298                 {
2299                     updateMsgCount();
2300                     while (m_sharedPty.empty && m_sharedBox.empty)
2301                     {
2302                         // NOTE: We're notifying all waiters here instead of just
2303                         //       a few because the onCrowding behavior may have
2304                         //       changed and we don't want to block sender threads
2305                         //       unnecessarily if the new behavior is not to block.
2306                         //       This will admittedly result in spurious wakeups
2307                         //       in other situations, but what can you do?
2308                         if (m_putQueue && !mboxFull())
2309                             m_notFull.notifyAll();
2310                         static if (timedWait)
2311                         {
2312                             if (period <= Duration.zero || !m_putMsg.wait(period))
2313                                 return false;
2314                         }
2315                         else
2316                         {
2317                             m_putMsg.wait();
2318                         }
2319                     }
2320                     m_localPty.put(m_sharedPty);
2321                     arrived.put(m_sharedBox);
2322                 }
2323                 if (m_localPty.empty)
2324                 {
2325                     scope (exit) m_localBox.put(arrived);
2326                     if (scan(arrived))
2327                     {
2328                         return true;
2329                     }
2330                     else
2331                     {
2332                         static if (timedWait)
2333                         {
2334                             period = limit - MonoTime.currTime;
2335                         }
2336                         continue;
2337                     }
2338                 }
2339                 m_localBox.put(arrived);
2340                 pty(m_localPty);
2341                 return true;
2342             }
2343         }
2344 
2345         /*
2346          * Called on thread termination.  This routine processes any remaining
2347          * control messages, clears out message queues, and sets a flag to
2348          * reject any future messages.
2349          */
2350         final void close()
2351         {
2352             static void onLinkDeadMsg(ref Message msg)
2353             {
2354                 assert(msg.convertsTo!(Tid),
2355                         "Message could be converted to Tid");
2356                 auto tid = msg.get!(Tid);
2357 
2358                 thisInfo.links.remove(tid);
2359                 if (tid == thisInfo.owner)
2360                     thisInfo.owner = Tid.init;
2361             }
2362 
2363             static void sweep(ref ListT list)
2364             {
2365                 for (auto range = list[]; !range.empty; range.popFront())
2366                 {
2367                     if (range.front.type == MsgType.linkDead)
2368                         onLinkDeadMsg(range.front);
2369                 }
2370             }
2371 
2372             ListT arrived;
2373 
2374             sweep(m_localBox);
2375             synchronized (m_lock)
2376             {
2377                 arrived.put(m_sharedBox);
2378                 m_closed = true;
2379             }
2380             m_localBox.clear();
2381             sweep(arrived);
2382         }
2383 
2384     private:
2385         // Routines involving local data only, no lock needed.
2386 
2387         bool mboxFull() @safe @nogc pure nothrow
2388         {
2389             return m_maxMsgs && m_maxMsgs <= m_localMsgs + m_sharedBox.length;
2390         }
2391 
2392         void updateMsgCount() @safe @nogc pure nothrow
2393         {
2394             m_localMsgs = m_localBox.length;
2395         }
2396 
2397         bool isControlMsg(ref Message msg) @safe @nogc pure nothrow
2398         {
2399             return msg.type != MsgType.standard && msg.type != MsgType.priority;
2400         }
2401 
2402         bool isPriorityMsg(ref Message msg) @safe @nogc pure nothrow
2403         {
2404             return msg.type == MsgType.priority;
2405         }
2406 
2407         bool isLinkDeadMsg(ref Message msg) @safe @nogc pure nothrow
2408         {
2409             return msg.type == MsgType.linkDead;
2410         }
2411 
2412         alias OnMaxFn = bool function(Tid);
2413         alias ListT = List!(Message);
2414 
2415         ListT m_localBox;
2416         ListT m_localPty;
2417 
2418         Mutex m_lock;
2419         Condition m_putMsg;
2420         Condition m_notFull;
2421         size_t m_putQueue;
2422         ListT m_sharedBox;
2423         ListT m_sharedPty;
2424         OnMaxFn m_onMaxMsgs;
2425         size_t m_localMsgs;
2426         size_t m_maxMsgs;
2427         bool m_closed;
2428     }
2429 
2430     /*
2431      *
2432      */
2433     struct List(T)
2434     {
2435         struct Range
2436         {
2437             import std.exception : enforce;
2438 
2439             @property bool empty() const
2440             {
2441                 return !m_prev.next;
2442             }
2443 
2444             @property ref T front()
2445             {
2446                 enforce(m_prev.next, "invalid list node");
2447                 return m_prev.next.val;
2448             }
2449 
2450             @property void front(T val)
2451             {
2452                 enforce(m_prev.next, "invalid list node");
2453                 m_prev.next.val = val;
2454             }
2455 
2456             void popFront()
2457             {
2458                 enforce(m_prev.next, "invalid list node");
2459                 m_prev = m_prev.next;
2460             }
2461 
2462             private this(Node* p)
2463             {
2464                 m_prev = p;
2465             }
2466 
2467             private Node* m_prev;
2468         }
2469 
2470         void put(T val)
2471         {
2472             put(newNode(val));
2473         }
2474 
2475         void put(ref List!(T) rhs)
2476         {
2477             if (!rhs.empty)
2478             {
2479                 put(rhs.m_first);
2480                 while (m_last.next !is null)
2481                 {
2482                     m_last = m_last.next;
2483                     m_count++;
2484                 }
2485                 rhs.m_first = null;
2486                 rhs.m_last = null;
2487                 rhs.m_count = 0;
2488             }
2489         }
2490 
2491         Range opSlice()
2492         {
2493             return Range(cast(Node*)&m_first);
2494         }
2495 
2496         void removeAt(Range r)
2497         {
2498             import std.exception : enforce;
2499 
2500             assert(m_count, "Can not remove from empty Range");
2501             Node* n = r.m_prev;
2502             enforce(n && n.next, "attempting to remove invalid list node");
2503 
2504             if (m_last is m_first)
2505                 m_last = null;
2506             else if (m_last is n.next)
2507                 m_last = n; // nocoverage
2508             Node* to_free = n.next;
2509             n.next = n.next.next;
2510             freeNode(to_free);
2511             m_count--;
2512         }
2513 
2514         @property size_t length()
2515         {
2516             return m_count;
2517         }
2518 
2519         void clear()
2520         {
2521             m_first = m_last = null;
2522             m_count = 0;
2523         }
2524 
2525         @property bool empty()
2526         {
2527             return m_first is null;
2528         }
2529 
2530     private:
2531         struct Node
2532         {
2533             Node* next;
2534             T val;
2535 
2536             this(T v)
2537             {
2538                 val = v;
2539             }
2540         }
2541 
2542         static shared struct SpinLock
2543         {
2544             void lock() { while (!cas(&locked, false, true)) { Thread.yield(); } }
2545             void unlock() { atomicStore!(MemoryOrder.rel)(locked, false); }
2546             bool locked;
2547         }
2548 
2549         static shared SpinLock sm_lock;
2550         static shared Node* sm_head;
2551 
2552         Node* newNode(T v)
2553         {
2554             Node* n;
2555             {
2556                 sm_lock.lock();
2557                 scope (exit) sm_lock.unlock();
2558 
2559                 if (sm_head)
2560                 {
2561                     n = cast(Node*) sm_head;
2562                     sm_head = sm_head.next;
2563                 }
2564             }
2565             if (n)
2566             {
2567                 import core.lifetime : emplace;
2568                 emplace!Node(n, v);
2569             }
2570             else
2571             {
2572                 n = new Node(v);
2573             }
2574             return n;
2575         }
2576 
2577         void freeNode(Node* n)
2578         {
2579             // destroy val to free any owned GC memory
2580             destroy(n.val);
2581 
2582             sm_lock.lock();
2583             scope (exit) sm_lock.unlock();
2584 
2585             auto sn = cast(shared(Node)*) n;
2586             sn.next = sm_head;
2587             sm_head = sn;
2588         }
2589 
2590         void put(Node* n)
2591         {
2592             m_count++;
2593             if (!empty)
2594             {
2595                 m_last.next = n;
2596                 m_last = n;
2597                 return;
2598             }
2599             m_first = n;
2600             m_last = n;
2601         }
2602 
2603         Node* m_first;
2604         Node* m_last;
2605         size_t m_count;
2606     }
2607 }
2608 
2609 @system unittest
2610 {
2611     import std.typecons : tuple, Tuple;
2612 
2613     static void testfn(Tid tid)
2614     {
2615         receive((float val) { assert(0); }, (int val, int val2) {
2616             assert(val == 42 && val2 == 86);
2617         });
2618         receive((Tuple!(int, int) val) { assert(val[0] == 42 && val[1] == 86); });
2619         receive((Variant val) {  });
2620         receive((string val) {
2621             if ("the quick brown fox" != val)
2622                 return false;
2623             return true;
2624         }, (string val) { assert(false); });
2625         prioritySend(tid, "done");
2626     }
2627 
2628     static void runTest(Tid tid)
2629     {
2630         send(tid, 42, 86);
2631         send(tid, tuple(42, 86));
2632         send(tid, "hello", "there");
2633         send(tid, "the quick brown fox");
2634         receive((string val) { assert(val == "done"); });
2635     }
2636 
2637     static void simpleTest()
2638     {
2639         auto tid = spawn(&testfn, thisTid);
2640         runTest(tid);
2641 
2642         // Run the test again with a limited mailbox size.
2643         tid = spawn(&testfn, thisTid);
2644         setMaxMailboxSize(tid, 2, OnCrowding.block);
2645         runTest(tid);
2646     }
2647 
2648     simpleTest();
2649 
2650     scheduler = new ThreadScheduler;
2651     simpleTest();
2652     scheduler = null;
2653 }
2654 
2655 private @property shared(Mutex) initOnceLock()
2656 {
2657     static shared Mutex lock;
2658     if (auto mtx = atomicLoad!(MemoryOrder.acq)(lock))
2659         return mtx;
2660     auto mtx = new shared Mutex;
2661     if (cas(&lock, cast(shared) null, mtx))
2662         return mtx;
2663     return atomicLoad!(MemoryOrder.acq)(lock);
2664 }
2665 
2666 /**
2667  * Initializes $(D_PARAM var) with the lazy $(D_PARAM init) value in a
2668  * thread-safe manner.
2669  *
2670  * The implementation guarantees that all threads simultaneously calling
2671  * initOnce with the same $(D_PARAM var) argument block until $(D_PARAM var) is
2672  * fully initialized. All side-effects of $(D_PARAM init) are globally visible
2673  * afterwards.
2674  *
2675  * Params:
2676  *   var = The variable to initialize
2677  *   init = The lazy initializer value
2678  *
2679  * Returns:
2680  *   A reference to the initialized variable
2681  */
2682 auto ref initOnce(alias var)(lazy typeof(var) init)
2683 {
2684     return initOnce!var(init, initOnceLock);
2685 }
2686 
2687 /// A typical use-case is to perform lazy but thread-safe initialization.
2688 @system unittest
2689 {
2690     static class MySingleton
2691     {
2692         static MySingleton instance()
2693         {
2694             __gshared MySingleton inst;
2695             return initOnce!inst(new MySingleton);
2696         }
2697     }
2698 
2699     assert(MySingleton.instance !is null);
2700 }
2701 
2702 @system unittest
2703 {
2704     static class MySingleton
2705     {
2706         static MySingleton instance()
2707         {
2708             __gshared MySingleton inst;
2709             return initOnce!inst(new MySingleton);
2710         }
2711 
2712     private:
2713         this() { val = ++cnt; }
2714         size_t val;
2715         __gshared size_t cnt;
2716     }
2717 
2718     foreach (_; 0 .. 10)
2719         spawn({ ownerTid.send(MySingleton.instance.val); });
2720     foreach (_; 0 .. 10)
2721         assert(receiveOnly!size_t == MySingleton.instance.val);
2722     assert(MySingleton.cnt == 1);
2723 }
2724 
2725 /**
2726  * Same as above, but takes a separate mutex instead of sharing one among
2727  * all initOnce instances.
2728  *
2729  * This should be used to avoid dead-locks when the $(D_PARAM init)
2730  * expression waits for the result of another thread that might also
2731  * call initOnce. Use with care.
2732  *
2733  * Params:
2734  *   var = The variable to initialize
2735  *   init = The lazy initializer value
2736  *   mutex = A mutex to prevent race conditions
2737  *
2738  * Returns:
2739  *   A reference to the initialized variable
2740  */
2741 auto ref initOnce(alias var)(lazy typeof(var) init, shared Mutex mutex)
2742 {
2743     // check that var is global, can't take address of a TLS variable
2744     static assert(is(typeof({ __gshared p = &var; })),
2745         "var must be 'static shared' or '__gshared'.");
2746     import core.atomic : atomicLoad, MemoryOrder, atomicStore;
2747 
2748     static shared bool flag;
2749     if (!atomicLoad!(MemoryOrder.acq)(flag))
2750     {
2751         synchronized (mutex)
2752         {
2753             if (!atomicLoad!(MemoryOrder.raw)(flag))
2754             {
2755                 var = init;
2756                 static if (!is(immutable typeof(var) == immutable noreturn))
2757                     atomicStore!(MemoryOrder.rel)(flag, true);
2758             }
2759         }
2760     }
2761     return var;
2762 }
2763 
2764 /// ditto
2765 auto ref initOnce(alias var)(lazy typeof(var) init, Mutex mutex)
2766 {
2767     return initOnce!var(init, cast(shared) mutex);
2768 }
2769 
2770 /// Use a separate mutex when init blocks on another thread that might also call initOnce.
2771 @system unittest
2772 {
2773     import core.sync.mutex : Mutex;
2774 
2775     static shared bool varA, varB;
2776     static shared Mutex m;
2777     m = new shared Mutex;
2778 
2779     spawn({
2780         // use a different mutex for varB to avoid a dead-lock
2781         initOnce!varB(true, m);
2782         ownerTid.send(true);
2783     });
2784     // init depends on the result of the spawned thread
2785     initOnce!varA(receiveOnly!bool);
2786     assert(varA == true);
2787     assert(varB == true);
2788 }
2789 
2790 @system unittest
2791 {
2792     static shared bool a;
2793     __gshared bool b;
2794     static bool c;
2795     bool d;
2796     initOnce!a(true);
2797     initOnce!b(true);
2798     static assert(!__traits(compiles, initOnce!c(true))); // TLS
2799     static assert(!__traits(compiles, initOnce!d(true))); // local variable
2800 }
2801 
2802 // test ability to send shared arrays
2803 @system unittest
2804 {
2805     static shared int[] x = new shared(int)[1];
2806     auto tid = spawn({
2807         auto arr = receiveOnly!(shared(int)[]);
2808         arr[0] = 5;
2809         ownerTid.send(true);
2810     });
2811     tid.send(x);
2812     receiveOnly!(bool);
2813     assert(x[0] == 5);
2814 }
2815 
2816 // https://issues.dlang.org/show_bug.cgi?id=13930
2817 @system unittest
2818 {
2819     immutable aa = ["0":0];
2820     thisTid.send(aa);
2821     receiveOnly!(immutable int[string]); // compile error
2822 }
2823 
2824 // https://issues.dlang.org/show_bug.cgi?id=19345
2825 @system unittest
2826 {
2827     static struct Aggregate { const int a; const int[5] b; }
2828     static void t1(Tid mainTid)
2829     {
2830         const sendMe = Aggregate(42, [1, 2, 3, 4, 5]);
2831         mainTid.send(sendMe);
2832     }
2833 
2834     spawn(&t1, thisTid);
2835     auto result1 = receiveOnly!(const Aggregate)();
2836     immutable expected = Aggregate(42, [1, 2, 3, 4, 5]);
2837     assert(result1 == expected);
2838 }
2839 
2840 // Noreturn support
2841 @system unittest
2842 {
2843     static noreturn foo(int) { throw new Exception(""); }
2844 
2845     if (false) spawn(&foo, 1);
2846     if (false) spawnLinked(&foo, 1);
2847 
2848     if (false) receive(&foo);
2849     if (false) receiveTimeout(Duration.init, &foo);
2850 
2851     // Wrapped in __traits(compiles) to skip codegen which crashes dmd's backend
2852     static assert(__traits(compiles, receiveOnly!noreturn()                 ));
2853     static assert(__traits(compiles, send(Tid.init, noreturn.init)          ));
2854     static assert(__traits(compiles, prioritySend(Tid.init, noreturn.init)  ));
2855     static assert(__traits(compiles, yield(noreturn.init)                   ));
2856 
2857     static assert(__traits(compiles, {
2858         __gshared noreturn n;
2859         initOnce!n(noreturn.init);
2860     }));
2861 }