Taskflow Algorithms » Data-parallel Pipeline

Taskflow provides another variant, tf::DataPipeline, on top of tf::Pipeline (see Task-parallel Pipeline) to help you implement data-parallel pipeline algorithms while leaving data management to Taskflow. We recommend you finishing reading TaskParallelPipeline first before learning tf::DataPipeline.

Include the Header

You need to include the header file, taskflow/algorithm/data_pipeline.hpp, for implementing data-parallel pipeline algorithms.

#include <taskflow/algorithm/data_pipeline.hpp>

Create a Data Pipeline Module Task

Similar to creating a task-parallel pipeline (tf::Pipeline), there are three steps to create a data-parallel pipeline application:

  1. Define the pipeline structure (e.g., pipe type, pipe callable, stopping rule, line count)
  2. Define the data storage and layout, if needed for the application
  3. Define the pipeline taskflow graph using composition

The following example creates a data-parallel pipeline that generates a total of five dataflow tokens from void to int at the first stage, from int to std::string at the second stage, and std::string to void at the final stage. Data storage between stages is automatically managed by tf::DataPipeline.

#include <taskflow/taskflow.hpp>
#include <taskflow/algorithm/data_pipeline.hpp>

int main() {

  // data flow => void -> int -> std::string -> void 
  tf::Taskflow taskflow("pipeline");
  tf::Executor executor;

  const size_t num_lines = 4;
  
  // create a pipeline graph
  tf::DataPipeline pl(num_lines,
    tf::make_data_pipe<void, int>(tf::PipeType::SERIAL, [&](tf::Pipeflow& pf) -> int{
      if(pf.token() == 5) {
        pf.stop();
        return 0;
      }
      else {
        printf("first pipe returns %lu\n", pf.token());
        return pf.token();
      }
    }),

    tf::make_data_pipe<int, std::string>(tf::PipeType::SERIAL, [](int& input) {
      printf("second pipe returns a string of %d\n", input + 100);
      return std::to_string(input + 100);
    }),

    tf::make_data_pipe<std::string, void>(tf::PipeType::SERIAL, [](std::string& input) {
      printf("third pipe receives the input string %s\n", input.c_str());
    })
  );

  // build the pipeline graph using composition
  taskflow.composed_of(pl).name("pipeline");

  // dump the pipeline graph structure (with composition)
  taskflow.dump(std::cout);

  // run the pipeline
  executor.run(taskflow).wait();

  return 0;
}

The interface of tf::DataPipeline is very similar to tf::Pipeline, except that the library transparently manages the dataflow between pipes. To create a stage in a data-parallel pipeline, you should always use the helper function tf::make_data_pipe:

tf::make_data_pipe<int, std::string>(
  tf::PipeType::SERIAL, 
  [](int& input) { 
    return std::to_string(input + 100);
  }
);

The helper function starts with a pair of an input and an output types in its template arguments. Both types will always be decayed to their original form using std::decay (e.g., const int& becomes int) for storage purpose. In terms of function arguments, the first argument specifies the direction of this data pipe, which can be either tf::PipeType::SERIAL or tf::PipeType::PARALLEL, and the second argument is a callable to invoke by the pipeline scheduler. The callable must take the input data type in its first argument and returns a value of the output data type. Additionally, the callable can take a tf::Pipeflow reference in its second argument which allows you to query the runtime information of a stage task, such as its line number and token number.

tf::make_data_pipe<int, std::string>(
  tf::PipeType::SERIAL, 
  [](int& input, tf::Pipeflow& pf) {
    printf("token=%lu, line=%lu\n", pf.token(), pf.line());
    return std::to_string(input + 100);
  }
)

For the first pipe, the input type should always be void and the callable must take a tf::Pipeflow reference in its argument. In this example, we will stop the pipeline when processing five tokens.

tf::make_data_pipe<void, int>(tf::PipeType::SERIAL, [](tf::Pipeflow& pf) -> int{
  if(pf.token() == 5) {
    pf.stop();
    return 0;    // returns a dummy value
  }
  else {
    return pf.token();
  }
}),

Similarly, the output type of the last pipe should be void as no more data will go out of the final pipe.

tf::make_data_pipe<std::string, void>(tf::PipeType::SERIAL, [](std::string& input) {
  std::cout << input << std::endl;
})

Finally, you need to compose the pipeline graph by creating a module task (i.e., tf::Taskflow::compoased_of).

// build the pipeline graph using composition
taskflow.composed_of(pl).name("pipeline");

// dump the pipeline graph structure (with composition)
taskflow.dump(std::cout);

// run the pipeline
executor.run(taskflow).wait();
Taskflow cluster_p0x7ffc47e53358 Taskflow cluster_p0x7ffc47e53220 m1 p0x1878a88 pipeline [m1] p0x1878600 cond p0x18786e8 rt-0 p0x1878600->p0x18786e8 0 p0x18787d0 rt-1 p0x1878600->p0x18787d0 1 p0x18788b8 rt-2 p0x1878600->p0x18788b8 2 p0x18789a0 rt-3 p0x1878600->p0x18789a0 3

Understand Internal Data Storage

By default, tf::DataPipeline uses std::variant to store a type-safe union of all input and output data types extracted from the given data pipes. To avoid false sharing, each line keeps a variant that is aligned with the cacheline size. When invoking a pipe callable, the input data is acquired in reference from the variant using std::get. When returning from a pipe callable, the output data is stored back to the variant using assignment operator.

Learn More about Taskflow Pipeline

Visit the following pages to learn more about pipeline:

  1. Task-parallel Pipeline
  2. Task-parallel Scalable Pipeline
  3. Text Processing Pipeline
  4. Graph Processing Pipeline
  5. Taskflow Processing Pipeline