Loading...
Searching...
No Matches
Taskflow Processing Pipeline

We study a taskflow processing pipeline where each stage runs an entire task graph rather than a single function. This example demonstrates how Taskflow combines task graph parallelism inside each stage with pipeline parallelism across stages, achieving two levels of parallelism simultaneously.

Problem Formulation

Many real-world pipelines require each stage to run a parallel algorithm, not a single function. We model this as a sequence of tokens flowing through three serial pipes, where each pipe embeds a full taskflow graph:

Within each pipe, tasks inside the embedded taskflow run in parallel. Across pipes, the pipeline scheduler overlaps execution of different tokens in different stages. This produces two-level parallelism: intra-stage (task graph) and inter-stage (pipeline).

Implementation

We define three taskflows — one per stage — each with a different internal structure to illustrate the flexibility of this model. We then create a pipeline of three serial pipes, each running its corresponding taskflow via tf::Executor::corun:

#include <taskflow/taskflow.hpp>
#include <taskflow/algorithm/pipeline.hpp>
// stage 1: diamond taskflow (A → {B,C} → D)
void make_taskflow1(tf::Taskflow& tf) {
auto [A1, B1, C1, D1] = tf.emplace(
[](){ printf("A1\n"); },
[](){ printf("B1\n"); },
[](){ printf("C1\n"); },
[](){ printf("D1\n"); }
);
A1.precede(B1, C1);
D1.succeed(B1, C1);
}
// stage 2: linear chain taskflow (A2 → B2 → C2 → D2)
void make_taskflow2(tf::Taskflow& tf) {
auto [A2, B2, C2, D2] = tf.emplace(
[](){ printf("A2\n"); },
[](){ printf("B2\n"); },
[](){ printf("C2\n"); },
[](){ printf("D2\n"); }
);
tf.linearize({A2, B2, C2, D2});
}
// stage 3: broadcast taskflow (A3 → {B3, C3, D3})
void make_taskflow3(tf::Taskflow& tf) {
auto [A3, B3, C3, D3] = tf.emplace(
[](){ printf("A3\n"); },
[](){ printf("B3\n"); },
[](){ printf("C3\n"); },
[](){ printf("D3\n"); }
);
A3.precede(B3, C3, D3);
}
int main() {
tf::Taskflow taskflow("taskflow pipeline");
tf::Executor executor;
const size_t num_lines = 2;
const size_t num_pipes = 3;
// one taskflow per pipe (serial pipes → at most one token per pipe at a time)
std::array<tf::Taskflow, num_pipes> taskflows;
make_taskflow1(taskflows[0]);
make_taskflow2(taskflows[1]);
make_taskflow3(taskflows[2]);
tf::Pipeline pl(num_lines,
// stage 1: run taskflow1 for up to 5 tokens
tf::Pipe{tf::PipeType::SERIAL, [&](tf::Pipeflow& pf) {
if(pf.token() == 5) {
pf.stop();
return;
}
printf("token %zu enters stage 1\n", pf.token());
executor.corun(taskflows[pf.pipe()]);
}},
// stage 2: run taskflow2
tf::Pipe{tf::PipeType::SERIAL, [&](tf::Pipeflow& pf) {
executor.corun(taskflows[pf.pipe()]);
}},
// stage 3: run taskflow3
tf::Pipe{tf::PipeType::SERIAL, [&](tf::Pipeflow& pf) {
executor.corun(taskflows[pf.pipe()]);
}}
);
tf::Task init = taskflow.emplace([](){ std::cout << "ready\n"; })
.name("start");
tf::Task pipe = taskflow.composed_of(pl)
.name("pipeline");
tf::Task done = taskflow.emplace([](){ std::cout << "done\n"; })
.name("stop");
init.precede(pipe);
pipe.precede(done);
executor.run(taskflow).wait();
return 0;
}
tf::Future< void > run(Taskflow &taskflow)
runs a taskflow once
void corun(T &target)
runs a target graph and waits until it completes using an internal worker of this executor
Task emplace(C &&callable)
creates a static task
Definition flow_builder.hpp:1551
void linearize(std::vector< Task > &tasks)
adds adjacent dependency links to a linear list of tasks
Definition flow_builder.hpp:1668
Task & precede(Ts &&... tasks)
adds precedence links from this to other tasks
Definition task.hpp:952
class to create a taskflow object
Definition taskflow.hpp:64
taskflow namespace
Definition small_vector.hpp:20
@ SERIAL
serial type
Definition pipeline.hpp:117

Why corun Instead of run

Each pipe callable is itself executed by a worker thread. If we called executor.run(taskflows[...]).wait() inside the pipe, that worker would block waiting for the inner taskflow — preventing it from helping with other available tasks and potentially causing deadlock if all workers are blocked.

tf::Executor::corun avoids this: the calling worker stays active in the work-stealing loop while the inner taskflow executes, ensuring forward progress and preventing deadlock:

// correct: calling worker participates in executing the inner taskflow
executor.corun(taskflows[pf.pipe()]);
// wrong: calling worker blocks, may deadlock if all workers are waiting
executor.run(taskflows[pf.pipe()]).wait();

Taskflow Storage

Since all three pipes are serial, at most one token occupies each stage at any time. A one-dimensional array of taskflows — one per stage — is therefore sufficient:

std::array<tf::Taskflow, num_pipes> taskflows;

If any pipe were declared parallel, multiple tokens could be at the same stage simultaneously across different lines, requiring a two-dimensional storage of size (num_lines × num_pipes).

Task Graph

The outer task graph, including pipeline composition, is shown below: