Taskflow provides another variant, tf::DataPipeline, on top of tf::Pipeline (see Task-parallel Pipeline) to help you implement data-parallel pipeline algorithms while leaving data management to Taskflow. We recommend you finishing reading TaskParallelPipeline first before learning tf::DataPipeline.
You need to include the header file, taskflow/algorithm/data_pipeline.hpp, for implementing data-parallel pipeline algorithms.
Similar to creating a task-parallel pipeline (tf::Pipeline), there are three steps to create a data-parallel pipeline application:
The following example creates a data-parallel pipeline that generates a total of five dataflow tokens from void to int at the first stage, from int to std::string at the second stage, and std::string to void at the final stage. Data storage between stages is automatically managed by tf::DataPipeline.
The interface of tf::DataPipeline is very similar to tf::Pipeline, except that the library transparently manages the dataflow between pipes. To create a stage in a data-parallel pipeline, you should always use the helper function tf::make_data_pipe:
The helper function starts with a pair of an input and an output types in its template arguments. Both types will always be decayed to their original form using std::decay (e.g., const int& becomes int) for storage purpose. In terms of function arguments, the first argument specifies the direction of this data pipe, which can be either tf::PipeType::SERIAL or tf::PipeType::PARALLEL, and the second argument is a callable to invoke by the pipeline scheduler. The callable must take the input data type in its first argument and returns a value of the output data type. Additionally, the callable can take a tf::Pipeflow reference in its second argument which allows you to query the runtime information of a stage task, such as its line number and token number.
For the first pipe, the input type should always be void and the callable must take a tf::Pipeflow reference in its argument. In this example, we will stop the pipeline when processing five tokens.
Similarly, the output type of the last pipe should be void as no more data will go out of the final pipe.
Finally, you need to compose the pipeline graph by creating a module task (i.e., tf::Taskflow::compoased_of).
By default, tf::DataPipeline uses std::variant to store a type-safe union of all input and output data types extracted from the given data pipes. To avoid false sharing, each line keeps a variant that is aligned with the cacheline size. When invoking a pipe callable, the input data is acquired in reference from the variant using std::get. When returning from a pipe callable, the output data is stored back to the variant using assignment operator.