1 /**
2  * $(SCRIPT inhibitQuickIndex = 1;)
3  * $(DIVC quickindex,
4  * $(BOOKTABLE,
5  * $(TR $(TH Category) $(TH Symbols))
6  * $(TR $(TD Tid) $(TD
7  *     $(MYREF join)
8  *     $(MYREF locate)
9  *     $(MYREF ownerTid)
10  *     $(MYREF register)
11  *     $(MYREF spawn)
12  *     $(MYREF spawnLinked)
13  *     $(MYREF thisTid)
14  *     $(MYREF Tid)
15  *     $(MYREF TidMissingException)
16  *     $(MYREF unregister)
17  * ))
18  * $(TR $(TD Message passing) $(TD
19  *     $(MYREF prioritySend)
20  *     $(MYREF receive)
21  *     $(MYREF receiveOnly)
22  *     $(MYREF receiveTimeout)
23  *     $(MYREF send)
24  *     $(MYREF setMaxMailboxSize)
25  * ))
26  * $(TR $(TD Message-related types) $(TD
27  *     $(MYREF LinkTerminated)
28  *     $(MYREF MailboxFull)
29  *     $(MYREF MessageMismatch)
30  *     $(MYREF OnCrowding)
31  *     $(MYREF OwnerTerminated)
32  *     $(MYREF PriorityMessageException)
33  * ))
34  * $(TR $(TD Scheduler) $(TD
35  *     $(MYREF FiberScheduler)
36  *     $(MYREF Generator)
37  *     $(MYREF Scheduler)
38  *     $(MYREF scheduler)
39  *     $(MYREF ThreadInfo)
40  *     $(MYREF ThreadScheduler)
41  *     $(MYREF yield)
42  * ))
43  * $(TR $(TD Misc) $(TD
44  *     $(MYREF initOnce)
45  * ))
46  * ))
47  *
48  * This is a low-level messaging API upon which more structured or restrictive
49  * APIs may be built.  The general idea is that every messageable entity is
50  * represented by a common handle type called a `Tid`, which allows messages to
51  * be sent to logical threads that are executing in both the current process
52  * and in external processes using the same interface.  This is an important
53  * aspect of scalability because it allows the components of a program to be
54  * spread across available resources with few to no changes to the actual
55  * implementation.
56  *
57  * A logical thread is an execution context that has its own stack and which
58  * runs asynchronously to other logical threads.  These may be preemptively
59  * scheduled kernel threads, $(MREF_ALTTEXT fibers, core, thread, fiber)
60  * (cooperative user-space threads), or some other concept with similar behavior.
61  *
62  * The type of concurrency used when logical threads are created is determined
63  * by the $(LREF Scheduler) selected at initialization time.  The default behavior is
64  * currently to create a new kernel thread per call to spawn, but other
65  * schedulers are available that multiplex fibers across the main thread or
66  * use some combination of the two approaches.
67  *
68  * Copyright: Copyright Sean Kelly 2009 - 2014.
69  * License:   <a href="http://www.boost.org/LICENSE_1_0.txt">Boost License 1.0</a>.
70  * Authors:   Sean Kelly, Alex Rønne Petersen, Martin Nowak
71  * Source:    $(PHOBOSSRC std/concurrency.d)
72  */
73 /*          Copyright Sean Kelly 2009 - 2014.
74  * Distributed under the Boost Software License, Version 1.0.
75  *    (See accompanying file LICENSE_1_0.txt or copy at
76  *          http://www.boost.org/LICENSE_1_0.txt)
77  */
78 module std.concurrency;
79 
80 public import std.variant;
81 
82 import core.atomic;
83 import core.sync.condition;
84 import core.sync.mutex;
85 import core.thread;
86 import std.range.primitives;
87 import std.range.interfaces : InputRange;
88 import std.traits;
89 
90 ///
91 @system unittest
92 {
93     __gshared string received;
94     static void spawnedFunc(Tid ownerTid)
95     {
96         import std.conv : text;
97         // Receive a message from the owner thread.
98         receive((int i){
99             received = text("Received the number ", i);
100 
101             // Send a message back to the owner thread
102             // indicating success.
103             send(ownerTid, true);
104         });
105     }
106 
107     // Start spawnedFunc in a new thread.
108     auto childTid = spawn(&spawnedFunc, thisTid);
109 
110     // Send the number 42 to this new thread.
111     send(childTid, 42);
112 
113     // Receive the result code.
114     auto wasSuccessful = receiveOnly!(bool);
115     assert(wasSuccessful);
116     assert(received == "Received the number 42");
117 }
118 
119 private
120 {
121     bool hasLocalAliasing(Types...)()
122     {
123         import std.typecons : Rebindable;
124 
125         // Works around "statement is not reachable"
126         bool doesIt = false;
127         static foreach (T; Types)
128         {
129             static if (is(T == Tid))
130             { /* Allowed */ }
131             else static if (is(T : Rebindable!R, R))
132                 doesIt |= hasLocalAliasing!R;
133             else static if (is(T == struct))
134                 doesIt |= hasLocalAliasing!(typeof(T.tupleof));
135             else
136                 doesIt |= std.traits.hasUnsharedAliasing!(T);
137         }
138         return doesIt;
139     }
140 
141     @safe unittest
142     {
143         static struct Container { Tid t; }
144         static assert(!hasLocalAliasing!(Tid, Container, int));
145     }
146 
147     // https://issues.dlang.org/show_bug.cgi?id=20097
148     @safe unittest
149     {
150         import std.datetime.systime : SysTime;
151         static struct Container { SysTime time; }
152         static assert(!hasLocalAliasing!(SysTime, Container));
153     }
154 
155     enum MsgType
156     {
157         standard,
158         priority,
159         linkDead,
160     }
161 
162     struct Message
163     {
164         MsgType type;
165         Variant data;
166 
167         this(T...)(MsgType t, T vals)
168         if (T.length > 0)
169         {
170             static if (T.length == 1)
171             {
172                 type = t;
173                 data = vals[0];
174             }
175             else
176             {
177                 import std.typecons : Tuple;
178 
179                 type = t;
180                 data = Tuple!(T)(vals);
181             }
182         }
183 
184         @property auto convertsTo(T...)()
185         {
186             static if (T.length == 1)
187             {
188                 return is(T[0] == Variant) || data.convertsTo!(T);
189             }
190             else
191             {
192                 import std.typecons : Tuple;
193                 return data.convertsTo!(Tuple!(T));
194             }
195         }
196 
197         @property auto get(T...)()
198         {
199             static if (T.length == 1)
200             {
201                 static if (is(T[0] == Variant))
202                     return data;
203                 else
204                     return data.get!(T);
205             }
206             else
207             {
208                 import std.typecons : Tuple;
209                 return data.get!(Tuple!(T));
210             }
211         }
212 
213         auto map(Op)(Op op)
214         {
215             alias Args = Parameters!(Op);
216 
217             static if (Args.length == 1)
218             {
219                 static if (is(Args[0] == Variant))
220                     return op(data);
221                 else
222                     return op(data.get!(Args));
223             }
224             else
225             {
226                 import std.typecons : Tuple;
227                 return op(data.get!(Tuple!(Args)).expand);
228             }
229         }
230     }
231 
232     void checkops(T...)(T ops)
233     {
234         import std.format : format;
235 
236         foreach (i, t1; T)
237         {
238             static assert(isFunctionPointer!t1 || isDelegate!t1,
239                     format!"T %d is not a function pointer or delegates"(i));
240             alias a1 = Parameters!(t1);
241             alias r1 = ReturnType!(t1);
242 
243             static if (i < T.length - 1 && is(r1 == void))
244             {
245                 static assert(a1.length != 1 || !is(a1[0] == Variant),
246                               "function with arguments " ~ a1.stringof ~
247                               " occludes successive function");
248 
249                 foreach (t2; T[i + 1 .. $])
250                 {
251                     alias a2 = Parameters!(t2);
252 
253                     static assert(!is(a1 == a2),
254                         "function with arguments " ~ a1.stringof ~ " occludes successive function");
255                 }
256             }
257         }
258     }
259 
260     @property ref ThreadInfo thisInfo() nothrow
261     {
262         import core.atomic : atomicLoad;
263 
264         auto localScheduler = atomicLoad(scheduler);
265         if (localScheduler is null)
266             return ThreadInfo.thisInfo;
267         return localScheduler.thisInfo;
268     }
269 }
270 
271 static ~this()
272 {
273     thisInfo.cleanup();
274 }
275 
276 // Exceptions
277 
278 /**
279  * Thrown on calls to $(LREF receiveOnly) if a message other than the type
280  * the receiving thread expected is sent.
281  */
282 class MessageMismatch : Exception
283 {
284     ///
285     this(string msg = "Unexpected message type") @safe pure nothrow @nogc
286     {
287         super(msg);
288     }
289 }
290 
291 /**
292  * Thrown on calls to $(LREF receive) if the thread that spawned the receiving
293  * thread has terminated and no more messages exist.
294  */
295 class OwnerTerminated : Exception
296 {
297     ///
298     this(Tid t, string msg = "Owner terminated") @safe pure nothrow @nogc
299     {
300         super(msg);
301         tid = t;
302     }
303 
304     Tid tid;
305 }
306 
307 /**
308  * Thrown if a linked thread has terminated.
309  */
310 class LinkTerminated : Exception
311 {
312     ///
313     this(Tid t, string msg = "Link terminated") @safe pure nothrow @nogc
314     {
315         super(msg);
316         tid = t;
317     }
318 
319     Tid tid;
320 }
321 
322 /**
323  * Thrown if a message was sent to a thread via
324  * $(REF prioritySend, std,concurrency) and the receiver does not have a handler
325  * for a message of this type.
326  */
327 class PriorityMessageException : Exception
328 {
329     ///
330     this(Variant vals)
331     {
332         super("Priority message");
333         message = vals;
334     }
335 
336     /**
337      * The message that was sent.
338      */
339     Variant message;
340 }
341 
342 /**
343  * Thrown on mailbox crowding if the mailbox is configured with
344  * `OnCrowding.throwException`.
345  */
346 class MailboxFull : Exception
347 {
348     ///
349     this(Tid t, string msg = "Mailbox full") @safe pure nothrow @nogc
350     {
351         super(msg);
352         tid = t;
353     }
354 
355     Tid tid;
356 }
357 
358 /**
359  * Thrown when a `Tid` is missing, e.g. when $(LREF ownerTid) doesn't
360  * find an owner thread.
361  */
362 class TidMissingException : Exception
363 {
364     import std.exception : basicExceptionCtors;
365     ///
366     mixin basicExceptionCtors;
367 }
368 
369 
370 // Thread ID
371 
372 
373 /**
374  * An opaque type used to represent a logical thread.
375  */
376 struct Tid
377 {
378 private:
379     this(MessageBox m) @safe pure nothrow @nogc
380     {
381         mbox = m;
382     }
383 
384     MessageBox mbox;
385 
386 public:
387 
388     /**
389      * Generate a convenient string for identifying this `Tid`.  This is only
390      * useful to see if `Tid`'s that are currently executing are the same or
391      * different, e.g. for logging and debugging.  It is potentially possible
392      * that a `Tid` executed in the future will have the same `toString()` output
393      * as another `Tid` that has already terminated.
394      */
395     void toString(W)(ref W w) const
396     {
397         import std.format.write : formattedWrite;
398         auto p = () @trusted { return cast(void*) mbox; }();
399         formattedWrite(w, "Tid(%x)", p);
400     }
401 
402 }
403 
404 @safe unittest
405 {
406     import std.conv : text;
407     Tid tid;
408     assert(text(tid) == "Tid(0)");
409     auto tid2 = thisTid;
410     assert(text(tid2) != "Tid(0)");
411     auto tid3 = tid2;
412     assert(text(tid2) == text(tid3));
413 }
414 
415 // https://issues.dlang.org/show_bug.cgi?id=21512
416 @system unittest
417 {
418     import std.format : format;
419 
420     const(Tid) b = spawn(() {});
421     assert(format!"%s"(b)[0 .. 4] == "Tid(");
422 }
423 
424 /**
425  * Returns: The `Tid` of the caller's thread.
426  */
427 @property Tid thisTid() @safe
428 {
429     // TODO: remove when concurrency is safe
430     static auto trus() @trusted
431     {
432         if (thisInfo.ident != Tid.init)
433             return thisInfo.ident;
434         thisInfo.ident = Tid(new MessageBox);
435         return thisInfo.ident;
436     }
437 
438     return trus();
439 }
440 
441 /**
442  * Return the `Tid` of the thread which spawned the caller's thread.
443  *
444  * Throws: A `TidMissingException` exception if
445  * there is no owner thread.
446  */
447 @property Tid ownerTid()
448 {
449     import std.exception : enforce;
450 
451     enforce!TidMissingException(thisInfo.owner.mbox !is null, "Error: Thread has no owner thread.");
452     return thisInfo.owner;
453 }
454 
455 @system unittest
456 {
457     import std.exception : assertThrown;
458 
459     static void fun()
460     {
461         string res = receiveOnly!string();
462         assert(res == "Main calling");
463         ownerTid.send("Child responding");
464     }
465 
466     assertThrown!TidMissingException(ownerTid);
467     auto child = spawn(&fun);
468     child.send("Main calling");
469     string res = receiveOnly!string();
470     assert(res == "Child responding");
471 }
472 
473 // Thread Creation
474 
475 private template isSpawnable(F, T...)
476 {
477     template isParamsImplicitlyConvertible(F1, F2, int i = 0)
478     {
479         alias param1 = Parameters!F1;
480         alias param2 = Parameters!F2;
481         static if (param1.length != param2.length)
482             enum isParamsImplicitlyConvertible = false;
483         else static if (param1.length == i)
484             enum isParamsImplicitlyConvertible = true;
485         else static if (is(param2[i] : param1[i]))
486             enum isParamsImplicitlyConvertible = isParamsImplicitlyConvertible!(F1,
487                     F2, i + 1);
488         else
489             enum isParamsImplicitlyConvertible = false;
490     }
491 
492     enum isSpawnable = isCallable!F && is(ReturnType!F : void)
493             && isParamsImplicitlyConvertible!(F, void function(T))
494             && (isFunctionPointer!F || !hasUnsharedAliasing!F);
495 }
496 
497 /**
498  * Starts `fn(args)` in a new logical thread.
499  *
500  * Executes the supplied function in a new logical thread represented by
501  * `Tid`.  The calling thread is designated as the owner of the new thread.
502  * When the owner thread terminates an `OwnerTerminated` message will be
503  * sent to the new thread, causing an `OwnerTerminated` exception to be
504  * thrown on `receive()`.
505  *
506  * Params:
507  *  fn   = The function to execute.
508  *  args = Arguments to the function.
509  *
510  * Returns:
511  *  A `Tid` representing the new logical thread.
512  *
513  * Notes:
514  *  `args` must not have unshared aliasing.  In other words, all arguments
515  *  to `fn` must either be `shared` or `immutable` or have no
516  *  pointer indirection.  This is necessary for enforcing isolation among
517  *  threads.
518  *
519  * Similarly, if `fn` is a delegate, it must not have unshared aliases, meaning
520  * `fn` must be either `shared` or `immutable`. */
521 Tid spawn(F, T...)(F fn, T args)
522 if (isSpawnable!(F, T))
523 {
524     static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
525     return _spawn(false, fn, args);
526 }
527 
528 ///
529 @system unittest
530 {
531     static void f(string msg)
532     {
533         assert(msg == "Hello World");
534     }
535 
536     auto tid = spawn(&f, "Hello World");
537 }
538 
539 /// Fails: char[] has mutable aliasing.
540 @system unittest
541 {
542     string msg = "Hello, World!";
543 
544     static void f1(string msg) {}
545     static assert(!__traits(compiles, spawn(&f1, msg.dup)));
546     static assert( __traits(compiles, spawn(&f1, msg.idup)));
547 
548     static void f2(char[] msg) {}
549     static assert(!__traits(compiles, spawn(&f2, msg.dup)));
550     static assert(!__traits(compiles, spawn(&f2, msg.idup)));
551 }
552 
553 /// New thread with anonymous function
554 @system unittest
555 {
556     spawn({
557         ownerTid.send("This is so great!");
558     });
559     assert(receiveOnly!string == "This is so great!");
560 }
561 
562 @system unittest
563 {
564     import core.thread : thread_joinAll;
565 
566     __gshared string receivedMessage;
567     static void f1(string msg)
568     {
569         receivedMessage = msg;
570     }
571 
572     auto tid1 = spawn(&f1, "Hello World");
573     thread_joinAll;
574     assert(receivedMessage == "Hello World");
575 }
576 
577 /**
578  * Starts `fn(args)` in a logical thread and will receive a `LinkTerminated`
579  * message when the operation terminates.
580  *
581  * Executes the supplied function in a new logical thread represented by
582  * `Tid`.  This new thread is linked to the calling thread so that if either
583  * it or the calling thread terminates a `LinkTerminated` message will be sent
584  * to the other, causing a `LinkTerminated` exception to be thrown on `receive()`.
585  * The owner relationship from `spawn()` is preserved as well, so if the link
586  * between threads is broken, owner termination will still result in an
587  * `OwnerTerminated` exception to be thrown on `receive()`.
588  *
589  * Params:
590  *  fn   = The function to execute.
591  *  args = Arguments to the function.
592  *
593  * Returns:
594  *  A Tid representing the new thread.
595  */
596 Tid spawnLinked(F, T...)(F fn, T args)
597 if (isSpawnable!(F, T))
598 {
599     static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
600     return _spawn(true, fn, args);
601 }
602 
603 /*
604  *
605  */
606 private Tid _spawn(F, T...)(bool linked, F fn, T args)
607 if (isSpawnable!(F, T))
608 {
609     // TODO: MessageList and &exec should be shared.
610     auto spawnTid = Tid(new MessageBox);
611     auto ownerTid = thisTid;
612 
613     void exec()
614     {
615         thisInfo.ident = spawnTid;
616         thisInfo.owner = ownerTid;
617         fn(args);
618     }
619 
620     // TODO: MessageList and &exec should be shared.
621     if (scheduler !is null)
622         scheduler.spawn(&exec);
623     else
624     {
625         auto t = new Thread(&exec);
626         spawnTid.mbox.m_thread = t;
627         t.start();
628     }
629     thisInfo.links[spawnTid] = linked;
630     return spawnTid;
631 }
632 
633 @system unittest
634 {
635     void function() fn1;
636     void function(int) fn2;
637     static assert(__traits(compiles, spawn(fn1)));
638     static assert(__traits(compiles, spawn(fn2, 2)));
639     static assert(!__traits(compiles, spawn(fn1, 1)));
640     static assert(!__traits(compiles, spawn(fn2)));
641 
642     void delegate(int) shared dg1;
643     shared(void delegate(int)) dg2;
644     shared(void delegate(long) shared) dg3;
645     shared(void delegate(real, int, long) shared) dg4;
646     void delegate(int) immutable dg5;
647     void delegate(int) dg6;
648     static assert(__traits(compiles, spawn(dg1, 1)));
649     static assert(__traits(compiles, spawn(dg2, 2)));
650     static assert(__traits(compiles, spawn(dg3, 3)));
651     static assert(__traits(compiles, spawn(dg4, 4, 4, 4)));
652     static assert(__traits(compiles, spawn(dg5, 5)));
653     static assert(!__traits(compiles, spawn(dg6, 6)));
654 
655     auto callable1  = new class{ void opCall(int) shared {} };
656     auto callable2  = cast(shared) new class{ void opCall(int) shared {} };
657     auto callable3  = new class{ void opCall(int) immutable {} };
658     auto callable4  = cast(immutable) new class{ void opCall(int) immutable {} };
659     auto callable5  = new class{ void opCall(int) {} };
660     auto callable6  = cast(shared) new class{ void opCall(int) immutable {} };
661     auto callable7  = cast(immutable) new class{ void opCall(int) shared {} };
662     auto callable8  = cast(shared) new class{ void opCall(int) const shared {} };
663     auto callable9  = cast(const shared) new class{ void opCall(int) shared {} };
664     auto callable10 = cast(const shared) new class{ void opCall(int) const shared {} };
665     auto callable11 = cast(immutable) new class{ void opCall(int) const shared {} };
666     static assert(!__traits(compiles, spawn(callable1,  1)));
667     static assert( __traits(compiles, spawn(callable2,  2)));
668     static assert(!__traits(compiles, spawn(callable3,  3)));
669     static assert( __traits(compiles, spawn(callable4,  4)));
670     static assert(!__traits(compiles, spawn(callable5,  5)));
671     static assert(!__traits(compiles, spawn(callable6,  6)));
672     static assert(!__traits(compiles, spawn(callable7,  7)));
673     static assert( __traits(compiles, spawn(callable8,  8)));
674     static assert(!__traits(compiles, spawn(callable9,  9)));
675     static assert( __traits(compiles, spawn(callable10, 10)));
676     static assert( __traits(compiles, spawn(callable11, 11)));
677 }
678 
679 /**
680  * Places the values as a message at the back of tid's message queue.
681  *
682  * Sends the supplied value to the thread represented by tid.  As with
683  * $(REF spawn, std,concurrency), `T` must not have unshared aliasing.
684  */
685 void send(T...)(Tid tid, T vals)
686 in (tid.mbox !is null)
687 {
688     static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
689     _send(tid, vals);
690 }
691 
692 /**
693  * Places the values as a message on the front of tid's message queue.
694  *
695  * Send a message to `tid` but place it at the front of `tid`'s message
696  * queue instead of at the back.  This function is typically used for
697  * out-of-band communication, to signal exceptional conditions, etc.
698  */
699 void prioritySend(T...)(Tid tid, T vals)
700 in (tid.mbox !is null)
701 {
702     static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
703     _send(MsgType.priority, tid, vals);
704 }
705 
706 /*
707  * ditto
708  */
709 private void _send(T...)(Tid tid, T vals)
710 in (tid.mbox !is null)
711 {
712     _send(MsgType.standard, tid, vals);
713 }
714 
715 /*
716  * Implementation of send.  This allows parameter checking to be different for
717  * both Tid.send() and .send().
718  */
719 private void _send(T...)(MsgType type, Tid tid, T vals)
720 in (tid.mbox !is null)
721 {
722     auto msg = Message(type, vals);
723     tid.mbox.put(msg);
724 }
725 
726 /**
727  * Receives a message from another thread.
728  *
729  * Receive a message from another thread, or block if no messages of the
730  * specified types are available.  This function works by pattern matching
731  * a message against a set of delegates and executing the first match found.
732  *
733  * If a delegate that accepts a $(REF Variant, std,variant) is included as
734  * the last argument to `receive`, it will match any message that was not
735  * matched by an earlier delegate.  If more than one argument is sent,
736  * the `Variant` will contain a $(REF Tuple, std,typecons) of all values
737  * sent.
738  *
739  * Params:
740  *     ops = Variadic list of function pointers and delegates. Entries
741  *           in this list must not occlude later entries.
742  *
743  * Throws: $(LREF OwnerTerminated) when the sending thread was terminated.
744  */
745 void receive(T...)( T ops )
746 in
747 {
748     assert(thisInfo.ident.mbox !is null,
749            "Cannot receive a message until a thread was spawned "
750            ~ "or thisTid was passed to a running thread.");
751 }
752 do
753 {
754     checkops( ops );
755 
756     thisInfo.ident.mbox.get( ops );
757 }
758 
759 ///
760 @system unittest
761 {
762     import std.variant : Variant;
763 
764     auto process = ()
765     {
766         receive(
767             (int i) { ownerTid.send(1); },
768             (double f) { ownerTid.send(2); },
769             (Variant v) { ownerTid.send(3); }
770         );
771     };
772 
773     {
774         auto tid = spawn(process);
775         send(tid, 42);
776         assert(receiveOnly!int == 1);
777     }
778 
779     {
780         auto tid = spawn(process);
781         send(tid, 3.14);
782         assert(receiveOnly!int == 2);
783     }
784 
785     {
786         auto tid = spawn(process);
787         send(tid, "something else");
788         assert(receiveOnly!int == 3);
789     }
790 }
791 
792 @safe unittest
793 {
794     static assert( __traits( compiles,
795                       {
796                           receive( (Variant x) {} );
797                           receive( (int x) {}, (Variant x) {} );
798                       } ) );
799 
800     static assert( !__traits( compiles,
801                        {
802                            receive( (Variant x) {}, (int x) {} );
803                        } ) );
804 
805     static assert( !__traits( compiles,
806                        {
807                            receive( (int x) {}, (int x) {} );
808                        } ) );
809 }
810 
811 // Make sure receive() works with free functions as well.
812 version (StdUnittest)
813 {
814     private void receiveFunction(int x) {}
815 }
816 @safe unittest
817 {
818     static assert( __traits( compiles,
819                       {
820                           receive( &receiveFunction );
821                           receive( &receiveFunction, (Variant x) {} );
822                       } ) );
823 }
824 
825 
826 private template receiveOnlyRet(T...)
827 {
828     static if ( T.length == 1 )
829     {
830         alias receiveOnlyRet = T[0];
831     }
832     else
833     {
834         import std.typecons : Tuple;
835         alias receiveOnlyRet = Tuple!(T);
836     }
837 }
838 
839 /**
840  * Receives only messages with arguments of the specified types.
841  *
842  * Params:
843  *     T = Variadic list of types to be received.
844  *
845  * Returns: The received message.  If `T` has more than one entry,
846  *          the message will be packed into a $(REF Tuple, std,typecons).
847  *
848  * Throws: $(LREF MessageMismatch) if a message of types other than `T`
849  *         is received,
850  *         $(LREF OwnerTerminated) when the sending thread was terminated.
851  */
852 receiveOnlyRet!(T) receiveOnly(T...)()
853 in
854 {
855     assert(thisInfo.ident.mbox !is null,
856         "Cannot receive a message until a thread was spawned or thisTid was passed to a running thread.");
857 }
858 do
859 {
860     import std.format : format;
861     import std.meta : allSatisfy;
862     import std.typecons : Tuple;
863 
864     Tuple!(T) ret;
865 
866     thisInfo.ident.mbox.get((T val) {
867         static if (T.length)
868         {
869             static if (allSatisfy!(isAssignable, T))
870             {
871                 ret.field = val;
872             }
873             else
874             {
875                 import core.lifetime : emplace;
876                 emplace(&ret, val);
877             }
878         }
879     },
880     (LinkTerminated e) { throw e; },
881     (OwnerTerminated e) { throw e; },
882     (Variant val) {
883         static if (T.length > 1)
884             string exp = T.stringof;
885         else
886             string exp = T[0].stringof;
887 
888         throw new MessageMismatch(
889             format("Unexpected message type: expected '%s', got '%s'", exp, val.type.toString()));
890     });
891     static if (T.length == 1)
892         return ret[0];
893     else
894         return ret;
895 }
896 
897 ///
898 @system unittest
899 {
900     auto tid = spawn(
901     {
902         assert(receiveOnly!int == 42);
903     });
904     send(tid, 42);
905 }
906 
907 ///
908 @system unittest
909 {
910     auto tid = spawn(
911     {
912         assert(receiveOnly!string == "text");
913     });
914     send(tid, "text");
915 }
916 
917 ///
918 @system unittest
919 {
920     struct Record { string name; int age; }
921 
922     auto tid = spawn(
923     {
924         auto msg = receiveOnly!(double, Record);
925         assert(msg[0] == 0.5);
926         assert(msg[1].name == "Alice");
927         assert(msg[1].age == 31);
928     });
929 
930     send(tid, 0.5, Record("Alice", 31));
931 }
932 
933 @system unittest
934 {
935     static void t1(Tid mainTid)
936     {
937         try
938         {
939             receiveOnly!string();
940             mainTid.send("");
941         }
942         catch (Throwable th)
943         {
944             mainTid.send(th.msg);
945         }
946     }
947 
948     auto tid = spawn(&t1, thisTid);
949     tid.send(1);
950     string result = receiveOnly!string();
951     assert(result == "Unexpected message type: expected 'string', got 'int'");
952 }
953 
954 // https://issues.dlang.org/show_bug.cgi?id=21663
955 @safe unittest
956 {
957     alias test = receiveOnly!(string, bool, bool);
958 }
959 
960 /**
961  * Receives a message from another thread and gives up if no match
962  * arrives within a specified duration.
963  *
964  * Receive a message from another thread, or block until `duration` exceeds,
965  * if no messages of the specified types are available. This function works
966  * by pattern matching a message against a set of delegates and executing
967  * the first match found.
968  *
969  * If a delegate that accepts a $(REF Variant, std,variant) is included as
970  * the last argument, it will match any message that was not
971  * matched by an earlier delegate.  If more than one argument is sent,
972  * the `Variant` will contain a $(REF Tuple, std,typecons) of all values
973  * sent.
974  *
975  * Params:
976  *     duration = Duration, how long to wait. If `duration` is negative,
977  *                won't wait at all.
978  *     ops = Variadic list of function pointers and delegates. Entries
979  *           in this list must not occlude later entries.
980  *
981  * Returns: `true` if it received a message and `false` if it timed out waiting
982  *          for one.
983  *
984  * Throws: $(LREF OwnerTerminated) when the sending thread was terminated.
985  */
986 bool receiveTimeout(T...)(Duration duration, T ops)
987 in
988 {
989     assert(thisInfo.ident.mbox !is null,
990         "Cannot receive a message until a thread was spawned or thisTid was passed to a running thread.");
991 }
992 do
993 {
994     checkops(ops);
995 
996     return thisInfo.ident.mbox.get(duration, ops);
997 }
998 
999 @safe unittest
1000 {
1001     static assert(__traits(compiles, {
1002         receiveTimeout(msecs(0), (Variant x) {});
1003         receiveTimeout(msecs(0), (int x) {}, (Variant x) {});
1004     }));
1005 
1006     static assert(!__traits(compiles, {
1007         receiveTimeout(msecs(0), (Variant x) {}, (int x) {});
1008     }));
1009 
1010     static assert(!__traits(compiles, {
1011         receiveTimeout(msecs(0), (int x) {}, (int x) {});
1012     }));
1013 
1014     static assert(__traits(compiles, {
1015         receiveTimeout(msecs(10), (int x) {}, (Variant x) {});
1016     }));
1017 }
1018 
1019 // MessageBox Limits
1020 
1021 /**
1022  * These behaviors may be specified when a mailbox is full.
1023  */
1024 enum OnCrowding
1025 {
1026     block, /// Wait until room is available.
1027     throwException, /// Throw a $(LREF MailboxFull) exception.
1028     ignore /// Abort the send and return.
1029 }
1030 
1031 private
1032 {
1033     bool onCrowdingBlock(Tid tid) @safe pure nothrow @nogc
1034     {
1035         return true;
1036     }
1037 
1038     bool onCrowdingThrow(Tid tid) @safe pure
1039     {
1040         throw new MailboxFull(tid);
1041     }
1042 
1043     bool onCrowdingIgnore(Tid tid) @safe pure nothrow @nogc
1044     {
1045         return false;
1046     }
1047 }
1048 
1049 /**
1050  * Sets a maximum mailbox size.
1051  *
1052  * Sets a limit on the maximum number of user messages allowed in the mailbox.
1053  * If this limit is reached, the caller attempting to add a new message will
1054  * execute the behavior specified by doThis.  If messages is zero, the mailbox
1055  * is unbounded.
1056  *
1057  * Params:
1058  *  tid      = The Tid of the thread for which this limit should be set.
1059  *  messages = The maximum number of messages or zero if no limit.
1060  *  doThis   = The behavior executed when a message is sent to a full
1061  *             mailbox.
1062  */
1063 void setMaxMailboxSize(Tid tid, size_t messages, OnCrowding doThis) @safe pure
1064 in (tid.mbox !is null)
1065 {
1066     final switch (doThis)
1067     {
1068     case OnCrowding.block:
1069         return tid.mbox.setMaxMsgs(messages, &onCrowdingBlock);
1070     case OnCrowding.throwException:
1071         return tid.mbox.setMaxMsgs(messages, &onCrowdingThrow);
1072     case OnCrowding.ignore:
1073         return tid.mbox.setMaxMsgs(messages, &onCrowdingIgnore);
1074     }
1075 }
1076 
1077 /**
1078  * Sets a maximum mailbox size.
1079  *
1080  * Sets a limit on the maximum number of user messages allowed in the mailbox.
1081  * If this limit is reached, the caller attempting to add a new message will
1082  * execute onCrowdingDoThis.  If messages is zero, the mailbox is unbounded.
1083  *
1084  * Params:
1085  *  tid      = The Tid of the thread for which this limit should be set.
1086  *  messages = The maximum number of messages or zero if no limit.
1087  *  onCrowdingDoThis = The routine called when a message is sent to a full
1088  *                     mailbox.
1089  */
1090 void setMaxMailboxSize(Tid tid, size_t messages, bool function(Tid) onCrowdingDoThis)
1091 in (tid.mbox !is null)
1092 {
1093     tid.mbox.setMaxMsgs(messages, onCrowdingDoThis);
1094 }
1095 
1096 private
1097 {
1098     __gshared Tid[string] tidByName;
1099     __gshared string[][Tid] namesByTid;
1100 }
1101 
1102 private @property Mutex registryLock()
1103 {
1104     __gshared Mutex impl;
1105     initOnce!impl(new Mutex);
1106     return impl;
1107 }
1108 
1109 private void unregisterMe(ref ThreadInfo me)
1110 {
1111     if (me.ident != Tid.init)
1112     {
1113         synchronized (registryLock)
1114         {
1115             if (auto allNames = me.ident in namesByTid)
1116             {
1117                 foreach (name; *allNames)
1118                     tidByName.remove(name);
1119                 namesByTid.remove(me.ident);
1120             }
1121         }
1122     }
1123 }
1124 
1125 /**
1126  * Associates name with tid.
1127  *
1128  * Associates name with tid in a process-local map.  When the thread
1129  * represented by tid terminates, any names associated with it will be
1130  * automatically unregistered.
1131  *
1132  * Params:
1133  *  name = The name to associate with tid.
1134  *  tid  = The tid register by name.
1135  *
1136  * Returns:
1137  *  true if the name is available and tid is not known to represent a
1138  *  defunct thread.
1139  */
1140 bool register(string name, Tid tid)
1141 in (tid.mbox !is null)
1142 {
1143     synchronized (registryLock)
1144     {
1145         if (name in tidByName)
1146             return false;
1147         if (tid.mbox.isClosed)
1148             return false;
1149         namesByTid[tid] ~= name;
1150         tidByName[name] = tid;
1151         return true;
1152     }
1153 }
1154 
1155 /**
1156  * Removes the registered name associated with a tid.
1157  *
1158  * Params:
1159  *  name = The name to unregister.
1160  *
1161  * Returns:
1162  *  true if the name is registered, false if not.
1163  */
1164 bool unregister(string name)
1165 {
1166     import std.algorithm.mutation : remove, SwapStrategy;
1167     import std.algorithm.searching : countUntil;
1168 
1169     synchronized (registryLock)
1170     {
1171         if (auto tid = name in tidByName)
1172         {
1173             auto allNames = *tid in namesByTid;
1174             auto pos = countUntil(*allNames, name);
1175             remove!(SwapStrategy.unstable)(*allNames, pos);
1176             tidByName.remove(name);
1177             return true;
1178         }
1179         return false;
1180     }
1181 }
1182 
1183 /**
1184  * Gets the `Tid` associated with name.
1185  *
1186  * Params:
1187  *  name = The name to locate within the registry.
1188  *
1189  * Returns:
1190  *  The associated `Tid` or `Tid.init` if name is not registered.
1191  */
1192 Tid locate(string name)
1193 {
1194     synchronized (registryLock)
1195     {
1196         if (auto tid = name in tidByName)
1197             return *tid;
1198         return Tid.init;
1199     }
1200 }
1201 
1202 /**
1203  * Waits for the thread associated with `tid` to complete.
1204  *
1205  * This function blocks until the thread represented by `tid` finishes executing
1206  * and then releases all OS resources associated with it (thread stack, thread-local
1207  * storage, etc.). This is essential for preventing resource leaks in long-running
1208  * applications that create many short-lived threads.
1209  *
1210  * For threads created by $(LREF spawn), this function must be called to properly
1211  * free OS resources. Without calling `join`, thread stacks (~8 MB each on typical
1212  * systems) will accumulate in virtual memory for the lifetime of the process.
1213  *
1214  * Params:
1215  *  tid = The $(LREF Tid) of the thread to join.
1216  *
1217  * Throws:
1218  *  $(REF ThreadError, core,thread,osthread) if the thread has already been joined
1219  *  or if the thread was created by a custom $(LREF Scheduler).
1220  *
1221  * Example:
1222  * ---
1223  * auto tid = spawn(&someFunction);
1224  * // ... do other work ...
1225  * join(tid); // Wait for thread to complete and free resources
1226  * ---
1227  *
1228  * Note:
1229  *  It is an error to call `join` on the same `Tid` more than once.
1230  *  The function will throw if the thread was created by a $(LREF Scheduler)
1231  *  rather than directly as a system thread.
1232  */
1233 void join(Tid tid)
1234 {
1235     import core.thread.threadbase : ThreadError;
1236 
1237     if (tid.mbox is null)
1238         throw new ThreadError("Cannot join Tid with null MessageBox");
1239 
1240     if (tid.mbox.m_thread is null)
1241         throw new ThreadError("Cannot join Tid created by a Scheduler");
1242 
1243     tid.mbox.m_thread.join();
1244 }
1245 
1246 ///
1247 @system unittest
1248 {
1249     import core.time : msecs;
1250     import core.thread : Thread;
1251 
1252     auto tid = spawn((int x) {
1253         Thread.sleep(10.msecs);
1254         ownerTid.send(x * 2);
1255     }, 21);
1256 
1257     join(tid); // Wait for thread to finish
1258     auto result = receiveOnly!int();
1259     assert(result == 42);
1260 }
1261 
1262 /**
1263  * Encapsulates all implementation-level data needed for scheduling.
1264  *
1265  * When defining a $(LREF Scheduler), an instance of this struct must be associated
1266  * with each logical thread.  It contains all implementation-level information
1267  * needed by the internal API.
1268  */
1269 struct ThreadInfo
1270 {
1271     Tid ident;
1272     bool[Tid] links;
1273     Tid owner;
1274 
1275     /**
1276      * Gets a thread-local instance of `ThreadInfo`.
1277      *
1278      * Gets a thread-local instance of `ThreadInfo`, which should be used as the
1279      * default instance when info is requested for a thread not created by the
1280      * `Scheduler`.
1281      */
1282     static @property ref thisInfo() nothrow
1283     {
1284         static ThreadInfo val;
1285         return val;
1286     }
1287 
1288     /**
1289      * Cleans up this ThreadInfo.
1290      *
1291      * This must be called when a scheduled thread terminates.  It tears down
1292      * the messaging system for the thread and notifies interested parties of
1293      * the thread's termination.
1294      */
1295     void cleanup()
1296     {
1297         if (ident.mbox !is null)
1298             ident.mbox.close();
1299         foreach (tid; links.keys)
1300             _send(MsgType.linkDead, tid, ident);
1301         if (owner != Tid.init)
1302             _send(MsgType.linkDead, owner, ident);
1303         unregisterMe(this); // clean up registry entries
1304     }
1305 
1306     // https://issues.dlang.org/show_bug.cgi?id=20160
1307     @system unittest
1308     {
1309         register("main_thread", thisTid());
1310 
1311         ThreadInfo t;
1312         t.cleanup();
1313 
1314         assert(locate("main_thread") == thisTid());
1315     }
1316 }
1317 
1318 /**
1319  * A `Scheduler` controls how threading is performed by spawn.
1320  *
1321  * Implementing a `Scheduler` allows the concurrency mechanism used by this
1322  * module to be customized according to different needs.  By default, a call
1323  * to spawn will create a new kernel thread that executes the supplied routine
1324  * and terminates when finished.  But it is possible to create `Scheduler`s that
1325  * reuse threads, that multiplex `Fiber`s (coroutines) across a single thread,
1326  * or any number of other approaches.  By making the choice of `Scheduler` a
1327  * user-level option, `std.concurrency` may be used for far more types of
1328  * application than if this behavior were predefined.
1329  *
1330  * Example:
1331  * ---
1332  * import std.concurrency;
1333  * import std.stdio;
1334  *
1335  * void main()
1336  * {
1337  *     scheduler = new FiberScheduler;
1338  *     scheduler.start(
1339  *     {
1340  *         writeln("the rest of main goes here");
1341  *     });
1342  * }
1343  * ---
1344  *
1345  * Some schedulers have a dispatching loop that must run if they are to work
1346  * properly, so for the sake of consistency, when using a scheduler, `start()`
1347  * must be called within `main()`.  This yields control to the scheduler and
1348  * will ensure that any spawned threads are executed in an expected manner.
1349  */
1350 interface Scheduler
1351 {
1352     /**
1353      * Spawns the supplied op and starts the `Scheduler`.
1354      *
1355      * This is intended to be called at the start of the program to yield all
1356      * scheduling to the active `Scheduler` instance.  This is necessary for
1357      * schedulers that explicitly dispatch threads rather than simply relying
1358      * on the operating system to do so, and so start should always be called
1359      * within `main()` to begin normal program execution.
1360      *
1361      * Params:
1362      *  op = A wrapper for whatever the main thread would have done in the
1363      *       absence of a custom scheduler.  It will be automatically executed
1364      *       via a call to spawn by the `Scheduler`.
1365      */
1366     void start(void delegate() op);
1367 
1368     /**
1369      * Assigns a logical thread to execute the supplied op.
1370      *
1371      * This routine is called by spawn.  It is expected to instantiate a new
1372      * logical thread and run the supplied operation.  This thread must call
1373      * `thisInfo.cleanup()` when the thread terminates if the scheduled thread
1374      * is not a kernel thread--all kernel threads will have their `ThreadInfo`
1375      * cleaned up automatically by a thread-local destructor.
1376      *
1377      * Params:
1378      *  op = The function to execute.  This may be the actual function passed
1379      *       by the user to spawn itself, or may be a wrapper function.
1380      */
1381     void spawn(void delegate() op);
1382 
1383     /**
1384      * Yields execution to another logical thread.
1385      *
1386      * This routine is called at various points within concurrency-aware APIs
1387      * to provide a scheduler a chance to yield execution when using some sort
1388      * of cooperative multithreading model.  If this is not appropriate, such
1389      * as when each logical thread is backed by a dedicated kernel thread,
1390      * this routine may be a no-op.
1391      */
1392     void yield() nothrow;
1393 
1394     /**
1395      * Returns an appropriate `ThreadInfo` instance.
1396      *
1397      * Returns an instance of `ThreadInfo` specific to the logical thread that
1398      * is calling this routine or, if the calling thread was not create by
1399      * this scheduler, returns `ThreadInfo.thisInfo` instead.
1400      */
1401     @property ref ThreadInfo thisInfo() nothrow;
1402 
1403     /**
1404      * Creates a `Condition` variable analog for signaling.
1405      *
1406      * Creates a new `Condition` variable analog which is used to check for and
1407      * to signal the addition of messages to a thread's message queue.  Like
1408      * yield, some schedulers may need to define custom behavior so that calls
1409      * to `Condition.wait()` yield to another thread when no new messages are
1410      * available instead of blocking.
1411      *
1412      * Params:
1413      *  m = The `Mutex` that will be associated with this condition.  It will be
1414      *      locked prior to any operation on the condition, and so in some
1415      *      cases a `Scheduler` may need to hold this reference and unlock the
1416      *      mutex before yielding execution to another logical thread.
1417      */
1418     Condition newCondition(Mutex m) nothrow;
1419 }
1420 
1421 /**
1422  * An example `Scheduler` using kernel threads.
1423  *
1424  * This is an example `Scheduler` that mirrors the default scheduling behavior
1425  * of creating one kernel thread per call to spawn.  It is fully functional
1426  * and may be instantiated and used, but is not a necessary part of the
1427  * default functioning of this module.
1428  */
1429 class ThreadScheduler : Scheduler
1430 {
1431     /**
1432      * This simply runs op directly, since no real scheduling is needed by
1433      * this approach.
1434      */
1435     void start(void delegate() op)
1436     {
1437         op();
1438     }
1439 
1440     /**
1441      * Creates a new kernel thread and assigns it to run the supplied op.
1442      */
1443     void spawn(void delegate() op)
1444     {
1445         auto t = new Thread(op);
1446         t.start();
1447     }
1448 
1449     /**
1450      * This scheduler does no explicit multiplexing, so this is a no-op.
1451      */
1452     void yield() nothrow
1453     {
1454         // no explicit yield needed
1455     }
1456 
1457     /**
1458      * Returns `ThreadInfo.thisInfo`, since it is a thread-local instance of
1459      * `ThreadInfo`, which is the correct behavior for this scheduler.
1460      */
1461     @property ref ThreadInfo thisInfo() nothrow
1462     {
1463         return ThreadInfo.thisInfo;
1464     }
1465 
1466     /**
1467      * Creates a new `Condition` variable.  No custom behavior is needed here.
1468      */
1469     Condition newCondition(Mutex m) nothrow
1470     {
1471         return new Condition(m);
1472     }
1473 }
1474 
1475 /**
1476  * An example `Scheduler` using $(MREF_ALTTEXT `Fiber`s, core, thread, fiber).
1477  *
1478  * This is an example scheduler that creates a new `Fiber` per call to spawn
1479  * and multiplexes the execution of all fibers within the main thread.
1480  */
1481 class FiberScheduler : Scheduler
1482 {
1483     /**
1484      * This creates a new `Fiber` for the supplied op and then starts the
1485      * dispatcher.
1486      */
1487     void start(void delegate() op)
1488     {
1489         create(op);
1490         dispatch();
1491     }
1492 
1493     /**
1494      * This created a new `Fiber` for the supplied op and adds it to the
1495      * dispatch list.
1496      */
1497     void spawn(void delegate() op) nothrow
1498     {
1499         create(op);
1500         yield();
1501     }
1502 
1503     /**
1504      * If the caller is a scheduled `Fiber`, this yields execution to another
1505      * scheduled `Fiber`.
1506      */
1507     void yield() nothrow
1508     {
1509         // NOTE: It's possible that we should test whether the calling Fiber
1510         //       is an InfoFiber before yielding, but I think it's reasonable
1511         //       that any (non-Generator) fiber should yield here.
1512         if (Fiber.getThis())
1513             Fiber.yield();
1514     }
1515 
1516     /**
1517      * Returns an appropriate `ThreadInfo` instance.
1518      *
1519      * Returns a `ThreadInfo` instance specific to the calling `Fiber` if the
1520      * `Fiber` was created by this dispatcher, otherwise it returns
1521      * `ThreadInfo.thisInfo`.
1522      */
1523     @property ref ThreadInfo thisInfo() nothrow
1524     {
1525         auto f = cast(InfoFiber) Fiber.getThis();
1526 
1527         if (f !is null)
1528             return f.info;
1529         return ThreadInfo.thisInfo;
1530     }
1531 
1532     /**
1533      * Returns a `Condition` analog that yields when wait or notify is called.
1534      *
1535      * Bug:
1536      * For the default implementation, `notifyAll` will behave like `notify`.
1537      *
1538      * Params:
1539      *   m = A `Mutex` to use for locking if the condition needs to be waited on
1540      *       or notified from multiple `Thread`s.
1541      *       If `null`, no `Mutex` will be used and it is assumed that the
1542      *       `Condition` is only waited on/notified from one `Thread`.
1543      */
1544     Condition newCondition(Mutex m) nothrow
1545     {
1546         return new FiberCondition(m);
1547     }
1548 
1549 protected:
1550     /**
1551      * Creates a new `Fiber` which calls the given delegate.
1552      *
1553      * Params:
1554      *   op = The delegate the fiber should call
1555      */
1556     void create(void delegate() op) nothrow
1557     {
1558         void wrap()
1559         {
1560             scope (exit)
1561             {
1562                 thisInfo.cleanup();
1563             }
1564             op();
1565         }
1566 
1567         m_fibers ~= new InfoFiber(&wrap);
1568     }
1569 
1570     /**
1571      * `Fiber` which embeds a `ThreadInfo`
1572      */
1573     static class InfoFiber : Fiber
1574     {
1575         ThreadInfo info;
1576 
1577         this(void delegate() op) nothrow
1578         {
1579             super(op);
1580         }
1581 
1582         this(void delegate() op, size_t sz) nothrow
1583         {
1584             super(op, sz);
1585         }
1586     }
1587 
1588 private:
1589     class FiberCondition : Condition
1590     {
1591         this(Mutex m) nothrow
1592         {
1593             super(m);
1594             notified = false;
1595         }
1596 
1597         override void wait() nothrow
1598         {
1599             scope (exit) notified = false;
1600 
1601             while (!notified)
1602                 switchContext();
1603         }
1604 
1605         override bool wait(Duration period) nothrow
1606         {
1607             import core.time : MonoTime;
1608 
1609             scope (exit) notified = false;
1610 
1611             for (auto limit = MonoTime.currTime + period;
1612                  !notified && !period.isNegative;
1613                  period = limit - MonoTime.currTime)
1614             {
1615                 this.outer.yield();
1616             }
1617             return notified;
1618         }
1619 
1620         override void notify() nothrow
1621         {
1622             notified = true;
1623             switchContext();
1624         }
1625 
1626         override void notifyAll() nothrow
1627         {
1628             notified = true;
1629             switchContext();
1630         }
1631 
1632     private:
1633         void switchContext() nothrow
1634         {
1635             if (mutex_nothrow) mutex_nothrow.unlock_nothrow();
1636             scope (exit)
1637                 if (mutex_nothrow)
1638                     mutex_nothrow.lock_nothrow();
1639             this.outer.yield();
1640         }
1641 
1642         bool notified;
1643     }
1644 
1645     void dispatch()
1646     {
1647         import std.algorithm.mutation : remove;
1648 
1649         while (m_fibers.length > 0)
1650         {
1651             auto t = m_fibers[m_pos].call(Fiber.Rethrow.no);
1652             if (t !is null && !(cast(OwnerTerminated) t))
1653             {
1654                 throw t;
1655             }
1656             if (m_fibers[m_pos].state == Fiber.State.TERM)
1657             {
1658                 if (m_pos >= (m_fibers = remove(m_fibers, m_pos)).length)
1659                     m_pos = 0;
1660             }
1661             else if (m_pos++ >= m_fibers.length - 1)
1662             {
1663                 m_pos = 0;
1664             }
1665         }
1666     }
1667 
1668     Fiber[] m_fibers;
1669     size_t m_pos;
1670 }
1671 
1672 @system unittest
1673 {
1674     static void receive(Condition cond, ref size_t received)
1675     {
1676         while (true)
1677         {
1678             synchronized (cond.mutex)
1679             {
1680                 cond.wait();
1681                 ++received;
1682             }
1683         }
1684     }
1685 
1686     static void send(Condition cond, ref size_t sent)
1687     {
1688         while (true)
1689         {
1690             synchronized (cond.mutex)
1691             {
1692                 ++sent;
1693                 cond.notify();
1694             }
1695         }
1696     }
1697 
1698     auto fs = new FiberScheduler;
1699     auto mtx = new Mutex;
1700     auto cond = fs.newCondition(mtx);
1701 
1702     size_t received, sent;
1703     auto waiter = new Fiber({ receive(cond, received); }), notifier = new Fiber({ send(cond, sent); });
1704     waiter.call();
1705     assert(received == 0);
1706     notifier.call();
1707     assert(sent == 1);
1708     assert(received == 0);
1709     waiter.call();
1710     assert(received == 1);
1711     waiter.call();
1712     assert(received == 1);
1713 }
1714 
1715 /**
1716  * Sets the `Scheduler` behavior within the program.
1717  *
1718  * This variable sets the `Scheduler` behavior within this program.  Typically,
1719  * when setting a `Scheduler`, `scheduler.start()` should be called in `main`.  This
1720  * routine will not return until program execution is complete.
1721  */
1722 __gshared Scheduler scheduler;
1723 
1724 // Generator
1725 
1726 /**
1727  * If the caller is a `Fiber` and is not a $(LREF Generator), this function will call
1728  * `scheduler.yield()` or `Fiber.yield()`, as appropriate.
1729  */
1730 void yield() nothrow
1731 {
1732     auto fiber = Fiber.getThis();
1733     if (!(cast(IsGenerator) fiber))
1734     {
1735         if (scheduler is null)
1736         {
1737             if (fiber)
1738                 return Fiber.yield();
1739         }
1740         else
1741             scheduler.yield();
1742     }
1743 }
1744 
1745 /// Used to determine whether a Generator is running.
1746 private interface IsGenerator {}
1747 
1748 
1749 /**
1750  * A Generator is a $(MREF_ALTTEXT Fiber, core, thread, fiber)
1751  * that periodically returns values of type `T` to the
1752  * caller via `yield`.  This is represented as an InputRange.
1753  */
1754 class Generator(T) :
1755     Fiber, IsGenerator, InputRange!T
1756 {
1757     /**
1758      * Initializes a generator object which is associated with a static
1759      * D function.  The function will be called once to prepare the range
1760      * for iteration.
1761      *
1762      * Params:
1763      *  fn = The fiber function.
1764      *
1765      * In:
1766      *  fn must not be null.
1767      */
1768     this(void function() fn)
1769     {
1770         super(fn);
1771         call();
1772     }
1773 
1774     /**
1775      * Initializes a generator object which is associated with a static
1776      * D function.  The function will be called once to prepare the range
1777      * for iteration.
1778      *
1779      * Params:
1780      *  fn = The fiber function.
1781      *  sz = The stack size for this fiber.
1782      *
1783      * In:
1784      *  fn must not be null.
1785      */
1786     this(void function() fn, size_t sz)
1787     {
1788         super(fn, sz);
1789         call();
1790     }
1791 
1792     /**
1793      * Initializes a generator object which is associated with a static
1794      * D function.  The function will be called once to prepare the range
1795      * for iteration.
1796      *
1797      * Params:
1798      *  fn = The fiber function.
1799      *  sz = The stack size for this fiber.
1800      *  guardPageSize = size of the guard page to trap fiber's stack
1801      *                  overflows. Refer to $(REF Fiber, core,thread)'s
1802      *                  documentation for more details.
1803      *
1804      * In:
1805      *  fn must not be null.
1806      */
1807     this(void function() fn, size_t sz, size_t guardPageSize)
1808     {
1809         super(fn, sz, guardPageSize);
1810         call();
1811     }
1812 
1813     /**
1814      * Initializes a generator object which is associated with a dynamic
1815      * D function.  The function will be called once to prepare the range
1816      * for iteration.
1817      *
1818      * Params:
1819      *  dg = The fiber function.
1820      *
1821      * In:
1822      *  dg must not be null.
1823      */
1824     this(void delegate() dg)
1825     {
1826         super(dg);
1827         call();
1828     }
1829 
1830     /**
1831      * Initializes a generator object which is associated with a dynamic
1832      * D function.  The function will be called once to prepare the range
1833      * for iteration.
1834      *
1835      * Params:
1836      *  dg = The fiber function.
1837      *  sz = The stack size for this fiber.
1838      *
1839      * In:
1840      *  dg must not be null.
1841      */
1842     this(void delegate() dg, size_t sz)
1843     {
1844         super(dg, sz);
1845         call();
1846     }
1847 
1848     /**
1849      * Initializes a generator object which is associated with a dynamic
1850      * D function.  The function will be called once to prepare the range
1851      * for iteration.
1852      *
1853      * Params:
1854      *  dg = The fiber function.
1855      *  sz = The stack size for this fiber.
1856      *  guardPageSize = size of the guard page to trap fiber's stack
1857      *                  overflows. Refer to $(REF Fiber, core,thread)'s
1858      *                  documentation for more details.
1859      *
1860      * In:
1861      *  dg must not be null.
1862      */
1863     this(void delegate() dg, size_t sz, size_t guardPageSize)
1864     {
1865         super(dg, sz, guardPageSize);
1866         call();
1867     }
1868 
1869     /**
1870      * Returns true if the generator is empty.
1871      */
1872     final bool empty() @property
1873     {
1874         return m_value is null || state == State.TERM;
1875     }
1876 
1877     /**
1878      * Obtains the next value from the underlying function.
1879      */
1880     final void popFront()
1881     {
1882         call();
1883     }
1884 
1885     /**
1886      * Returns the most recently generated value by shallow copy.
1887      */
1888     final T front() @property
1889     {
1890         return *m_value;
1891     }
1892 
1893     /**
1894      * Returns the most recently generated value without executing a
1895      * copy contructor. Will not compile for element types defining a
1896      * postblit, because `Generator` does not return by reference.
1897      */
1898     final T moveFront()
1899     {
1900         static if (!hasElaborateCopyConstructor!T)
1901         {
1902             return front;
1903         }
1904         else
1905         {
1906             static assert(0,
1907                     "Fiber front is always rvalue and thus cannot be moved since it defines a postblit.");
1908         }
1909     }
1910 
1911     final int opApply(scope int delegate(T) loopBody)
1912     {
1913         int broken;
1914         for (; !empty; popFront())
1915         {
1916             broken = loopBody(front);
1917             if (broken) break;
1918         }
1919         return broken;
1920     }
1921 
1922     final int opApply(scope int delegate(size_t, T) loopBody)
1923     {
1924         int broken;
1925         for (size_t i; !empty; ++i, popFront())
1926         {
1927             broken = loopBody(i, front);
1928             if (broken) break;
1929         }
1930         return broken;
1931     }
1932 private:
1933     T* m_value;
1934 }
1935 
1936 ///
1937 @system unittest
1938 {
1939     auto tid = spawn({
1940         int i;
1941         while (i < 9)
1942             i = receiveOnly!int;
1943 
1944         ownerTid.send(i * 2);
1945     });
1946 
1947     auto r = new Generator!int({
1948         foreach (i; 1 .. 10)
1949             yield(i);
1950     });
1951 
1952     foreach (e; r)
1953         tid.send(e);
1954 
1955     assert(receiveOnly!int == 18);
1956 }
1957 
1958 /**
1959  * Yields a value of type T to the caller of the currently executing
1960  * generator.
1961  *
1962  * Params:
1963  *  value = The value to yield.
1964  */
1965 void yield(T)(ref T value)
1966 {
1967     Generator!T cur = cast(Generator!T) Fiber.getThis();
1968     if (cur !is null && cur.state == Fiber.State.EXEC)
1969     {
1970         cur.m_value = &value;
1971         return Fiber.yield();
1972     }
1973     throw new Exception("yield(T) called with no active generator for the supplied type");
1974 }
1975 
1976 /// ditto
1977 void yield(T)(T value)
1978 {
1979     yield(value);
1980 }
1981 
1982 @system unittest
1983 {
1984     import core.exception;
1985     import std.exception;
1986 
1987     auto mainTid = thisTid;
1988     alias testdg = () {
1989         auto tid = spawn(
1990         (Tid mainTid) {
1991             int i;
1992             scope (failure) mainTid.send(false);
1993             try
1994             {
1995                 for (i = 1; i < 10; i++)
1996                 {
1997                     if (receiveOnly!int() != i)
1998                     {
1999                         mainTid.send(false);
2000                         break;
2001                     }
2002                 }
2003             }
2004             catch (OwnerTerminated e)
2005             {
2006                 // i will advance 1 past the last value expected
2007                 mainTid.send(i == 4);
2008             }
2009         }, mainTid);
2010         auto r = new Generator!int(
2011         {
2012             assertThrown!Exception(yield(2.0));
2013             yield(); // ensure this is a no-op
2014             yield(1);
2015             yield(); // also once something has been yielded
2016             yield(2);
2017             yield(3);
2018         });
2019 
2020         foreach (e; r)
2021         {
2022             tid.send(e);
2023         }
2024     };
2025 
2026     scheduler = new ThreadScheduler;
2027     scheduler.spawn(testdg);
2028     assert(receiveOnly!bool());
2029 
2030     scheduler = new FiberScheduler;
2031     scheduler.start(testdg);
2032     assert(receiveOnly!bool());
2033     scheduler = null;
2034 }
2035 ///
2036 @system unittest
2037 {
2038     import std.range;
2039 
2040     InputRange!int myIota = iota(10).inputRangeObject;
2041 
2042     myIota.popFront();
2043     myIota.popFront();
2044     assert(myIota.moveFront == 2);
2045     assert(myIota.front == 2);
2046     myIota.popFront();
2047     assert(myIota.front == 3);
2048 
2049     //can be assigned to std.range.interfaces.InputRange directly
2050     myIota = new Generator!int(
2051     {
2052         foreach (i; 0 .. 10) yield(i);
2053     });
2054 
2055     myIota.popFront();
2056     myIota.popFront();
2057     assert(myIota.moveFront == 2);
2058     assert(myIota.front == 2);
2059     myIota.popFront();
2060     assert(myIota.front == 3);
2061 
2062     size_t[2] counter = [0, 0];
2063     foreach (i, unused; myIota) counter[] += [1, i];
2064 
2065     assert(myIota.empty);
2066     assert(counter == [7, 21]);
2067 }
2068 
2069 private
2070 {
2071     /*
2072      * A MessageBox is a message queue for one thread.  Other threads may send
2073      * messages to this owner by calling put(), and the owner receives them by
2074      * calling get().  The put() call is therefore effectively shared and the
2075      * get() call is effectively local.  setMaxMsgs may be used by any thread
2076      * to limit the size of the message queue.
2077      */
2078     class MessageBox
2079     {
2080         this() @trusted nothrow /* TODO: make @safe after relevant druntime PR gets merged */
2081         {
2082             m_lock = new Mutex;
2083             m_closed = false;
2084 
2085             if (scheduler is null)
2086             {
2087                 m_putMsg = new Condition(m_lock);
2088                 m_notFull = new Condition(m_lock);
2089             }
2090             else
2091             {
2092                 m_putMsg = scheduler.newCondition(m_lock);
2093                 m_notFull = scheduler.newCondition(m_lock);
2094             }
2095         }
2096 
2097         ///
2098         final @property bool isClosed() @safe @nogc pure
2099         {
2100             synchronized (m_lock)
2101             {
2102                 return m_closed;
2103             }
2104         }
2105 
2106         /*
2107          * Sets a limit on the maximum number of user messages allowed in the
2108          * mailbox.  If this limit is reached, the caller attempting to add
2109          * a new message will execute call.  If num is zero, there is no limit
2110          * on the message queue.
2111          *
2112          * Params:
2113          *  num  = The maximum size of the queue or zero if the queue is
2114          *         unbounded.
2115          *  call = The routine to call when the queue is full.
2116          */
2117         final void setMaxMsgs(size_t num, bool function(Tid) call) @safe @nogc pure
2118         {
2119             synchronized (m_lock)
2120             {
2121                 m_maxMsgs = num;
2122                 m_onMaxMsgs = call;
2123             }
2124         }
2125 
2126         /*
2127          * If maxMsgs is not set, the message is added to the queue and the
2128          * owner is notified.  If the queue is full, the message will still be
2129          * accepted if it is a control message, otherwise onCrowdingDoThis is
2130          * called.  If the routine returns true, this call will block until
2131          * the owner has made space available in the queue.  If it returns
2132          * false, this call will abort.
2133          *
2134          * Params:
2135          *  msg = The message to put in the queue.
2136          *
2137          * Throws:
2138          *  An exception if the queue is full and onCrowdingDoThis throws.
2139          */
2140         final void put(ref Message msg)
2141         {
2142             synchronized (m_lock)
2143             {
2144                 // TODO: Generate an error here if m_closed is true, or maybe
2145                 //       put a message in the caller's queue?
2146                 if (!m_closed)
2147                 {
2148                     while (true)
2149                     {
2150                         if (isPriorityMsg(msg))
2151                         {
2152                             m_sharedPty.put(msg);
2153                             m_putMsg.notify();
2154                             return;
2155                         }
2156                         if (!mboxFull() || isControlMsg(msg))
2157                         {
2158                             m_sharedBox.put(msg);
2159                             m_putMsg.notify();
2160                             return;
2161                         }
2162                         if (m_onMaxMsgs !is null && !m_onMaxMsgs(thisTid))
2163                         {
2164                             return;
2165                         }
2166                         m_putQueue++;
2167                         m_notFull.wait();
2168                         m_putQueue--;
2169                     }
2170                 }
2171             }
2172         }
2173 
2174         /*
2175          * Matches ops against each message in turn until a match is found.
2176          *
2177          * Params:
2178          *  ops = The operations to match.  Each may return a bool to indicate
2179          *        whether a message with a matching type is truly a match.
2180          *
2181          * Returns:
2182          *  true if a message was retrieved and false if not (such as if a
2183          *  timeout occurred).
2184          *
2185          * Throws:
2186          *  LinkTerminated if a linked thread terminated, or OwnerTerminated
2187          * if the owner thread terminates and no existing messages match the
2188          * supplied ops.
2189          */
2190         bool get(T...)(scope T vals)
2191         {
2192             import std.meta : AliasSeq;
2193 
2194             static assert(T.length, "T must not be empty");
2195 
2196             static if (is(T[0] : Duration))
2197             {
2198                 alias Ops = AliasSeq!(T[1 .. $]);
2199                 alias ops = vals[1 .. $];
2200                 enum timedWait = true;
2201                 Duration period = vals[0];
2202             }
2203             else
2204             {
2205                 alias Ops = AliasSeq!(T);
2206                 alias ops = vals[0 .. $];
2207                 enum timedWait = false;
2208             }
2209 
2210             bool onStandardMsg(ref Message msg)
2211             {
2212                 foreach (i, t; Ops)
2213                 {
2214                     alias Args = Parameters!(t);
2215                     auto op = ops[i];
2216 
2217                     if (msg.convertsTo!(Args))
2218                     {
2219                         alias RT = ReturnType!(t);
2220                         static if (is(RT == bool))
2221                         {
2222                             return msg.map(op);
2223                         }
2224                         else
2225                         {
2226                             msg.map(op);
2227                             static if (!is(immutable RT == immutable noreturn))
2228                                 return true;
2229                         }
2230                     }
2231                 }
2232                 return false;
2233             }
2234 
2235             bool onLinkDeadMsg(ref Message msg)
2236             {
2237                 assert(msg.convertsTo!(Tid),
2238                         "Message could be converted to Tid");
2239                 auto tid = msg.get!(Tid);
2240 
2241                 if (bool* pDepends = tid in thisInfo.links)
2242                 {
2243                     auto depends = *pDepends;
2244                     thisInfo.links.remove(tid);
2245                     // Give the owner relationship precedence.
2246                     if (depends && tid != thisInfo.owner)
2247                     {
2248                         auto e = new LinkTerminated(tid);
2249                         auto m = Message(MsgType.standard, e);
2250                         if (onStandardMsg(m))
2251                             return true;
2252                         throw e;
2253                     }
2254                 }
2255                 if (tid == thisInfo.owner)
2256                 {
2257                     thisInfo.owner = Tid.init;
2258                     auto e = new OwnerTerminated(tid);
2259                     auto m = Message(MsgType.standard, e);
2260                     if (onStandardMsg(m))
2261                         return true;
2262                     throw e;
2263                 }
2264                 return false;
2265             }
2266 
2267             bool onControlMsg(ref Message msg)
2268             {
2269                 switch (msg.type)
2270                 {
2271                 case MsgType.linkDead:
2272                     return onLinkDeadMsg(msg);
2273                 default:
2274                     return false;
2275                 }
2276             }
2277 
2278             bool scan(ref ListT list)
2279             {
2280                 for (auto range = list[]; !range.empty;)
2281                 {
2282                     // Only the message handler will throw, so if this occurs
2283                     // we can be certain that the message was handled.
2284                     scope (failure)
2285                         list.removeAt(range);
2286 
2287                     if (isControlMsg(range.front))
2288                     {
2289                         if (onControlMsg(range.front))
2290                         {
2291                             // Although the linkDead message is a control message,
2292                             // it can be handled by the user.  Since the linkDead
2293                             // message throws if not handled, if we get here then
2294                             // it has been handled and we can return from receive.
2295                             // This is a weird special case that will have to be
2296                             // handled in a more general way if more are added.
2297                             if (!isLinkDeadMsg(range.front))
2298                             {
2299                                 list.removeAt(range);
2300                                 continue;
2301                             }
2302                             list.removeAt(range);
2303                             return true;
2304                         }
2305                         range.popFront();
2306                         continue;
2307                     }
2308                     else
2309                     {
2310                         if (onStandardMsg(range.front))
2311                         {
2312                             list.removeAt(range);
2313                             return true;
2314                         }
2315                         range.popFront();
2316                         continue;
2317                     }
2318                 }
2319                 return false;
2320             }
2321 
2322             bool pty(ref ListT list)
2323             {
2324                 if (!list.empty)
2325                 {
2326                     auto range = list[];
2327 
2328                     if (onStandardMsg(range.front))
2329                     {
2330                         list.removeAt(range);
2331                         return true;
2332                     }
2333                     if (range.front.convertsTo!(Throwable))
2334                         throw range.front.get!(Throwable);
2335                     else if (range.front.convertsTo!(shared(Throwable)))
2336                         /* Note: a shared type can be caught without the shared qualifier
2337                          * so throwing shared will be an error */
2338                         throw cast() range.front.get!(shared(Throwable));
2339                     else
2340                         throw new PriorityMessageException(range.front.data);
2341                 }
2342                 return false;
2343             }
2344 
2345             static if (timedWait)
2346             {
2347                 import core.time : MonoTime;
2348                 auto limit = MonoTime.currTime + period;
2349             }
2350 
2351             while (true)
2352             {
2353                 ListT arrived;
2354 
2355                 if (pty(m_localPty) || scan(m_localBox))
2356                 {
2357                     return true;
2358                 }
2359                 yield();
2360                 synchronized (m_lock)
2361                 {
2362                     updateMsgCount();
2363                     while (m_sharedPty.empty && m_sharedBox.empty)
2364                     {
2365                         // NOTE: We're notifying all waiters here instead of just
2366                         //       a few because the onCrowding behavior may have
2367                         //       changed and we don't want to block sender threads
2368                         //       unnecessarily if the new behavior is not to block.
2369                         //       This will admittedly result in spurious wakeups
2370                         //       in other situations, but what can you do?
2371                         if (m_putQueue && !mboxFull())
2372                             m_notFull.notifyAll();
2373                         static if (timedWait)
2374                         {
2375                             if (period <= Duration.zero || !m_putMsg.wait(period))
2376                                 return false;
2377                         }
2378                         else
2379                         {
2380                             m_putMsg.wait();
2381                         }
2382                     }
2383                     m_localPty.put(m_sharedPty);
2384                     arrived.put(m_sharedBox);
2385                 }
2386                 if (m_localPty.empty)
2387                 {
2388                     scope (exit) m_localBox.put(arrived);
2389                     if (scan(arrived))
2390                     {
2391                         return true;
2392                     }
2393                     else
2394                     {
2395                         static if (timedWait)
2396                         {
2397                             period = limit - MonoTime.currTime;
2398                         }
2399                         continue;
2400                     }
2401                 }
2402                 m_localBox.put(arrived);
2403                 pty(m_localPty);
2404                 return true;
2405             }
2406         }
2407 
2408         /*
2409          * Called on thread termination.  This routine processes any remaining
2410          * control messages, clears out message queues, and sets a flag to
2411          * reject any future messages.
2412          */
2413         final void close()
2414         {
2415             static void onLinkDeadMsg(ref Message msg)
2416             {
2417                 assert(msg.convertsTo!(Tid),
2418                         "Message could be converted to Tid");
2419                 auto tid = msg.get!(Tid);
2420 
2421                 thisInfo.links.remove(tid);
2422                 if (tid == thisInfo.owner)
2423                     thisInfo.owner = Tid.init;
2424             }
2425 
2426             static void sweep(ref ListT list)
2427             {
2428                 for (auto range = list[]; !range.empty; range.popFront())
2429                 {
2430                     if (range.front.type == MsgType.linkDead)
2431                         onLinkDeadMsg(range.front);
2432                 }
2433             }
2434 
2435             ListT arrived;
2436 
2437             sweep(m_localBox);
2438             synchronized (m_lock)
2439             {
2440                 arrived.put(m_sharedBox);
2441                 m_closed = true;
2442             }
2443             m_localBox.clear();
2444             sweep(arrived);
2445         }
2446 
2447     private:
2448         // Routines involving local data only, no lock needed.
2449 
2450         bool mboxFull() @safe @nogc pure nothrow
2451         {
2452             return m_maxMsgs && m_maxMsgs <= m_localMsgs + m_sharedBox.length;
2453         }
2454 
2455         void updateMsgCount() @safe @nogc pure nothrow
2456         {
2457             m_localMsgs = m_localBox.length;
2458         }
2459 
2460         bool isControlMsg(ref Message msg) @safe @nogc pure nothrow
2461         {
2462             return msg.type != MsgType.standard && msg.type != MsgType.priority;
2463         }
2464 
2465         bool isPriorityMsg(ref Message msg) @safe @nogc pure nothrow
2466         {
2467             return msg.type == MsgType.priority;
2468         }
2469 
2470         bool isLinkDeadMsg(ref Message msg) @safe @nogc pure nothrow
2471         {
2472             return msg.type == MsgType.linkDead;
2473         }
2474 
2475         alias OnMaxFn = bool function(Tid);
2476         alias ListT = List!(Message);
2477 
2478         ListT m_localBox;
2479         ListT m_localPty;
2480 
2481         Mutex m_lock;
2482         Condition m_putMsg;
2483         Condition m_notFull;
2484         size_t m_putQueue;
2485         ListT m_sharedBox;
2486         ListT m_sharedPty;
2487         OnMaxFn m_onMaxMsgs;
2488         size_t m_localMsgs;
2489         size_t m_maxMsgs;
2490         bool m_closed;
2491         package Thread m_thread; // For join(Tid)
2492     }
2493 
2494     /*
2495      *
2496      */
2497     struct List(T)
2498     {
2499         struct Range
2500         {
2501             import std.exception : enforce;
2502 
2503             @property bool empty() const
2504             {
2505                 return !m_prev.next;
2506             }
2507 
2508             @property ref T front()
2509             {
2510                 enforce(m_prev.next, "invalid list node");
2511                 return m_prev.next.val;
2512             }
2513 
2514             @property void front(T val)
2515             {
2516                 enforce(m_prev.next, "invalid list node");
2517                 m_prev.next.val = val;
2518             }
2519 
2520             void popFront()
2521             {
2522                 enforce(m_prev.next, "invalid list node");
2523                 m_prev = m_prev.next;
2524             }
2525 
2526             private this(Node* p)
2527             {
2528                 m_prev = p;
2529             }
2530 
2531             private Node* m_prev;
2532         }
2533 
2534         void put(T val)
2535         {
2536             put(newNode(val));
2537         }
2538 
2539         void put(ref List!(T) rhs)
2540         {
2541             if (!rhs.empty)
2542             {
2543                 put(rhs.m_first);
2544                 while (m_last.next !is null)
2545                 {
2546                     m_last = m_last.next;
2547                     m_count++;
2548                 }
2549                 rhs.m_first = null;
2550                 rhs.m_last = null;
2551                 rhs.m_count = 0;
2552             }
2553         }
2554 
2555         Range opSlice()
2556         {
2557             return Range(cast(Node*)&m_first);
2558         }
2559 
2560         void removeAt(Range r)
2561         {
2562             import std.exception : enforce;
2563 
2564             assert(m_count, "Can not remove from empty Range");
2565             Node* n = r.m_prev;
2566             enforce(n && n.next, "attempting to remove invalid list node");
2567 
2568             if (m_last is m_first)
2569                 m_last = null;
2570             else if (m_last is n.next)
2571                 m_last = n; // nocoverage
2572             Node* to_free = n.next;
2573             n.next = n.next.next;
2574             freeNode(to_free);
2575             m_count--;
2576         }
2577 
2578         @property size_t length()
2579         {
2580             return m_count;
2581         }
2582 
2583         void clear()
2584         {
2585             m_first = m_last = null;
2586             m_count = 0;
2587         }
2588 
2589         @property bool empty()
2590         {
2591             return m_first is null;
2592         }
2593 
2594     private:
2595         struct Node
2596         {
2597             Node* next;
2598             T val;
2599 
2600             this(T v)
2601             {
2602                 val = v;
2603             }
2604         }
2605 
2606         static shared struct SpinLock
2607         {
2608             void lock() { while (!cas(&locked, false, true)) { Thread.yield(); } }
2609             void unlock() { atomicStore!(MemoryOrder.rel)(locked, false); }
2610             bool locked;
2611         }
2612 
2613         static shared SpinLock sm_lock;
2614         static shared Node* sm_head;
2615 
2616         Node* newNode(T v)
2617         {
2618             Node* n;
2619             {
2620                 sm_lock.lock();
2621                 scope (exit) sm_lock.unlock();
2622 
2623                 if (sm_head)
2624                 {
2625                     n = cast(Node*) sm_head;
2626                     sm_head = sm_head.next;
2627                 }
2628             }
2629             if (n)
2630             {
2631                 import core.lifetime : emplace;
2632                 emplace!Node(n, v);
2633             }
2634             else
2635             {
2636                 n = new Node(v);
2637             }
2638             return n;
2639         }
2640 
2641         void freeNode(Node* n)
2642         {
2643             // destroy val to free any owned GC memory
2644             destroy(n.val);
2645 
2646             sm_lock.lock();
2647             scope (exit) sm_lock.unlock();
2648 
2649             auto sn = cast(shared(Node)*) n;
2650             sn.next = sm_head;
2651             sm_head = sn;
2652         }
2653 
2654         void put(Node* n)
2655         {
2656             m_count++;
2657             if (!empty)
2658             {
2659                 m_last.next = n;
2660                 m_last = n;
2661                 return;
2662             }
2663             m_first = n;
2664             m_last = n;
2665         }
2666 
2667         Node* m_first;
2668         Node* m_last;
2669         size_t m_count;
2670     }
2671 }
2672 
2673 @system unittest
2674 {
2675     import std.typecons : tuple, Tuple;
2676 
2677     static void testfn(Tid tid)
2678     {
2679         receive((float val) { assert(0); }, (int val, int val2) {
2680             assert(val == 42 && val2 == 86);
2681         });
2682         receive((Tuple!(int, int) val) { assert(val[0] == 42 && val[1] == 86); });
2683         receive((Variant val) {  });
2684         receive((string val) {
2685             if ("the quick brown fox" != val)
2686                 return false;
2687             return true;
2688         }, (string val) { assert(false); });
2689         prioritySend(tid, "done");
2690     }
2691 
2692     static void runTest(Tid tid)
2693     {
2694         send(tid, 42, 86);
2695         send(tid, tuple(42, 86));
2696         send(tid, "hello", "there");
2697         send(tid, "the quick brown fox");
2698         receive((string val) { assert(val == "done"); });
2699     }
2700 
2701     static void simpleTest()
2702     {
2703         auto tid = spawn(&testfn, thisTid);
2704         runTest(tid);
2705 
2706         // Run the test again with a limited mailbox size.
2707         tid = spawn(&testfn, thisTid);
2708         setMaxMailboxSize(tid, 2, OnCrowding.block);
2709         runTest(tid);
2710     }
2711 
2712     simpleTest();
2713 
2714     scheduler = new ThreadScheduler;
2715     simpleTest();
2716     scheduler = null;
2717 }
2718 
2719 private @property shared(Mutex) initOnceLock()
2720 {
2721     static shared Mutex lock;
2722     if (auto mtx = atomicLoad!(MemoryOrder.acq)(lock))
2723         return mtx;
2724     auto mtx = new shared Mutex;
2725     if (cas(&lock, cast(shared) null, mtx))
2726         return mtx;
2727     return atomicLoad!(MemoryOrder.acq)(lock);
2728 }
2729 
2730 /**
2731  * Initializes $(D_PARAM var) with the lazy $(D_PARAM init) value in a
2732  * thread-safe manner.
2733  *
2734  * The implementation guarantees that all threads simultaneously calling
2735  * initOnce with the same $(D_PARAM var) argument block until $(D_PARAM var) is
2736  * fully initialized. All side-effects of $(D_PARAM init) are globally visible
2737  * afterwards.
2738  *
2739  * Params:
2740  *   var = The variable to initialize
2741  *   init = The lazy initializer value
2742  *
2743  * Returns:
2744  *   A reference to the initialized variable
2745  */
2746 auto ref initOnce(alias var)(lazy typeof(var) init)
2747 {
2748     return initOnce!var(init, initOnceLock);
2749 }
2750 
2751 /// A typical use-case is to perform lazy but thread-safe initialization.
2752 @system unittest
2753 {
2754     static class MySingleton
2755     {
2756         static MySingleton instance()
2757         {
2758             __gshared MySingleton inst;
2759             return initOnce!inst(new MySingleton);
2760         }
2761     }
2762 
2763     assert(MySingleton.instance !is null);
2764 }
2765 
2766 @system unittest
2767 {
2768     static class MySingleton
2769     {
2770         static MySingleton instance()
2771         {
2772             __gshared MySingleton inst;
2773             return initOnce!inst(new MySingleton);
2774         }
2775 
2776     private:
2777         this() { val = ++cnt; }
2778         size_t val;
2779         __gshared size_t cnt;
2780     }
2781 
2782     foreach (_; 0 .. 10)
2783         spawn({ ownerTid.send(MySingleton.instance.val); });
2784     foreach (_; 0 .. 10)
2785         assert(receiveOnly!size_t == MySingleton.instance.val);
2786     assert(MySingleton.cnt == 1);
2787 }
2788 
2789 /**
2790  * Same as above, but takes a separate mutex instead of sharing one among
2791  * all initOnce instances.
2792  *
2793  * This should be used to avoid dead-locks when the $(D_PARAM init)
2794  * expression waits for the result of another thread that might also
2795  * call initOnce. Use with care.
2796  *
2797  * Params:
2798  *   var = The variable to initialize
2799  *   init = The lazy initializer value
2800  *   mutex = A mutex to prevent race conditions
2801  *
2802  * Returns:
2803  *   A reference to the initialized variable
2804  */
2805 auto ref initOnce(alias var)(lazy typeof(var) init, shared Mutex mutex)
2806 {
2807     // check that var is global, can't take address of a TLS variable
2808     static assert(is(typeof({ __gshared p = &var; })),
2809         "var must be 'static shared' or '__gshared'.");
2810     import core.atomic : atomicLoad, MemoryOrder, atomicStore;
2811 
2812     static shared bool flag;
2813     if (!atomicLoad!(MemoryOrder.acq)(flag))
2814     {
2815         synchronized (mutex)
2816         {
2817             if (!atomicLoad!(MemoryOrder.raw)(flag))
2818             {
2819                 var = init;
2820                 static if (!is(immutable typeof(var) == immutable noreturn))
2821                     atomicStore!(MemoryOrder.rel)(flag, true);
2822             }
2823         }
2824     }
2825     return var;
2826 }
2827 
2828 /// ditto
2829 auto ref initOnce(alias var)(lazy typeof(var) init, Mutex mutex)
2830 {
2831     return initOnce!var(init, cast(shared) mutex);
2832 }
2833 
2834 /// Use a separate mutex when init blocks on another thread that might also call initOnce.
2835 @system unittest
2836 {
2837     import core.sync.mutex : Mutex;
2838 
2839     static shared bool varA, varB;
2840     static shared Mutex m;
2841     m = new shared Mutex;
2842 
2843     spawn({
2844         // use a different mutex for varB to avoid a dead-lock
2845         initOnce!varB(true, m);
2846         ownerTid.send(true);
2847     });
2848     // init depends on the result of the spawned thread
2849     initOnce!varA(receiveOnly!bool);
2850     assert(varA == true);
2851     assert(varB == true);
2852 }
2853 
2854 @system unittest
2855 {
2856     static shared bool a;
2857     __gshared bool b;
2858     static bool c;
2859     bool d;
2860     initOnce!a(true);
2861     initOnce!b(true);
2862     static assert(!__traits(compiles, initOnce!c(true))); // TLS
2863     static assert(!__traits(compiles, initOnce!d(true))); // local variable
2864 }
2865 
2866 // test ability to send shared arrays
2867 @system unittest
2868 {
2869     static shared int[] x = new shared(int)[1];
2870     auto tid = spawn({
2871         auto arr = receiveOnly!(shared(int)[]);
2872         arr[0] = 5;
2873         ownerTid.send(true);
2874     });
2875     tid.send(x);
2876     receiveOnly!(bool);
2877     assert(x[0] == 5);
2878 }
2879 
2880 // https://issues.dlang.org/show_bug.cgi?id=13930
2881 @system unittest
2882 {
2883     immutable aa = ["0":0];
2884     thisTid.send(aa);
2885     receiveOnly!(immutable int[string]); // compile error
2886 }
2887 
2888 // https://issues.dlang.org/show_bug.cgi?id=19345
2889 @system unittest
2890 {
2891     static struct Aggregate { const int a; const int[5] b; }
2892     static void t1(Tid mainTid)
2893     {
2894         const sendMe = Aggregate(42, [1, 2, 3, 4, 5]);
2895         mainTid.send(sendMe);
2896     }
2897 
2898     spawn(&t1, thisTid);
2899     auto result1 = receiveOnly!(const Aggregate)();
2900     immutable expected = Aggregate(42, [1, 2, 3, 4, 5]);
2901     assert(result1 == expected);
2902 }
2903 
2904 // Noreturn support
2905 @system unittest
2906 {
2907     static noreturn foo(int) { throw new Exception(""); }
2908 
2909     if (false) spawn(&foo, 1);
2910     if (false) spawnLinked(&foo, 1);
2911 
2912     if (false) receive(&foo);
2913     if (false) receiveTimeout(Duration.init, &foo);
2914 
2915     // Wrapped in __traits(compiles) to skip codegen which crashes dmd's backend
2916     static assert(__traits(compiles, receiveOnly!noreturn()                 ));
2917     static assert(__traits(compiles, send(Tid.init, noreturn.init)          ));
2918     static assert(__traits(compiles, prioritySend(Tid.init, noreturn.init)  ));
2919     static assert(__traits(compiles, yield(noreturn.init)                   ));
2920 
2921     static assert(__traits(compiles, {
2922         __gshared noreturn n;
2923         initOnce!n(noreturn.init);
2924     }));
2925 }