Loading...
Searching...
No Matches
Graph Processing Pipeline

We study a graph processing pipeline that propagates a sequence of stage functions through the nodes of a DAG in topological order. This example shows how task graph parallelism and pipeline parallelism can be combined — and when one model outperforms the other.

Problem Formulation

Given a DAG where each node must execute three sequential stage functions f1, f2, f3, and where an edge u→v requires fi(u) to complete before fi(v) starts, we want to process all nodes as quickly as possible.

The following figure shows the per-node stage dependency for a three-node DAG with edges A→B and A→C:

One approach is pure task graph parallelism: one task per node that runs f1, f2, f3 sequentially. This is simple but leaves pipeline stages idle whenever one stage finishes before the next node is ready.

A better approach transforms the problem into pipeline parallelism by finding a topological order of the DAG (e.g., A, B, C) and treating each node as a token that flows through a three-stage pipeline. The following figure shows the resulting pipeline execution:

Stages on the same anti-diagonal can execute simultaneously. For example, f3(A), f2(B), and f1(C) can all run in parallel — this is wavefront parallelism over the (stage × node) grid.

Note
The relative performance of task graph parallelism vs pipeline parallelism depends on graph size and stage count. A small, wide graph with many short stage functions often favours task graph parallelism; a long chain with expensive stages favours pipelining. Benchmark both on your workload.

Implementation

We create a three-serial-pipe pipeline. Each pipe calls the stage function for the node identified by the current token index into the topological order:

#include <taskflow/taskflow.hpp>
#include <taskflow/algorithm/pipeline.hpp>
void f1(const std::string& node) { printf("f1(%s)\n", node.c_str()); }
void f2(const std::string& node) { printf("f2(%s)\n", node.c_str()); }
void f3(const std::string& node) { printf("f3(%s)\n", node.c_str()); }
int main() {
tf::Taskflow taskflow("graph pipeline");
tf::Executor executor;
const size_t num_lines = 2;
// topological order of the DAG: A → {B, C}
const std::vector<std::string> nodes = {"A", "B", "C"};
tf::Pipeline pl(num_lines,
// stage 1: run f1 for the current node, or stop when all are done
tf::Pipe{tf::PipeType::SERIAL, [&](tf::Pipeflow& pf) {
if(pf.token() == nodes.size()) {
pf.stop();
}
else {
f1(nodes[pf.token()]);
}
}},
// stage 2: run f2 for the current node
tf::Pipe{tf::PipeType::SERIAL, [&](tf::Pipeflow& pf) {
f2(nodes[pf.token()]);
}},
// stage 3: run f3 for the current node
tf::Pipe{tf::PipeType::SERIAL, [&](tf::Pipeflow& pf) {
f3(nodes[pf.token()]);
}}
);
tf::Task init = taskflow.emplace([](){ std::cout << "ready\n"; })
.name("start");
tf::Task pipe = taskflow.composed_of(pl)
.name("pipeline");
tf::Task done = taskflow.emplace([](){ std::cout << "done\n"; })
.name("stop");
init.precede(pipe);
pipe.precede(done);
executor.run(taskflow).wait();
return 0;
}
tf::Future< void > run(Taskflow &taskflow)
runs a taskflow once
Task & precede(Ts &&... tasks)
adds precedence links from this to other tasks
Definition task.hpp:952
@ SERIAL
serial type
Definition pipeline.hpp:117

Topological Order

The pipeline only supports dependencies from the current token to a previously processed token (i.e., the pipeline flows in one direction). We satisfy this by feeding nodes in a valid topological order, so that for any edge u→v, u appears before v in the node sequence. In this example we hard-code it:

const std::vector<std::string> nodes = {"A", "B", "C"};

In a general application, topological order can be computed via DFS or Kahn's algorithm on the input graph.

Sample Output

Three possible execution orderings for this pipeline:

# output 1 — full pipelining
ready
f1(A)
f2(A) f1(B)
f3(A) f2(B) f1(C)
f3(B) f2(C)
f3(C)
done
# output 2 — no overlap (sequential-like)
ready
f1(A) f2(A) f3(A)
f1(B) f2(B) f3(B)
f1(C) f2(C) f3(C)
done

The task graph for this pipeline is shown below:

Reference

This graph processing pipeline technique has been applied to accelerate timing analysis in VLSI circuit design. For details, see:

  • Cheng-Hsiang Chiu and Tsung-Wei Huang, "Efficient Timing Propagation with Simultaneous Structural and Pipeline Parallelisms," DAC 2022.