Loading...
Searching...
No Matches
Parallel Breadth-First Search

We implement a parallel breadth-first search (BFS) using tf::Taskflow::for_each_by_index to process each frontier level in parallel. This example demonstrates how an iterative graph algorithm with a data-dependent loop structure maps onto Taskflow's stateful parallel iteration model.

What is BFS and Why Parallelize It?

Breadth-first search (BFS) is one of the most fundamental graph algorithms. Starting from a source node, it visits every reachable node in the graph and records the shortest distance (in number of edges) from the source to each node. It is used in network routing, social network analysis, game AI pathfinding, dependency resolution, and many other applications. The key idea is that BFS discovers nodes layer by layer. All nodes at distance 1 from the source are found first, then all nodes at distance 2, and so on. Each layer is called a frontier: the set of nodes discovered at the same distance from the source.

The following figure illustrates this process on a small graph. S is the source. Nodes at the same distance are at the same level and share the same colour. Edges point from each node to the neighbours it discovers in the next level:

The sequential algorithm processes one node at a time within each frontier. But notice that all nodes within a frontier are completely independent of each other: they were discovered in the same round and none of them depends on another node at the same level to compute its distance. This means all nodes in a frontier can be processed in parallel, relaxing their outgoing edges simultaneously across multiple CPU cores. For large graphs with wide frontiers (e.g., social networks, web graphs, road networks) this parallelism can be substantial. A frontier of one million nodes with an average degree of ten means ten million edge relaxations that can all run concurrently.

Implementation

We represent the graph as an adjacency list and maintain two frontier buffers: curr_frontier holds the nodes being processed in the current level, and next_frontier accumulates the nodes discovered for the next level. An atomic counter next_size tracks how many nodes have been written into next_frontier so far.

Distance values are stored as std::atomic<int> so that multiple workers can race to claim an unvisited node safely. The first worker to set distance[v] from INF to a finite value wins; all others see the compare_exchange_strong fail and skip that node. This prevents any node from being added to next_frontier more than once.

Note that curr_frontier is read-only during the sweep and requires no synchronization; only distance[] and next_size are written concurrently.

#include <taskflow/taskflow.hpp>
const int INF = std::numeric_limits<int>::max();
void bfs(
const std::vector<std::vector<int>>& graph,
int source,
std::vector<std::atomic<int>>& distance
) {
const int N = static_cast<int>(graph.size());
for(int i = 0; i < N; i++) {
distance[i].store(INF, std::memory_order_relaxed);
}
distance[source].store(0, std::memory_order_relaxed);
std::vector<int> curr_frontier, next_frontier(N);
std::atomic<int> next_size{0};
curr_frontier.reserve(N);
curr_frontier.push_back(source);
tf::Executor executor;
tf::Taskflow taskflow;
// stateful range over the current frontier; reset each level
tf::IndexRange<int> range(0, 0, 1);
auto sweep = taskflow.for_each_by_index(
std::ref(range),
[&](const tf::IndexRange<int>& sub) {
for(int idx = sub.begin(); idx < sub.end(); idx += sub.step_size()) {
int u = curr_frontier[idx];
for(int v : graph[u]) {
int expected = INF;
if(distance[v].compare_exchange_strong(
expected,
distance[u].load(std::memory_order_relaxed) + 1)
) {
// first worker to reach v claims it for the next frontier
int pos = next_size.fetch_add(1, std::memory_order_relaxed);
next_frontier[pos] = v;
}
}
}
}
);
// host-driven loop: one executor.run call per frontier level
while(!curr_frontier.empty()) {
range.reset(0, static_cast<int>(curr_frontier.size()), 1);
next_size.store(0, std::memory_order_relaxed);
executor.run(taskflow).wait();
// promote next_frontier to curr_frontier for the next level
int sz = next_size.load(std::memory_order_relaxed);
curr_frontier.assign(next_frontier.begin(), next_frontier.begin() + sz);
next_size.store(0, std::memory_order_relaxed);
}
}
int main() {
// example graph (undirected, stored as directed adjacency list)
//
// 0 --- 1 --- 3
// | |
// 2 --- 4 --- 5
//
std::vector<std::vector<int>> graph = {
{1, 2}, // 0
{0, 3, 4}, // 1
{0, 4}, // 2
{1}, // 3
{1, 2, 5}, // 4
{4} // 5
};
int N = static_cast<int>(graph.size());
std::vector<std::atomic<int>> distance(N);
bfs(graph, 0, distance); // source = 0
for(int i = 0; i < N; i++) {
printf("distance[%d] = %d\n", i, distance[i].load());
}
return 0;
}
class to create an executor
Definition executor.hpp:62
tf::Future< void > run(Taskflow &taskflow)
runs a taskflow once
Task for_each_by_index(R range, C callable, P part=P())
constructs a parallel-for task over a one-dimensional index range
class to create an N-dimensional index range of integral indices
Definition iterator.hpp:139
class to create a taskflow object
Definition taskflow.hpp:64
constexpr size_t distance(T beg, T end, T step)
calculates the number of iterations in the given index range
Definition iterator.hpp:71

