std.parallelism

std.parallelism implements high-level primitives for SMP parallelism. These include parallel foreach, parallel reduce, parallel eager map, pipelining and future/promise parallelism. std.parallelism is recommended when the same operation is to be executed in parallel on different data, or when a function is to be executed in a background thread and its result returned to a well-defined main thread. For communication between arbitrary threads, see std.concurrency.

std.parallelism is based on the concept of a Task. A Task is an object that represents the fundamental unit of work in this library and may be executed in parallel with any other Task. Using Task directly allows programming with a future/promise paradigm. All other supported parallelism paradigms (parallel foreach, map, reduce, pipelining) represent an additional level of abstraction over Task. They automatically create one or more Task objects, or closely related types that are conceptually identical but not part of the public API.

After creation, a Task may be executed in a new thread, or submitted to a TaskPool for execution. A TaskPool encapsulates a task queue and its worker threads. Its purpose is to efficiently map a large number of Tasks onto a smaller number of threads. A task queue is a FIFO queue of Task objects that have been submitted to the TaskPool and are awaiting execution. A worker thread is a thread that is associated with exactly one task queue. It executes the Task at the front of its queue when the queue has work available, or sleeps when no work is available. Each task queue is associated with zero or more worker threads. If the result of a Task is needed before execution by a worker thread has begun, the Task can be removed from the task queue and executed immediately in the thread where the result is needed.

Warning: Unless marked as @trusted or @safe, artifacts in this module allow implicit data sharing between threads and cannot guarantee that client code is free from low level data races.

Members

Aliases

totalCPUs
alias totalCPUs = __lazilyInitializedConstant!(immutable(uint), uint.max, totalCPUsImpl)

The total number of CPU cores available on the current machine, as reported by the operating system.

Classes

TaskPool
class TaskPool

This class encapsulates a task queue and a set of worker threads. Its purpose is to efficiently map a large number of Tasks onto a smaller number of threads. A task queue is a FIFO queue of Task objects that have been submitted to the TaskPool and are awaiting execution. A worker thread is a thread that executes the Task at the front of the queue when one is available and sleeps when the queue is empty.

Functions

parallel
ParallelForeach!R parallel(R range)
ParallelForeach!R parallel(R range, size_t workUnitSize)

Convenience functions that forwards to taskPool.parallel. The purpose of these is to make parallel foreach less verbose and more readable.

scopedTask
auto scopedTask(Args args)
auto scopedTask(F delegateOrFp, Args args)
auto scopedTask(F fun, Args args)

These functions allow the creation of Task objects on the stack rather than the GC heap. The lifetime of a Task created by scopedTask cannot exceed the lifetime of the scope it was created in.

task
auto task(Args args)

Creates a Task on the GC heap that calls an alias. This may be executed via Task.executeInNewThread or by submitting to a std.parallelism.TaskPool. A globally accessible instance of TaskPool is provided by std.parallelism.taskPool.

task
auto task(F delegateOrFp, Args args)

Creates a Task on the GC heap that calls a function pointer, delegate, or class/struct with overloaded opCall.

task
auto task(F fun, Args args)

Version of task usable from @safe code. Usage mechanics are identical to the non-@safe case, but safety introduces some restrictions:

Properties

defaultPoolThreads
uint defaultPoolThreads [@property getter]
void defaultPoolThreads [@property getter]

These properties get and set the number of worker threads in the TaskPool instance returned by taskPool. The default value is totalCPUs - 1. Calling the setter after the first call to taskPool does not changes number of worker threads in the instance returned by taskPool.

taskPool
TaskPool taskPool [@property getter]

Returns a lazily initialized global instantiation of TaskPool. This function can safely be called concurrently from multiple non-worker threads. The worker threads in this pool are daemon threads, meaning that it is not necessary to call TaskPool.stop or TaskPool.finish before terminating the main thread.

Structs

Task
struct Task(alias fun, Args...)

Task represents the fundamental unit of work. A Task may be executed in parallel with any other Task. Using this struct directly allows future/promise parallelism. In this paradigm, a function (or delegate or other callable) is executed in a thread other than the one it was called from. The calling thread does not block while the function is being executed. A call to workForce, yieldForce, or spinForce is used to ensure that the Task has finished executing and to obtain the return value, if any. These functions and done also act as full memory barriers, meaning that any memory writes made in the thread that executed the Task are guaranteed to be visible in the calling thread after one of these functions returns.

Examples

import std.algorithm.iteration : map;
import std.math.operations : isClose;
import std.parallelism : taskPool;
import std.range : iota;

// Parallel reduce can be combined with
// std.algorithm.iteration.map to interesting effect.
// The following example (thanks to Russel Winder)
// calculates pi by quadrature  using
// std.algorithm.map and TaskPool.reduce.
// getTerm is evaluated in parallel as needed by
// TaskPool.reduce.
//
// Timings on an Intel i5-3450 quad core machine
// for n = 1_000_000_000:
//
// TaskPool.reduce:       1.067 s
// std.algorithm.reduce:  4.011 s

enum n = 1_000_000;
enum delta = 1.0 / n;

alias getTerm = (int i)
{
    immutable x = ( i - 0.5 ) * delta;
    return delta / ( 1.0 + x * x ) ;
};

immutable pi = 4.0 * taskPool.reduce!"a + b"(n.iota.map!getTerm);

assert(pi.isClose(3.14159, 1e-5));

Meta

Authors

David Simcha