#include <taskflow/algorithm/pipeline.hpp>
template<typename P>
ScalablePipeline class
class to create a scalable pipeline object
Template parameters | |
---|---|
P | type of the iterator to a range of pipes |
A scalable pipeline is a composable graph object for users to create a pipeline scheduling framework using a module task in a taskflow. Unlike tf::
tf::Taskflow taskflow("pipeline"); tf::Executor executor; const size_t num_lines = 4; // create data storage std::array<int, num_lines> buffer; // define the pipe callable auto pipe_callable = [&buffer] (tf::Pipeflow& pf) mutable { switch(pf.pipe()) { // first stage generates only 5 scheduling tokens and saves the // token number into the buffer. case 0: { if(pf.token() == 5) { pf.stop(); } else { printf("stage 1: input token = %zu\n", pf.token()); buffer[pf.line()] = pf.token(); } return; } break; // other stages propagate the previous result to this pipe and // increment it by one default: { printf( "stage %zu: input buffer[%zu] = %d\n", pf.pipe(), pf.line(), buffer[pf.line()] ); buffer[pf.line()] = buffer[pf.line()] + 1; } break; } }; // create a vector of three pipes std::vector< tf::Pipe<std::function<void(tf::Pipeflow&)>> > pipes; for(size_t i=0; i<3; i++) { pipes.emplace_back(tf::PipeType::SERIAL, pipe_callable); } // create a pipeline of four parallel lines based on the given vector of pipes tf::ScalablePipeline pl(num_lines, pipes.begin(), pipes.end()); // build the pipeline graph using composition tf::Task init = taskflow.emplace([](){ std::cout << "ready\n"; }) .name("starting pipeline"); tf::Task task = taskflow.composed_of(pl) .name("pipeline"); tf::Task stop = taskflow.emplace([](){ std::cout << "stopped\n"; }) .name("pipeline stopped"); // create task dependency init.precede(task); task.precede(stop); // dump the pipeline graph structure (with composition) taskflow.dump(std::cout); // run the pipeline executor.run(taskflow).wait(); // reset the pipeline to a new range of five pipes and starts from // the initial state (i.e., token counts from zero) for(size_t i=0; i<2; i++) { pipes.emplace_back(tf::PipeType::SERIAL, pipe_callable); } pl.reset(pipes.begin(), pipes.end()); executor.run(taskflow).wait();
The above example creates a pipeline graph that schedules five tokens over four parallel lines in a circular fashion, first going through three serial pipes and then five serial pipes:
# initial construction of three serial pipes o -> o -> o | | | v v v o -> o -> o | | | v v v o -> o -> o | | | v v v o -> o -> o # resetting to a new range of five serial pipes o -> o -> o -> o -> o | | | | | v v v v v o -> o -> o -> o -> o | | | | | v v v v v o -> o -> o -> o -> o | | | | | v v v v v o -> o -> o -> o -> o
Each pipe has the same type of tf::Pipe<std::function<void(tf::Pipeflow&)>>
and is kept in a vector that is amenable to change. We construct the scalable pipeline using two range iterators pointing to the beginning and the end of the vector. At each pipe stage, the program propagates the result to the next pipe by adding one to the result stored in a custom data storage, buffer
. The pipeline scheduler will generate five scheduling tokens and then stop.
A scalable pipeline is move-only.
Public types
-
using pipe_t = typename std::
iterator_traits<P>::value_type - pipe type
Constructors, destructors, conversion operators
- ScalablePipeline() defaulted
- default constructor
- ScalablePipeline(size_t num_lines)
- constructs an empty scalable pipeline object
- ScalablePipeline(size_t num_lines, P first, P last)
- constructs a scalable pipeline object
- ScalablePipeline(const ScalablePipeline&) deleted
- disabled copy constructor
- ScalablePipeline(ScalablePipeline&& rhs)
- move constructor
Public functions
- auto operator=(const ScalablePipeline&) -> ScalablePipeline& deleted
- disabled copy assignment operator
- auto operator=(ScalablePipeline&& rhs) -> ScalablePipeline&
- move constructor
- auto num_lines() const -> size_t noexcept
- queries the number of parallel lines
- auto num_pipes() const -> size_t noexcept
- queries the number of pipes
- void reset()
- resets the pipeline
- void reset(P first, P last)
- resets the pipeline with a new range of pipes
- void reset(size_t num_lines, P first, P last)
- resets the pipeline to a new line number and a new range of pipes
- auto num_tokens() const -> size_t noexcept
- queries the number of generated tokens in the pipeline
- auto graph() -> Graph&
- obtains the graph object associated with the pipeline construct
Function documentation
template<typename P>
tf:: ScalablePipeline<P>:: ScalablePipeline(size_t num_lines)
constructs an empty scalable pipeline object
Parameters | |
---|---|
num_lines | the number of parallel lines |
An empty scalable pipeline does not have any pipes. The pipeline needs to be reset to a valid range of pipes before running.
template<typename P>
tf:: ScalablePipeline<P>:: ScalablePipeline(size_t num_lines,
P first,
P last)
constructs a scalable pipeline object
Parameters | |
---|---|
num_lines | the number of parallel lines |
first | iterator to the beginning of the range |
last | iterator to the end of the range |
Constructs a pipeline from the given range of pipes specified in [first, last)
using num_lines
parallel lines. The first pipe must define a serial direction (tf::
Internally, the scalable pipeline copies the iterators from the specified range. Those pipe callables pointed to by these iterators must remain valid during the execution of the pipeline.
template<typename P>
tf:: ScalablePipeline<P>:: ScalablePipeline(ScalablePipeline&& rhs)
move constructor
Constructs a pipeline from the given rhs
using move semantics (i.e. the data in rhs
is moved into this pipeline). After the move, rhs
is in a state as if it is just constructed. The behavior is undefined if rhs
is running during the move.
template<typename P>
ScalablePipeline& tf:: ScalablePipeline<P>:: operator=(ScalablePipeline&& rhs)
move constructor
Replaces the contents with those of rhs
using move semantics (i.e. the data in rhs
is moved into this pipeline). After the move, rhs
is in a state as if it is just constructed. The behavior is undefined if rhs
is running during the move.
template<typename P>
size_t tf:: ScalablePipeline<P>:: num_lines() const noexcept
queries the number of parallel lines
The function returns the number of parallel lines given by the user upon the construction of the pipeline. The number of lines represents the maximum parallelism this pipeline can achieve.
template<typename P>
size_t tf:: ScalablePipeline<P>:: num_pipes() const noexcept
queries the number of pipes
The Function returns the number of pipes given by the user upon the construction of the pipeline.
template<typename P>
void tf:: ScalablePipeline<P>:: reset()
resets the pipeline
Resets the pipeline to the initial state. After resetting a pipeline, its token identifier will start from zero.
template<typename P>
void tf:: ScalablePipeline<P>:: reset(P first,
P last)
resets the pipeline with a new range of pipes
Parameters | |
---|---|
first | iterator to the beginning of the range |
last | iterator to the end of the range |
The member function assigns the pipeline to a new range of pipes specified in [first, last)
and resets the pipeline to the initial state. After resetting a pipeline, its token identifier will start from zero.
Internally, the scalable pipeline copies the iterators from the specified range. Those pipe callables pointed to by these iterators must remain valid during the execution of the pipeline.
template<typename P>
void tf:: ScalablePipeline<P>:: reset(size_t num_lines,
P first,
P last)
resets the pipeline to a new line number and a new range of pipes
Parameters | |
---|---|
num_lines | number of parallel lines |
first | iterator to the beginning of the range |
last | iterator to the end of the range |
The member function resets the pipeline to a new number of parallel lines and a new range of pipes specified in [first, last)
, as if the pipeline is just constructed. After resetting a pipeline, its token identifier will start from zero.
Internally, the scalable pipeline copies the iterators from the specified range. Those pipe callables pointed to by these iterators must remain valid during the execution of the pipeline.
template<typename P>
size_t tf:: ScalablePipeline<P>:: num_tokens() const noexcept
queries the number of generated tokens in the pipeline
The number represents the total scheduling tokens that has been generated by the pipeline so far.