Task Group
A task group is a lightweight mechanism in Taskflow to spawn and manage a collection of asynchronous tasks cooperatively within a single executor. Task groups allow tasks to be executed recursively, asynchronously, or with dependencies, enabling efficient implementation of recursive parallel algorithms.
Create a %Task Group
A task group (tf::
tf::Executor executor; executor.silent_async([&](){ tf::TaskGroup tg = executor.task_group(); });
Internally, a task group is bound to the executor and the worker that creates it. This worker is referred to as the parent worker of the task group and is the only worker allowed to issue cooperative execution (tf::
tf::Executor executor; tf::TaskGroup tg = executor.task_group(); // throws
Submit Asynchronous Tasks with Cooperative Execution
tf::
- tf::
TaskGroup:: async - tf::
TaskGroup:: silent_async - tf::
TaskGroup:: dependent_async - tf::
TaskGroup:: silent_dependent_async
Each variant serves a distinct purpose depending on whether you need, including a returned future, dependency ordering between tasks, etc. For instance, the code below creates 100 tasks using tf::
executor.async([&](){ tf::TaskGroup tg = executor.task_group(); std::atomic<int> counter{0}; // spawn 100 silent-async tasks (without future return) for(int i=0; i<100; i++) { tg.silent_async([&](){ counter++; }); } // spawn one async task (with future return) auto fu = tg.async([](){ return 42; }); // cooperatively run all tasks in the group tg.corun(); assert(counter == 100); assert(fu.get() == 42); });
If you need dependencies among async tasks, use tf::
executor.async([&](){ auto tg = executor.task_group(); tf::AsyncTask A = tg.silent_dependent_async([](){ printf("A\n"); }); tf::AsyncTask B = tg.silent_dependent_async([](){ printf("B\n"); }); tf::AsyncTask C = tg.silent_dependent_async([](){ printf("C\n"); }, A, B); tg.corun(); });
Cancel a %Task Group
You can mark a task group as cancelled to stop any not-yet-started tasks in the group from running. Tasks that are already running will continue to completion, but no new tasks belonging to the task group will be scheduled after cancellation. The example below demonstrates how tf::
const size_t W = 12; // must be >1 for this example to work tf::Executor executor(W); executor.async([&executor, W](){ auto tg = executor.task_group(); // deliberately block the other W-1 workers std::atomic<size_t> latch(0); for(size_t i=0; i<W-1; ++i) { tg.async([&](){ ++latch; while(latch != 0); }); } // wait until the other W-1 workers are blocked while(latch != W-1); // spawn other tasks which should never run after cancellation for(size_t i=0; i<100; ++i) { tg.async([&](){ throw std::runtime_error("this should never run"); }); } // cancel the task group and unblock the other W-1 workers assert(tg.is_cancelled() == false); tg.cancel(); assert(tg.is_cancelled() == true); latch = 0; tg.corun(); });
Note that cancellation is cooperative: tasks should not assume immediate termination. Users must still call tf::
Implement Recursive Task Parallelism
tf::
tf::Executor executor; size_t fibonacci(size_t N) { if(N < 2) return N; size_t res1, res2; tf::TaskGroup tg = executor.task_group(); tg.silent_async([N, &res1](){ res1 = fibonacci(N-1); }); res2 = fibonacci(N-2); // cooperatively run tasks until all tasks spawned by `tg` complete tg.corun(); return res1 + res2; } int main() { size_t N = 30, res; res = executor.async([](){ return fibonacci(30); }).get(); std::cout << N << "-th Fibonacci number is " << res << '\n'; return 0; }
The function fibonacci spawns one recursive call as an asynchronous task and computes the other directly. Calling tf::