Limit the Maximum Concurrency
This chapters discusses how to limit the maximum concurrency or parallelism of workers running inside tasks.
Define a Semaphore
Taskflow provides a mechanism, tf::
tf::Executor executor(8); // create an executor of 8 workers tf::Taskflow taskflow; tf::Semaphore semaphore(1); // create a semaphore with initial count 1 int counter = 0; // create 1000 independent tasks in the taskflow for(size_t i=0; i<1000; i++) { taskflow.emplace([&](tf::Runtime& rt){ rt.acquire(semaphore); counter++; // only one worker will increment the counter at any time rt.release(semaphore); }); } executor.run(taskflow).wait();
The above example creates 1000 tasks with no dependencies between them. Each task increments counter
by one. Since this increment operation is protected by the semaphore initialized with a count of 1
, no multiple workers will run this operation at the same time. In other words, the semaphore limits the parallelism of the 1000 tasks to 1.
We can create a semaphore with a different count value, such as 3, to limit the parallelism of independent tasks to 3.
tf::Executor executor(8); // create an executor of 8 workers tf::Taskflow taskflow; tf::Semaphore semaphore(3); // create a semaphore with initial count 3 // create a task that acquires and releases the semaphore taskflow.emplace([&](tf::Runtime& rt){ rt.acquire(semaphore); std::cout << "A" << std::endl; rt.release(semaphore); }); // create a task that acquires and releases the semaphore taskflow.emplace([&](tf::Runtime& rt){ rt.acquire(semaphore); std::cout << "B" << std::endl; rt.release(semaphore); }); // create a task that acquires and releases the semaphore taskflow.emplace([&](tf::Runtime& rt){ rt.acquire(semaphore); std::cout << "C" << std::endl; rt.release(semaphore); }); // create a task that acquires and releases the semaphore taskflow.emplace([&](tf::Runtime& rt){ rt.acquire(semaphore); std::cout << "D" << std::endl; rt.release(semaphore); }); // create a task that acquires and releases the semaphore taskflow.emplace([&](tf::Runtime& rt){ rt.acquire(semaphore); std::cout << "E" << std::endl; rt.release(semaphore); }); executor.run(taskflow).wait();
# One possible output: A, B, and C run concurrently, D and E run concurrently ABC ED
tf::
tf::Executor executor(4); // creates an executor of 4 workers tf::Taskflow taskflow; tf::Semaphore semaphore(1); int N = 5; int counter = 0; // non-atomic integer counter for(int i=0; i<N; i++) { tf::Task f = taskflow.emplace([&](tf::Runtime& rt){ rt.acquire(semaphore); counter++; }).name("from-"s + std::to_string(i)); tf::Task t = taskflow.emplace([&](tf::Runtime& rt){ counter++; rt.release(semaphore); }).name("to-"s + std::to_string(i)); f.precede(t); } executor.run(taskflow).wait(); assert(counter == 2*N);
Without semaphores, each pair of tasks, e.g., from-0 -> to-0
, will run independently and concurrently. However, the program forces each from
task to acquire the semaphore before running its work and not to release it until its paired to
task is done. This constraint forces each pair of tasks to run sequentially, while the order of which pair runs first is up to the scheduler.
Define a Conflict Graph
One important application of tf::A
conflicts with task B
and task C
(and vice versa), meaning that A
cannot run together with B
and C
whereas B
and C
can run together.
We can create one semaphore of one concurrency for each edge in the conflict graph and let the two tasks of that edge acquire the semaphore. This organization forces the two tasks to not run concurrently.
tf::Executor executor; tf::Taskflow taskflow; tf::Semaphore conflict_AB(1); tf::Semaphore conflict_AC(1); // task A cannot run in parallel with task B and task C tf::Task A = taskflow.emplace([&](tf::Runtime& rt){ rt.acquire(conflict_AB, conflict_AC); std::cout << "A" << std::endl; rt.release(conflict_AB, conflict_AC); }); // task B cannot run in parallel with task A tf::Task B = taskflow.emplace([&](tf::Runtime& rt){ rt.acquire(conflict_AB); std::cout << "B" << std::endl; rt.release(conflict_AB); }); // task C cannot run in parallel with task A tf::Task C = taskflow.emplace([&](tf::Runtime& rt){ rt.acquire(conflict_AC); std::cout << "C" << std::endl; rt.release(conflict_AC); }); executor.run(taskflow).wait();
# One possible output: B and C run concurrently after A A BC
Use a Semaphore across Different Tasks
You can use tf::
tf::Executor executor; tf::Taskflow taskflow1, taskflow2; int counter(0); size_t N = 2000; for(size_t i=0; i<N; i++) { // acquire and release the semaphore from a task in taskflow1 taskflow1.emplace([&](tf::Runtime& rt){ rt.acquire(s); counter++; rt.release(s); }); // acquire and release the semaphore from a task in another taskflow2 taskflow2.emplace([&](tf::Runtime& rt){ rt.acquire(s); counter++; rt.release(s); }); // acquire and release the semaphore from an async task executor.async([&](tf::Runtime& rt){ rt.acquire(s); counter++; rt.release(s); }); } executor.wait_for_all(); assert(counter == 3*N);