Data-parallel Pipeline
Taskflow provides another variant, tf::
Include the Header
You need to include the header file, taskflow/algorithm/data_pipeline.hpp
, for implementing data-parallel pipeline algorithms.
#include <taskflow/algorithm/data_pipeline.hpp>
Create a Data Pipeline Module Task
Similar to creating a task-parallel pipeline (tf::
- Define the pipeline structure (e.g., pipe type, pipe callable, stopping rule, line count)
- Define the data storage and layout, if needed for the application
- Define the pipeline taskflow graph using composition
The following example creates a data-parallel pipeline that generates a total of five dataflow tokens from void
to int
at the first stage, from int
to std::string
at the second stage, and std::string
to void
at the final stage. Data storage between stages is automatically managed by tf::
#include <taskflow/taskflow.hpp> #include <taskflow/algorithm/data_pipeline.hpp> int main() { // data flow => void -> int -> std::string -> void tf::Taskflow taskflow("pipeline"); tf::Executor executor; const size_t num_lines = 4; // create a pipeline graph tf::DataPipeline pl(num_lines, tf::make_data_pipe<void, int>(tf::PipeType::SERIAL, [&](tf::Pipeflow& pf) -> int{ if(pf.token() == 5) { pf.stop(); return 0; } else { printf("first pipe returns %lu\n", pf.token()); return pf.token(); } }), tf::make_data_pipe<int, std::string>(tf::PipeType::SERIAL, [](int& input) { printf("second pipe returns a string of %d\n", input + 100); return std::to_string(input + 100); }), tf::make_data_pipe<std::string, void>(tf::PipeType::SERIAL, [](std::string& input) { printf("third pipe receives the input string %s\n", input.c_str()); }) ); // build the pipeline graph using composition taskflow.composed_of(pl).name("pipeline"); // dump the pipeline graph structure (with composition) taskflow.dump(std::cout); // run the pipeline executor.run(taskflow).wait(); return 0; }
The interface of tf::
tf::make_data_pipe<int, std::string>( tf::PipeType::SERIAL, [](int& input) { return std::to_string(input + 100); } );
The helper function starts with a pair of an input and an output types in its template arguments. Both types will always be decayed to their original form using std::const int&
becomes int
) for storage purpose. In terms of function arguments, the first argument specifies the direction of this data pipe, which can be either tf::
tf::make_data_pipe<int, std::string>( tf::PipeType::SERIAL, [](int& input, tf::Pipeflow& pf) { printf("token=%lu, line=%lu\n", pf.token(), pf.line()); return std::to_string(input + 100); } )
For the first pipe, the input type should always be void
and the callable must take a tf::
tf::make_data_pipe<void, int>(tf::PipeType::SERIAL, [](tf::Pipeflow& pf) -> int{ if(pf.token() == 5) { pf.stop(); return 0; // returns a dummy value } else { return pf.token(); } }),
Similarly, the output type of the last pipe should be void
as no more data will go out of the final pipe.
tf::make_data_pipe<std::string, void>(tf::PipeType::SERIAL, [](std::string& input) { std::cout << input << std::endl; })
Finally, you need to compose the pipeline graph by creating a module task (i.e., tf::Taskflow::compoased_of).
// build the pipeline graph using composition taskflow.composed_of(pl).name("pipeline"); // dump the pipeline graph structure (with composition) taskflow.dump(std::cout); // run the pipeline executor.run(taskflow).wait();
Understand Internal Data Storage
By default, tf::
Learn More about Taskflow Pipeline
Visit the following pages to learn more about pipeline: