Cookbook » Limit the Maximum Concurrency

This chapters discusses how to limit the concurrency or the maximum number of workers in your Taskflow applications.

Define a Semaphore

Taskflow provides a mechanism, tf::Semaphore, for you to limit the maximum concurrency in a section of tasks. You can let a task acquire/release one or multiple semaphores before/after executing its work. A task can acquire and release a semaphore, or just acquire or just release it. A tf::Semaphore object starts with an initial value. As long as that value is above 0, tasks can acquire the semaphore and do their work. If the value is 0 or less, a task trying to acquire the semaphore will not run but goes to a waiting list of that semaphore. When the semaphore is released by another task, it reschedules all tasks on that waiting list.

tf::Executor executor(8);   // create an executor of 8 workers
tf::Taskflow taskflow;

tf::Semaphore semaphore(1); // create a semaphore with initial value of 1

std::vector<tf::Task> tasks {
  taskflow.emplace([](){ std::cout << "A" << std::endl; }),
  taskflow.emplace([](){ std::cout << "B" << std::endl; }),
  taskflow.emplace([](){ std::cout << "C" << std::endl; }),
  taskflow.emplace([](){ std::cout << "D" << std::endl; }),
  taskflow.emplace([](){ std::cout << "E" << std::endl; })
};

for(auto & task : tasks) {  // each task acquires and release the semaphore
  task.acquire(semaphore);
  task.release(semaphore);
}

executor.run(taskflow).wait();
G A A B B C C D D E E

The above example creates five tasks with no dependencies between them. Under normal circumstances, the five tasks would be executed concurrently. However, this example has a semaphore with initial value of 1, and all tasks need to acquire that semaphore before running and release that semaphore after they are done. This organization limits the number of concurrently running tasks to only one. One possible output is shown below:

# the output is a sequential chain of five tasks
A
B
E
D
C

For the same example above, we can limit the semaphore concurrency to another value different from 1, say 3, which will limit only three workers to run the five tasks, A, B, C, D, and E.

tf::Executor executor(8);   // create an executor of 8 workers
tf::Taskflow taskflow;

tf::Semaphore semaphore(3); // create a semaphore with initial value of 3

std::vector<tf::Task> tasks {
  taskflow.emplace([](){ std::cout << "A" << std::endl; }),
  taskflow.emplace([](){ std::cout << "B" << std::endl; }),
  taskflow.emplace([](){ std::cout << "C" << std::endl; }),
  taskflow.emplace([](){ std::cout << "D" << std::endl; }),
  taskflow.emplace([](){ std::cout << "E" << std::endl; })
};

for(auto & task : tasks) {  // each task acquires and release the semaphore
  task.acquire(semaphore);
  task.release(semaphore);
}

executor.run(taskflow).wait();
# One possible output: A, B, and C run concurrently, D and E run concurrently
ABC
ED

Semaphores are powerful for limiting the maximum concurrency of not only a section of tasks but also different sections of tasks. Specifically, you can have one task acquire a semaphore and have another task release that semaphore to impose concurrency on subgraphs of tasks. The following example serializes the execution of five pairs of tasks using a semaphore rather than explicit dependencies.

tf::Executor executor(4);  // creates an executor of 4 workers
tf::Taskflow taskflow;
tf::Semaphore semaphore(1);

int N = 5;
int counter = 0;  // non-atomic integer counter

for(int i=0; i<N; i++) {
  tf::Task f = taskflow.emplace([&](){ counter++; })
                       .name("from-"s + std::to_string(i));
  tf::Task t = taskflow.emplace([&](){ counter++; })
                       .name("to-"s + std::to_string(i));
  f.precede(t);
  f.acquire(semaphore);
  t.release(semaphore);
}

executor.run(taskflow).wait();

assert(counter == 2*N);
Taskflow p0xc02d50 from-0 p0xc02e40 to-0 p0xc02d50->p0xc02e40 p0xc02f30 from-1 p0xc03020 to-1 p0xc02f30->p0xc03020 p0xc03110 from-2 p0xc03200 to-2 p0xc03110->p0xc03200 p0xc032f0 from-3 p0xc033e0 to-3 p0xc032f0->p0xc033e0 p0xc034d0 from-4 p0xc035c0 to-4 p0xc034d0->p0xc035c0

Without semaphores, each pair of tasks, e.g., from-0 -> to-0, will run independently and concurrently. However, the program forces each from task to acquire the semaphore before running its work and not to release it until its paired to task is done. This constraint forces each pair of tasks to run sequentially, while the order of which pair runs first is up to the scheduler.

Use Semaphores Across Different Tasks

You can use semaphores to limit the concurrency across different sections of taskflow graphs. When you submit multiple taskflows to an executor, the executor view them as a bag of dependent tasks. It does not matter which task in which taskflow graph acquires or releases a semaphore.

tf::Executor executor(8);   // create an executor of 8 workers
tf::Taskflow taskflow1;
tf::Taskflow taskflow2;

tf::Semaphore semaphore(1); // create a semaphore with initial value of 1

taskflow1.emplace([](){std::cout << "task in taskflow1"; })
         .acquire(semaphore)
         .release(semaphore);

taskflow2.emplace([](){std::cout << "task in taskflow2"; })
         .acquire(semaphore)
         .release(semaphore);

executor.run(taskflow1);
executor.run(taskflow2);
executor.wait_for_all();

The above examples creates one task from each taskflow and submits the two taskflows to the executor. Again, under normal circumstances, the two tasks can run concurrently, but the semaphore restricts one worker to run the two task sequentially in arbitrary order.

Define a Conflict Graph

One important application of tf::Semaphore is conflict-aware scheduling using a conflict graph. A conflict graph is a undirected graph where each vertex represents a task and each edge represents a conflict between a pair of tasks. When a task conflicts with another task, they cannot run together. Consider the conflict graph below, task A conflicts with task B and task C (and vice versa), meaning that A cannot run together with B and C whereas B and C can run together.

G A A B B A--B A conflicts B C C A--C A conflicts C

We can create one semaphore of one concurrency for each edge in the conflict graph and let the two tasks of that edge acquire the semaphore. This organization forces the two tasks to not run concurrently.

tf::Executor executor;
tf::Taskflow taskflow;

tf::Semaphore conflict_AB(1);
tf::Semaphore conflict_AC(1);

tf::Task A = taskflow.emplace([](){ std::cout << "A" << std::endl; });
tf::Task B = taskflow.emplace([](){ std::cout << "B" << std::endl; });
tf::Task C = taskflow.emplace([](){ std::cout << "C" << std::endl; });

// describe the conflict between A and B
A.acquire(conflict_AB).release(conflict_AB);
B.acquire(conflict_AB).release(conflict_AB);

// describe the conflict between A and C
A.acquire(conflict_AC).release(conflict_AC);
C.acquire(conflict_AC).release(conflict_AC);

executor.run(taskflow).wait();
# One possible output: B and C run concurrently after A
A
BC

Reset a Semaphore

You can reset a semaphore to its initial state using tf::Semaphore::reset(), or set a new maximum value with tf::Semaphore::reset(size_t new_max_value). The method tf::Semaphore::value() allows you to query the current value of the semaphore, which represents the number of available acquisitions.

tf::Semaphore semaphore(4);
assert(semaphore.value() == 4 && semaphore.max_value() == 4);

// reset the semaphore to a new value
semaphore.reset(11);
assert(semaphore.value() == 11 && semaphore.max_value() == 11);

Understand the Limitation of Semaphores

Currently, tf::Semaphore has limited support for exception handling and taskflow cancellation. If a task throws an exception or the taskflow is canceled, subsequent acquire and release operations on the semaphore may result in undefined behavior. To ensure correct behavior, you should call tf::Semaphore::reset before reusing the semaphore in the next run. For instance, in the code below, when task B throws an exception, the executor will cancel the execution of the taskflow. That is, tasks C and D will not run, and thus no task will release the acquired semaphore. To resolve this situation, we must reset the semaphore to a clean state for the next run.

tf::Executor executor;
tf::Taskflow taskflow;
tf::Semaphore semaphore(1);

tf::Task A = taskflow.emplace([](){});
tf::Task B = taskflow.emplace([](){ throw std::runtime_error("exception"); });
tf::Task C = taskflow.emplace([](){});
tf::Task D = taskflow.emplace([](){});
A.precede(B);
B.precede(C);
C.precede(D);

A.acquire(semaphore);
D.release(semaphore);

// current semaphore has a value of 1
assert(semaphore.value() == 1);

// when B throws the exception, D will not run and thus semaphore is not released
try {
  executor.run(taskflow).get();
}
catch(std::runtime_error& e) {
  std::cout << e.what() << std::endl;
}

// since A acquired the semaphore, its value is 0
assert(semaphore.value() == 0);

// reset the semaphore to a clean state before running the taskflow again
semaphore.reset();
assert(semaphore.value() == 1);

executor.run(taskflow).get();