The stateful std::ref(range) is the key to making this work without rebuilding the taskflow each level. The sweep task reads the range at execution time, so updating range with reset before each executor.run call is all that is needed to redirect the parallel loop to the new frontier.

Encoding the Loop as a Condition Task

The host loop calls tf::Executor::run once per frontier level, re-entering the executor each time. An alternative is to encode the loop termination check as a condition task inside the graph itself, so the entire BFS runs in a single tf::Executor::run call with minimal synchronization overhead:

tf::Taskflow taskflow;
auto init = taskflow.emplace([&](){
// initialize data here ...
});
auto sweep = taskflow.for_each_by_index(
std::ref(range),
[&](const tf::IndexRange<int>& sub) {
for(int idx = sub.begin(); idx < sub.end(); idx += sub.step_size()) {
int u = curr_frontier[idx];
for(int v : graph[u]) {
auto expected = INF;
if(distance[v].compare_exchange_strong(
expected,
distance[u].load(std::memory_order_relaxed) + 1)) {
int pos = next_size.fetch_add(1, std::memory_order_relaxed);
next_frontier[pos] = v;
}
}
}
}
);
auto check = taskflow.emplace([&]() -> int {
// promote next_frontier to curr_frontier
int sz = next_size.load(std::memory_order_relaxed);
curr_frontier.assign(next_frontier.begin(), next_frontier.begin() + sz);
next_size.store(0, std::memory_order_relaxed);
if(curr_frontier.empty()) {
return 1; // no more nodes to visit: exit the graph
}
// prepare range for the next frontier level
range.reset(0, static_cast<int>(curr_frontier.size()), 1);
return 0; // loop back to sweep
});
sweep.precede(check)
.succeed(init)
check.precede(sweep); // back-edge: return 0 loops here
// initialize for the first frontier level before running
range.reset(0, 1, 1); // curr_frontier = {source}, size 1
executor.run(taskflow).wait();
Task emplace(C &&callable)
creates a static task
Definition flow_builder.hpp:1435
void reset()
resets the task handle to null
Definition task.hpp:1072
Task & succeed(Ts &&... tasks)
adds precedence links from other tasks to this
Definition task.hpp:960
Task & precede(Ts &&... tasks)
adds precedence links from this to other tasks
Definition task.hpp:952

The condition task runs on a single worker after sweep completes, so the promotion of next_frontier into curr_frontier and the range reset are sequenced correctly without any additional synchronization. The back-edge from check to sweep forms the BFS level loop; the executor drives the full traversal end-to-end in a single run call.

Note
The compare_exchange_strong on distance[v] is the correctness guarantee that prevents a node from being visited twice. If two workers simultaneously attempt to claim the same unvisited node v, exactly one compare_exchange_strong will succeed (the one that sees distance[v]==INF); the other will fail because distance[v] is no longer INF and will skip v. This ensures next_frontier contains each node at most once and that distance[v] holds the correct shortest-path distance from the source.