Cookbook » Asynchronous Tasking

This chapters discusses how to launch tasks asynchronously so that you can incorporate independent, dynamic parallelism in your taskflows.

Launch Asynchronous Tasks from an Executor

Taskflow executor provides an STL-styled method, tf::Executor::async, for you to run a callable object asynchronously. The method returns a std::future that will eventually hold the result of that function call.

std::future<int> future = executor.async([](){ return 1; });
assert(future.get() == 1);

If you do not need the return value or use a future to synchronize the execution, you are encouraged to use tf::Executor::silent_async which returns nothing and thus has less overhead (i.e., no shared state management) compared to tf::Executor::async.

executor.silent_async([](){
  // do some work without returning any result
});

Launching asynchronous tasks from an executor is thread-safe and can be called by multiple threads both inside (i.e., worker) and outside the executor. Our scheduler autonomously detects whether an asynchronous task is submitted from an external thread or a worker thread and schedules its execution using work stealing.

tf::Task my_task = taskflow.emplace([&](){
  // launch an asynchronous task from my_task
  executor.async([&](){
    // launch another asynchronous task that may be run by another worker
    executor.async([&](){});
  })
});
executor.run(taskflow);
executor.wait_for_all();   // wait for all tasks to finish

You can name an asynchronous task using the overloads, tf::Executor::async(const std::string& name, F&& f) and tf::Executor::silent_async(const std::string& name, F&& f), that take a string in the first argument. Assigned names will appear in the observers of the executor.

std::future<void> fu = executor.async("async task", [](){});
executor.silent_async("silent async task", [](){});

Launch Asynchronous Tasks from a Subflow

You can launch asynchronous tasks from tf::Subflow using tf::Subflow::async. Asynchronous tasks are independent tasks spawned during the execution of a subflow. When the subflow joins, all asynchronous tasks are guaranteed to finish. The following code creates 100 asynchronous tasks from a subflow and joins their executions explicitly using tf::Subflow::join.

tf::Taskflow taskflow;
tf::Executor executor;

std::atomic<int> counter{0};

taskflow.emplace([&] (tf::Subflow& sf){
  std::vector<std::future<void>> futures;
  for(int i=0; i<100; i++) {
    futures.emplace_back(sf.async([&](){ ++counter; }));
  }
  sf.join();  // all of the 100 asynchronous tasks will finish by this join
  assert(counter == 100);
});

executor.run(taskflow).wait();

If you do not need the return value or the future to synchronize the execution, you can use tf::Subflow::silent_async which has less overhead when creating an asynchronous task compared to tf::Subflow::async.

tf::Taskflow taskflow;
tf::Executor executor;

std::atomic<int> counter{0};

taskflow.emplace([&] (tf::Subflow& sf){
  for(int i=0; i<100; i++) {
    sf.silent_async([&](){ ++counter; });
  }
  sf.join();  // all of the 100 asynchronous tasks will finish by this join
  assert(counter == 100);
});

executor.run(taskflow).wait();

You can assign an asynchronous task a name using the two overloads, tf::Subflow::async(const std::string& name, F&& f) and tf::Subflow::silent_async(const std::string& name, F&& f). Both methods take an additional argument of a string.

taskflow.emplace([](tf::Subflow& sf){
  std::future<void> future = sf.async("name of the task", [](){});
  sf.silent_async("another name of the task", [](){});
  sf.join();
});

Launch Asynchronous Tasks from a Runtime

The asynchronous tasking feature of tf::Subflow is indeed derived from tf::Runtime. You can launch asynchronous tasks from tf::Runtime using tf::Runtime::async or tf::Runtime::silent_async. The following code creates 100 asynchronous tasks from a runtime and joins their executions explicitly using tf::Runtime::corun_all.

tf::Taskflow taskflow;
tf::Executor executor;

std::atomic<int> counter{0};

taskflow.emplace([&] (tf::Runtime& rt){
  for(int i=0; i<100; i++) {
    rt.silent_async([&](){ ++counter; }));
  }
  rt.corun_all();  // all of the 100 asynchronous tasks will finish by this join
  assert(counter == 100);
});
executor.run(taskflow).wait();

Unlike tf::Subflow::join, you can call tf::Runtime::corun_all multiple times to synchronize the execution of asynchronous tasks between different runs. For example, the following code spawn 100 asynchronous tasks twice and join each execution to assure the spawned 100 asynchronous tasks have properly completed.

tf::Taskflow taskflow;
tf::Executor executor;

std::atomic<int> counter{0};

taskflow.emplace([&] (tf::Runtime& rt){
  // spawn 100 asynchronous tasks and join
  for(int i=0; i<100; i++) {
    rt.silent_async([&](){ ++counter; }));
  }
  rt.corun_all();  // all of the 100 asynchronous tasks will finish by this join
  assert(counter == 100);
  
  // spawn another 100 asynchronous tasks and join
  for(int i=0; i<100; i++) {
    rt.silent_async([&](){ ++counter; }));
  }
  rt.corun_all();  // all of the 100 asynchronous tasks will finish by this join
  assert(counter == 200);
});
executor.run(taskflow).wait();

By default, tf::Runtime does not join like tf::Subflow. All pending asynchronous tasks spawned by tf::Runtime are no longer controllable when their parent runtime disappears. It is your responsibility to properly synchronize spawned asynchronous tasks using tf::Runtime::corun_all.