Cookbook » Asynchronous Tasking with Dependencies

This chapters discusses how to create a task graph dynamically using asynchronous tasks, which is extremely beneficial for workloads that want to (1) explore task graph parallelism out of dynamic control flow or (2) overlap task graph creation time with individual task execution time. We recommend that you first read Asynchronous Tasking before digesting this chapter.

Create a Dynamic Task Graph

When the construct-and-run model of a task graph is not possible in your application, you can use tf::Executor::dependent_async and tf::Executor::silent_dependent_async to create a task graph dynamically. This type of parallelism is also known as on-the-fly task graph parallelism, which offers great flexibility for expressing dynamic task graph parallelism. The example below dynamically creates a task graph of four dependent async tasks, A, B, C, and D, where A runs before B and C and D runs after B and C:

G A A B B A->B C C A->C D D B->D C->D
tf::Executor executor;
tf::AsyncTask A = executor.silent_dependent_async([](){ printf("A\n"); });
tf::AsyncTask B = executor.silent_dependent_async([](){ printf("B\n"); }, A);
tf::AsyncTask C = executor.silent_dependent_async([](){ printf("C\n"); }, A);
auto [D, fuD] = executor.dependent_async([](){ printf("D\n"); }, B, C);
fuD.get();  // wait for D to finish, which in turn means A, B, C have finished

Both tf::Executor::dependent_async and tf::Executor::silent_dependent_async create a task of type tf::AsyncTask to run the given function asynchronously. Additionally, tf::Executor::dependent_async returns a std::future that eventually holds the result of the execution. When returning from both calls, the executor has scheduled a worker to run the task whenever its dependencies are met. That is, task execution happens simultaneously with the creation of the task graph, which is different from constructing a Taskflow and running it from an executor, illustrated in the figure below:

Image

Since this model only allows relating a dependency from the current task to a previously created task, you need a correct topological order of graph expression. In our example, there are only two possible topological orderings, either ABCD or ACBD. The code below shows another feasible order of expressing this dynamic task graph parallelism:

tf::Executor executor;
tf::AsyncTask A = executor.silent_dependent_async([](){ printf("A\n"); });
tf::AsyncTask C = executor.silent_dependent_async([](){ printf("C\n"); }, A);
tf::AsyncTask B = executor.silent_dependent_async([](){ printf("B\n"); }, A);
auto [D, fuD] = executor.dependent_async([](){ printf("D\n"); }, B, C);
fuD.get();  // wait for D to finish, which in turn means A, B, C have finished

In addition to using std::future to synchronize the execution, you can use tf::Executor::wait_for_all to wait for all scheduled tasks to finish:

tf::Executor executor;
tf::AsyncTask A = executor.silent_dependent_async([](){ printf("A\n"); });
tf::AsyncTask B = executor.silent_dependent_async([](){ printf("B\n"); }, A);
tf::AsyncTask C = executor.silent_dependent_async([](){ printf("C\n"); }, A);
tf::AsyncTask D = executor.silent_dependent_async([](){ printf("D\n"); }, B, C);
executor.wait_for_all();

Specify a Range of Dependent Async Tasks

Both tf::Executor::dependent_async(F&& func, Tasks&&... tasks) and tf::Executor::silent_dependent_async(F&& func, Tasks&&... tasks) accept an arbitrary number of tasks in the dependency list. If the number of dependent tasks is unknown at programming time, such as those relying on runtime variables, you can use the following two overloads to specify dependent tasks in an iterable range [first, last):

The code below creates an asynchronous task that depends on N previously created asynchronous tasks stored in a vector, where N is a runtime variable:

tf::Executor executor;
std::vector<tf::AsyncTask> dependents;
for(size_t i=0; i<N; i++) {  // N is a runtime variable
  dependents.push_back(executor.silent_dependent_async([](){}));
}
executor.silent_dependent_async([](){}, dependents.begin(), dependents.end());
executor.wait_for_all();

Understand the Lifetime of a Dependent Async Task

A tf::AsyncTask is a lightweight handle that retains shared ownership of a dependent async task created by an executor. This shared ownership ensures that the async task remains alive when adding it to the dependency list of another async task, thus avoiding the classical ABA problem.

// main thread retains shared ownership of async task A
tf::AsyncTask A = executor.silent_dependent_async([](){});

// task A remains alive (i.e., at least one ref count by the main thread) 
// when being added to the dependency list of async task B
tf::AsyncTask B = executor.silent_dependent_async([](){}, A);

Currently, tf::AsyncTask is implemented based on the logic of C++ smart pointer std::shared_ptr and is considered cheap to copy or move as long as only a handful of objects own it. When a worker completes an async task, it will remove the task from the executor, decrementing the number of shared owners by one. If that counter reaches zero, the task is destroyed.

Create a Dynamic Task Graph by Multiple Threads

You can use multiple threads to create a dynamic task graph as long as the order of simultaneously creating tasks is topologically correct. The example below uses creates a dynamic task graph using three threads (including the main thread), where task A runs before task B and task C:

tf::Executor executor;

// main thread creates a dependent async task A
tf::AsyncTask A = executor.silent_dependent_async([](){});

// spawn a new thread to create an async task B that runs after A
std::thread t1([&](){
  tf::AsyncTask B = executor.silent_dependent_async([](){}, A);
});

// spawn a new thread to create an async task C that runs after A
std::thread t2([&](){
  tf::AsyncTask C = executor.silent_dependent_async([](){}, A);
});

executor.wait_for_all();
t1.join();
t2.join();

Regardless of t1 runs before or after t2, the resulting topological order is always correct with the graph definition, either ABC or ACB.

Query the Completion Status of Dependent Async Tasks

When you create a dependent async task, you can query its completion status by tf::AsyncTask::is_done, which returns true upon completion or false otherwise. A completed dependent async task indicates that a worker has executed its associated callable.

// create a dependent async task that returns 100
auto [task, fu] = executor.dependent_async([](){ return 100; });

// loops until the dependent async task completes
while(!task.is_done());
assert(fu.get() == 100);

tf::AsyncTask::is_done is useful when you need to wait on the result of a dependent async task before moving onto the next program instruction. Often, tf::AsyncTask is used together with tf::Executor::corun_until to keep a worker awake in its work-stealing loop to avoid deadlock (see Execute a Taskflow from an Internal Worker for more details). For instance, the code below implements the famous Fibonacci sequence using recursive asynchronous tasking:

tf::Executor executor;
std::function<int(int)> fibonacci;

// calculate the Fibonacci sequence: 0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89
fibonacci = [&](int N){
  if (N < 2) {
    return N; 
  }
  auto [t1, fu1] = executor.dependent_async(std::bind(fibonacci, N-1));
  auto [t2, fu2] = executor.dependent_async(std::bind(fibonacci, N-2));
  executor.corun_until([&](){ return t1.is_done() && t2.is_done(); });
  return fu1.get() + fu2.get();
};

auto [task, fib11] = executor.dependent_async(std::bind(fibonacci, 11));
assert(fib11 == 89);  // the 11-th Fibonacci number is 89