We study a taskflow processing pipeline where each stage runs an entire task graph rather than a single function. This example demonstrates how Taskflow combines task graph parallelism inside each stage with pipeline parallelism across stages, achieving two levels of parallelism simultaneously.
Problem Formulation
Many real-world pipelines require each stage to run a parallel algorithm, not a single function. We model this as a sequence of tokens flowing through three serial pipes, where each pipe embeds a full taskflow graph:
Within each pipe, tasks inside the embedded taskflow run in parallel. Across pipes, the pipeline scheduler overlaps execution of different tokens in different stages. This produces two-level parallelism: intra-stage (task graph) and inter-stage (pipeline).
Implementation
We define three taskflows — one per stage — each with a different internal structure to illustrate the flexibility of this model. We then create a pipeline of three serial pipes, each running its corresponding taskflow via tf::Executor::corun:
#include <taskflow/taskflow.hpp>
#include <taskflow/algorithm/pipeline.hpp>
auto [A1, B1, C1, D1] =
tf.emplace(
[](){ printf("A1\n"); },
[](){ printf("B1\n"); },
[](){ printf("C1\n"); },
[](){ printf("D1\n"); }
);
A1.precede(B1, C1);
D1.succeed(B1, C1);
}
void make_taskflow2(tf::Taskflow& tf) {
auto [A2, B2, C2, D2] = tf.
emplace(
[](){ printf("A2\n"); },
[](){ printf("B2\n"); },
[](){ printf("C2\n"); },
[](){ printf("D2\n"); }
);
}
void make_taskflow3(tf::Taskflow& tf) {
auto [A3, B3, C3, D3] = tf.
emplace(
[](){ printf("A3\n"); },
[](){ printf("B3\n"); },
[](){ printf("C3\n"); },
[](){ printf("D3\n"); }
);
}
int main() {
tf::Taskflow taskflow("taskflow pipeline");
tf::Executor executor;
const size_t num_lines = 2;
const size_t num_pipes = 3;
std::array<tf::Taskflow, num_pipes> taskflows;
make_taskflow1(taskflows[0]);
make_taskflow2(taskflows[1]);
make_taskflow3(taskflows[2]);
tf::Pipeline pl(num_lines,
if(pf.token() == 5) {
pf.stop();
return;
}
printf("token %zu enters stage 1\n", pf.token());
executor.
corun(taskflows[pf.pipe()]);
}},
executor.
corun(taskflows[pf.pipe()]);
}},
executor.
corun(taskflows[pf.pipe()]);
}}
);
tf::Task init = taskflow.emplace([](){ std::cout << "ready\n"; })
.name("start");
tf::Task pipe = taskflow.composed_of(pl)
.name("pipeline");
tf::Task done = taskflow.emplace([](){ std::cout << "done\n"; })
.name("stop");
init.precede(pipe);
executor.
run(taskflow).wait();
return 0;
}
tf::Future< void > run(Taskflow &taskflow)
runs a taskflow once
void corun(T &target)
runs a target graph and waits until it completes using an internal worker of this executor
Task emplace(C &&callable)
creates a static task
Definition flow_builder.hpp:1551
void linearize(std::vector< Task > &tasks)
adds adjacent dependency links to a linear list of tasks
Definition flow_builder.hpp:1668
Task & precede(Ts &&... tasks)
adds precedence links from this to other tasks
Definition task.hpp:952
class to create a taskflow object
Definition taskflow.hpp:64
taskflow namespace
Definition small_vector.hpp:20
@ SERIAL
serial type
Definition pipeline.hpp:117
Why corun Instead of run
Each pipe callable is itself executed by a worker thread. If we called executor.run(taskflows[...]).wait() inside the pipe, that worker would block waiting for the inner taskflow — preventing it from helping with other available tasks and potentially causing deadlock if all workers are blocked.
tf::Executor::corun avoids this: the calling worker stays active in the work-stealing loop while the inner taskflow executes, ensuring forward progress and preventing deadlock:
executor.
corun(taskflows[pf.pipe()]);
executor.
run(taskflows[pf.pipe()]).wait();
Taskflow Storage
Since all three pipes are serial, at most one token occupies each stage at any time. A one-dimensional array of taskflows — one per stage — is therefore sufficient:
std::array<tf::Taskflow, num_pipes> taskflows;
If any pipe were declared parallel, multiple tokens could be at the same stage simultaneously across different lines, requiring a two-dimensional storage of size (num_lines × num_pipes).
Task Graph
The outer task graph, including pipeline composition, is shown below: