We study a text processing pipeline that finds the most frequent character in each string from an input source, demonstrating how Taskflow's pipeline model overlaps serial and parallel stages to process a stream of tokens efficiently.
Problem Formulation
Given a vector of strings, we want to find the most frequent character in each string and output the result in the same order as the input. For example:
# input
abade ddddf eefge xyzzd ijjjj jiiii kkijk
# output (most frequent character : count)
a:2 d:4 e:3 z:2 j:4 i:4 k:3
We decompose the computation into three stages:
- Read (serial) — read one string from the input vector in order
- Count (parallel) — build a character-frequency map from the string
- Reduce (serial) — find the most frequent character from the map and output the result
The first and third stages must run serially to preserve input/output order. The second stage is independent across strings and can run in parallel across multiple pipeline lines.
Creating the Pipeline
We create a pipeline of three pipes with two parallel lines. A larger line count increases throughput at the cost of memory — in practice, std::thread::hardware_concurrency is a good default.
#include <taskflow/taskflow.hpp>
#include <taskflow/algorithm/pipeline.hpp>
std::string format_map(const std::unordered_map<char, size_t>& map) {
std::ostringstream oss;
for(const auto& [c, n] : map) oss << c << ':' << n << ' ';
return oss.str();
}
int main() {
const size_t num_lines = 2;
std::vector<std::string> input = {
"abade", "ddddf", "eefge", "xyzzd", "ijjjj", "jiiii", "kkijk"
};
using data_type = std::variant<
std::string,
std::unordered_map<char, size_t>,
std::pair<char, size_t>
>;
std::array<data_type, num_lines> buffer;
if(pf.token() == input.size()) {
pf.stop();
}
else {
printf("stage 1: %s\n", input[pf.token()].c_str());
buffer[pf.line()] = input[pf.token()];
}
}},
std::unordered_map<char, size_t> map;
for(char c : std::get<std::string>(buffer[pf.line()])) map[c]++;
printf("stage 2: %s\n", format_map(map).c_str());
buffer[pf.line()] = map;
}},
auto& map = std::get<std::unordered_map<char, size_t>>(buffer[pf.line()]);
auto sol = std::max_element(map.begin(), map.end(),
[](const auto& a, const auto& b) { return a.second < b.second; }
);
printf("stage 3: %c:%zu\n", sol->first, sol->second);
buffer[pf.line()] = *sol;
}}
);
tf::Task init = taskflow.emplace([](){ std::cout <<
"ready\n"; })
.name("start");
tf::Task done = taskflow.emplace([](){ std::cout <<
"done\n"; })
.name("stop");
executor.
run(taskflow).wait();
return 0;
}
class to create an executor
Definition executor.hpp:62
tf::Future< void > run(Taskflow &taskflow)
runs a taskflow once
class to create a pipe object for a pipeline stage
Definition pipeline.hpp:144
class to create a pipeflow object used by the pipe callable
Definition pipeline.hpp:43
class to create a pipeline scheduling framework
Definition pipeline.hpp:307
class to create a task handle over a taskflow node
Definition task.hpp:263
const std::string & name() const
queries the name of the task
Definition task.hpp:1082
Task & precede(Ts &&... tasks)
adds precedence links from this to other tasks
Definition task.hpp:952
Task & composed_of(T &object)
creates a module task from a taskflow
Definition task.hpp:984
class to create a taskflow object
Definition taskflow.hpp:64
Data Buffer
Taskflow gives users full control over data management in a pipeline. We allocate a one-dimensional buffer indexed by pipeline line:
std::array<data_type, num_lines> buffer;
A one-dimensional buffer is sufficient because Taskflow guarantees that at most one scheduling token is active per line at any time, so no two tokens will read or write the same buffer slot simultaneously.
- Note
- Only input elements are transformed by stage functions — the pipeline scheduling token index (tf::Pipeflow::token) identifies which input element is being processed, and the line index (tf::Pipeflow::line) identifies which buffer slot to use.
Sample Output
Because stage 2 is a parallel pipe, its output may interleave across lines. One possible execution trace:
ready
stage 1: abade
stage 1: ddddf
stage 2: f:1 d:4
stage 2: e:1 d:1 a:2 b:1
stage 3: a:2
stage 1: eefge
stage 2: g:1 e:3 f:1
stage 3: d:4
stage 1: xyzzd
stage 3: e:3
stage 1: ijjjj
stage 2: z:2 x:1 d:1 y:1
stage 3: z:2
stage 1: jiiii
stage 2: j:4 i:1
stage 3: j:4
stage 2: i:4 j:1
stage 1: kkijk
stage 3: i:4
stage 2: j:1 k:3 i:1
stage 3: k:3
done
The seven stage-3 outputs appear in the same order as the input (a:2, d:4, e:3, z:2, j:4, i:4, k:3), as guaranteed by the serial pipe declaration. The pipeline task graph is shown below: