Loading...
Searching...
No Matches
Text Processing Pipeline

We study a text processing pipeline that finds the most frequent character in each string from an input source, demonstrating how Taskflow's pipeline model overlaps serial and parallel stages to process a stream of tokens efficiently.

Problem Formulation

Given a vector of strings, we want to find the most frequent character in each string and output the result in the same order as the input. For example:

# input
abade ddddf eefge xyzzd ijjjj jiiii kkijk
# output (most frequent character : count)
a:2 d:4 e:3 z:2 j:4 i:4 k:3

We decompose the computation into three stages:

  1. Read (serial) — read one string from the input vector in order
  2. Count (parallel) — build a character-frequency map from the string
  3. Reduce (serial) — find the most frequent character from the map and output the result

The first and third stages must run serially to preserve input/output order. The second stage is independent across strings and can run in parallel across multiple pipeline lines.

Creating the Pipeline

We create a pipeline of three pipes with two parallel lines. A larger line count increases throughput at the cost of memory — in practice, std::thread::hardware_concurrency is a good default.

#include <taskflow/taskflow.hpp>
#include <taskflow/algorithm/pipeline.hpp>
std::string format_map(const std::unordered_map<char, size_t>& map) {
std::ostringstream oss;
for(const auto& [c, n] : map) oss << c << ':' << n << ' ';
return oss.str();
}
int main() {
tf::Taskflow taskflow("text-pipeline");
tf::Executor executor;
const size_t num_lines = 2;
std::vector<std::string> input = {
"abade", "ddddf", "eefge", "xyzzd", "ijjjj", "jiiii", "kkijk"
};
// one buffer slot per pipeline line; each slot holds the data for one token
using data_type = std::variant<
std::string,
std::unordered_map<char, size_t>,
std::pair<char, size_t>
>;
std::array<data_type, num_lines> buffer;
tf::Pipeline pl(num_lines,
// stage 1 (serial): read the next input string
if(pf.token() == input.size()) {
pf.stop(); // no more tokens — shut the pipeline down
}
else {
printf("stage 1: %s\n", input[pf.token()].c_str());
buffer[pf.line()] = input[pf.token()];
}
}},
// stage 2 (parallel): build a character-frequency map
std::unordered_map<char, size_t> map;
for(char c : std::get<std::string>(buffer[pf.line()])) map[c]++;
printf("stage 2: %s\n", format_map(map).c_str());
buffer[pf.line()] = map;
}},
// stage 3 (serial): find and report the most frequent character
auto& map = std::get<std::unordered_map<char, size_t>>(buffer[pf.line()]);
auto sol = std::max_element(map.begin(), map.end(),
[](const auto& a, const auto& b) { return a.second < b.second; }
);
printf("stage 3: %c:%zu\n", sol->first, sol->second);
buffer[pf.line()] = *sol;
}}
);
tf::Task init = taskflow.emplace([](){ std::cout << "ready\n"; })
.name("start");
tf::Task pipe = taskflow.composed_of(pl)
.name("pipeline");
tf::Task done = taskflow.emplace([](){ std::cout << "done\n"; })
.name("stop");
init.precede(pipe);
pipe.precede(done);
executor.run(taskflow).wait();
return 0;
}
class to create an executor
Definition executor.hpp:62
tf::Future< void > run(Taskflow &taskflow)
runs a taskflow once
class to create a pipe object for a pipeline stage
Definition pipeline.hpp:144
class to create a pipeflow object used by the pipe callable
Definition pipeline.hpp:43
class to create a pipeline scheduling framework
Definition pipeline.hpp:307
class to create a task handle over a taskflow node
Definition task.hpp:263
const std::string & name() const
queries the name of the task
Definition task.hpp:1082
Task & precede(Ts &&... tasks)
adds precedence links from this to other tasks
Definition task.hpp:952
Task & composed_of(T &object)
creates a module task from a taskflow
Definition task.hpp:984
class to create a taskflow object
Definition taskflow.hpp:64
@ SERIAL
serial type
@ PARALLEL
parallel type

Data Buffer

Taskflow gives users full control over data management in a pipeline. We allocate a one-dimensional buffer indexed by pipeline line:

std::array<data_type, num_lines> buffer;

A one-dimensional buffer is sufficient because Taskflow guarantees that at most one scheduling token is active per line at any time, so no two tokens will read or write the same buffer slot simultaneously.

Note
Only input elements are transformed by stage functions — the pipeline scheduling token index (tf::Pipeflow::token) identifies which input element is being processed, and the line index (tf::Pipeflow::line) identifies which buffer slot to use.

Sample Output

Because stage 2 is a parallel pipe, its output may interleave across lines. One possible execution trace:

ready
stage 1: abade
stage 1: ddddf
stage 2: f:1 d:4
stage 2: e:1 d:1 a:2 b:1
stage 3: a:2
stage 1: eefge
stage 2: g:1 e:3 f:1
stage 3: d:4
stage 1: xyzzd
stage 3: e:3
stage 1: ijjjj
stage 2: z:2 x:1 d:1 y:1
stage 3: z:2
stage 1: jiiii
stage 2: j:4 i:1
stage 3: j:4
stage 2: i:4 j:1
stage 1: kkijk
stage 3: i:4
stage 2: j:1 k:3 i:1
stage 3: k:3
done

The seven stage-3 outputs appear in the same order as the input (a:2, d:4, e:3, z:2, j:4, i:4, k:3), as guaranteed by the serial pipe declaration. The pipeline task graph is shown below: