Limit the Maximum Concurrency
This chapters discusses how to limit the concurrency or the maximum number of workers in subgraphs of a taskflow.
Define a Semaphore
Taskflow provides a mechanism, tf::
tf::Executor executor(8); // create an executor of 8 workers tf::Taskflow taskflow; tf::Semaphore semaphore(1); // create a semaphore with initial count 1 std::vector<tf::Task> tasks { taskflow.emplace([](){ std::cout << "A" << std::endl; }), taskflow.emplace([](){ std::cout << "B" << std::endl; }), taskflow.emplace([](){ std::cout << "C" << std::endl; }), taskflow.emplace([](){ std::cout << "D" << std::endl; }), taskflow.emplace([](){ std::cout << "E" << std::endl; }) }; for(auto & task : tasks) { // each task acquires and release the semaphore task.acquire(semaphore); task.release(semaphore); } executor.run(taskflow).wait();
The above example creates five tasks with no dependencies between them. Under normal circumstances, the five tasks would be executed concurrently. However, this example has a semaphore with initial count 1, and all tasks need to acquire that semaphore before running and release that semaphore after they are done. This organization limits the number of concurrently running tasks to only one. One possible output is shown below:
# the output is a sequential chain of five tasks A B E D C
For the same example above, we can limit the semaphore concurrency to another value different from 1, say 3, which will limit only three workers to run the five tasks, A
, B
, C
, D
, and E
.
tf::Executor executor(8); // create an executor of 8 workers tf::Taskflow taskflow; tf::Semaphore semaphore(3); // create a semaphore with initial count 3 std::vector<tf::Task> tasks { taskflow.emplace([](){ std::cout << "A" << std::endl; }), taskflow.emplace([](){ std::cout << "B" << std::endl; }), taskflow.emplace([](){ std::cout << "C" << std::endl; }), taskflow.emplace([](){ std::cout << "D" << std::endl; }), taskflow.emplace([](){ std::cout << "E" << std::endl; }) }; for(auto & task : tasks) { // each task acquires and release the semaphore task.acquire(semaphore); task.release(semaphore); } executor.run(taskflow).wait();
# One possible output: A, B, and C run concurrently, D and E run concurrently ABC ED
Semaphores are powerful for limiting the maximum concurrency of not only a section of tasks but also different sections of tasks. Specifically, you can have one task acquire a semaphore and have another task release that semaphore to impose concurrency on subgraphs of tasks. The following example serializes the execution of five pairs of tasks using a semaphore rather than explicit dependencies.
tf::Executor executor(4); // creates an executor of 4 workers tf::Taskflow taskflow; tf::Semaphore semaphore(1); int N = 5; int counter = 0; // non-atomic integer counter for(int i=0; i<N; i++) { tf::Task f = taskflow.emplace([&](){ counter++; }) .name("from-"s + std::to_string(i)); tf::Task t = taskflow.emplace([&](){ counter++; }) .name("to-"s + std::to_string(i)); f.precede(t); f.acquire(semaphore); t.release(semaphore); } executor.run(taskflow).wait(); assert(counter == 2*N);
Without semaphores, each pair of tasks, e.g., from-0 -> to-0
, will run independently and concurrently. However, the program forces each from
task to acquire the semaphore before running its work and not to release it until its paired to
task is done. This constraint forces each pair of tasks to run sequentially, while the order of which pair runs first is up to the scheduler.
Define a Critical Section
tf::
tf::Executor executor(8); // create an executor of 8 workers tf::Taskflow taskflow; // create a critical section of two workers tf::CriticalSection critical_section(2); tf::Task A = taskflow.emplace([](){ std::cout << "A" << std::endl; }); tf::Task B = taskflow.emplace([](){ std::cout << "B" << std::endl; }); tf::Task C = taskflow.emplace([](){ std::cout << "C" << std::endl; }); tf::Task D = taskflow.emplace([](){ std::cout << "D" << std::endl; }); tf::Task E = taskflow.emplace([](){ std::cout << "E" << std::endl; }); critical_section.add(A, B, C, D, E); executor.run(taskflow).wait();
Define a Conflict Graph
One important application of tf::A
conflicts with task B
and task C
(and vice versa), meaning that A
cannot run together with B
and C
whereas B
and C
can run together.
We can create one semaphore of one concurrency for each edge in the conflict graph and let the two tasks of that edge acquire the semaphore. This organization forces the two tasks to not run concurrently.
tf::Executor executor; tf::Taskflow taskflow; tf::Semaphore conflict_AB(1); tf::Semaphore conflict_AC(1); tf::Task A = taskflow.emplace([](){ std::cout << "A" << std::endl; }); tf::Task B = taskflow.emplace([](){ std::cout << "B" << std::endl; }); tf::Task C = taskflow.emplace([](){ std::cout << "C" << std::endl; }); // describe the conflict between A and B A.acquire(conflict_AB).release(conflict_AB); B.acquire(conflict_AB).release(conflict_AB); // describe the conflict between A and C A.acquire(conflict_AC).release(conflict_AC); C.acquire(conflict_AC).release(conflict_AC); executor.run(taskflow).wait();
# One possible output: B and C run concurrently after A A BC
The above code can be rewritten with tf::
tf::Executor executor; tf::Taskflow taskflow; tf::CriticalSection critical_section_AB(1); tf::CriticalSection critical_section_AC(1); tf::Task A = taskflow.emplace([](){ std::cout << "A" << std::endl; }); tf::Task B = taskflow.emplace([](){ std::cout << "B" << std::endl; }); tf::Task C = taskflow.emplace([](){ std::cout << "C" << std::endl; }); // describe the conflict graph critical_section_AB.add(A, B); critical_section_AC.add(A, C); executor.run(taskflow).wait();