1 /**
2  * $(SCRIPT inhibitQuickIndex = 1;)
3  * $(DIVC quickindex,
4  * $(BOOKTABLE,
5  * $(TR $(TH Category) $(TH Symbols))
6  * $(TR $(TD Tid) $(TD
7  *     $(MYREF locate)
8  *     $(MYREF ownerTid)
9  *     $(MYREF register)
10  *     $(MYREF spawn)
11  *     $(MYREF spawnLinked)
12  *     $(MYREF thisTid)
13  *     $(MYREF Tid)
14  *     $(MYREF TidMissingException)
15  *     $(MYREF unregister)
16  * ))
17  * $(TR $(TD Message passing) $(TD
18  *     $(MYREF prioritySend)
19  *     $(MYREF receive)
20  *     $(MYREF receiveOnly)
21  *     $(MYREF receiveTimeout)
22  *     $(MYREF send)
23  *     $(MYREF setMaxMailboxSize)
24  * ))
25  * $(TR $(TD Message-related types) $(TD
26  *     $(MYREF LinkTerminated)
27  *     $(MYREF MailboxFull)
28  *     $(MYREF MessageMismatch)
29  *     $(MYREF OnCrowding)
30  *     $(MYREF OwnerTerminated)
31  *     $(MYREF PriorityMessageException)
32  * ))
33  * $(TR $(TD Scheduler) $(TD
34  *     $(MYREF FiberScheduler)
35  *     $(MYREF Generator)
36  *     $(MYREF Scheduler)
37  *     $(MYREF scheduler)
38  *     $(MYREF ThreadInfo)
39  *     $(MYREF ThreadScheduler)
40  *     $(MYREF yield)
41  * ))
42  * $(TR $(TD Misc) $(TD
43  *     $(MYREF initOnce)
44  * ))
45  * ))
46  *
47  * This is a low-level messaging API upon which more structured or restrictive
48  * APIs may be built.  The general idea is that every messageable entity is
49  * represented by a common handle type called a `Tid`, which allows messages to
50  * be sent to logical threads that are executing in both the current process
51  * and in external processes using the same interface.  This is an important
52  * aspect of scalability because it allows the components of a program to be
53  * spread across available resources with few to no changes to the actual
54  * implementation.
55  *
56  * A logical thread is an execution context that has its own stack and which
57  * runs asynchronously to other logical threads.  These may be preemptively
58  * scheduled kernel threads, $(MREF_ALTTEXT fibers, core, thread, fiber)
59  * (cooperative user-space threads), or some other concept with similar behavior.
60  *
61  * The type of concurrency used when logical threads are created is determined
62  * by the $(LREF Scheduler) selected at initialization time.  The default behavior is
63  * currently to create a new kernel thread per call to spawn, but other
64  * schedulers are available that multiplex fibers across the main thread or
65  * use some combination of the two approaches.
66  *
67  * Copyright: Copyright Sean Kelly 2009 - 2014.
68  * License:   <a href="http://www.boost.org/LICENSE_1_0.txt">Boost License 1.0</a>.
69  * Authors:   Sean Kelly, Alex Rønne Petersen, Martin Nowak
70  * Source:    $(PHOBOSSRC std/concurrency.d)
71  */
72 /*          Copyright Sean Kelly 2009 - 2014.
73  * Distributed under the Boost Software License, Version 1.0.
74  *    (See accompanying file LICENSE_1_0.txt or copy at
75  *          http://www.boost.org/LICENSE_1_0.txt)
76  */
77 module std.concurrency;
78 
79 public import std.variant;
80 
81 import core.atomic;
82 import core.sync.condition;
83 import core.sync.mutex;
84 import core.thread;
85 import std.range.primitives;
86 import std.range.interfaces : InputRange;
87 import std.traits;
88 
89 ///
90 @system unittest
91 {
92     __gshared string received;
93     static void spawnedFunc(Tid ownerTid)
94     {
95         import std.conv : text;
96         // Receive a message from the owner thread.
97         receive((int i){
98             received = text("Received the number ", i);
99 
100             // Send a message back to the owner thread
101             // indicating success.
102             send(ownerTid, true);
103         });
104     }
105 
106     // Start spawnedFunc in a new thread.
107     auto childTid = spawn(&spawnedFunc, thisTid);
108 
109     // Send the number 42 to this new thread.
110     send(childTid, 42);
111 
112     // Receive the result code.
113     auto wasSuccessful = receiveOnly!(bool);
114     assert(wasSuccessful);
115     assert(received == "Received the number 42");
116 }
117 
118 private
119 {
120     bool hasLocalAliasing(Types...)()
121     {
122         import std.typecons : Rebindable;
123 
124         // Works around "statement is not reachable"
125         bool doesIt = false;
126         static foreach (T; Types)
127         {
128             static if (is(T == Tid))
129             { /* Allowed */ }
130             else static if (is(T : Rebindable!R, R))
131                 doesIt |= hasLocalAliasing!R;
132             else static if (is(T == struct))
133                 doesIt |= hasLocalAliasing!(typeof(T.tupleof));
134             else
135                 doesIt |= std.traits.hasUnsharedAliasing!(T);
136         }
137         return doesIt;
138     }
139 
140     @safe unittest
141     {
142         static struct Container { Tid t; }
143         static assert(!hasLocalAliasing!(Tid, Container, int));
144     }
145 
146     // https://issues.dlang.org/show_bug.cgi?id=20097
147     @safe unittest
148     {
149         import std.datetime.systime : SysTime;
150         static struct Container { SysTime time; }
151         static assert(!hasLocalAliasing!(SysTime, Container));
152     }
153 
154     enum MsgType
155     {
156         standard,
157         priority,
158         linkDead,
159     }
160 
161     struct Message
162     {
163         MsgType type;
164         Variant data;
165 
166         this(T...)(MsgType t, T vals)
167         if (T.length > 0)
168         {
169             static if (T.length == 1)
170             {
171                 type = t;
172                 data = vals[0];
173             }
174             else
175             {
176                 import std.typecons : Tuple;
177 
178                 type = t;
179                 data = Tuple!(T)(vals);
180             }
181         }
182 
183         @property auto convertsTo(T...)()
184         {
185             static if (T.length == 1)
186             {
187                 return is(T[0] == Variant) || data.convertsTo!(T);
188             }
189             else
190             {
191                 import std.typecons : Tuple;
192                 return data.convertsTo!(Tuple!(T));
193             }
194         }
195 
196         @property auto get(T...)()
197         {
198             static if (T.length == 1)
199             {
200                 static if (is(T[0] == Variant))
201                     return data;
202                 else
203                     return data.get!(T);
204             }
205             else
206             {
207                 import std.typecons : Tuple;
208                 return data.get!(Tuple!(T));
209             }
210         }
211 
212         auto map(Op)(Op op)
213         {
214             alias Args = Parameters!(Op);
215 
216             static if (Args.length == 1)
217             {
218                 static if (is(Args[0] == Variant))
219                     return op(data);
220                 else
221                     return op(data.get!(Args));
222             }
223             else
224             {
225                 import std.typecons : Tuple;
226                 return op(data.get!(Tuple!(Args)).expand);
227             }
228         }
229     }
230 
231     void checkops(T...)(T ops)
232     {
233         import std.format : format;
234 
235         foreach (i, t1; T)
236         {
237             static assert(isFunctionPointer!t1 || isDelegate!t1,
238                     format!"T %d is not a function pointer or delegates"(i));
239             alias a1 = Parameters!(t1);
240             alias r1 = ReturnType!(t1);
241 
242             static if (i < T.length - 1 && is(r1 == void))
243             {
244                 static assert(a1.length != 1 || !is(a1[0] == Variant),
245                               "function with arguments " ~ a1.stringof ~
246                               " occludes successive function");
247 
248                 foreach (t2; T[i + 1 .. $])
249                 {
250                     alias a2 = Parameters!(t2);
251 
252                     static assert(!is(a1 == a2),
253                         "function with arguments " ~ a1.stringof ~ " occludes successive function");
254                 }
255             }
256         }
257     }
258 
259     @property ref ThreadInfo thisInfo() nothrow
260     {
261         import core.atomic : atomicLoad;
262 
263         auto localScheduler = atomicLoad(scheduler);
264         if (localScheduler is null)
265             return ThreadInfo.thisInfo;
266         return localScheduler.thisInfo;
267     }
268 }
269 
270 static ~this()
271 {
272     thisInfo.cleanup();
273 }
274 
275 // Exceptions
276 
277 /**
278  * Thrown on calls to $(LREF receiveOnly) if a message other than the type
279  * the receiving thread expected is sent.
280  */
281 class MessageMismatch : Exception
282 {
283     ///
284     this(string msg = "Unexpected message type") @safe pure nothrow @nogc
285     {
286         super(msg);
287     }
288 }
289 
290 /**
291  * Thrown on calls to $(LREF receive) if the thread that spawned the receiving
292  * thread has terminated and no more messages exist.
293  */
294 class OwnerTerminated : Exception
295 {
296     ///
297     this(Tid t, string msg = "Owner terminated") @safe pure nothrow @nogc
298     {
299         super(msg);
300         tid = t;
301     }
302 
303     Tid tid;
304 }
305 
306 /**
307  * Thrown if a linked thread has terminated.
308  */
309 class LinkTerminated : Exception
310 {
311     ///
312     this(Tid t, string msg = "Link terminated") @safe pure nothrow @nogc
313     {
314         super(msg);
315         tid = t;
316     }
317 
318     Tid tid;
319 }
320 
321 /**
322  * Thrown if a message was sent to a thread via
323  * $(REF prioritySend, std,concurrency) and the receiver does not have a handler
324  * for a message of this type.
325  */
326 class PriorityMessageException : Exception
327 {
328     ///
329     this(Variant vals)
330     {
331         super("Priority message");
332         message = vals;
333     }
334 
335     /**
336      * The message that was sent.
337      */
338     Variant message;
339 }
340 
341 /**
342  * Thrown on mailbox crowding if the mailbox is configured with
343  * `OnCrowding.throwException`.
344  */
345 class MailboxFull : Exception
346 {
347     ///
348     this(Tid t, string msg = "Mailbox full") @safe pure nothrow @nogc
349     {
350         super(msg);
351         tid = t;
352     }
353 
354     Tid tid;
355 }
356 
357 /**
358  * Thrown when a `Tid` is missing, e.g. when $(LREF ownerTid) doesn't
359  * find an owner thread.
360  */
361 class TidMissingException : Exception
362 {
363     import std.exception : basicExceptionCtors;
364     ///
365     mixin basicExceptionCtors;
366 }
367 
368 
369 // Thread ID
370 
371 
372 /**
373  * An opaque type used to represent a logical thread.
374  */
375 struct Tid
376 {
377 private:
378     this(MessageBox m) @safe pure nothrow @nogc
379     {
380         mbox = m;
381     }
382 
383     MessageBox mbox;
384 
385 public:
386 
387     /**
388      * Generate a convenient string for identifying this `Tid`.  This is only
389      * useful to see if `Tid`'s that are currently executing are the same or
390      * different, e.g. for logging and debugging.  It is potentially possible
391      * that a `Tid` executed in the future will have the same `toString()` output
392      * as another `Tid` that has already terminated.
393      */
394     void toString(W)(ref W w) const
395     {
396         import std.format.write : formattedWrite;
397         auto p = () @trusted { return cast(void*) mbox; }();
398         formattedWrite(w, "Tid(%x)", p);
399     }
400 
401 }
402 
403 @safe unittest
404 {
405     import std.conv : text;
406     Tid tid;
407     assert(text(tid) == "Tid(0)");
408     auto tid2 = thisTid;
409     assert(text(tid2) != "Tid(0)");
410     auto tid3 = tid2;
411     assert(text(tid2) == text(tid3));
412 }
413 
414 // https://issues.dlang.org/show_bug.cgi?id=21512
415 @system unittest
416 {
417     import std.format : format;
418 
419     const(Tid) b = spawn(() {});
420     assert(format!"%s"(b)[0 .. 4] == "Tid(");
421 }
422 
423 /**
424  * Returns: The `Tid` of the caller's thread.
425  */
426 @property Tid thisTid() @safe
427 {
428     // TODO: remove when concurrency is safe
429     static auto trus() @trusted
430     {
431         if (thisInfo.ident != Tid.init)
432             return thisInfo.ident;
433         thisInfo.ident = Tid(new MessageBox);
434         return thisInfo.ident;
435     }
436 
437     return trus();
438 }
439 
440 /**
441  * Return the `Tid` of the thread which spawned the caller's thread.
442  *
443  * Throws: A `TidMissingException` exception if
444  * there is no owner thread.
445  */
446 @property Tid ownerTid()
447 {
448     import std.exception : enforce;
449 
450     enforce!TidMissingException(thisInfo.owner.mbox !is null, "Error: Thread has no owner thread.");
451     return thisInfo.owner;
452 }
453 
454 @system unittest
455 {
456     import std.exception : assertThrown;
457 
458     static void fun()
459     {
460         string res = receiveOnly!string();
461         assert(res == "Main calling");
462         ownerTid.send("Child responding");
463     }
464 
465     assertThrown!TidMissingException(ownerTid);
466     auto child = spawn(&fun);
467     child.send("Main calling");
468     string res = receiveOnly!string();
469     assert(res == "Child responding");
470 }
471 
472 // Thread Creation
473 
474 private template isSpawnable(F, T...)
475 {
476     template isParamsImplicitlyConvertible(F1, F2, int i = 0)
477     {
478         alias param1 = Parameters!F1;
479         alias param2 = Parameters!F2;
480         static if (param1.length != param2.length)
481             enum isParamsImplicitlyConvertible = false;
482         else static if (param1.length == i)
483             enum isParamsImplicitlyConvertible = true;
484         else static if (is(param2[i] : param1[i]))
485             enum isParamsImplicitlyConvertible = isParamsImplicitlyConvertible!(F1,
486                     F2, i + 1);
487         else
488             enum isParamsImplicitlyConvertible = false;
489     }
490 
491     enum isSpawnable = isCallable!F && is(ReturnType!F : void)
492             && isParamsImplicitlyConvertible!(F, void function(T))
493             && (isFunctionPointer!F || !hasUnsharedAliasing!F);
494 }
495 
496 /**
497  * Starts `fn(args)` in a new logical thread.
498  *
499  * Executes the supplied function in a new logical thread represented by
500  * `Tid`.  The calling thread is designated as the owner of the new thread.
501  * When the owner thread terminates an `OwnerTerminated` message will be
502  * sent to the new thread, causing an `OwnerTerminated` exception to be
503  * thrown on `receive()`.
504  *
505  * Params:
506  *  fn   = The function to execute.
507  *  args = Arguments to the function.
508  *
509  * Returns:
510  *  A `Tid` representing the new logical thread.
511  *
512  * Notes:
513  *  `args` must not have unshared aliasing.  In other words, all arguments
514  *  to `fn` must either be `shared` or `immutable` or have no
515  *  pointer indirection.  This is necessary for enforcing isolation among
516  *  threads.
517  *
518  * Similarly, if `fn` is a delegate, it must not have unshared aliases, meaning
519  * `fn` must be either `shared` or `immutable`. */
520 Tid spawn(F, T...)(F fn, T args)
521 if (isSpawnable!(F, T))
522 {
523     static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
524     return _spawn(false, fn, args);
525 }
526 
527 ///
528 @system unittest
529 {
530     static void f(string msg)
531     {
532         assert(msg == "Hello World");
533     }
534 
535     auto tid = spawn(&f, "Hello World");
536 }
537 
538 /// Fails: char[] has mutable aliasing.
539 @system unittest
540 {
541     string msg = "Hello, World!";
542 
543     static void f1(string msg) {}
544     static assert(!__traits(compiles, spawn(&f1, msg.dup)));
545     static assert( __traits(compiles, spawn(&f1, msg.idup)));
546 
547     static void f2(char[] msg) {}
548     static assert(!__traits(compiles, spawn(&f2, msg.dup)));
549     static assert(!__traits(compiles, spawn(&f2, msg.idup)));
550 }
551 
552 /// New thread with anonymous function
553 @system unittest
554 {
555     spawn({
556         ownerTid.send("This is so great!");
557     });
558     assert(receiveOnly!string == "This is so great!");
559 }
560 
561 @system unittest
562 {
563     import core.thread : thread_joinAll;
564 
565     __gshared string receivedMessage;
566     static void f1(string msg)
567     {
568         receivedMessage = msg;
569     }
570 
571     auto tid1 = spawn(&f1, "Hello World");
572     thread_joinAll;
573     assert(receivedMessage == "Hello World");
574 }
575 
576 /**
577  * Starts `fn(args)` in a logical thread and will receive a `LinkTerminated`
578  * message when the operation terminates.
579  *
580  * Executes the supplied function in a new logical thread represented by
581  * `Tid`.  This new thread is linked to the calling thread so that if either
582  * it or the calling thread terminates a `LinkTerminated` message will be sent
583  * to the other, causing a `LinkTerminated` exception to be thrown on `receive()`.
584  * The owner relationship from `spawn()` is preserved as well, so if the link
585  * between threads is broken, owner termination will still result in an
586  * `OwnerTerminated` exception to be thrown on `receive()`.
587  *
588  * Params:
589  *  fn   = The function to execute.
590  *  args = Arguments to the function.
591  *
592  * Returns:
593  *  A Tid representing the new thread.
594  */
595 Tid spawnLinked(F, T...)(F fn, T args)
596 if (isSpawnable!(F, T))
597 {
598     static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
599     return _spawn(true, fn, args);
600 }
601 
602 /*
603  *
604  */
605 private Tid _spawn(F, T...)(bool linked, F fn, T args)
606 if (isSpawnable!(F, T))
607 {
608     // TODO: MessageList and &exec should be shared.
609     auto spawnTid = Tid(new MessageBox);
610     auto ownerTid = thisTid;
611 
612     void exec()
613     {
614         thisInfo.ident = spawnTid;
615         thisInfo.owner = ownerTid;
616         fn(args);
617     }
618 
619     // TODO: MessageList and &exec should be shared.
620     if (scheduler !is null)
621         scheduler.spawn(&exec);
622     else
623     {
624         auto t = new Thread(&exec);
625         t.start();
626     }
627     thisInfo.links[spawnTid] = linked;
628     return spawnTid;
629 }
630 
631 @system unittest
632 {
633     void function() fn1;
634     void function(int) fn2;
635     static assert(__traits(compiles, spawn(fn1)));
636     static assert(__traits(compiles, spawn(fn2, 2)));
637     static assert(!__traits(compiles, spawn(fn1, 1)));
638     static assert(!__traits(compiles, spawn(fn2)));
639 
640     void delegate(int) shared dg1;
641     shared(void delegate(int)) dg2;
642     shared(void delegate(long) shared) dg3;
643     shared(void delegate(real, int, long) shared) dg4;
644     void delegate(int) immutable dg5;
645     void delegate(int) dg6;
646     static assert(__traits(compiles, spawn(dg1, 1)));
647     static assert(__traits(compiles, spawn(dg2, 2)));
648     static assert(__traits(compiles, spawn(dg3, 3)));
649     static assert(__traits(compiles, spawn(dg4, 4, 4, 4)));
650     static assert(__traits(compiles, spawn(dg5, 5)));
651     static assert(!__traits(compiles, spawn(dg6, 6)));
652 
653     auto callable1  = new class{ void opCall(int) shared {} };
654     auto callable2  = cast(shared) new class{ void opCall(int) shared {} };
655     auto callable3  = new class{ void opCall(int) immutable {} };
656     auto callable4  = cast(immutable) new class{ void opCall(int) immutable {} };
657     auto callable5  = new class{ void opCall(int) {} };
658     auto callable6  = cast(shared) new class{ void opCall(int) immutable {} };
659     auto callable7  = cast(immutable) new class{ void opCall(int) shared {} };
660     auto callable8  = cast(shared) new class{ void opCall(int) const shared {} };
661     auto callable9  = cast(const shared) new class{ void opCall(int) shared {} };
662     auto callable10 = cast(const shared) new class{ void opCall(int) const shared {} };
663     auto callable11 = cast(immutable) new class{ void opCall(int) const shared {} };
664     static assert(!__traits(compiles, spawn(callable1,  1)));
665     static assert( __traits(compiles, spawn(callable2,  2)));
666     static assert(!__traits(compiles, spawn(callable3,  3)));
667     static assert( __traits(compiles, spawn(callable4,  4)));
668     static assert(!__traits(compiles, spawn(callable5,  5)));
669     static assert(!__traits(compiles, spawn(callable6,  6)));
670     static assert(!__traits(compiles, spawn(callable7,  7)));
671     static assert( __traits(compiles, spawn(callable8,  8)));
672     static assert(!__traits(compiles, spawn(callable9,  9)));
673     static assert( __traits(compiles, spawn(callable10, 10)));
674     static assert( __traits(compiles, spawn(callable11, 11)));
675 }
676 
677 /**
678  * Places the values as a message at the back of tid's message queue.
679  *
680  * Sends the supplied value to the thread represented by tid.  As with
681  * $(REF spawn, std,concurrency), `T` must not have unshared aliasing.
682  */
683 void send(T...)(Tid tid, T vals)
684 in (tid.mbox !is null)
685 {
686     static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
687     _send(tid, vals);
688 }
689 
690 /**
691  * Places the values as a message on the front of tid's message queue.
692  *
693  * Send a message to `tid` but place it at the front of `tid`'s message
694  * queue instead of at the back.  This function is typically used for
695  * out-of-band communication, to signal exceptional conditions, etc.
696  */
697 void prioritySend(T...)(Tid tid, T vals)
698 in (tid.mbox !is null)
699 {
700     static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
701     _send(MsgType.priority, tid, vals);
702 }
703 
704 /*
705  * ditto
706  */
707 private void _send(T...)(Tid tid, T vals)
708 in (tid.mbox !is null)
709 {
710     _send(MsgType.standard, tid, vals);
711 }
712 
713 /*
714  * Implementation of send.  This allows parameter checking to be different for
715  * both Tid.send() and .send().
716  */
717 private void _send(T...)(MsgType type, Tid tid, T vals)
718 in (tid.mbox !is null)
719 {
720     auto msg = Message(type, vals);
721     tid.mbox.put(msg);
722 }
723 
724 /**
725  * Receives a message from another thread.
726  *
727  * Receive a message from another thread, or block if no messages of the
728  * specified types are available.  This function works by pattern matching
729  * a message against a set of delegates and executing the first match found.
730  *
731  * If a delegate that accepts a $(REF Variant, std,variant) is included as
732  * the last argument to `receive`, it will match any message that was not
733  * matched by an earlier delegate.  If more than one argument is sent,
734  * the `Variant` will contain a $(REF Tuple, std,typecons) of all values
735  * sent.
736  *
737  * Params:
738  *     ops = Variadic list of function pointers and delegates. Entries
739  *           in this list must not occlude later entries.
740  *
741  * Throws: $(LREF OwnerTerminated) when the sending thread was terminated.
742  */
743 void receive(T...)( T ops )
744 in
745 {
746     assert(thisInfo.ident.mbox !is null,
747            "Cannot receive a message until a thread was spawned "
748            ~ "or thisTid was passed to a running thread.");
749 }
750 do
751 {
752     checkops( ops );
753 
754     thisInfo.ident.mbox.get( ops );
755 }
756 
757 ///
758 @system unittest
759 {
760     import std.variant : Variant;
761 
762     auto process = ()
763     {
764         receive(
765             (int i) { ownerTid.send(1); },
766             (double f) { ownerTid.send(2); },
767             (Variant v) { ownerTid.send(3); }
768         );
769     };
770 
771     {
772         auto tid = spawn(process);
773         send(tid, 42);
774         assert(receiveOnly!int == 1);
775     }
776 
777     {
778         auto tid = spawn(process);
779         send(tid, 3.14);
780         assert(receiveOnly!int == 2);
781     }
782 
783     {
784         auto tid = spawn(process);
785         send(tid, "something else");
786         assert(receiveOnly!int == 3);
787     }
788 }
789 
790 @safe unittest
791 {
792     static assert( __traits( compiles,
793                       {
794                           receive( (Variant x) {} );
795                           receive( (int x) {}, (Variant x) {} );
796                       } ) );
797 
798     static assert( !__traits( compiles,
799                        {
800                            receive( (Variant x) {}, (int x) {} );
801                        } ) );
802 
803     static assert( !__traits( compiles,
804                        {
805                            receive( (int x) {}, (int x) {} );
806                        } ) );
807 }
808 
809 // Make sure receive() works with free functions as well.
810 version (StdUnittest)
811 {
812     private void receiveFunction(int x) {}
813 }
814 @safe unittest
815 {
816     static assert( __traits( compiles,
817                       {
818                           receive( &receiveFunction );
819                           receive( &receiveFunction, (Variant x) {} );
820                       } ) );
821 }
822 
823 
824 private template receiveOnlyRet(T...)
825 {
826     static if ( T.length == 1 )
827     {
828         alias receiveOnlyRet = T[0];
829     }
830     else
831     {
832         import std.typecons : Tuple;
833         alias receiveOnlyRet = Tuple!(T);
834     }
835 }
836 
837 /**
838  * Receives only messages with arguments of the specified types.
839  *
840  * Params:
841  *     T = Variadic list of types to be received.
842  *
843  * Returns: The received message.  If `T` has more than one entry,
844  *          the message will be packed into a $(REF Tuple, std,typecons).
845  *
846  * Throws: $(LREF MessageMismatch) if a message of types other than `T`
847  *         is received,
848  *         $(LREF OwnerTerminated) when the sending thread was terminated.
849  */
850 receiveOnlyRet!(T) receiveOnly(T...)()
851 in
852 {
853     assert(thisInfo.ident.mbox !is null,
854         "Cannot receive a message until a thread was spawned or thisTid was passed to a running thread.");
855 }
856 do
857 {
858     import std.format : format;
859     import std.meta : allSatisfy;
860     import std.typecons : Tuple;
861 
862     Tuple!(T) ret;
863 
864     thisInfo.ident.mbox.get((T val) {
865         static if (T.length)
866         {
867             static if (allSatisfy!(isAssignable, T))
868             {
869                 ret.field = val;
870             }
871             else
872             {
873                 import core.lifetime : emplace;
874                 emplace(&ret, val);
875             }
876         }
877     },
878     (LinkTerminated e) { throw e; },
879     (OwnerTerminated e) { throw e; },
880     (Variant val) {
881         static if (T.length > 1)
882             string exp = T.stringof;
883         else
884             string exp = T[0].stringof;
885 
886         throw new MessageMismatch(
887             format("Unexpected message type: expected '%s', got '%s'", exp, val.type.toString()));
888     });
889     static if (T.length == 1)
890         return ret[0];
891     else
892         return ret;
893 }
894 
895 ///
896 @system unittest
897 {
898     auto tid = spawn(
899     {
900         assert(receiveOnly!int == 42);
901     });
902     send(tid, 42);
903 }
904 
905 ///
906 @system unittest
907 {
908     auto tid = spawn(
909     {
910         assert(receiveOnly!string == "text");
911     });
912     send(tid, "text");
913 }
914 
915 ///
916 @system unittest
917 {
918     struct Record { string name; int age; }
919 
920     auto tid = spawn(
921     {
922         auto msg = receiveOnly!(double, Record);
923         assert(msg[0] == 0.5);
924         assert(msg[1].name == "Alice");
925         assert(msg[1].age == 31);
926     });
927 
928     send(tid, 0.5, Record("Alice", 31));
929 }
930 
931 @system unittest
932 {
933     static void t1(Tid mainTid)
934     {
935         try
936         {
937             receiveOnly!string();
938             mainTid.send("");
939         }
940         catch (Throwable th)
941         {
942             mainTid.send(th.msg);
943         }
944     }
945 
946     auto tid = spawn(&t1, thisTid);
947     tid.send(1);
948     string result = receiveOnly!string();
949     assert(result == "Unexpected message type: expected 'string', got 'int'");
950 }
951 
952 // https://issues.dlang.org/show_bug.cgi?id=21663
953 @safe unittest
954 {
955     alias test = receiveOnly!(string, bool, bool);
956 }
957 
958 /**
959  * Receives a message from another thread and gives up if no match
960  * arrives within a specified duration.
961  *
962  * Receive a message from another thread, or block until `duration` exceeds,
963  * if no messages of the specified types are available. This function works
964  * by pattern matching a message against a set of delegates and executing
965  * the first match found.
966  *
967  * If a delegate that accepts a $(REF Variant, std,variant) is included as
968  * the last argument, it will match any message that was not
969  * matched by an earlier delegate.  If more than one argument is sent,
970  * the `Variant` will contain a $(REF Tuple, std,typecons) of all values
971  * sent.
972  *
973  * Params:
974  *     duration = Duration, how long to wait. If `duration` is negative,
975  *                won't wait at all.
976  *     ops = Variadic list of function pointers and delegates. Entries
977  *           in this list must not occlude later entries.
978  *
979  * Returns: `true` if it received a message and `false` if it timed out waiting
980  *          for one.
981  *
982  * Throws: $(LREF OwnerTerminated) when the sending thread was terminated.
983  */
984 bool receiveTimeout(T...)(Duration duration, T ops)
985 in
986 {
987     assert(thisInfo.ident.mbox !is null,
988         "Cannot receive a message until a thread was spawned or thisTid was passed to a running thread.");
989 }
990 do
991 {
992     checkops(ops);
993 
994     return thisInfo.ident.mbox.get(duration, ops);
995 }
996 
997 @safe unittest
998 {
999     static assert(__traits(compiles, {
1000         receiveTimeout(msecs(0), (Variant x) {});
1001         receiveTimeout(msecs(0), (int x) {}, (Variant x) {});
1002     }));
1003 
1004     static assert(!__traits(compiles, {
1005         receiveTimeout(msecs(0), (Variant x) {}, (int x) {});
1006     }));
1007 
1008     static assert(!__traits(compiles, {
1009         receiveTimeout(msecs(0), (int x) {}, (int x) {});
1010     }));
1011 
1012     static assert(__traits(compiles, {
1013         receiveTimeout(msecs(10), (int x) {}, (Variant x) {});
1014     }));
1015 }
1016 
1017 // MessageBox Limits
1018 
1019 /**
1020  * These behaviors may be specified when a mailbox is full.
1021  */
1022 enum OnCrowding
1023 {
1024     block, /// Wait until room is available.
1025     throwException, /// Throw a $(LREF MailboxFull) exception.
1026     ignore /// Abort the send and return.
1027 }
1028 
1029 private
1030 {
1031     bool onCrowdingBlock(Tid tid) @safe pure nothrow @nogc
1032     {
1033         return true;
1034     }
1035 
1036     bool onCrowdingThrow(Tid tid) @safe pure
1037     {
1038         throw new MailboxFull(tid);
1039     }
1040 
1041     bool onCrowdingIgnore(Tid tid) @safe pure nothrow @nogc
1042     {
1043         return false;
1044     }
1045 }
1046 
1047 /**
1048  * Sets a maximum mailbox size.
1049  *
1050  * Sets a limit on the maximum number of user messages allowed in the mailbox.
1051  * If this limit is reached, the caller attempting to add a new message will
1052  * execute the behavior specified by doThis.  If messages is zero, the mailbox
1053  * is unbounded.
1054  *
1055  * Params:
1056  *  tid      = The Tid of the thread for which this limit should be set.
1057  *  messages = The maximum number of messages or zero if no limit.
1058  *  doThis   = The behavior executed when a message is sent to a full
1059  *             mailbox.
1060  */
1061 void setMaxMailboxSize(Tid tid, size_t messages, OnCrowding doThis) @safe pure
1062 in (tid.mbox !is null)
1063 {
1064     final switch (doThis)
1065     {
1066     case OnCrowding.block:
1067         return tid.mbox.setMaxMsgs(messages, &onCrowdingBlock);
1068     case OnCrowding.throwException:
1069         return tid.mbox.setMaxMsgs(messages, &onCrowdingThrow);
1070     case OnCrowding.ignore:
1071         return tid.mbox.setMaxMsgs(messages, &onCrowdingIgnore);
1072     }
1073 }
1074 
1075 /**
1076  * Sets a maximum mailbox size.
1077  *
1078  * Sets a limit on the maximum number of user messages allowed in the mailbox.
1079  * If this limit is reached, the caller attempting to add a new message will
1080  * execute onCrowdingDoThis.  If messages is zero, the mailbox is unbounded.
1081  *
1082  * Params:
1083  *  tid      = The Tid of the thread for which this limit should be set.
1084  *  messages = The maximum number of messages or zero if no limit.
1085  *  onCrowdingDoThis = The routine called when a message is sent to a full
1086  *                     mailbox.
1087  */
1088 void setMaxMailboxSize(Tid tid, size_t messages, bool function(Tid) onCrowdingDoThis)
1089 in (tid.mbox !is null)
1090 {
1091     tid.mbox.setMaxMsgs(messages, onCrowdingDoThis);
1092 }
1093 
1094 private
1095 {
1096     __gshared Tid[string] tidByName;
1097     __gshared string[][Tid] namesByTid;
1098 }
1099 
1100 private @property Mutex registryLock()
1101 {
1102     __gshared Mutex impl;
1103     initOnce!impl(new Mutex);
1104     return impl;
1105 }
1106 
1107 private void unregisterMe(ref ThreadInfo me)
1108 {
1109     if (me.ident != Tid.init)
1110     {
1111         synchronized (registryLock)
1112         {
1113             if (auto allNames = me.ident in namesByTid)
1114             {
1115                 foreach (name; *allNames)
1116                     tidByName.remove(name);
1117                 namesByTid.remove(me.ident);
1118             }
1119         }
1120     }
1121 }
1122 
1123 /**
1124  * Associates name with tid.
1125  *
1126  * Associates name with tid in a process-local map.  When the thread
1127  * represented by tid terminates, any names associated with it will be
1128  * automatically unregistered.
1129  *
1130  * Params:
1131  *  name = The name to associate with tid.
1132  *  tid  = The tid register by name.
1133  *
1134  * Returns:
1135  *  true if the name is available and tid is not known to represent a
1136  *  defunct thread.
1137  */
1138 bool register(string name, Tid tid)
1139 in (tid.mbox !is null)
1140 {
1141     synchronized (registryLock)
1142     {
1143         if (name in tidByName)
1144             return false;
1145         if (tid.mbox.isClosed)
1146             return false;
1147         namesByTid[tid] ~= name;
1148         tidByName[name] = tid;
1149         return true;
1150     }
1151 }
1152 
1153 /**
1154  * Removes the registered name associated with a tid.
1155  *
1156  * Params:
1157  *  name = The name to unregister.
1158  *
1159  * Returns:
1160  *  true if the name is registered, false if not.
1161  */
1162 bool unregister(string name)
1163 {
1164     import std.algorithm.mutation : remove, SwapStrategy;
1165     import std.algorithm.searching : countUntil;
1166 
1167     synchronized (registryLock)
1168     {
1169         if (auto tid = name in tidByName)
1170         {
1171             auto allNames = *tid in namesByTid;
1172             auto pos = countUntil(*allNames, name);
1173             remove!(SwapStrategy.unstable)(*allNames, pos);
1174             tidByName.remove(name);
1175             return true;
1176         }
1177         return false;
1178     }
1179 }
1180 
1181 /**
1182  * Gets the `Tid` associated with name.
1183  *
1184  * Params:
1185  *  name = The name to locate within the registry.
1186  *
1187  * Returns:
1188  *  The associated `Tid` or `Tid.init` if name is not registered.
1189  */
1190 Tid locate(string name)
1191 {
1192     synchronized (registryLock)
1193     {
1194         if (auto tid = name in tidByName)
1195             return *tid;
1196         return Tid.init;
1197     }
1198 }
1199 
1200 /**
1201  * Encapsulates all implementation-level data needed for scheduling.
1202  *
1203  * When defining a $(LREF Scheduler), an instance of this struct must be associated
1204  * with each logical thread.  It contains all implementation-level information
1205  * needed by the internal API.
1206  */
1207 struct ThreadInfo
1208 {
1209     Tid ident;
1210     bool[Tid] links;
1211     Tid owner;
1212 
1213     /**
1214      * Gets a thread-local instance of `ThreadInfo`.
1215      *
1216      * Gets a thread-local instance of `ThreadInfo`, which should be used as the
1217      * default instance when info is requested for a thread not created by the
1218      * `Scheduler`.
1219      */
1220     static @property ref thisInfo() nothrow
1221     {
1222         static ThreadInfo val;
1223         return val;
1224     }
1225 
1226     /**
1227      * Cleans up this ThreadInfo.
1228      *
1229      * This must be called when a scheduled thread terminates.  It tears down
1230      * the messaging system for the thread and notifies interested parties of
1231      * the thread's termination.
1232      */
1233     void cleanup()
1234     {
1235         if (ident.mbox !is null)
1236             ident.mbox.close();
1237         foreach (tid; links.keys)
1238             _send(MsgType.linkDead, tid, ident);
1239         if (owner != Tid.init)
1240             _send(MsgType.linkDead, owner, ident);
1241         unregisterMe(this); // clean up registry entries
1242     }
1243 
1244     // https://issues.dlang.org/show_bug.cgi?id=20160
1245     @system unittest
1246     {
1247         register("main_thread", thisTid());
1248 
1249         ThreadInfo t;
1250         t.cleanup();
1251 
1252         assert(locate("main_thread") == thisTid());
1253     }
1254 }
1255 
1256 /**
1257  * A `Scheduler` controls how threading is performed by spawn.
1258  *
1259  * Implementing a `Scheduler` allows the concurrency mechanism used by this
1260  * module to be customized according to different needs.  By default, a call
1261  * to spawn will create a new kernel thread that executes the supplied routine
1262  * and terminates when finished.  But it is possible to create `Scheduler`s that
1263  * reuse threads, that multiplex `Fiber`s (coroutines) across a single thread,
1264  * or any number of other approaches.  By making the choice of `Scheduler` a
1265  * user-level option, `std.concurrency` may be used for far more types of
1266  * application than if this behavior were predefined.
1267  *
1268  * Example:
1269  * ---
1270  * import std.concurrency;
1271  * import std.stdio;
1272  *
1273  * void main()
1274  * {
1275  *     scheduler = new FiberScheduler;
1276  *     scheduler.start(
1277  *     {
1278  *         writeln("the rest of main goes here");
1279  *     });
1280  * }
1281  * ---
1282  *
1283  * Some schedulers have a dispatching loop that must run if they are to work
1284  * properly, so for the sake of consistency, when using a scheduler, `start()`
1285  * must be called within `main()`.  This yields control to the scheduler and
1286  * will ensure that any spawned threads are executed in an expected manner.
1287  */
1288 interface Scheduler
1289 {
1290     /**
1291      * Spawns the supplied op and starts the `Scheduler`.
1292      *
1293      * This is intended to be called at the start of the program to yield all
1294      * scheduling to the active `Scheduler` instance.  This is necessary for
1295      * schedulers that explicitly dispatch threads rather than simply relying
1296      * on the operating system to do so, and so start should always be called
1297      * within `main()` to begin normal program execution.
1298      *
1299      * Params:
1300      *  op = A wrapper for whatever the main thread would have done in the
1301      *       absence of a custom scheduler.  It will be automatically executed
1302      *       via a call to spawn by the `Scheduler`.
1303      */
1304     void start(void delegate() op);
1305 
1306     /**
1307      * Assigns a logical thread to execute the supplied op.
1308      *
1309      * This routine is called by spawn.  It is expected to instantiate a new
1310      * logical thread and run the supplied operation.  This thread must call
1311      * `thisInfo.cleanup()` when the thread terminates if the scheduled thread
1312      * is not a kernel thread--all kernel threads will have their `ThreadInfo`
1313      * cleaned up automatically by a thread-local destructor.
1314      *
1315      * Params:
1316      *  op = The function to execute.  This may be the actual function passed
1317      *       by the user to spawn itself, or may be a wrapper function.
1318      */
1319     void spawn(void delegate() op);
1320 
1321     /**
1322      * Yields execution to another logical thread.
1323      *
1324      * This routine is called at various points within concurrency-aware APIs
1325      * to provide a scheduler a chance to yield execution when using some sort
1326      * of cooperative multithreading model.  If this is not appropriate, such
1327      * as when each logical thread is backed by a dedicated kernel thread,
1328      * this routine may be a no-op.
1329      */
1330     void yield() nothrow;
1331 
1332     /**
1333      * Returns an appropriate `ThreadInfo` instance.
1334      *
1335      * Returns an instance of `ThreadInfo` specific to the logical thread that
1336      * is calling this routine or, if the calling thread was not create by
1337      * this scheduler, returns `ThreadInfo.thisInfo` instead.
1338      */
1339     @property ref ThreadInfo thisInfo() nothrow;
1340 
1341     /**
1342      * Creates a `Condition` variable analog for signaling.
1343      *
1344      * Creates a new `Condition` variable analog which is used to check for and
1345      * to signal the addition of messages to a thread's message queue.  Like
1346      * yield, some schedulers may need to define custom behavior so that calls
1347      * to `Condition.wait()` yield to another thread when no new messages are
1348      * available instead of blocking.
1349      *
1350      * Params:
1351      *  m = The `Mutex` that will be associated with this condition.  It will be
1352      *      locked prior to any operation on the condition, and so in some
1353      *      cases a `Scheduler` may need to hold this reference and unlock the
1354      *      mutex before yielding execution to another logical thread.
1355      */
1356     Condition newCondition(Mutex m) nothrow;
1357 }
1358 
1359 /**
1360  * An example `Scheduler` using kernel threads.
1361  *
1362  * This is an example `Scheduler` that mirrors the default scheduling behavior
1363  * of creating one kernel thread per call to spawn.  It is fully functional
1364  * and may be instantiated and used, but is not a necessary part of the
1365  * default functioning of this module.
1366  */
1367 class ThreadScheduler : Scheduler
1368 {
1369     /**
1370      * This simply runs op directly, since no real scheduling is needed by
1371      * this approach.
1372      */
1373     void start(void delegate() op)
1374     {
1375         op();
1376     }
1377 
1378     /**
1379      * Creates a new kernel thread and assigns it to run the supplied op.
1380      */
1381     void spawn(void delegate() op)
1382     {
1383         auto t = new Thread(op);
1384         t.start();
1385     }
1386 
1387     /**
1388      * This scheduler does no explicit multiplexing, so this is a no-op.
1389      */
1390     void yield() nothrow
1391     {
1392         // no explicit yield needed
1393     }
1394 
1395     /**
1396      * Returns `ThreadInfo.thisInfo`, since it is a thread-local instance of
1397      * `ThreadInfo`, which is the correct behavior for this scheduler.
1398      */
1399     @property ref ThreadInfo thisInfo() nothrow
1400     {
1401         return ThreadInfo.thisInfo;
1402     }
1403 
1404     /**
1405      * Creates a new `Condition` variable.  No custom behavior is needed here.
1406      */
1407     Condition newCondition(Mutex m) nothrow
1408     {
1409         return new Condition(m);
1410     }
1411 }
1412 
1413 /**
1414  * An example `Scheduler` using $(MREF_ALTTEXT `Fiber`s, core, thread, fiber).
1415  *
1416  * This is an example scheduler that creates a new `Fiber` per call to spawn
1417  * and multiplexes the execution of all fibers within the main thread.
1418  */
1419 class FiberScheduler : Scheduler
1420 {
1421     /**
1422      * This creates a new `Fiber` for the supplied op and then starts the
1423      * dispatcher.
1424      */
1425     void start(void delegate() op)
1426     {
1427         create(op);
1428         dispatch();
1429     }
1430 
1431     /**
1432      * This created a new `Fiber` for the supplied op and adds it to the
1433      * dispatch list.
1434      */
1435     void spawn(void delegate() op) nothrow
1436     {
1437         create(op);
1438         yield();
1439     }
1440 
1441     /**
1442      * If the caller is a scheduled `Fiber`, this yields execution to another
1443      * scheduled `Fiber`.
1444      */
1445     void yield() nothrow
1446     {
1447         // NOTE: It's possible that we should test whether the calling Fiber
1448         //       is an InfoFiber before yielding, but I think it's reasonable
1449         //       that any (non-Generator) fiber should yield here.
1450         if (Fiber.getThis())
1451             Fiber.yield();
1452     }
1453 
1454     /**
1455      * Returns an appropriate `ThreadInfo` instance.
1456      *
1457      * Returns a `ThreadInfo` instance specific to the calling `Fiber` if the
1458      * `Fiber` was created by this dispatcher, otherwise it returns
1459      * `ThreadInfo.thisInfo`.
1460      */
1461     @property ref ThreadInfo thisInfo() nothrow
1462     {
1463         auto f = cast(InfoFiber) Fiber.getThis();
1464 
1465         if (f !is null)
1466             return f.info;
1467         return ThreadInfo.thisInfo;
1468     }
1469 
1470     /**
1471      * Returns a `Condition` analog that yields when wait or notify is called.
1472      *
1473      * Bug:
1474      * For the default implementation, `notifyAll` will behave like `notify`.
1475      *
1476      * Params:
1477      *   m = A `Mutex` to use for locking if the condition needs to be waited on
1478      *       or notified from multiple `Thread`s.
1479      *       If `null`, no `Mutex` will be used and it is assumed that the
1480      *       `Condition` is only waited on/notified from one `Thread`.
1481      */
1482     Condition newCondition(Mutex m) nothrow
1483     {
1484         return new FiberCondition(m);
1485     }
1486 
1487 protected:
1488     /**
1489      * Creates a new `Fiber` which calls the given delegate.
1490      *
1491      * Params:
1492      *   op = The delegate the fiber should call
1493      */
1494     void create(void delegate() op) nothrow
1495     {
1496         void wrap()
1497         {
1498             scope (exit)
1499             {
1500                 thisInfo.cleanup();
1501             }
1502             op();
1503         }
1504 
1505         m_fibers ~= new InfoFiber(&wrap);
1506     }
1507 
1508     /**
1509      * `Fiber` which embeds a `ThreadInfo`
1510      */
1511     static class InfoFiber : Fiber
1512     {
1513         ThreadInfo info;
1514 
1515         this(void delegate() op) nothrow
1516         {
1517             super(op);
1518         }
1519 
1520         this(void delegate() op, size_t sz) nothrow
1521         {
1522             super(op, sz);
1523         }
1524     }
1525 
1526 private:
1527     class FiberCondition : Condition
1528     {
1529         this(Mutex m) nothrow
1530         {
1531             super(m);
1532             notified = false;
1533         }
1534 
1535         override void wait() nothrow
1536         {
1537             scope (exit) notified = false;
1538 
1539             while (!notified)
1540                 switchContext();
1541         }
1542 
1543         override bool wait(Duration period) nothrow
1544         {
1545             import core.time : MonoTime;
1546 
1547             scope (exit) notified = false;
1548 
1549             for (auto limit = MonoTime.currTime + period;
1550                  !notified && !period.isNegative;
1551                  period = limit - MonoTime.currTime)
1552             {
1553                 this.outer.yield();
1554             }
1555             return notified;
1556         }
1557 
1558         override void notify() nothrow
1559         {
1560             notified = true;
1561             switchContext();
1562         }
1563 
1564         override void notifyAll() nothrow
1565         {
1566             notified = true;
1567             switchContext();
1568         }
1569 
1570     private:
1571         void switchContext() nothrow
1572         {
1573             if (mutex_nothrow) mutex_nothrow.unlock_nothrow();
1574             scope (exit)
1575                 if (mutex_nothrow)
1576                     mutex_nothrow.lock_nothrow();
1577             this.outer.yield();
1578         }
1579 
1580         bool notified;
1581     }
1582 
1583     void dispatch()
1584     {
1585         import std.algorithm.mutation : remove;
1586 
1587         while (m_fibers.length > 0)
1588         {
1589             auto t = m_fibers[m_pos].call(Fiber.Rethrow.no);
1590             if (t !is null && !(cast(OwnerTerminated) t))
1591             {
1592                 throw t;
1593             }
1594             if (m_fibers[m_pos].state == Fiber.State.TERM)
1595             {
1596                 if (m_pos >= (m_fibers = remove(m_fibers, m_pos)).length)
1597                     m_pos = 0;
1598             }
1599             else if (m_pos++ >= m_fibers.length - 1)
1600             {
1601                 m_pos = 0;
1602             }
1603         }
1604     }
1605 
1606     Fiber[] m_fibers;
1607     size_t m_pos;
1608 }
1609 
1610 @system unittest
1611 {
1612     static void receive(Condition cond, ref size_t received)
1613     {
1614         while (true)
1615         {
1616             synchronized (cond.mutex)
1617             {
1618                 cond.wait();
1619                 ++received;
1620             }
1621         }
1622     }
1623 
1624     static void send(Condition cond, ref size_t sent)
1625     {
1626         while (true)
1627         {
1628             synchronized (cond.mutex)
1629             {
1630                 ++sent;
1631                 cond.notify();
1632             }
1633         }
1634     }
1635 
1636     auto fs = new FiberScheduler;
1637     auto mtx = new Mutex;
1638     auto cond = fs.newCondition(mtx);
1639 
1640     size_t received, sent;
1641     auto waiter = new Fiber({ receive(cond, received); }), notifier = new Fiber({ send(cond, sent); });
1642     waiter.call();
1643     assert(received == 0);
1644     notifier.call();
1645     assert(sent == 1);
1646     assert(received == 0);
1647     waiter.call();
1648     assert(received == 1);
1649     waiter.call();
1650     assert(received == 1);
1651 }
1652 
1653 /**
1654  * Sets the `Scheduler` behavior within the program.
1655  *
1656  * This variable sets the `Scheduler` behavior within this program.  Typically,
1657  * when setting a `Scheduler`, `scheduler.start()` should be called in `main`.  This
1658  * routine will not return until program execution is complete.
1659  */
1660 __gshared Scheduler scheduler;
1661 
1662 // Generator
1663 
1664 /**
1665  * If the caller is a `Fiber` and is not a $(LREF Generator), this function will call
1666  * `scheduler.yield()` or `Fiber.yield()`, as appropriate.
1667  */
1668 void yield() nothrow
1669 {
1670     auto fiber = Fiber.getThis();
1671     if (!(cast(IsGenerator) fiber))
1672     {
1673         if (scheduler is null)
1674         {
1675             if (fiber)
1676                 return Fiber.yield();
1677         }
1678         else
1679             scheduler.yield();
1680     }
1681 }
1682 
1683 /// Used to determine whether a Generator is running.
1684 private interface IsGenerator {}
1685 
1686 
1687 /**
1688  * A Generator is a $(MREF_ALTTEXT Fiber, core, thread, fiber)
1689  * that periodically returns values of type `T` to the
1690  * caller via `yield`.  This is represented as an InputRange.
1691  */
1692 class Generator(T) :
1693     Fiber, IsGenerator, InputRange!T
1694 {
1695     /**
1696      * Initializes a generator object which is associated with a static
1697      * D function.  The function will be called once to prepare the range
1698      * for iteration.
1699      *
1700      * Params:
1701      *  fn = The fiber function.
1702      *
1703      * In:
1704      *  fn must not be null.
1705      */
1706     this(void function() fn)
1707     {
1708         super(fn);
1709         call();
1710     }
1711 
1712     /**
1713      * Initializes a generator object which is associated with a static
1714      * D function.  The function will be called once to prepare the range
1715      * for iteration.
1716      *
1717      * Params:
1718      *  fn = The fiber function.
1719      *  sz = The stack size for this fiber.
1720      *
1721      * In:
1722      *  fn must not be null.
1723      */
1724     this(void function() fn, size_t sz)
1725     {
1726         super(fn, sz);
1727         call();
1728     }
1729 
1730     /**
1731      * Initializes a generator object which is associated with a static
1732      * D function.  The function will be called once to prepare the range
1733      * for iteration.
1734      *
1735      * Params:
1736      *  fn = The fiber function.
1737      *  sz = The stack size for this fiber.
1738      *  guardPageSize = size of the guard page to trap fiber's stack
1739      *                  overflows. Refer to $(REF Fiber, core,thread)'s
1740      *                  documentation for more details.
1741      *
1742      * In:
1743      *  fn must not be null.
1744      */
1745     this(void function() fn, size_t sz, size_t guardPageSize)
1746     {
1747         super(fn, sz, guardPageSize);
1748         call();
1749     }
1750 
1751     /**
1752      * Initializes a generator object which is associated with a dynamic
1753      * D function.  The function will be called once to prepare the range
1754      * for iteration.
1755      *
1756      * Params:
1757      *  dg = The fiber function.
1758      *
1759      * In:
1760      *  dg must not be null.
1761      */
1762     this(void delegate() dg)
1763     {
1764         super(dg);
1765         call();
1766     }
1767 
1768     /**
1769      * Initializes a generator object which is associated with a dynamic
1770      * D function.  The function will be called once to prepare the range
1771      * for iteration.
1772      *
1773      * Params:
1774      *  dg = The fiber function.
1775      *  sz = The stack size for this fiber.
1776      *
1777      * In:
1778      *  dg must not be null.
1779      */
1780     this(void delegate() dg, size_t sz)
1781     {
1782         super(dg, sz);
1783         call();
1784     }
1785 
1786     /**
1787      * Initializes a generator object which is associated with a dynamic
1788      * D function.  The function will be called once to prepare the range
1789      * for iteration.
1790      *
1791      * Params:
1792      *  dg = The fiber function.
1793      *  sz = The stack size for this fiber.
1794      *  guardPageSize = size of the guard page to trap fiber's stack
1795      *                  overflows. Refer to $(REF Fiber, core,thread)'s
1796      *                  documentation for more details.
1797      *
1798      * In:
1799      *  dg must not be null.
1800      */
1801     this(void delegate() dg, size_t sz, size_t guardPageSize)
1802     {
1803         super(dg, sz, guardPageSize);
1804         call();
1805     }
1806 
1807     /**
1808      * Returns true if the generator is empty.
1809      */
1810     final bool empty() @property
1811     {
1812         return m_value is null || state == State.TERM;
1813     }
1814 
1815     /**
1816      * Obtains the next value from the underlying function.
1817      */
1818     final void popFront()
1819     {
1820         call();
1821     }
1822 
1823     /**
1824      * Returns the most recently generated value by shallow copy.
1825      */
1826     final T front() @property
1827     {
1828         return *m_value;
1829     }
1830 
1831     /**
1832      * Returns the most recently generated value without executing a
1833      * copy contructor. Will not compile for element types defining a
1834      * postblit, because `Generator` does not return by reference.
1835      */
1836     final T moveFront()
1837     {
1838         static if (!hasElaborateCopyConstructor!T)
1839         {
1840             return front;
1841         }
1842         else
1843         {
1844             static assert(0,
1845                     "Fiber front is always rvalue and thus cannot be moved since it defines a postblit.");
1846         }
1847     }
1848 
1849     final int opApply(scope int delegate(T) loopBody)
1850     {
1851         int broken;
1852         for (; !empty; popFront())
1853         {
1854             broken = loopBody(front);
1855             if (broken) break;
1856         }
1857         return broken;
1858     }
1859 
1860     final int opApply(scope int delegate(size_t, T) loopBody)
1861     {
1862         int broken;
1863         for (size_t i; !empty; ++i, popFront())
1864         {
1865             broken = loopBody(i, front);
1866             if (broken) break;
1867         }
1868         return broken;
1869     }
1870 private:
1871     T* m_value;
1872 }
1873 
1874 ///
1875 @system unittest
1876 {
1877     auto tid = spawn({
1878         int i;
1879         while (i < 9)
1880             i = receiveOnly!int;
1881 
1882         ownerTid.send(i * 2);
1883     });
1884 
1885     auto r = new Generator!int({
1886         foreach (i; 1 .. 10)
1887             yield(i);
1888     });
1889 
1890     foreach (e; r)
1891         tid.send(e);
1892 
1893     assert(receiveOnly!int == 18);
1894 }
1895 
1896 /**
1897  * Yields a value of type T to the caller of the currently executing
1898  * generator.
1899  *
1900  * Params:
1901  *  value = The value to yield.
1902  */
1903 void yield(T)(ref T value)
1904 {
1905     Generator!T cur = cast(Generator!T) Fiber.getThis();
1906     if (cur !is null && cur.state == Fiber.State.EXEC)
1907     {
1908         cur.m_value = &value;
1909         return Fiber.yield();
1910     }
1911     throw new Exception("yield(T) called with no active generator for the supplied type");
1912 }
1913 
1914 /// ditto
1915 void yield(T)(T value)
1916 {
1917     yield(value);
1918 }
1919 
1920 @system unittest
1921 {
1922     import core.exception;
1923     import std.exception;
1924 
1925     auto mainTid = thisTid;
1926     alias testdg = () {
1927         auto tid = spawn(
1928         (Tid mainTid) {
1929             int i;
1930             scope (failure) mainTid.send(false);
1931             try
1932             {
1933                 for (i = 1; i < 10; i++)
1934                 {
1935                     if (receiveOnly!int() != i)
1936                     {
1937                         mainTid.send(false);
1938                         break;
1939                     }
1940                 }
1941             }
1942             catch (OwnerTerminated e)
1943             {
1944                 // i will advance 1 past the last value expected
1945                 mainTid.send(i == 4);
1946             }
1947         }, mainTid);
1948         auto r = new Generator!int(
1949         {
1950             assertThrown!Exception(yield(2.0));
1951             yield(); // ensure this is a no-op
1952             yield(1);
1953             yield(); // also once something has been yielded
1954             yield(2);
1955             yield(3);
1956         });
1957 
1958         foreach (e; r)
1959         {
1960             tid.send(e);
1961         }
1962     };
1963 
1964     scheduler = new ThreadScheduler;
1965     scheduler.spawn(testdg);
1966     assert(receiveOnly!bool());
1967 
1968     scheduler = new FiberScheduler;
1969     scheduler.start(testdg);
1970     assert(receiveOnly!bool());
1971     scheduler = null;
1972 }
1973 ///
1974 @system unittest
1975 {
1976     import std.range;
1977 
1978     InputRange!int myIota = iota(10).inputRangeObject;
1979 
1980     myIota.popFront();
1981     myIota.popFront();
1982     assert(myIota.moveFront == 2);
1983     assert(myIota.front == 2);
1984     myIota.popFront();
1985     assert(myIota.front == 3);
1986 
1987     //can be assigned to std.range.interfaces.InputRange directly
1988     myIota = new Generator!int(
1989     {
1990         foreach (i; 0 .. 10) yield(i);
1991     });
1992 
1993     myIota.popFront();
1994     myIota.popFront();
1995     assert(myIota.moveFront == 2);
1996     assert(myIota.front == 2);
1997     myIota.popFront();
1998     assert(myIota.front == 3);
1999 
2000     size_t[2] counter = [0, 0];
2001     foreach (i, unused; myIota) counter[] += [1, i];
2002 
2003     assert(myIota.empty);
2004     assert(counter == [7, 21]);
2005 }
2006 
2007 private
2008 {
2009     /*
2010      * A MessageBox is a message queue for one thread.  Other threads may send
2011      * messages to this owner by calling put(), and the owner receives them by
2012      * calling get().  The put() call is therefore effectively shared and the
2013      * get() call is effectively local.  setMaxMsgs may be used by any thread
2014      * to limit the size of the message queue.
2015      */
2016     class MessageBox
2017     {
2018         this() @trusted nothrow /* TODO: make @safe after relevant druntime PR gets merged */
2019         {
2020             m_lock = new Mutex;
2021             m_closed = false;
2022 
2023             if (scheduler is null)
2024             {
2025                 m_putMsg = new Condition(m_lock);
2026                 m_notFull = new Condition(m_lock);
2027             }
2028             else
2029             {
2030                 m_putMsg = scheduler.newCondition(m_lock);
2031                 m_notFull = scheduler.newCondition(m_lock);
2032             }
2033         }
2034 
2035         ///
2036         final @property bool isClosed() @safe @nogc pure
2037         {
2038             synchronized (m_lock)
2039             {
2040                 return m_closed;
2041             }
2042         }
2043 
2044         /*
2045          * Sets a limit on the maximum number of user messages allowed in the
2046          * mailbox.  If this limit is reached, the caller attempting to add
2047          * a new message will execute call.  If num is zero, there is no limit
2048          * on the message queue.
2049          *
2050          * Params:
2051          *  num  = The maximum size of the queue or zero if the queue is
2052          *         unbounded.
2053          *  call = The routine to call when the queue is full.
2054          */
2055         final void setMaxMsgs(size_t num, bool function(Tid) call) @safe @nogc pure
2056         {
2057             synchronized (m_lock)
2058             {
2059                 m_maxMsgs = num;
2060                 m_onMaxMsgs = call;
2061             }
2062         }
2063 
2064         /*
2065          * If maxMsgs is not set, the message is added to the queue and the
2066          * owner is notified.  If the queue is full, the message will still be
2067          * accepted if it is a control message, otherwise onCrowdingDoThis is
2068          * called.  If the routine returns true, this call will block until
2069          * the owner has made space available in the queue.  If it returns
2070          * false, this call will abort.
2071          *
2072          * Params:
2073          *  msg = The message to put in the queue.
2074          *
2075          * Throws:
2076          *  An exception if the queue is full and onCrowdingDoThis throws.
2077          */
2078         final void put(ref Message msg)
2079         {
2080             synchronized (m_lock)
2081             {
2082                 // TODO: Generate an error here if m_closed is true, or maybe
2083                 //       put a message in the caller's queue?
2084                 if (!m_closed)
2085                 {
2086                     while (true)
2087                     {
2088                         if (isPriorityMsg(msg))
2089                         {
2090                             m_sharedPty.put(msg);
2091                             m_putMsg.notify();
2092                             return;
2093                         }
2094                         if (!mboxFull() || isControlMsg(msg))
2095                         {
2096                             m_sharedBox.put(msg);
2097                             m_putMsg.notify();
2098                             return;
2099                         }
2100                         if (m_onMaxMsgs !is null && !m_onMaxMsgs(thisTid))
2101                         {
2102                             return;
2103                         }
2104                         m_putQueue++;
2105                         m_notFull.wait();
2106                         m_putQueue--;
2107                     }
2108                 }
2109             }
2110         }
2111 
2112         /*
2113          * Matches ops against each message in turn until a match is found.
2114          *
2115          * Params:
2116          *  ops = The operations to match.  Each may return a bool to indicate
2117          *        whether a message with a matching type is truly a match.
2118          *
2119          * Returns:
2120          *  true if a message was retrieved and false if not (such as if a
2121          *  timeout occurred).
2122          *
2123          * Throws:
2124          *  LinkTerminated if a linked thread terminated, or OwnerTerminated
2125          * if the owner thread terminates and no existing messages match the
2126          * supplied ops.
2127          */
2128         bool get(T...)(scope T vals)
2129         {
2130             import std.meta : AliasSeq;
2131 
2132             static assert(T.length, "T must not be empty");
2133 
2134             static if (is(T[0] : Duration))
2135             {
2136                 alias Ops = AliasSeq!(T[1 .. $]);
2137                 alias ops = vals[1 .. $];
2138                 enum timedWait = true;
2139                 Duration period = vals[0];
2140             }
2141             else
2142             {
2143                 alias Ops = AliasSeq!(T);
2144                 alias ops = vals[0 .. $];
2145                 enum timedWait = false;
2146             }
2147 
2148             bool onStandardMsg(ref Message msg)
2149             {
2150                 foreach (i, t; Ops)
2151                 {
2152                     alias Args = Parameters!(t);
2153                     auto op = ops[i];
2154 
2155                     if (msg.convertsTo!(Args))
2156                     {
2157                         alias RT = ReturnType!(t);
2158                         static if (is(RT == bool))
2159                         {
2160                             return msg.map(op);
2161                         }
2162                         else
2163                         {
2164                             msg.map(op);
2165                             static if (!is(immutable RT == immutable noreturn))
2166                                 return true;
2167                         }
2168                     }
2169                 }
2170                 return false;
2171             }
2172 
2173             bool onLinkDeadMsg(ref Message msg)
2174             {
2175                 assert(msg.convertsTo!(Tid),
2176                         "Message could be converted to Tid");
2177                 auto tid = msg.get!(Tid);
2178 
2179                 if (bool* pDepends = tid in thisInfo.links)
2180                 {
2181                     auto depends = *pDepends;
2182                     thisInfo.links.remove(tid);
2183                     // Give the owner relationship precedence.
2184                     if (depends && tid != thisInfo.owner)
2185                     {
2186                         auto e = new LinkTerminated(tid);
2187                         auto m = Message(MsgType.standard, e);
2188                         if (onStandardMsg(m))
2189                             return true;
2190                         throw e;
2191                     }
2192                 }
2193                 if (tid == thisInfo.owner)
2194                 {
2195                     thisInfo.owner = Tid.init;
2196                     auto e = new OwnerTerminated(tid);
2197                     auto m = Message(MsgType.standard, e);
2198                     if (onStandardMsg(m))
2199                         return true;
2200                     throw e;
2201                 }
2202                 return false;
2203             }
2204 
2205             bool onControlMsg(ref Message msg)
2206             {
2207                 switch (msg.type)
2208                 {
2209                 case MsgType.linkDead:
2210                     return onLinkDeadMsg(msg);
2211                 default:
2212                     return false;
2213                 }
2214             }
2215 
2216             bool scan(ref ListT list)
2217             {
2218                 for (auto range = list[]; !range.empty;)
2219                 {
2220                     // Only the message handler will throw, so if this occurs
2221                     // we can be certain that the message was handled.
2222                     scope (failure)
2223                         list.removeAt(range);
2224 
2225                     if (isControlMsg(range.front))
2226                     {
2227                         if (onControlMsg(range.front))
2228                         {
2229                             // Although the linkDead message is a control message,
2230                             // it can be handled by the user.  Since the linkDead
2231                             // message throws if not handled, if we get here then
2232                             // it has been handled and we can return from receive.
2233                             // This is a weird special case that will have to be
2234                             // handled in a more general way if more are added.
2235                             if (!isLinkDeadMsg(range.front))
2236                             {
2237                                 list.removeAt(range);
2238                                 continue;
2239                             }
2240                             list.removeAt(range);
2241                             return true;
2242                         }
2243                         range.popFront();
2244                         continue;
2245                     }
2246                     else
2247                     {
2248                         if (onStandardMsg(range.front))
2249                         {
2250                             list.removeAt(range);
2251                             return true;
2252                         }
2253                         range.popFront();
2254                         continue;
2255                     }
2256                 }
2257                 return false;
2258             }
2259 
2260             bool pty(ref ListT list)
2261             {
2262                 if (!list.empty)
2263                 {
2264                     auto range = list[];
2265 
2266                     if (onStandardMsg(range.front))
2267                     {
2268                         list.removeAt(range);
2269                         return true;
2270                     }
2271                     if (range.front.convertsTo!(Throwable))
2272                         throw range.front.get!(Throwable);
2273                     else if (range.front.convertsTo!(shared(Throwable)))
2274                         /* Note: a shared type can be caught without the shared qualifier
2275                          * so throwing shared will be an error */
2276                         throw cast() range.front.get!(shared(Throwable));
2277                     else
2278                         throw new PriorityMessageException(range.front.data);
2279                 }
2280                 return false;
2281             }
2282 
2283             static if (timedWait)
2284             {
2285                 import core.time : MonoTime;
2286                 auto limit = MonoTime.currTime + period;
2287             }
2288 
2289             while (true)
2290             {
2291                 ListT arrived;
2292 
2293                 if (pty(m_localPty) || scan(m_localBox))
2294                 {
2295                     return true;
2296                 }
2297                 yield();
2298                 synchronized (m_lock)
2299                 {
2300                     updateMsgCount();
2301                     while (m_sharedPty.empty && m_sharedBox.empty)
2302                     {
2303                         // NOTE: We're notifying all waiters here instead of just
2304                         //       a few because the onCrowding behavior may have
2305                         //       changed and we don't want to block sender threads
2306                         //       unnecessarily if the new behavior is not to block.
2307                         //       This will admittedly result in spurious wakeups
2308                         //       in other situations, but what can you do?
2309                         if (m_putQueue && !mboxFull())
2310                             m_notFull.notifyAll();
2311                         static if (timedWait)
2312                         {
2313                             if (period <= Duration.zero || !m_putMsg.wait(period))
2314                                 return false;
2315                         }
2316                         else
2317                         {
2318                             m_putMsg.wait();
2319                         }
2320                     }
2321                     m_localPty.put(m_sharedPty);
2322                     arrived.put(m_sharedBox);
2323                 }
2324                 if (m_localPty.empty)
2325                 {
2326                     scope (exit) m_localBox.put(arrived);
2327                     if (scan(arrived))
2328                     {
2329                         return true;
2330                     }
2331                     else
2332                     {
2333                         static if (timedWait)
2334                         {
2335                             period = limit - MonoTime.currTime;
2336                         }
2337                         continue;
2338                     }
2339                 }
2340                 m_localBox.put(arrived);
2341                 pty(m_localPty);
2342                 return true;
2343             }
2344         }
2345 
2346         /*
2347          * Called on thread termination.  This routine processes any remaining
2348          * control messages, clears out message queues, and sets a flag to
2349          * reject any future messages.
2350          */
2351         final void close()
2352         {
2353             static void onLinkDeadMsg(ref Message msg)
2354             {
2355                 assert(msg.convertsTo!(Tid),
2356                         "Message could be converted to Tid");
2357                 auto tid = msg.get!(Tid);
2358 
2359                 thisInfo.links.remove(tid);
2360                 if (tid == thisInfo.owner)
2361                     thisInfo.owner = Tid.init;
2362             }
2363 
2364             static void sweep(ref ListT list)
2365             {
2366                 for (auto range = list[]; !range.empty; range.popFront())
2367                 {
2368                     if (range.front.type == MsgType.linkDead)
2369                         onLinkDeadMsg(range.front);
2370                 }
2371             }
2372 
2373             ListT arrived;
2374 
2375             sweep(m_localBox);
2376             synchronized (m_lock)
2377             {
2378                 arrived.put(m_sharedBox);
2379                 m_closed = true;
2380             }
2381             m_localBox.clear();
2382             sweep(arrived);
2383         }
2384 
2385     private:
2386         // Routines involving local data only, no lock needed.
2387 
2388         bool mboxFull() @safe @nogc pure nothrow
2389         {
2390             return m_maxMsgs && m_maxMsgs <= m_localMsgs + m_sharedBox.length;
2391         }
2392 
2393         void updateMsgCount() @safe @nogc pure nothrow
2394         {
2395             m_localMsgs = m_localBox.length;
2396         }
2397 
2398         bool isControlMsg(ref Message msg) @safe @nogc pure nothrow
2399         {
2400             return msg.type != MsgType.standard && msg.type != MsgType.priority;
2401         }
2402 
2403         bool isPriorityMsg(ref Message msg) @safe @nogc pure nothrow
2404         {
2405             return msg.type == MsgType.priority;
2406         }
2407 
2408         bool isLinkDeadMsg(ref Message msg) @safe @nogc pure nothrow
2409         {
2410             return msg.type == MsgType.linkDead;
2411         }
2412 
2413         alias OnMaxFn = bool function(Tid);
2414         alias ListT = List!(Message);
2415 
2416         ListT m_localBox;
2417         ListT m_localPty;
2418 
2419         Mutex m_lock;
2420         Condition m_putMsg;
2421         Condition m_notFull;
2422         size_t m_putQueue;
2423         ListT m_sharedBox;
2424         ListT m_sharedPty;
2425         OnMaxFn m_onMaxMsgs;
2426         size_t m_localMsgs;
2427         size_t m_maxMsgs;
2428         bool m_closed;
2429     }
2430 
2431     /*
2432      *
2433      */
2434     struct List(T)
2435     {
2436         struct Range
2437         {
2438             import std.exception : enforce;
2439 
2440             @property bool empty() const
2441             {
2442                 return !m_prev.next;
2443             }
2444 
2445             @property ref T front()
2446             {
2447                 enforce(m_prev.next, "invalid list node");
2448                 return m_prev.next.val;
2449             }
2450 
2451             @property void front(T val)
2452             {
2453                 enforce(m_prev.next, "invalid list node");
2454                 m_prev.next.val = val;
2455             }
2456 
2457             void popFront()
2458             {
2459                 enforce(m_prev.next, "invalid list node");
2460                 m_prev = m_prev.next;
2461             }
2462 
2463             private this(Node* p)
2464             {
2465                 m_prev = p;
2466             }
2467 
2468             private Node* m_prev;
2469         }
2470 
2471         void put(T val)
2472         {
2473             put(newNode(val));
2474         }
2475 
2476         void put(ref List!(T) rhs)
2477         {
2478             if (!rhs.empty)
2479             {
2480                 put(rhs.m_first);
2481                 while (m_last.next !is null)
2482                 {
2483                     m_last = m_last.next;
2484                     m_count++;
2485                 }
2486                 rhs.m_first = null;
2487                 rhs.m_last = null;
2488                 rhs.m_count = 0;
2489             }
2490         }
2491 
2492         Range opSlice()
2493         {
2494             return Range(cast(Node*)&m_first);
2495         }
2496 
2497         void removeAt(Range r)
2498         {
2499             import std.exception : enforce;
2500 
2501             assert(m_count, "Can not remove from empty Range");
2502             Node* n = r.m_prev;
2503             enforce(n && n.next, "attempting to remove invalid list node");
2504 
2505             if (m_last is m_first)
2506                 m_last = null;
2507             else if (m_last is n.next)
2508                 m_last = n; // nocoverage
2509             Node* to_free = n.next;
2510             n.next = n.next.next;
2511             freeNode(to_free);
2512             m_count--;
2513         }
2514 
2515         @property size_t length()
2516         {
2517             return m_count;
2518         }
2519 
2520         void clear()
2521         {
2522             m_first = m_last = null;
2523             m_count = 0;
2524         }
2525 
2526         @property bool empty()
2527         {
2528             return m_first is null;
2529         }
2530 
2531     private:
2532         struct Node
2533         {
2534             Node* next;
2535             T val;
2536 
2537             this(T v)
2538             {
2539                 val = v;
2540             }
2541         }
2542 
2543         static shared struct SpinLock
2544         {
2545             void lock() { while (!cas(&locked, false, true)) { Thread.yield(); } }
2546             void unlock() { atomicStore!(MemoryOrder.rel)(locked, false); }
2547             bool locked;
2548         }
2549 
2550         static shared SpinLock sm_lock;
2551         static shared Node* sm_head;
2552 
2553         Node* newNode(T v)
2554         {
2555             Node* n;
2556             {
2557                 sm_lock.lock();
2558                 scope (exit) sm_lock.unlock();
2559 
2560                 if (sm_head)
2561                 {
2562                     n = cast(Node*) sm_head;
2563                     sm_head = sm_head.next;
2564                 }
2565             }
2566             if (n)
2567             {
2568                 import core.lifetime : emplace;
2569                 emplace!Node(n, v);
2570             }
2571             else
2572             {
2573                 n = new Node(v);
2574             }
2575             return n;
2576         }
2577 
2578         void freeNode(Node* n)
2579         {
2580             // destroy val to free any owned GC memory
2581             destroy(n.val);
2582 
2583             sm_lock.lock();
2584             scope (exit) sm_lock.unlock();
2585 
2586             auto sn = cast(shared(Node)*) n;
2587             sn.next = sm_head;
2588             sm_head = sn;
2589         }
2590 
2591         void put(Node* n)
2592         {
2593             m_count++;
2594             if (!empty)
2595             {
2596                 m_last.next = n;
2597                 m_last = n;
2598                 return;
2599             }
2600             m_first = n;
2601             m_last = n;
2602         }
2603 
2604         Node* m_first;
2605         Node* m_last;
2606         size_t m_count;
2607     }
2608 }
2609 
2610 @system unittest
2611 {
2612     import std.typecons : tuple, Tuple;
2613 
2614     static void testfn(Tid tid)
2615     {
2616         receive((float val) { assert(0); }, (int val, int val2) {
2617             assert(val == 42 && val2 == 86);
2618         });
2619         receive((Tuple!(int, int) val) { assert(val[0] == 42 && val[1] == 86); });
2620         receive((Variant val) {  });
2621         receive((string val) {
2622             if ("the quick brown fox" != val)
2623                 return false;
2624             return true;
2625         }, (string val) { assert(false); });
2626         prioritySend(tid, "done");
2627     }
2628 
2629     static void runTest(Tid tid)
2630     {
2631         send(tid, 42, 86);
2632         send(tid, tuple(42, 86));
2633         send(tid, "hello", "there");
2634         send(tid, "the quick brown fox");
2635         receive((string val) { assert(val == "done"); });
2636     }
2637 
2638     static void simpleTest()
2639     {
2640         auto tid = spawn(&testfn, thisTid);
2641         runTest(tid);
2642 
2643         // Run the test again with a limited mailbox size.
2644         tid = spawn(&testfn, thisTid);
2645         setMaxMailboxSize(tid, 2, OnCrowding.block);
2646         runTest(tid);
2647     }
2648 
2649     simpleTest();
2650 
2651     scheduler = new ThreadScheduler;
2652     simpleTest();
2653     scheduler = null;
2654 }
2655 
2656 private @property shared(Mutex) initOnceLock()
2657 {
2658     static shared Mutex lock;
2659     if (auto mtx = atomicLoad!(MemoryOrder.acq)(lock))
2660         return mtx;
2661     auto mtx = new shared Mutex;
2662     if (cas(&lock, cast(shared) null, mtx))
2663         return mtx;
2664     return atomicLoad!(MemoryOrder.acq)(lock);
2665 }
2666 
2667 /**
2668  * Initializes $(D_PARAM var) with the lazy $(D_PARAM init) value in a
2669  * thread-safe manner.
2670  *
2671  * The implementation guarantees that all threads simultaneously calling
2672  * initOnce with the same $(D_PARAM var) argument block until $(D_PARAM var) is
2673  * fully initialized. All side-effects of $(D_PARAM init) are globally visible
2674  * afterwards.
2675  *
2676  * Params:
2677  *   var = The variable to initialize
2678  *   init = The lazy initializer value
2679  *
2680  * Returns:
2681  *   A reference to the initialized variable
2682  */
2683 auto ref initOnce(alias var)(lazy typeof(var) init)
2684 {
2685     return initOnce!var(init, initOnceLock);
2686 }
2687 
2688 /// A typical use-case is to perform lazy but thread-safe initialization.
2689 @system unittest
2690 {
2691     static class MySingleton
2692     {
2693         static MySingleton instance()
2694         {
2695             __gshared MySingleton inst;
2696             return initOnce!inst(new MySingleton);
2697         }
2698     }
2699 
2700     assert(MySingleton.instance !is null);
2701 }
2702 
2703 @system unittest
2704 {
2705     static class MySingleton
2706     {
2707         static MySingleton instance()
2708         {
2709             __gshared MySingleton inst;
2710             return initOnce!inst(new MySingleton);
2711         }
2712 
2713     private:
2714         this() { val = ++cnt; }
2715         size_t val;
2716         __gshared size_t cnt;
2717     }
2718 
2719     foreach (_; 0 .. 10)
2720         spawn({ ownerTid.send(MySingleton.instance.val); });
2721     foreach (_; 0 .. 10)
2722         assert(receiveOnly!size_t == MySingleton.instance.val);
2723     assert(MySingleton.cnt == 1);
2724 }
2725 
2726 /**
2727  * Same as above, but takes a separate mutex instead of sharing one among
2728  * all initOnce instances.
2729  *
2730  * This should be used to avoid dead-locks when the $(D_PARAM init)
2731  * expression waits for the result of another thread that might also
2732  * call initOnce. Use with care.
2733  *
2734  * Params:
2735  *   var = The variable to initialize
2736  *   init = The lazy initializer value
2737  *   mutex = A mutex to prevent race conditions
2738  *
2739  * Returns:
2740  *   A reference to the initialized variable
2741  */
2742 auto ref initOnce(alias var)(lazy typeof(var) init, shared Mutex mutex)
2743 {
2744     // check that var is global, can't take address of a TLS variable
2745     static assert(is(typeof({ __gshared p = &var; })),
2746         "var must be 'static shared' or '__gshared'.");
2747     import core.atomic : atomicLoad, MemoryOrder, atomicStore;
2748 
2749     static shared bool flag;
2750     if (!atomicLoad!(MemoryOrder.acq)(flag))
2751     {
2752         synchronized (mutex)
2753         {
2754             if (!atomicLoad!(MemoryOrder.raw)(flag))
2755             {
2756                 var = init;
2757                 static if (!is(immutable typeof(var) == immutable noreturn))
2758                     atomicStore!(MemoryOrder.rel)(flag, true);
2759             }
2760         }
2761     }
2762     return var;
2763 }
2764 
2765 /// ditto
2766 auto ref initOnce(alias var)(lazy typeof(var) init, Mutex mutex)
2767 {
2768     return initOnce!var(init, cast(shared) mutex);
2769 }
2770 
2771 /// Use a separate mutex when init blocks on another thread that might also call initOnce.
2772 @system unittest
2773 {
2774     import core.sync.mutex : Mutex;
2775 
2776     static shared bool varA, varB;
2777     static shared Mutex m;
2778     m = new shared Mutex;
2779 
2780     spawn({
2781         // use a different mutex for varB to avoid a dead-lock
2782         initOnce!varB(true, m);
2783         ownerTid.send(true);
2784     });
2785     // init depends on the result of the spawned thread
2786     initOnce!varA(receiveOnly!bool);
2787     assert(varA == true);
2788     assert(varB == true);
2789 }
2790 
2791 @system unittest
2792 {
2793     static shared bool a;
2794     __gshared bool b;
2795     static bool c;
2796     bool d;
2797     initOnce!a(true);
2798     initOnce!b(true);
2799     static assert(!__traits(compiles, initOnce!c(true))); // TLS
2800     static assert(!__traits(compiles, initOnce!d(true))); // local variable
2801 }
2802 
2803 // test ability to send shared arrays
2804 @system unittest
2805 {
2806     static shared int[] x = new shared(int)[1];
2807     auto tid = spawn({
2808         auto arr = receiveOnly!(shared(int)[]);
2809         arr[0] = 5;
2810         ownerTid.send(true);
2811     });
2812     tid.send(x);
2813     receiveOnly!(bool);
2814     assert(x[0] == 5);
2815 }
2816 
2817 // https://issues.dlang.org/show_bug.cgi?id=13930
2818 @system unittest
2819 {
2820     immutable aa = ["0":0];
2821     thisTid.send(aa);
2822     receiveOnly!(immutable int[string]); // compile error
2823 }
2824 
2825 // https://issues.dlang.org/show_bug.cgi?id=19345
2826 @system unittest
2827 {
2828     static struct Aggregate { const int a; const int[5] b; }
2829     static void t1(Tid mainTid)
2830     {
2831         const sendMe = Aggregate(42, [1, 2, 3, 4, 5]);
2832         mainTid.send(sendMe);
2833     }
2834 
2835     spawn(&t1, thisTid);
2836     auto result1 = receiveOnly!(const Aggregate)();
2837     immutable expected = Aggregate(42, [1, 2, 3, 4, 5]);
2838     assert(result1 == expected);
2839 }
2840 
2841 // Noreturn support
2842 @system unittest
2843 {
2844     static noreturn foo(int) { throw new Exception(""); }
2845 
2846     if (false) spawn(&foo, 1);
2847     if (false) spawnLinked(&foo, 1);
2848 
2849     if (false) receive(&foo);
2850     if (false) receiveTimeout(Duration.init, &foo);
2851 
2852     // Wrapped in __traits(compiles) to skip codegen which crashes dmd's backend
2853     static assert(__traits(compiles, receiveOnly!noreturn()                 ));
2854     static assert(__traits(compiles, send(Tid.init, noreturn.init)          ));
2855     static assert(__traits(compiles, prioritySend(Tid.init, noreturn.init)  ));
2856     static assert(__traits(compiles, yield(noreturn.init)                   ));
2857 
2858     static assert(__traits(compiles, {
2859         __gshared noreturn n;
2860         initOnce!n(noreturn.init);
2861     }));
2862 }