Cookbook » Interact with the Runtime

Taskflow allows you to interact with the scheduling runtime by taking a runtime object as an argument of a task. This is mostly useful for designing specialized parallel algorithms extended from the existing facility of Taskflow.

Create a Runtime Object

Taskflow allows a static task and a condition task to take a referenced tf::Runtime object that provides a set of methods to interact with the scheduling runtime. The following example creates a static task that leverages tf::Runtime to explicitly schedule a conditioned task which would never run under the normal scheduling circumstance:

tf::Task A, B, C, D;
std::tie(A, B, C, D) = taskflow.emplace(
  [] () { return 0; },
  [&C] (tf::Runtime& rt) {  // C must be captured by reference
    std::cout << "B\n"; 
    rt.schedule(C);
  },
  [] () { std::cout << "C\n"; },
  [] () { std::cout << "D\n"; }
);
A.precede(B, C, D);
executor.run(taskflow).wait();
Taskflow p0x7bc400014030 D p0x7bc400014118 C p0x7bc400014200 B p0x7bc4000142e8 A p0x7bc4000142e8->p0x7bc400014030 2 p0x7bc4000142e8->p0x7bc400014118 1 p0x7bc4000142e8->p0x7bc400014200 0

When the condition task A completes and returns 0, the scheduler moves on to task B. Under the normal circumstance, tasks C and D will not run because their conditional dependencies never happen. This can be broken by forcefully scheduling C or/and D via a runtime object of a task that resides in the same graph. Here, task B call tf::Runtime::schedule to forcefully run task C even though the weak dependency between A and C will never happen based on the graph structure itself. As a result, we will see both B and C in the output:

B    # B leverages a runtime object to schedule C out of its dependency constraint
C

Acquire the Running Executor

You can acquire the reference to the running executor using tf::Runtime::executor(). The executor associated with a runtime object is the executor that runs the parent task of that runtime object.

tf::Executor executor;
tf::Taskflow taskflow;
taskflow.emplace([&](tf::Runtime& rt){
  assert(&(rt.executor()) == &executor);
});
executor.run(taskflow).wait();

Run a Task Graph Synchronously

A runtime object can spawn and run a task graph synchronously using tf::Runtime::corun. This model allows you to leverage dynamic tasking to execute a parallel workload within a runtime object. The following code creates a subflow of two independent tasks and executes it synchronously via the given runtime object:

taskflow.emplace([](tf::Runtime& rt){
  rt.corun([](tf::Subflow& sf){
    sf.emplace([](){ std::cout << "independent task 1\n"; });
    sf.emplace([](){ std::cout << "independent task 2\n"; });
    // subflow joins upon corun returns
  });
});

You can also create a task graph yourself and execute it through a runtime object. This organization avoids repetitive creation of a subflow with the same topology, such as running a runtime object repetitively. The following code performs the same execution logic as the above example but using the given task graph to avoid repetitive creations of a subflow:

// create a custom graph
tf::Taskflow graph;
graph.emplace([](){ std::cout << "independent task 1\n"; });
graph.emplace([](){ std::cout << "independent task 2\n"; });

taskflow.emplace([&](tf::Runtime& rt){ 
  // this worker coruns the graph through its work-stealing loop
  rt.corun(graph);
});
executor.run_n(taskflow, 10000);

Although tf::Runtime::corun blocks until the operation completes, the caller thread (worker) is not preempted (e.g., sleep or holding any lock). Instead, the caller thread joins the work-stealing loop of the executor and leaves whenever the spawned task graph completes. This is different from waiting for a submitted taskflow using tf::Future<T>::wait which blocks the caller thread until the submitted taskflow completes. When multiple submitted taskflows are being waited, their executions can potentially lead to deadlock. For example, the code below creates a taskflow of 1000 tasks with each task running a taskflow of 500 tasks in a blocking fashion:

tf::Executor executor(2);
tf::Taskflow taskflow;
std::array<tf::Taskflow, 1000> others;

std::atomic<size_t> counter{0};

for(size_t n=0; n<1000; n++) {
  for(size_t i=0; i<500; i++) {
    others[n].emplace([&](){ counter++; });
  }
  taskflow.emplace([&executor, &tf=others[n]](){
    // blocking the worker can introduce deadlock where
    // all workers are waiting for their taskflows to finish
    executor.run(tf).wait();
  });
}
executor.run(taskflow).wait();

Using tf::Runtime::corun allows each worker to corun these taskflows through its work-stealing loop, thus avoiding deadlock problem caused by blocking wait.

tf::Executor executor(2);
tf::Taskflow taskflow;
std::array<tf::Taskflow, 1000> others;

std::atomic<size_t> counter{0};

for(size_t n=0; n<1000; n++) {
  for(size_t i=0; i<500; i++) {
    others[n].emplace([&](){ counter++; });
  }
  taskflow.emplace([&tf=others[n]](tf::Runtime& rt){
    // the caller worker will not block but corun these
    // taskflows through its work-stealing loop
    rt.corun(tf);
  });
}
executor.run(taskflow).wait();

Learn More About Runtime

t the following pages to learn more about tf::Runtime: