Loading...
Searching...
No Matches
Parallel Merge

Taskflow provides a function for constructing a task to merge two sorted ranges into a single sorted output range in parallel.

Include the Header

You need to include the header file, taskflow/algorithm/merge.hpp, for creating a parallel-merge task.

#include <taskflow/algorithm/merge.hpp>

Motivation

The standard library std::merge walks both input sequences simultaneously from left to right in a single thread, producing a sorted output in O(n+m) time. While optimal for a single core, this sequential walk cannot exploit multiple CPU cores — at any point only one comparison is in flight. For large inputs (e.g. merging two sorted arrays of tens of millions of elements), this leaves the majority of available hardware idle.

Taskflow's parallel merge divides the output into W independent chunks (one per worker thread) and uses the co-rank technique to identify each worker's exact input sub-ranges, allowing all workers to merge their chunks simultaneously with no synchronization.

Create a Parallel-Merge Task

The task created by tf::Taskflow::merge(B1 first1, E1 last1, B2 first2, E2 last2, O d_first) merges the two sorted ranges [first1, last1) and [first2, last2) into the output range beginning at d_first, using std::less as the comparator. It represents the parallel execution of std::merge:

std::merge(first1, last1, first2, last2, d_first);

The following example merges two sorted vectors of one million integers using four worker threads:

#include <taskflow/algorithm/merge.hpp>
tf::Taskflow taskflow;
tf::Executor executor(4);
std::vector<int> seq1(1000000), seq2(1000000), output(2000000);
// fill and sort seq1 and seq2 ...
taskflow.merge(
seq1.begin(), seq1.end(),
seq2.begin(), seq2.end(),
output.begin()
);
executor.run(taskflow).wait();
// output is now the sorted merge of seq1 and seq2
class to create an executor
Definition executor.hpp:62
Task merge(B1 first1, E1 last1, B2 first2, E2 last2, O d_first)
merges two sorted ranges into a single sorted output using the std::less comparator
class to create a taskflow object
Definition taskflow.hpp:64

To merge with a custom comparator, pass it as the last argument. Both input ranges must be sorted with respect to that comparator:

// descending order
std::sort(seq1.begin(), seq1.end(), std::greater<int>{});
std::sort(seq2.begin(), seq2.end(), std::greater<int>{});
taskflow.merge(
seq1.begin(), seq1.end(),
seq2.begin(), seq2.end(),
output.begin(),
std::greater<int>{}
);
executor.run(taskflow).wait();
Note
Both input ranges must be sorted with respect to the comparator before the merge task runs. Passing unsorted input is undefined behavior. The output range must not overlap either input range. Both input iterators must be random-access iterators.

Capture Iterators by Reference

You can pass iterators by reference using std::ref to marshal parameter updates between dependent tasks. This is useful when the ranges are not known at task-graph construction time but are initialized by an upstream task.

std::vector<int> seq1, seq2, output;
std::vector<int>::iterator beg1, end1, beg2, end2, d_beg;
tf::Task init = taskflow.emplace([&]() {
// fill and sort seq1, seq2 at runtime
beg1 = seq1.begin(); end1 = seq1.end();
beg2 = seq2.begin(); end2 = seq2.end();
d_beg = output.begin();
});
tf::Task merge_task = taskflow.merge(
std::ref(beg1), std::ref(end1),
std::ref(beg2), std::ref(end2),
std::ref(d_beg)
);
// wrong! iterators are captured by copy at construction time
// tf::Task merge_task = taskflow.merge(beg1, end1, beg2, end2, d_beg);
init.precede(merge_task);
executor.run(taskflow).wait();
Task emplace(C &&callable)
creates a static task
Definition flow_builder.hpp:1562
class to create a task handle over a taskflow node
Definition task.hpp:263
Task & precede(Ts &&... tasks)
adds precedence links from this to other tasks
Definition task.hpp:952

When init finishes, the merge task reads the updated iterators and merges the two runtime-defined sequences.

Understanding the Parallel Merge Algorithm

Step 1: Flat Output Partitioning

The merged output has N = n + m elements (wheren = |seq1| and m = |seq2|).
The algorithm divides the output into W equal contiguous chunks of size K = N/W, one per worker thread. Worker w is responsible for writing output positions [w*K, (w+1)*K). The key challenge here is that for each output chunk, which elements of seq1 and seq2 belong to it? This is what the co-rank function solves.

Step 2: Co-rank to Identify Input Subranges

For a given output position rank, co_rank(rank) finds i and j such that:

  • The first rank elements of the merged output consist of exactly i elements from seq1[0..i) and j elements from seq2[0..j).
  • These i + j elements are interleaved in sorted order in the output. Note that co_rank does not decide how they go in blocks but only identifies how many elements come from each sequence. The actual placement of these elements are accomplished via std::merge.

Once a worker knows (i_start, j_start) at its chunk's beginning and (i_end, j_end) at its chunk's end, it can independently merge seq1[i_start..i_end) with seq2[j_start..j_end) and write the result directly to its output region, with no communication with other workers.

Step 3: How co_rank works? Binary Search!

For a given rank, co_rank binary-searches for the unique i in the range [max(0, rank-m), min(n, rank)] such that the partition (seq1[0..i), seq2[0..rank-i)) is valid. A partition is valid when:

  • The last element of seq1's slice does not exceed the first unused element of seq2: seq1[i-1] <= seq2[j]
  • The last element of seq2's slice does not exceed the first unused element of seq1: seq2[j-1] <= seq1[i]

Because both sequences are sorted, as i increases:

  • seq1[i-1] increases (moving right in seq1)
  • seq2[j-1] decreases (moving left in seq2, since j = rank - i decreases)

This means the condition seq2[j-1] <= seq1[i] transitions from false to true exactly once — making binary search applicable. The algorithm needs to check only one condition per iteration:

  • If seq2[j-1] > seq1[i]: i is too large -> high = i
  • Otherwise: i is not large enough -> low = i + 1

The figure below shows two iterations of the binary search for rank=5 on sequences seq1=[1,3,5,7,9,11] and seq2=[2,4,6,8,10,12]:

The search converges to i=3, j=2: the first 5 merged elements consist of seq1[0,3)=[1,3,5] and seq2[0,2)=[2,4], which interleave to [1,2,3,4,5].

Why parallel merge uses no partitioner

Unlike tf::Taskflow::for_each, where users can configure a partitioner to adapt to unequal per-element costs, std::merge on a chunk of size K always costs O(K) regardless of the data values. There is little load imbalance to mitigate. As a result, tf::Taskflow::merge always adopts static partitioning, i.e., W chunks of size N/W and one per worker. which is always the optimal strategy for parallel merge.