#include <taskflow/algorithm/pipeline.hpp>
template<typename... Ps>
Pipeline class
class to create a pipeline scheduling framework
Template parameters | |
---|---|
Ps | pipe types |
A pipeline is a composable graph object for users to create a pipeline scheduling framework using a module task in a taskflow. Unlike the conventional pipeline programming frameworks (e.g., Intel TBB), Taskflow's pipeline algorithm does not provide any data abstraction, which often restricts users from optimizing data layouts in their applications, but a flexible framework for users to customize their application data atop our pipeline scheduling. The following code creates a pipeline of four parallel lines to schedule tokens through three serial pipes:
tf::Taskflow taskflow; tf::Executor executor; const size_t num_lines = 4; const size_t num_pipes = 3; // create a custom data buffer std::array<std::array<int, num_pipes>, num_lines> buffer; // create a pipeline graph of four concurrent lines and three serial pipes tf::Pipeline pipeline(num_lines, // first pipe must define a serial direction tf::Pipe{tf::PipeType::SERIAL, [&buffer](tf::Pipeflow& pf) { // generate only 5 scheduling tokens if(pf.token() == 5) { pf.stop(); } // save the token id into the buffer else { buffer[pf.line()][pf.pipe()] = pf.token(); } }}, tf::Pipe{tf::PipeType::SERIAL, [&buffer] (tf::Pipeflow& pf) { // propagate the previous result to this pipe by adding one buffer[pf.line()][pf.pipe()] = buffer[pf.line()][pf.pipe()-1] + 1; }}, tf::Pipe{tf::PipeType::SERIAL, [&buffer](tf::Pipeflow& pf){ // propagate the previous result to this pipe by adding one buffer[pf.line()][pf.pipe()] = buffer[pf.line()][pf.pipe()-1] + 1; }} ); // build the pipeline graph using composition tf::Task init = taskflow.emplace([](){ std::cout << "ready\n"; }) .name("starting pipeline"); tf::Task task = taskflow.composed_of(pipeline) .name("pipeline"); tf::Task stop = taskflow.emplace([](){ std::cout << "stopped\n"; }) .name("pipeline stopped"); // create task dependency init.precede(task); task.precede(stop); // run the pipeline executor.run(taskflow).wait();
The above example creates a pipeline graph that schedules five tokens over four parallel lines in a circular fashion, as depicted below:
o -> o -> o | | | v v v o -> o -> o | | | v v v o -> o -> o | | | v v v o -> o -> o
At each pipe stage, the program propagates the result to the next pipe by adding one to the result stored in a custom data storage, buffer
. The pipeline scheduler will generate five scheduling tokens and then stop.
Internally, tf::
Constructors, destructors, conversion operators
- Pipeline(size_t num_lines, Ps && ... ps)
- constructs a pipeline object
-
Pipeline(size_t num_lines,
std::
tuple<Ps...>&& ps) - constructs a pipeline object
Public functions
- auto num_lines() const -> size_t noexcept
- queries the number of parallel lines
- auto num_pipes() const -> size_t constexpr noexcept
- queries the number of pipes
- void reset()
- resets the pipeline
- auto num_tokens() const -> size_t noexcept
- queries the number of generated tokens in the pipeline
- auto graph() -> Graph&
- obtains the graph object associated with the pipeline construct
Function documentation
template<typename... Ps>
tf:: Pipeline<Ps>:: Pipeline(size_t num_lines,
Ps && ... ps)
constructs a pipeline object
Parameters | |
---|---|
num_lines | the number of parallel lines |
ps | a list of pipes |
Constructs a pipeline of up to num_lines
parallel lines to schedule tokens through the given linear chain of pipes. The first pipe must define a serial direction (tf::
template<typename... Ps>
tf:: Pipeline<Ps>:: Pipeline(size_t num_lines,
std:: tuple<Ps...>&& ps)
constructs a pipeline object
Parameters | |
---|---|
num_lines | the number of parallel lines |
ps | a tuple of pipes |
Constructs a pipeline of up to num_lines
parallel lines to schedule tokens through the given linear chain of pipes. The first pipe must define a serial direction (tf::
template<typename... Ps>
size_t tf:: Pipeline<Ps>:: num_lines() const noexcept
queries the number of parallel lines
The function returns the number of parallel lines given by the user upon the construction of the pipeline. The number of lines represents the maximum parallelism this pipeline can achieve.
template<typename... Ps>
size_t tf:: Pipeline<Ps>:: num_pipes() const constexpr noexcept
queries the number of pipes
The Function returns the number of pipes given by the user upon the construction of the pipeline.
template<typename... Ps>
void tf:: Pipeline<Ps>:: reset()
resets the pipeline
Resetting the pipeline to the initial state. After resetting a pipeline, its token identifier will start from zero as if the pipeline was just constructed.
template<typename... Ps>
size_t tf:: Pipeline<Ps>:: num_tokens() const noexcept
queries the number of generated tokens in the pipeline
The number represents the total scheduling tokens that has been generated by the pipeline so far.