Task-parallel Scalable Pipeline
Unlike tf::
Include the Header
You need to include the header file, taskflow/algorithm/pipeline.hpp
, for creating a scalable pipeline scheduling framework.
#include <taskflow/algorithm/pipeline.hpp>
Create a Scalable Pipeline Module Task
Similar to tf::
tf::Taskflow taskflow("pipeline"); tf::Executor executor; const size_t num_lines = 4; // create data storage std::array<int, num_lines> buffer; // define the pipe callable auto pipe_callable = [&buffer] (tf::Pipeflow& pf) mutable { switch(pf.pipe()) { // first stage generates only 5 scheduling tokens and saves the // token number into the buffer. case 0: { if(pf.token() == 5) { pf.stop(); } else { printf("stage 1: input token = %zu\n", pf.token()); buffer[pf.line()] = pf.token(); } return; } break; // other stages propagate the previous result to this pipe and // increment it by one default: { printf( "stage %zu: input buffer[%zu] = %d\n", pf.pipe(), pf.line(), buffer[pf.line()] ); buffer[pf.line()] = buffer[pf.line()] + 1; } break; } }; // create a vector of three pipes std::vector< tf::Pipe<std::function<void(tf::Pipeflow&)>> > pipes; for(size_t i=0; i<3; i++) { pipes.emplace_back(tf::PipeType::SERIAL, pipe_callable); } // create a pipeline of four parallel lines based on the given vector of pipes tf::ScalablePipeline pl(num_lines, pipes.begin(), pipes.end()); // build the pipeline graph using composition tf::Task init = taskflow.emplace([](){ std::cout << "ready\n"; }) .name("starting pipeline"); tf::Task task = taskflow.composed_of(pl) .name("pipeline"); tf::Task stop = taskflow.emplace([](){ std::cout << "stopped\n"; }) .name("pipeline stopped"); // create task dependency init.precede(task); task.precede(stop); // dump the pipeline graph structure (with composition) taskflow.dump(std::cout); // run the pipeline executor.run(taskflow).wait(); // reset the pipeline to a new range of five pipes and starts from // the initial state (i.e., token counts from zero) for(size_t i=0; i<2; i++) { pipes.emplace_back(tf::PipeType::SERIAL, pipe_callable); } pl.reset(pipes.begin(), pipes.end()); executor.run(taskflow).wait();
The program defines a uniform pipe type of tf::Pipe<std::function<void(tf::Pipeflow&)>>
and keep all pipes in a vector that is amenable to change. Then, it constructs a scalable pipeline using two range iterators, [first, last)
, that point to the beginning and the end of the pipe vector, resulting in a pipeline of three serial stages:
Then, the program appends another two pipes into the vector and resets the pipeline to the new range of two additional pipes, resulting in a pipeline of five serial stages:
When resetting a scalable pipeline to a new range, it will start from the initial state as if it has just been constructed, i.e., the token number counts from zero.
Reset a Placeholder Scalable Pipeline
It is possible to create a scalable pipeline as a placeholder using the constructor tf::
tf::Executor executor; tf::Taskflow taskflow; size_t num_pipes = 10; size_t num_lines = 10; std::vector<tf::Pipe<std::function<void(tf::Pipeflow&)>>> pipes; tf::ScalablePipeline<typename decltype(pipes)::iterator> spl(num_lines); tf::Task init = taskflow.emplace([&](){ for(size_t i=0; i<num_pipes; i++) { pipes.emplace_back(tf::PipeType::SERIAL, [&](tf::Pipeflow& pf) { if(pf.pipe() == 0 && pf.token() == 1024) { pf.stop(); return; } }); } spl.reset(pipes.begin(), pipes.end()); }).name("init"); tf::Task pipeline = taskflow.composed_of(spl).name("pipeline"); pipeline.succeed(init); executor.run(taskflow).wait();
The task graph of this program is shown below:
Similarly, you can create an empty scalable pipeline using the default constructor tf::
std::vector<tf::Pipe<std::function<void(tf::Pipeflow&)>>> pipes; tf::ScalablePipeline<typename decltype(pipes)::iterator> spl; // create pipes ... spl.reset(num_lines, pipes.begin(), pipes.end());
Use Other Iterator Types
When assigning a range to a scalable pipeline, the pipeline fetches all pipe iterators in that range to an internal vector. This organization allows invoking a pipe callable to be a random accessible operation, regardless of the pipe container type. Taskflow does not have much restriction on the iterator type, as long as these pipes can be iterated in a sequential order using the postfix increment operator, ++
.
// use vector to store pipes std::vector<tf::Pipe<std::function<void(tf::Pipeflow&)>>> vector; tf::ScalablePipeline spl1(num_lines, vector.begin(), vector.end()); // use list to store pipes std::list<tf::Pipe<std::function<void(tf::Pipeflow&)>>> list; tf::ScalablePipeline spl2(num_lines, list.begin(), list.end());
Learn More about Taskflow Pipeline
Visit the following pages to learn more about pipeline: