TaskGroup class
class to create a task group from a task
A task group executes a group of asynchronous tasks. It enables asynchronous task spawning, cooperative execution among worker threads, and naturally supports recursive parallelism. Due to cooperative execution, a task group can only be created by an executor worker; otherwise an exception will be thrown. The code below demonstrates how to use task groups to implement recursive Fibonacci parallelism.
tf::Executor executor; size_t fibonacci(size_t N) { if (N < 2) return N; size_t res1, res2; // Create a task group from the current executor tf::TaskGroup tg = get_executor().task_group(); // Submit asynchronous tasks to the group tg.silent_async([N, &res1](){ res1 = fibonacci(N-1); }); res2 = fibonacci(N-2); // compute one branch synchronously // Wait for all tasks in the group to complete tg.corun(); return res1 + res2; } int main() { size_t N = 30, res; res = executor.async([](){ return fibonacci(30); }).get(); std::cout << N << "-th Fibonacci number is " << res << '\n'; return 0; }
Users must explicitly call tf::
Constructors, destructors, conversion operators
Public functions
- auto operator=(TaskGroup&&) -> TaskGroup& deleted
- disabled copy assignment
- auto operator=(const TaskGroup&) -> TaskGroup& deleted
- disabled move assignment
- auto executor() -> Executor&
- obtains the executor that creates this task group
-
template<typename F>auto async(F&& f) -> auto
- runs the given callable asynchronously
-
template<typename P, typename F>auto async(P&& params, F&& f) -> auto
- runs the given callable asynchronously
-
template<typename F>void silent_async(F&& f)
- runs the given function asynchronously without returning any future object
-
template<typename P, typename F>void silent_async(P&& params, F&& f)
- runs the given function asynchronously without returning any future object
-
template<typename F, typename... Tasks, std::enable_if_t<all_same_v<AsyncTask, std::decay_t<Tasks>...>, void>* = nullptr>auto dependent_async(F&& func, Tasks && ... tasks) -> auto
- runs the given function asynchronously when the given predecessors finish
-
template<typename P, typename F, typename... Tasks, std::enable_if_t<is_auto dependent_async(P&& params, F&& func, Tasks && ... tasks) -> auto
task_ params_ v<P> && all_same_v<AsyncTask, std::decay_t<Tasks>...>, void>* = nullptr> - runs the given function asynchronously when the given predecessors finish
-
template<typename F, typename I, std::enable_if_t<!std::is_same_v<std::decay_t<I>, AsyncTask>, void>* = nullptr>auto dependent_async(F&& func, I first, I last) -> auto
- runs the given function asynchronously when the given range of predecessors finish
-
template<typename P, typename F, typename I, std::enable_if_t<is_auto dependent_async(P&& params, F&& func, I first, I last) -> auto
task_ params_ v<P> && !std::is_same_v<std::decay_t<I>, AsyncTask>, void>* = nullptr> - runs the given function asynchronously when the given range of predecessors finish
-
template<typename F, typename... Tasks, std::enable_if_t<all_same_v<AsyncTask, std::decay_t<Tasks>...>, void>* = nullptr>auto silent_dependent_async(F&& func, Tasks && ... tasks) -> tf::
AsyncTask - runs the given function asynchronously when the given predecessors finish
-
template<typename P, typename F, typename... Tasks, std::enable_if_t<is_auto silent_dependent_async(P&& params, F&& func, Tasks && ... tasks) -> tf::
task_ params_ v<P> && all_same_v<AsyncTask, std::decay_t<Tasks>...>, void>* = nullptr> AsyncTask - runs the given function asynchronously when the given predecessors finish
-
template<typename F, typename I, std::enable_if_t<!std::is_same_v<std::decay_t<I>, AsyncTask>, void>* = nullptr>auto silent_dependent_async(F&& func, I first, I last) -> tf::
AsyncTask - runs the given function asynchronously when the given range of predecessors finish
-
template<typename P, typename F, typename I, std::enable_if_t<is_auto silent_dependent_async(P&& params, F&& func, I first, I last) -> tf::
task_ params_ v<P> && !std::is_same_v<std::decay_t<I>, AsyncTask>, void>* = nullptr> AsyncTask - runs the given function asynchronously when the given range of predecessors finish
- void corun()
- corun all tasks spawned by this task group with other workers
- void cancel()
- cancel all tasks in this task group
- auto is_cancelled() -> bool
- queries if the task group has been cancelled
- auto size() const -> size_t
- queries the number of tasks currently in this task group
Function documentation
template<typename F>
auto tf:: TaskGroup:: async(F&& f)
runs the given callable asynchronously
| Template parameters | |
|---|---|
| F | callable type |
| Parameters | |
| f | callable object |
This method creates an asynchronous task that executes the given function with the specified arguments. Unlike tf::
executor.silent_async([&](){ std::atomic<int> counter(0); auto tg = executor.task_group(); auto fu1 = tg.async([&](){ counter++; }); auto fu2 = tg.async([&](){ counter++; }); fu1.get(); fu2.get(); assert(counter == 2); // spawn 100 asynchronous tasks from the task group for(int i=0; i<100; i++) { tg.silent_async([&](){ counter++; }); } // corun until the 100 asynchronous tasks have completed tg.corun(); assert(counter == 102); // do something else afterwards ... });
template<typename P, typename F>
auto tf:: TaskGroup:: async(P&& params,
F&& f)
runs the given callable asynchronously
| Template parameters | |
|---|---|
| P | task parameters type |
| F | callable type |
| Parameters | |
| params | task parameters |
| f | callable |
Similar to tf::
executor.silent_async([&](){ auto tg = executor.task_group(); auto future = tg.async("my task", [](){ return 10; }); assert(future.get() == 10); });
template<typename F>
void tf:: TaskGroup:: silent_async(F&& f)
runs the given function asynchronously without returning any future object
| Template parameters | |
|---|---|
| F | callable type |
| Parameters | |
| f | callable |
This function is more efficient than tf::
executor.silent_async([&](){ std::atomic<int> counter(0); auto tg = executor.task_group(); for(int i=0; i<100; i++) { tg.silent_async([&](){ counter++; }); } tg.corun(); assert(counter == 100); });
template<typename P, typename F>
void tf:: TaskGroup:: silent_async(P&& params,
F&& f)
runs the given function asynchronously without returning any future object
| Template parameters | |
|---|---|
| F | callable type |
| Parameters | |
| params | task parameters |
| f | callable |
Similar to tf::
executor.silent_async([&](){ auto tg = executor.task_group(); tg.silent_async("my task", [](){}); tg.corun(); });
template<typename F, typename... Tasks, std::enable_if_t<all_same_v<AsyncTask, std::decay_t<Tasks>...>, void>* = nullptr>
auto tf:: TaskGroup:: dependent_async(F&& func,
Tasks && ... tasks)
runs the given function asynchronously when the given predecessors finish
| Template parameters | |
|---|---|
| F | callable type |
| Tasks | tasks of type tf:: |
| Parameters | |
| func | callable object |
| tasks | asynchronous tasks on which this execution depends |
| Returns | a pair of a tf:: |
The example below creates three asynchronous tasks, A, B, and C, in which task C runs after task A and task B. Task C returns a pair of its tf::
executor.silent_async([&](){ auto tg = executor.task_group(); tf::AsyncTask A = tg.silent_dependent_async([](){ printf("A\n"); }); tf::AsyncTask B = tg.silent_dependent_async([](){ printf("B\n"); }); auto [C, fuC] = tg.dependent_async( [](){ printf("C runs after A and B\n"); return 1; }, A, B ); fuC.get(); // C finishes, which in turns means both A and B finish, // so we don't need tg.corun() });
template<typename P, typename F, typename... Tasks, std::enable_if_t<is_ task_ params_ v<P> && all_same_v<AsyncTask, std::decay_t<Tasks>...>, void>* = nullptr>
auto tf:: TaskGroup:: dependent_async(P&& params,
F&& func,
Tasks && ... tasks)
runs the given function asynchronously when the given predecessors finish
| Template parameters | |
|---|---|
| P | task parameters type |
| F | callable type |
| Tasks | tasks of type tf:: |
| Parameters | |
| params | task parameters |
| func | callable object |
| tasks | asynchronous tasks on which this execution depends |
| Returns | a pair of a tf:: |
The example below creates three named asynchronous tasks, A, B, and C, in which task C runs after task A and task B. Task C returns a pair of its tf::
executor.silent_async([&](){ auto tg = executor.task_group(); tf::AsyncTask A = tg.silent_dependent_async("A", [](){ printf("A\n"); }); tf::AsyncTask B = tg.silent_dependent_async("B", [](){ printf("B\n"); }); auto [C, fuC] = tg.dependent_async( "C", [](){ printf("C runs after A and B\n"); return 1; }, A, B ); fuC.get(); // C finishes, which in turns means both A and B finish, // so we don't need tg.corun() });
template<typename F, typename I, std::enable_if_t<!std::is_same_v<std::decay_t<I>, AsyncTask>, void>* = nullptr>
auto tf:: TaskGroup:: dependent_async(F&& func,
I first,
I last)
runs the given function asynchronously when the given range of predecessors finish
| Template parameters | |
|---|---|
| F | callable type |
| I | iterator type |
| Parameters | |
| func | callable object |
| first | iterator to the beginning (inclusive) |
| last | iterator to the end (exclusive) |
| Returns | a pair of a tf:: |
The example below creates three asynchronous tasks, A, B, and C, in which task C runs after task A and task B. Task C returns a pair of its tf::
executor.silent_async([](){ auto tg = executor.task_group(); std::array<tf::AsyncTask, 2> array { tg.silent_dependent_async([](){ printf("A\n"); }), tg.silent_dependent_async([](){ printf("B\n"); }) }; auto [C, fuC] = tg.dependent_async( [](){ printf("C runs after A and B\n"); return 1; }, array.begin(), array.end() ); fuC.get(); // C finishes, which in turns means both A and B finish, // so we don't need tg.corun() });
template<typename P, typename F, typename I, std::enable_if_t<is_ task_ params_ v<P> && !std::is_same_v<std::decay_t<I>, AsyncTask>, void>* = nullptr>
auto tf:: TaskGroup:: dependent_async(P&& params,
F&& func,
I first,
I last)
runs the given function asynchronously when the given range of predecessors finish
| Template parameters | |
|---|---|
| P | task parameters type |
| F | callable type |
| I | iterator type |
| Parameters | |
| params | task parameters |
| func | callable object |
| first | iterator to the beginning (inclusive) |
| last | iterator to the end (exclusive) |
| Returns | a pair of a tf:: |
The example below creates three named asynchronous tasks, A, B, and C, in which task C runs after task A and task B. Task C returns a pair of its tf::
executor.silent_async([&](){ auto tg = executor.task_group(); std::array<tf::AsyncTask, 2> array { tg.silent_dependent_async("A", [](){ printf("A\n"); }), tg.silent_dependent_async("B", [](){ printf("B\n"); }) }; auto [C, fuC] = tg.dependent_async( "C", [](){ printf("C runs after A and B\n"); return 1; }, array.begin(), array.end() ); fuC.get(); // C finishes, which in turns means both A and B finish, // so we don't need tg.corun() });
template<typename F, typename... Tasks, std::enable_if_t<all_same_v<AsyncTask, std::decay_t<Tasks>...>, void>* = nullptr>
tf:: AsyncTask tf:: TaskGroup:: silent_dependent_async(F&& func,
Tasks && ... tasks)
runs the given function asynchronously when the given predecessors finish
| Template parameters | |
|---|---|
| F | callable type |
| Tasks | tasks of type tf:: |
| Parameters | |
| func | callable object |
| tasks | asynchronous tasks on which this execution depends |
| Returns | a tf:: |
This member function is more efficient than tf::A, B, and C, in which task C runs after task A and task B.
executor.silent_async([&](){ auto tg = executor.task_group(); tf::AsyncTask A = tg.silent_dependent_async([](){ printf("A\n"); }); tf::AsyncTask B = tg.silent_dependent_async([](){ printf("B\n"); }); tg.silent_dependent_async([](){ printf("C runs after A and B\n"); }, A, B); tg.corun(); // corun until all dependent-async tasks finish });
template<typename P, typename F, typename... Tasks, std::enable_if_t<is_ task_ params_ v<P> && all_same_v<AsyncTask, std::decay_t<Tasks>...>, void>* = nullptr>
tf:: AsyncTask tf:: TaskGroup:: silent_dependent_async(P&& params,
F&& func,
Tasks && ... tasks)
runs the given function asynchronously when the given predecessors finish
| Template parameters | |
|---|---|
| F | callable type |
| Tasks | tasks of type tf:: |
| Parameters | |
| params | task parameters |
| func | callable object |
| tasks | asynchronous tasks on which this execution depends |
| Returns | a tf:: |
This member function is more efficient than tf::A, B, and C, in which task C runs after task A and task B. Assigned task names will appear in the observers of the executor.
executor.silent_async([&](){ auto tg = executor.task_group(); tf::AsyncTask A = tg.silent_dependent_async("A", [](){ printf("A\n"); }); tf::AsyncTask B = tg.silent_dependent_async("B", [](){ printf("B\n"); }); tg.silent_dependent_async( "C", [](){ printf("C runs after A and B\n"); }, A, B ); tg.corun(); // corun until all dependent-async tasks finish });
template<typename F, typename I, std::enable_if_t<!std::is_same_v<std::decay_t<I>, AsyncTask>, void>* = nullptr>
tf:: AsyncTask tf:: TaskGroup:: silent_dependent_async(F&& func,
I first,
I last)
runs the given function asynchronously when the given range of predecessors finish
| Template parameters | |
|---|---|
| F | callable type |
| I | iterator type |
| Parameters | |
| func | callable object |
| first | iterator to the beginning (inclusive) |
| last | iterator to the end (exclusive) |
| Returns | a tf:: |
This member function is more efficient than tf::A, B, and C, in which task C runs after task A and task B.
executor.silent_async([&](){ auto tg = executor.task_group(); std::array<tf::AsyncTask, 2> array { tg.silent_dependent_async([](){ printf("A\n"); }), tg.silent_dependent_async([](){ printf("B\n"); }) }; tg.silent_dependent_async( [](){ printf("C runs after A and B\n"); }, array.begin(), array.end() ); tg.corun(); // corun until all dependent-async tasks finish });
template<typename P, typename F, typename I, std::enable_if_t<is_ task_ params_ v<P> && !std::is_same_v<std::decay_t<I>, AsyncTask>, void>* = nullptr>
tf:: AsyncTask tf:: TaskGroup:: silent_dependent_async(P&& params,
F&& func,
I first,
I last)
runs the given function asynchronously when the given range of predecessors finish
| Template parameters | |
|---|---|
| F | callable type |
| I | iterator type |
| Parameters | |
| params | tasks parameters |
| func | callable object |
| first | iterator to the beginning (inclusive) |
| last | iterator to the end (exclusive) |
| Returns | a tf:: |
This member function is more efficient than tf::A, B, and C, in which task C runs after task A and task B. Assigned task names will appear in the observers of the executor.
executor.silent_async([&](){ auto tg = executor.task_group(); std::array<tf::AsyncTask, 2> array { tg.silent_dependent_async("A", [](){ printf("A\n"); }), tg.silent_dependent_async("B", [](){ printf("B\n"); }) }; tg.silent_dependent_async( "C", [](){ printf("C runs after A and B\n"); }, array.begin(), array.end() ); tg.corun(); // corun until all dependent-async tasks finish });
void tf:: TaskGroup:: corun()
corun all tasks spawned by this task group with other workers
Coruns all tasks spawned by this task group cooperatively with other workers in the same executor until all these tasks finish. Under cooperative execution, a worker is not preempted. Instead, it continues participating in the work-stealing loop, executing available tasks alongside other workers.
executor.silent_async([&](){ auto tg = executor.task_group(); std::atomic<size_t> counter{0}; // spawn 100 async tasks and wait for(int i=0; i<100; i++) { tg.silent_async([&](){ counter++; }); } tg.corun(); assert(counter == 100); // spawn another 100 async tasks and wait for(int i=0; i<100; i++) { tg.silent_async([&](){ counter++; }); } tg.corun(); assert(counter == 200); });
Note that only the parent worker of this task group (the worker who creates it) can call this corun.
void tf:: TaskGroup:: cancel()
cancel all tasks in this task group
Marks the task group as cancelled to stop any not-yet-started tasks in the group from running. Tasks that are already running will continue to completion, but no new tasks belonging to the task group will be scheduled after cancellation.
This example below demonstrates how tf::
const size_t W = 12; // must be >1 for this example to work tf::Executor executor(W); executor.async([&executor, W](){ auto tg = executor.task_group(); // deliberately block the other W-1 workers std::atomic<size_t> latch(0); for(size_t i=0; i<W-1; ++i) { tg.async([&](){ ++latch; while(latch != 0); }); } // wait until the other W-1 workers are blocked while(latch != W-1); // spawn other tasks which should never run after cancellation for(size_t i=0; i<100; ++i) { tg.async([&](){ throw std::runtime_error("this should never run"); }); } // cancel the task group and unblock the other W-1 workers assert(tg.is_cancelled() == false); tg.cancel(); assert(tg.is_cancelled() == true); latch = 0; tg.corun(); });
Note that cancellation is cooperative: tasks should not assume immediate termination. Users must still call tf::
bool tf:: TaskGroup:: is_cancelled()
queries if the task group has been cancelled
| Returns | true if the task group has been marked as cancelled or false otherwise |
|---|
This method returns true if the task group has been marked as cancelled via a call to cancel(), and false otherwise.
executor.async([&](){ auto tg = executor.task_group(); assert(tg.is_cancelled() == false); tg.cancel(true); assert(tg.is_cancelled() == false); });
The cancellation state reflects whether the task group is currently in a cancelled state and does not imply that all tasks have completed or been synchronized. If a task group spawns any task, users must still call corun() to synchronize with all spawned tasks and ensure safe completion or cancellation. Failing to do so results in undefined behavior.
size_t tf:: TaskGroup:: size() const
queries the number of tasks currently in this task group
| Returns | the number of tasks currently in this task group |
|---|
This method returns the number of tasks that belong to the task group at the time of the call. The returned value represents a snapshot and may become outdated immediately, as tasks can be concurrently spawned, started, completed, or canceled while this method is executing. As a result, the value returned by size() should be used for informational or diagnostic purposes only and must not be relied upon for synchronization or correctness.
executor.silent_async([&](){ auto tg = executor.task_group(); assert(tg.size() == 0); for(size_t i=0; i<1000; ++i) { tg.silent_async([](){}); } assert(tg.size() >= 0); tg.corun(); });