Cookbook » Asynchronous Tasking with Dependencies

This chapters discusses how to create a task graph dynamically using dependent asynchronous (dependent-async) tasks, which is extremely beneficial for workloads that want to (1) explore task graph parallelism out of dynamic control flow or (2) overlap task graph creation time with individual task execution time. We recommend that you first read Asynchronous Tasking before digesting this chapter.

Create a Dynamic Task Graph

When the construct-and-run model of a task graph is not possible in your application, you can use tf::Executor::dependent_async and tf::Executor::silent_dependent_async to create a task graph on the fly. This style of execution is commonly referred to as dynamic task graph parallelism and provides greater flexibility in expressing parallelism that adapts to runtime conditions. The example below dynamically creates a task graph of four dependent-async tasks, A, B, C, and D, where A runs before B and C and D runs after B and C:

G A A B B A->B C C A->C D D B->D C->D
tf::Executor executor;
tf::AsyncTask A = executor.silent_dependent_async([](){ printf("A\n"); });
tf::AsyncTask B = executor.silent_dependent_async([](){ printf("B\n"); }, A);
tf::AsyncTask C = executor.silent_dependent_async([](){ printf("C\n"); }, A);
auto [D, fuD] = executor.dependent_async([](){ printf("D\n"); }, B, C);
fuD.get();  // wait for D to finish, which in turn means A, B, C have finished

Both tf::Executor::dependent_async and tf::Executor::silent_dependent_async create a dependent-async task of type tf::AsyncTask to run the given function asynchronously. Additionally, tf::Executor::dependent_async returns a std::future that eventually holds the result of the execution. When returning from both calls, the executor has scheduled a worker to run the task whenever its dependencies are met. That is, task execution happens simultaneously with the creation of the task graph, which is different from constructing a Taskflow and running it from an executor, illustrated in the figure below:

Image

Since this model only allows relating a dependency from the current task to a previously created task, you need a correct topological order of graph expression. In our example, there are only two possible topological orderings, either ABCD or ACBD. The code below shows another feasible order of expressing this dynamic task graph parallelism:

tf::Executor executor;
tf::AsyncTask A = executor.silent_dependent_async([](){ printf("A\n"); });
tf::AsyncTask C = executor.silent_dependent_async([](){ printf("C\n"); }, A);
tf::AsyncTask B = executor.silent_dependent_async([](){ printf("B\n"); }, A);
auto [D, fuD] = executor.dependent_async([](){ printf("D\n"); }, B, C);
fuD.get();  // wait for D to finish, which in turn means A, B, C have finished

In addition to using std::future to synchronize the execution at a particular task point, you can use tf::Executor::wait_for_all to wait for all scheduled tasks to finish:

tf::Executor executor;
tf::AsyncTask A = executor.silent_dependent_async([](){ printf("A\n"); });
tf::AsyncTask B = executor.silent_dependent_async([](){ printf("B\n"); }, A);
tf::AsyncTask C = executor.silent_dependent_async([](){ printf("C\n"); }, A);
tf::AsyncTask D = executor.silent_dependent_async([](){ printf("D\n"); }, B, C);
executor.wait_for_all();

Specify a Range of Dependent Async Tasks

Both tf::Executor::dependent_async and tf::Executor::silent_dependent_async accept an arbitrary number of tasks in the dependency list. If the number of task dependencies (i.e., predecessors) is unknown at programming time, such as those relying on runtime variables, you can use the following two overloads to specify predecessor tasks in an iterable range [first, last):

The range must be an input iterator whose deferenced type is convertible to tf::AsyncTask. The following example creates a dependent-async task that depends on N previously created dependent-async tasks stored in a vector, where N is a runtime variable:

tf::Executor executor;
std::vector<tf::AsyncTask> predecessors;
for(size_t i=0; i<N; i++) {  // N is a runtime variable
  predecessors.push_back(executor.silent_dependent_async([](){}));
}
executor.silent_dependent_async([](){}, predecessors.begin(), predecessors.end());

// wait for the above N+1 dependent-async tasks to finish
executor.wait_for_all();

Understand the Lifetime of a Dependent-async Task

tf::AsyncTask is a lightweight handle that retains shared ownership of a dependent-async task created by an executor. This shared ownership ensures that the async task remains alive when adding it to the dependency list of another async task, thus avoiding the classical ABA problem.

// main thread retains shared ownership of async task A
tf::AsyncTask A = executor.silent_dependent_async([](){});
assert(A.use_count() >= 1);  // main thread holds a shared ownership to A

// task A remains alive (i.e., at least one ref count by the main thread) 
// when being added to the dependency list of async task B
tf::AsyncTask B = executor.silent_dependent_async([](){}, A);
assert(B.use_count() >= 1);  // main thread holds a shared ownership to B

Currently, tf::AsyncTask is implemented based on C++ smart pointer (std::shared_ptr) and is considered cheap to copy or move as long as only a handful of objects own it. When a worker completes a dependent-async task, it will remove the task from the executor, decrementing the number of shared owners by one. If that counter reaches zero, the task is destroyed.

Create a Dynamic Task Graph by Multiple Threads

You can use multiple threads to create a dynamic task graph as long as the order of simultaneously creating tasks is topologically correct. The example below uses creates a dynamic task graph using three threads (including the main thread), where task A runs before task B and task C:

tf::Executor executor;

// main thread creates a dependent-async task A
tf::AsyncTask A = executor.silent_dependent_async([](){});

// spawn a new thread to create an async task B that runs after A
std::thread t1([&](){
  tf::AsyncTask B = executor.silent_dependent_async([](){}, A);
});

// spawn a new thread to create an async task C that runs after A
std::thread t2([&](){
  tf::AsyncTask C = executor.silent_dependent_async([](){}, A);
});

executor.wait_for_all();
t1.join();
t2.join();

Regardless of whether t1 runs before or after t2, the resulting topological order remains valid with respect to the graph definition. In this example, either ABC or ACB is a correct ordering.

Query the Completion Status of Dependent Async Tasks

When you create a dependent-async task, you can query its completion status using tf::AsyncTask::is_done, which returns true if the task has completed its execution, or false otherwise. A task is considered completed once a worker has finished executing its associated callable.

// create a dependent-async task that returns 100
auto [task, fu] = executor.dependent_async([](){ return 100; });

// loops until the dependent-async task completes
while(!task.is_done());
assert(fu.get() == 100);

tf::AsyncTask::is_done is useful when you need to wait on the result of a dependent-async task before moving onto the next program instruction. Often, tf::AsyncTask is used together with tf::Executor::corun_until to keep a worker awake in its work-stealing loop to avoid deadlock (see Execute a Taskflow from an Internal Worker for more details). For instance, the code below implements the famous Fibonacci sequence using recursive dependent-async tasking:

tf::Executor executor;
std::function<int(int)> fibonacci;

// calculate the Fibonacci sequence: 0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89
fibonacci = [&](int N){
  if (N < 2) {
    return N; 
  }
  auto [t1, fu1] = executor.dependent_async(std::bind(fibonacci, N-1));
  auto [t2, fu2] = executor.dependent_async(std::bind(fibonacci, N-2));
  executor.corun_until([&](){ return t1.is_done() && t2.is_done(); });
  return fu1.get() + fu2.get();
};

auto [task, fib11] = executor.dependent_async(std::bind(fibonacci, 11));
assert(fib11 == 89);  // the 11-th Fibonacci number is 89