Taskflow Algorithms » Task-parallel Scalable Pipeline

Unlike tf::Pipeline (see Task-parallel Pipeline) that instantiates all pipes at the construction time, Taskflow provides a scalable alternative called tf::ScalablePipeline to allow variable assignments of pipes using range iterators. A scalable pipeline is thus more flexible for applications to create a pipeline scheduling framework whose pipeline structure depends on runtime variables.

Include the Header

You need to include the header file, taskflow/algorithm/pipeline.hpp, for creating a scalable pipeline scheduling framework.

#include <taskflow/algorithm/pipeline.hpp>

Create a Scalable Pipeline Module Task

Similar to tf::Pipeline, tf::ScalablePipeline is a composable graph object to implement a pipeline scheduling framework in a taskflow. The key difference between tf::Pipeline and tf::ScalablePipeline is that a scalable pipeline can accept variable assignments of pipes rather than instantiating all pipes at construction or programming time. Users define a linear range of pipes, each of the same callable type, and apply that range to construct a scalable pipeline. Between successive runs, users can reset the pipeline to a different range of pipes. The following code creates a scalable pipeline that uses four parallel lines to schedule tokens through three serial pipes in the given vector, then resetting that pipeline to a new range of five serial pipes:

tf::Taskflow taskflow("pipeline");
tf::Executor executor;

const size_t num_lines = 4;

// create data storage
std::array<int, num_lines> buffer;

// define the pipe callable
auto pipe_callable = [&buffer] (tf::Pipeflow& pf) mutable {
  switch(pf.pipe()) {
    // first stage generates only 5 scheduling tokens and saves the 
    // token number into the buffer.
    case 0: {
      if(pf.token() == 5) {
        pf.stop();
      }
      else {
        printf("stage 1: input token = %zu\n", pf.token());
        buffer[pf.line()] = pf.token();
      }
      return;
    }
    break;
    
    // other stages propagate the previous result to this pipe and
    // increment it by one
    default: {
      printf(
        "stage %zu: input buffer[%zu] = %d\n", pf.pipe(), pf.line(), buffer[pf.line()]
      );
      buffer[pf.line()] = buffer[pf.line()] + 1;
    } 
    break;
  }
};

// create a vector of three pipes
std::vector< tf::Pipe<std::function<void(tf::Pipeflow&)>> > pipes;

for(size_t i=0; i<3; i++) {
  pipes.emplace_back(tf::PipeType::SERIAL, pipe_callable);
}

// create a pipeline of four parallel lines based on the given vector of pipes
tf::ScalablePipeline pl(num_lines, pipes.begin(), pipes.end());

// build the pipeline graph using composition
tf::Task init = taskflow.emplace([](){ std::cout << "ready\n"; })
                        .name("starting pipeline");
tf::Task task = taskflow.composed_of(pl)
                        .name("pipeline");
tf::Task stop = taskflow.emplace([](){ std::cout << "stopped\n"; })
                        .name("pipeline stopped");

// create task dependency
init.precede(task);
task.precede(stop);

// dump the pipeline graph structure (with composition)
taskflow.dump(std::cout);

// run the pipeline
executor.run(taskflow).wait();

// reset the pipeline to a new range of five pipes and starts from
// the initial state (i.e., token counts from zero)
for(size_t i=0; i<2; i++) {
  pipes.emplace_back(tf::PipeType::SERIAL, pipe_callable);
}
pl.reset(pipes.begin(), pipes.end());

executor.run(taskflow).wait();

The program defines a uniform pipe type of tf::Pipe<std::function<void(tf::Pipeflow&)>> and keep all pipes in a vector that is amenable to change. Then, it constructs a scalable pipeline using two range iterators, [first, last), that point to the beginning and the end of the pipe vector, resulting in a pipeline of three serial stages:

Taskflow cluster0 cluster1 cluster2 cluster3 p00 pipe-0 p10 pipe-0 p00->p10 p01 pipe-1 p00->p01 p20 pipe-0 p10->p20 p11 pipe-1 p10->p11 p30 pipe-0 p20->p30 p21 pipe-1 p20->p21 p31 pipe-1 p30->p31 p01->p11 p02 pipe-2 p01->p02 p11->p21 p12 pipe-2 p11->p12 p21->p31 p22 pipe-2 p21->p22 p32 pipe-2 p31->p32 p02->p12 p12->p22 p22->p32

Then, the program appends another two pipes into the vector and resets the pipeline to the new range of two additional pipes, resulting in a pipeline of five serial stages:

