Executor
After you create a task dependency graph, you need to submit it to threads for execution. In this chapter, we will show you how to execute a task dependency graph.
Create an Executor
To execute a taskflow, you need to create an executor of type tf::N
worker threads. The default value is std::
tf::Executor executor1; // create an executor with the number of workers // equal to std::thread::hardware_concurrency tf::Executor executor2(4); // create an executor of 4 worker threads
An executor can be reused to execute multiple taskflows. In most workloads, you may need only one executor to run multiple taskflows where each taskflow represents a part of a parallel decomposition.
Understand Work-stealing in Executor
Taskflow designs a highly efficient work-stealing algorithm to schedule and run tasks in an executor. Work-stealing is a dynamic scheduling algorithm widely used in parallel computing to distribute and balance workload among multiple threads or cores. Specifically, within an executor, each worker maintains its own local queue of tasks. When a worker finishes its own tasks, instead of becoming idle or going sleep, it (thief) tries to steal a task from the queue another worker (victim). The figure below illustrates the idea of work-stealing:

The key advantage of work-stealing lies in its decentralized nature and efficiency. Most of the time, worker threads work on their local queues without contention. Stealing only occurs when a worker becomes idle, minimizing overhead associated with synchronization and task distribution. This decentralized strategy effectively balances the workload, ensuring that idle workers are put to work and that the overall computation progresses efficiently.
Execute a Taskflow
tf::run_*
methods, tf::
1: // Declare an executor and a taskflow 2: tf::Executor executor; 3: tf::Taskflow taskflow; 4: 5: // Add three tasks into the taskflow 6: tf::Task A = taskflow.emplace([] () { std::cout << "This is TaskA\n"; }); 7: tf::Task B = taskflow.emplace([] () { std::cout << "This is TaskB\n"; }); 8: tf::Task C = taskflow.emplace([] () { std::cout << "This is TaskC\n"; }); 9: 10: // Build precedence between tasks 11: A.precede(B, C); 12: 13: tf::Future<void> fu = executor.run(taskflow); 14: fu.wait(); // block until the execution completes 15: 16: executor.run(taskflow, [](){ std::cout << "end of 1 run"; }).wait(); 17: executor.run_n(taskflow, 4); 18: executor.wait_for_all(); // block until all associated executions finish 19: executor.run_n(taskflow, 4, [](){ std::cout << "end of 4 runs"; }).wait(); 20: executor.run_until(taskflow, [cnt=0] () mutable { return ++cnt == 10; });
Debrief:
- Lines 6-8 create a taskflow of three tasks A, B, and C
- Lines 13-14 run the taskflow once and wait for completion
- Line 16 runs the taskflow once with a callback to invoke when the execution finishes
- Lines 17-18 run the taskflow four times and use tf::
Executor:: wait_for_all to wait for completion - Line 19 runs the taskflow four times and invokes a callback at the end of the fourth execution
- Line 20 keeps running the taskflow until the predicate returns true
Issuing multiple runs on the same taskflow will automatically synchronize to a sequential chain of executions in the order of run calls.
executor.run(taskflow); // execution 1 executor.run_n(taskflow, 10); // execution 2 executor.run(taskflow); // execution 3 executor.wait_for_all(); // execution 1 -> execution 2 -> execution 3
tf::Executor executor; // create an executor // create a taskflow whose lifetime is restricted by the scope { tf::Taskflow taskflow; // add tasks to the taskflow // ... // run the taskflow executor.run(taskflow); } // leaving the scope will destroy taskflow while it is running, // resulting in undefined behavior
Similarly, you should avoid touching a taskflow while it is running.
tf::Taskflow taskflow; // Add tasks into the taskflow // ... // Declare an executor tf::Executor executor; tf::Future<void> future = executor.run(taskflow); // non-blocking return // alter the taskflow while running leads to undefined behavior taskflow.emplace([](){ std::cout << "Add a new task\n"; });
You must always keep a taskflow alive and must not modify it while it is running on an executor.
Execute a Taskflow with Transferred Ownership
You can transfer the ownership of a taskflow to an executor and run it without wrangling with the lifetime issue of that taskflow. Each run_*
method discussed in the previous section comes with an overload that takes a moved taskflow object.
tf::Taskflow taskflow; tf::Executor executor; taskflow.emplace([](){}); // let the executor manage the lifetime of the submitted taskflow executor.run(std::move(taskflow)); // now taskflow has no tasks assert(taskflow.num_tasks() == 0);
However, you should avoid moving a running taskflow which can result in undefined behavior.
tf::Taskflow taskflow; tf::Executor executor; taskflow.emplace([](){}); // executor does not manage the lifetime of taskflow executor.run(taskflow); // error! you cannot move a taskflow while it is running executor.run(std::move(taskflow));
The correct way to submit a taskflow with moved ownership to an executor is to ensure all previous runs have completed. The executor will automatically release the resources of a moved taskflow right after its execution completes.
// submit the taskflow and wait until it completes executor.run(taskflow).wait(); // now it's safe to move the taskflow to the executor and run it executor.run(std::move(taskflow));
Likewise, you cannot move a taskflow that is running on an executor. You must wait until all the previous fires of runs on that taskflow complete before calling move.
// submit the taskflow and wait until it completes executor.run(taskflow).wait(); // now it's safe to move the taskflow to another tf::Taskflow moved_taskflow(std::move(taskflow));
Execute a Taskflow from an Internal Worker
Each run variant of tf::tf::Future::wait
, the caller blocks without doing anything until the associated state is written to be ready. This design, however, can introduce deadlock problem especially when you need to run multiple taskflows from the internal workers of an executor. For example, the code below creates a taskflow of 1000 tasks with each task running a taskflow of 500 tasks in a blocking fashion:
tf::Executor executor(2); tf::Taskflow taskflow; std::array<tf::Taskflow, 1000> others; for(size_t n=0; n<1000; n++) { for(size_t i=0; i<500; i++) { others[n].emplace([&](){}); } taskflow.emplace([&executor, &tf=others[n]](){ // blocking the worker can introduce deadlock where // all workers are waiting for their taskflows to finish executor.run(tf).wait(); }); } executor.run(taskflow).wait();
To avoid this problem, the executor has a method, tf::
tf::Executor executor(2); tf::Taskflow taskflow; std::array<tf::Taskflow, 1000> others; std::atomic<size_t> counter{0}; for(size_t n=0; n<1000; n++) { for(size_t i=0; i<500; i++) { others[n].emplace([&](){ counter++; }); } taskflow.emplace([&executor, &tf=others[n]](){ // the caller worker will not block but corun these // taskflows through its work-stealing loop executor.corun(tf); }); } executor.run(taskflow).wait();
Similar to tf::
taskflow.emplace([&](){ auto fu = std::async([](){ std::sleep(100s); }); executor.corun_until([](){ return fu.wait_for(std::chrono::seconds(0)) == future_status::ready; }); });
Thread Safety of Executor
All run_*
methods of tf::
tf::Executor executor; for(int i=0; i<10; ++i) { std::thread([i, &](){ // ... modify my taskflow at i executor.run(taskflows[i]); // run my taskflow at i }).detach(); } executor.wait_for_all();
Query the Worker ID
Each worker thread in a tf::[0, N)
, where N
is the number of worker threads in the executor. You can query the identifier of the calling thread using tf::
std::vector<int> worker_vectors[8]; // one vector per worker tf::Taskflow taskflow; tf::Executor executor(8); // an executor of eight workers assert(executor.this_worker_id() == -1); // master thread is not a worker taskflow.emplace([&](){ int id = executor.this_worker_id(); // in the range [0, 8) auto& vec = worker_vectors[worker_id]; // ... });
Observe Thread Activities
You can observe thread activities in an executor when a worker thread participates in executing a task and leaves the execution using tf::
class ObserverInterface { virtual ~ObserverInterface() = default; virtual void set_up(size_t num_workers) = 0; virtual void on_entry(tf::WorkerView worker_view, tf::TaskView task_view) = 0; virtual void on_exit(tf::WorkerView worker_view, tf::TaskView task_view) = 0; };
There are three methods you must define in your derived class, tf::
You can associate an executor with one or multiple observers (though one is common) using tf::
#include <taskflow/taskflow.hpp> struct MyObserver : public tf::ObserverInterface { MyObserver(const std::string& name) { std::cout << "constructing observer " << name << '\n'; } void set_up(size_t num_workers) override final { std::cout << "setting up observer with " << num_workers << " workers\n"; } void on_entry(tf::WorkerView w, tf::TaskView tv) override final { std::ostringstream oss; oss << "worker " << w.id() << " ready to run " << tv.name() << '\n'; std::cout << oss.str(); } void on_exit(tf::WorkerView w, tf::TaskView tv) override final { std::ostringstream oss; oss << "worker " << w.id() << " finished running " << tv.name() << '\n'; std::cout << oss.str(); } }; int main(){ tf::Executor executor(4); // Create a taskflow of eight tasks tf::Taskflow taskflow; auto A = taskflow.emplace([] () { std::cout << "1\n"; }).name("A"); auto B = taskflow.emplace([] () { std::cout << "2\n"; }).name("B"); auto C = taskflow.emplace([] () { std::cout << "3\n"; }).name("C"); auto D = taskflow.emplace([] () { std::cout << "4\n"; }).name("D"); auto E = taskflow.emplace([] () { std::cout << "5\n"; }).name("E"); auto F = taskflow.emplace([] () { std::cout << "6\n"; }).name("F"); auto G = taskflow.emplace([] () { std::cout << "7\n"; }).name("G"); auto H = taskflow.emplace([] () { std::cout << "8\n"; }).name("H"); // create an observer std::shared_ptr<MyObserver> observer = executor.make_observer<MyObserver>( "MyObserver" ); // run the taskflow executor.run(taskflow).get(); // remove the observer (optional) executor.remove_observer(std::move(observer)); return 0; }
The above code produces the following output:
constructing observer MyObserver setting up observer with 4 workers worker 2 ready to run A 1 worker 2 finished running A worker 2 ready to run B 2 worker 1 ready to run C worker 2 finished running B 3 worker 2 ready to run D worker 3 ready to run E worker 1 finished running C 4 5 worker 1 ready to run F worker 2 finished running D worker 3 finished running E 6 worker 2 ready to run G worker 3 ready to run H worker 1 finished running F 7 8 worker 2 finished running G worker 3 finished running H
It is expected each line of std::
Modify Worker Property
You can change the property of each worker thread from its executor, such as assigning thread-processor affinity before the worker enters the scheduler loop and post-processing additional information after the worker leaves the scheduler loop, by passing an instance derived from tf::
// affine the given thread to the given core index (linux-specific) bool affine(std::thread& thread, unsigned int core_id) { cpu_set_t cpuset; CPU_ZERO(&cpuset); CPU_SET(core_id, &cpuset); pthread_t native_handle = thread.native_handle(); return pthread_setaffinity_np(native_handle, sizeof(cpu_set_t), &cpuset) == 0; } class CustomWorkerBehavior : public tf::WorkerInterface { public: // to call before the worker enters the scheduling loop void scheduler_prologue(tf::Worker& w) override { printf("worker %lu prepares to enter the work-stealing loop\n", w.id()); // now affine the worker to a particular CPU core equal to its id if(affine(w.thread(), w.id())) { printf("successfully affines worker %lu to CPU core %lu\n", w.id(), w.id()); } else { printf("failed to affine worker %lu to CPU core %lu\n", w.id(), w.id()); } } // to call after the worker leaves the scheduling loop void scheduler_epilogue(tf::Worker& w, std::exception_ptr) override { printf("worker %lu left the work-stealing loop\n", w.id()); } }; int main() { tf::Executor executor(4, tf::make_worker_interface<CustomWorkerBehavior>()); return 0; }
When running the program, we see the following one possible output:
worker 3 prepares to enter the work-stealing loop successfully affines worker 3 to CPU core 3 worker 3 left the work-stealing loop worker 0 prepares to enter the work-stealing loop successfully affines worker 0 to CPU core 0 worker 0 left the work-stealing loop worker 1 prepares to enter the work-stealing loop worker 2 prepares to enter the work-stealing loop successfully affines worker 1 to CPU core 1 worker 1 left the work-stealing loop successfully affines worker 2 to CPU core 2 worker 2 left the work-stealing loop
When you create an executor, it spawns a set of worker threads to run tasks using a work-stealing scheduling algorithm. The execution logic of the scheduler and its interaction with each spawned worker via tf::
for(size_t n=0; n<num_workers; n++) { create_thread([](Worker& worker) // pre-processing executor-specific worker information // ... // enter the scheduling loop // Here, WorkerInterface::scheduler_prologue is invoked, if any worker_interface->scheduler_prologue(worker); try { while(1) { perform_work_stealing_algorithm(); if(stop) { break; } } } catch(...) { exception_ptr = std::current_exception(); } // leaves the scheduling loop and joins this worker thread // Here, WorkerInterface::scheduler_epilogue is invoked, if any worker_interface->scheduler_epilogue(worker, exception_ptr); ); }