Loading...
Searching...
No Matches
Async Producer-Consumer Pipeline

We implement a producer-consumer pipeline using dependent-async tasks, demonstrating how tf::Executor::dependent_async naturally expresses stage-level dependencies between data items and how production and consumption overlap in time without any manual synchronization.

Problem Formulation

A producer generates N data items one by one. Each item must pass through a validator before it can be consumed. Consumption can begin as soon as an item is validated. There is no need to wait for the entire input to be produced first. The three-stage pipeline per item is:

  1. Produce: generate the data item
  2. Validate: verify the item is correct
  3. Consume: process the validated item

Items are independent of each other across all three stages, so stages of different items can overlap in time. This is exactly the kind of dynamic, data-driven structure that tf::Executor::dependent_async is designed for.

The following diagram illustrates the overlapping execution of four items through the three stages. Each item's stages are wired by dependency edges so that Produce must finish before Validate, and Validate before Consume. Because items are independent of one another, the executor schedules stages of different items in parallel whenever workers are available:

Implementation

We create three dependent-async tasks per item. Each task depends on the previous stage of the same item. Because tf::Executor::dependent_async begins executing a task as soon as all its predecessors complete, item i+1 can be produced while item i is being validated and item i-1 is being consumed:

#include <taskflow/taskflow.hpp>
// simulate data production, validation, and consumption
int produce (size_t i) { return static_cast<int>(i * i); }
bool validate(int value) { return value >= 0; }
void consume (size_t i, int val) { printf("consumed item %zu: %d\n", i, val); }
int main() {
tf::Executor executor;
const size_t N = 8;
std::vector<int> data(N);
std::vector<bool> valid(N, false);
for(size_t i = 0; i < N; i++) {
// stage 1: produce item i (no predecessor, runs immediately)
tf::AsyncTask prod = executor.silent_dependent_async([i, &data]() {
data[i] = produce(i);
printf("produced item %zu: %d\n", i, data[i]);
});
// stage 2: validate item i, depends on production completing
[i, &data, &valid]() {
valid[i] = validate(data[i]);
printf("validated item %zu: %s\n", i, valid[i] ? "ok" : "fail");
},
prod // predecessor: stage 1 of the same item
);
// stage 3: consume item i, depends on validation completing
[i, &data, &valid]() {
if(valid[i]) {
consume(i, data[i]);
}
},
val // predecessor: stage 2 of the same item
);
}
// wait for all submitted tasks to finish
executor.wait_for_all();
return 0;
}
class to hold a dependent asynchronous task with shared ownership
Definition async_task.hpp:45
class to create an executor
Definition executor.hpp:62
tf::AsyncTask silent_dependent_async(F &&func, Tasks &&... tasks)
runs the given function asynchronously when the given predecessors finish
void wait_for_all()
waits for all tasks to complete

Because each stage only depends on the previous stage of the same item rather than on the previous item's stages, the executor's work-stealing scheduler can run all three stages of different items concurrently across available workers. No mutex, condition variable, or queue is needed; the dependency edges express all synchronization requirements.

Conditional Consumption with Cooperative Execution

In some pipelines, the main thread needs to inspect intermediate results before deciding what to submit next. tf::AsyncTask::is_done provides a non-blocking way to check whether a specific task has completed. Combined with tf::Executor::corun_until, the calling thread remains active in the work-stealing loop while polling and never blocks.

The example below produces and validates one item, polls completion, and conditionally submits a downstream task based on the validation result:

tf::Executor executor;
int item = 42;
bool valid = false;
// stage 1: produce
auto [prod, fu_prod] = executor.dependent_async([&]() -> int {
return produce(0);
});
// stage 2: validate — depends on stage 1
auto [val, fu_val] = executor.dependent_async([&]() -> bool {
return validate(fu_prod.get());
}, prod);
// keep the calling thread's worker active while both tasks run
executor.corun_until([&]() {
return prod.is_done() && val.is_done();
});
// safe to read results: both tasks have completed
if(fu_val.get()) {
// only submit the consume task if validation passed
executor.silent_dependent_async([&]() {
consume(0, fu_prod.get());
}, val);
}
executor.wait_for_all();
bool is_done() const
checks if this dependent-async task finishes
Definition async_task.hpp:292
void corun_until(P &&predicate)
keeps running the work-stealing loop until the predicate returns true
auto dependent_async(F &&func, Tasks &&... tasks)
runs the given function asynchronously when the given predecessors finish
Note
tf::AsyncTask::is_done is designed to be used with tf::Executor::corun_until. Calling is_done in a tight spin-wait without corun_until risks starving the worker thread pool if the calling thread is itself one of the executor's workers. See Asynchronous Tasking with Dependencies for a full discussion of the dependent-async API.