Taskflow cluster0 cluster1 cluster2 cluster3 p00 pipe-0 p10 pipe-0 p00->p10 p01 pipe-1 p00->p01 p20 pipe-0 p10->p20 p11 pipe-1 p10->p11 p30 pipe-0 p20->p30 p21 pipe-1 p20->p21 p31 pipe-1 p30->p31 p01->p11 p02 pipe-2 p01->p02 p11->p21 p12 pipe-2 p11->p12 p21->p31 p22 pipe-2 p21->p22 p32 pipe-2 p31->p32 p02->p12 p03 pipe-3 p02->p03 p12->p22 p13 pipe-3 p12->p13 p22->p32 p23 pipe-3 p22->p23 p33 pipe-3 p32->p33 p03->p13 p04 pipe-4 p03->p04 p13->p23 p14 pipe-4 p13->p14 p23->p33 p24 pipe-4 p23->p24 p34 pipe-4 p33->p34 p04->p14 p14->p24 p24->p34

When resetting a scalable pipeline to a new range, it will start from the initial state as if it has just been constructed, i.e., the token number counts from zero.

Reset a Placeholder Scalable Pipeline

It is possible to create a scalable pipeline as a placeholder using the constructor tf::ScalablePipeline(size_t num_lines) and reset it to another range later in the application. The following code creates a task to emplace a range of pipes and reset the pipeline to that range, before running the pipeline task:

tf::Executor executor;
tf::Taskflow taskflow;

size_t num_pipes = 10;
size_t num_lines = 10;

std::vector<tf::Pipe<std::function<void(tf::Pipeflow&)>>> pipes;
tf::ScalablePipeline<typename decltype(pipes)::iterator> spl(num_lines); 

tf::Task init = taskflow.emplace([&](){
  for(size_t i=0; i<num_pipes; i++) {
    pipes.emplace_back(tf::PipeType::SERIAL, [&](tf::Pipeflow& pf) { 
      if(pf.pipe() == 0 && pf.token() == 1024) {
        pf.stop();
        return;
      }
    });
  }
  spl.reset(pipes.begin(), pipes.end());
}).name("init");

tf::Task pipeline = taskflow.composed_of(spl).name("pipeline");
pipeline.succeed(init);
executor.run(taskflow).wait();  

The task graph of this program is shown below:

Taskflow cluster_p0x7ffcc76abe80 Taskflow cluster_p0x7ffcc76abd30 m1 p0x8e9a68 init pipes p0x8e9980 pipeline [m1] p0x8e9a68->p0x8e9980 p0x8ea460 cond p0x8ea378 rt-0 p0x8ea460->p0x8ea378 0 p0x8ea290 rt-1 p0x8ea460->p0x8ea290 1 p0x8ea1a8 rt-2 p0x8ea460->p0x8ea1a8 2 p0x8ea0c0 rt-3 p0x8ea460->p0x8ea0c0 3 p0x8e9fd8 rt-4 p0x8ea460->p0x8e9fd8 4 p0x8e9ef0 rt-5 p0x8ea460->p0x8e9ef0 5 p0x8e9e08 rt-6 p0x8ea460->p0x8e9e08 6 p0x8e9d20 rt-7 p0x8ea460->p0x8e9d20 7 p0x8e9c38 rt-8 p0x8ea460->p0x8e9c38 8 p0x8e9b50 rt-9 p0x8ea460->p0x8e9b50 9

Similarly, you can create an empty scalable pipeline using the default constructor tf::ScalablePipeline() and reset it later in your program.

std::vector<tf::Pipe<std::function<void(tf::Pipeflow&)>>> pipes;
tf::ScalablePipeline<typename decltype(pipes)::iterator> spl; 
// create pipes ...
spl.reset(num_lines, pipes.begin(), pipes.end());

Use Other Iterator Types

When assigning a range to a scalable pipeline, the pipeline fetches all pipe iterators in that range to an internal vector. This organization allows invoking a pipe callable to be a random accessible operation, regardless of the pipe container type. Taskflow does not have much restriction on the iterator type, as long as these pipes can be iterated in a sequential order using the postfix increment operator, ++.

// use vector to store pipes
std::vector<tf::Pipe<std::function<void(tf::Pipeflow&)>>> vector;
tf::ScalablePipeline spl1(num_lines, vector.begin(), vector.end());

// use list to store pipes
std::list<tf::Pipe<std::function<void(tf::Pipeflow&)>>> list;
tf::ScalablePipeline spl2(num_lines, list.begin(), list.end());

Learn More about Taskflow Pipeline

Visit the following pages to learn more about pipeline: