We study a taskflow processing pipeline that propagates a sequence of tokens through linearly dependent taskflows. The pipeline embeds a taskflow in each pipe to run a parallel algorithm using task graph parallelism.
Formulate the Taskflow Processing Pipeline Problem
Many complex and irregular pipeline applications require each pipe to run a parallel algorithm using task graph parallelism. We can formulate such applications as scheduling a sequence of tokens through linearly dependent taskflows. The following example illustrates the pipeline propagation of three scheduling tokens through three linearly dependent taskflows:
Each pipe (stage) in the pipeline embeds a taskflow to perform a stage-specific parallel algorithm on an input scheduling token. Parallelism exhibits both inside and outside the three taskflows, combining both task graph parallelism and pipeline parallelism.
Create a Taskflow Processing Pipeline
Using the example from the previous section, we create a pipeline of three serial pipes each running a taskflow on a sequence of five scheduling tokens. The overall implementation is shown below:
#include <taskflow/taskflow.hpp>
#include <taskflow/algorithm/pipeline.hpp>
auto [A1, B1, C1, D1] =
tf.emplace(
[](){ printf("A1\n"); },
[](){ printf("B1\n"); },
[](){ printf("C1\n"); },
[](){ printf("D1\n"); }
);
A1.precede(B1, C1);
D1.succeed(B1, C1);
}
void make_taskflow2(tf::Taskflow& tf) {
auto [A2, B2, C2, D2] = tf.
emplace(
[](){ printf("A2\n"); },
[](){ printf("B2\n"); },
[](){ printf("C2\n"); },
[](){ printf("D2\n"); }
);
}
void make_taskflow3(tf::Taskflow& tf) {
auto [A3, B3, C3, D3] = tf.
emplace(
[](){ printf("A3\n"); },
[](){ printf("B3\n"); },
[](){ printf("C3\n"); },
[](){ printf("D3\n"); }
);
}
int main() {
tf::Taskflow taskflow("taskflow processing pipeline");
tf::Executor executor;
const size_t num_lines = 2;
const size_t num_pipes = 3;
std::array<tf::Taskflow, num_pipes> taskflows;
make_taskflow1(taskflows[0]);
make_taskflow2(taskflows[1]);
make_taskflow3(taskflows[2]);
tf::Pipeline pl(num_lines,
if(pf.token() == 5) {
pf.stop();
return;
}
printf("begin token %zu\n", pf.token());
executor.
corun(taskflows[pf.pipe()]);
}},
executor.
corun(taskflows[pf.pipe()]);
}},
executor.
corun(taskflows[pf.pipe()]);
}}
);
tf::Task init = taskflow.emplace([](){ std::cout << "ready\n"; })
.name("starting pipeline");
tf::Task task = taskflow.composed_of(pl)
.name("pipeline");
tf::Task stop = taskflow.emplace([](){ std::cout << "stopped\n"; })
.name("pipeline stopped");
init.precede(task);
taskflow.
dump(std::cout);
executor.
run(taskflow).wait();
return 0;
}
tf::Future< void > run(Taskflow &taskflow)
runs a taskflow once
void corun(T &target)
runs a target graph and waits until it completes using an internal worker of this executor
Task emplace(C &&callable)
creates a static task
Definition flow_builder.hpp:1352
void linearize(std::vector< Task > &tasks)
adds adjacent dependency links to a linear list of tasks
Definition flow_builder.hpp:1469
void dump(std::ostream &ostream) const
dumps the task through an output stream
Definition task.hpp:1171
Task & precede(Ts &&... tasks)
adds precedence links from this to other tasks
Definition task.hpp:947
class to create a taskflow object
Definition taskflow.hpp:64
taskflow namespace
Definition small_vector.hpp:20
@ SERIAL
serial type
Definition pipeline.hpp:117
Define Taskflows
First, we define three taskflows for the three pipes in the pipeline:
auto [A1, B1, C1, D1] =
tf.emplace(
[](){ printf("A1\n"); },
[](){ printf("B1\n"); },
[](){ printf("C1\n"); },
[](){ printf("D1\n"); }
);
A1.precede(B1, C1);
D1.succeed(B1, C1);
}
void make_taskflow2(tf::Taskflow& tf) {
auto [A2, B2, C2, D2] = tf.
emplace(
[](){ printf("A2\n"); },
[](){ printf("B2\n"); },
[](){ printf("C2\n"); },
[](){ printf("D2\n"); }
);
}
void make_taskflow3(tf::Taskflow& tf) {
auto [A3, B3, C3, D3] = tf.
emplace(
[](){ printf("A3\n"); },
[](){ printf("B3\n"); },
[](){ printf("C3\n"); },
[](){ printf("D3\n"); }
);
}
As each taskflow corresponds to a pipe in the pipeline, we create a linear array to store the three taskflows:
std::array<tf::Taskflow, num_pipes> taskflows;
make_taskflow1(taskflows[0]);
make_taskflow2(taskflows[1]);
make_taskflow3(taskflows[2]);
Since the three taskflows are linearly dependent, at most one taskflow will run at a pipe. We can store the three taskflows in a linear array of dimension equal to the number of pipes. If there is a parallel pipe, we need to use two-dimensional array, as multiple taskflows at a stage can run simultaneously across parallel lines.
Define the Pipes
The pipe definition is straightforward. Each pipe runs the corresponding taskflow, which can be indexed at taskflows with the pipe's identifier, tf::Pipeflow::pipe(). The first pipe will cease the pipeline scheduling when it has processed five scheduling tokens:
if(pf.token() == 5) {
pf.stop();
return;
}
printf("begin token %zu\n", pf.token());
executor.
corun(taskflows[pf.pipe()]);
}},
executor.
corun(taskflows[pf.pipe()]);
}},
executor.
corun(taskflows[pf.pipe()]);
}}
class to create a pipe object for a pipeline stage
Definition pipeline.hpp:144
class to create a pipeflow object used by the pipe callable
Definition pipeline.hpp:43
At each pipe, we use tf::Executor::corun to execute the corresponding taskflow and wait until the execution completes. This is important because we want the caller thread, which is the worker that invokes the pipe callable, to not block (i.e., executor.run(taskflows[pf.pipe()]).wait()) but participate in the work-stealing loop of the scheduler to avoid deadlock.
Define the Task Graph
To build up the taskflow for the pipeline, we create a module task with the defined pipeline structure and connect it with two tasks that output helper messages before and after the pipeline:
tf::Task init = taskflow.emplace([](){ std::cout <<
"ready\n"; })
.name("starting pipeline");
tf::Task stop = taskflow.emplace([](){ std::cout <<
"stopped\n"; })
.name("pipeline stopped");
class to create a task handle over a taskflow node
Definition task.hpp:263
const std::string & name() const
queries the name of the task
Definition task.hpp:1077
Task & composed_of(T &object)
creates a module task from a taskflow
Definition task.hpp:979
Submit the Task Graph
Finally, we submit the taskflow to the execution and run it once:
executor.
run(taskflow).wait();
One possible output is shown below:
ready
begin token 0
A1
C1
B1
D1
begin token 1
A2
B2
A1
C1
B1
D1
C2
D2
A3
D3
C3
B3
begin token 2
A2
B2
C2
D2
A1
C1
B1
D1
A3
D3
C3
B3
A2
B2
C2
D2
begin token 3
A3
D3
C3
B3
A1
C1
B1
D1
begin token 4
A2
A1
C1
B1
D1
B2
C2
D2
A3
D3
C3
B3
A2
B2
C2
D2
A3
D3
C3
B3
stopped