Parallel Reduction
Taskflow provides template function that constructs a task to perform parallel reduction over a range of items.
Include the Header
You need to include the header file, taskflow/algorithm/reduce.hpp
, for creating a parallel-reduction task.
#include <taskflow/algorithm/reduce.hpp>
Create a Parallel-Reduction Task
The reduction task created by tf::[first, last)
using the binary operator bop
and stores the reduced result in result
. It represents the parallel execution of the following reduction loop:
for(auto itr=first; itr<last; itr++) { result = bop(result, *itr); }
At runtime, the reduction task spawns a subflow to perform parallel reduction. The reduced result is stored in result
that will be captured by reference in the reduction task. It is your responsibility to ensure result
remains alive during the parallel execution.
int sum = 100; std::vector<int> vec = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; tf::Task task = taskflow.reduce(vec.begin(), vec.end(), sum, [] (int l, int r) { return l + r; } // binary reducer operator ); executor.run(taskflow).wait(); assert(sum == 100 + 55);
The order in which the binary operator is applied to pairs of elements is unspecified. In other words, the elements of the range may be grouped and rearranged in arbitrary order. The result and the argument types of the binary operator must be consistent with the input data type.
Capture Iterators by Reference
You can pass iterators by reference using std::
int sum = 100; std::vector<int> vec; std::vector<int>::iterator first, last; tf::Task init = taskflow.emplace([&](){ vec = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; first = vec.begin(); last = vec.end(); }); tf::Task task = taskflow.reduce(std::ref(first), std::ref(last), sum, [] (int l, int r) { return l + r; } // binary reducer operator ); // wrong! must use std::ref, or first and last are captured by copy // tf::Task task = taskflow.reduce(first, last, sum, [] (int l, int r) { // return l + r; // binary reducer operator // }); init.precede(task); executor.run(taskflow).wait(); assert(sum == 100 + 55);
In the above example, when init
finishes, vec
has been initialized to 10 elements with first
and last
pointing to the data range of vec
. The reduction task will then work on this initialized range as a result of passing iterators by reference.
Create a Parallel-Transform-Reduction Task
It is common to transform each element into a new data type and then perform reduction on the transformed elements. Taskflow provides a method, tf::uop
to transform each element in the specified range and then perform parallel reduction over result
and transformed elements. It represents the parallel execution of the following reduction loop:
for(auto itr=first; itr<last; itr++) { result = bop(result, uop(*itr)); }
The example below transforms each digit in a string to an integer number and then sums up all integers in parallel.
std::string str = "12345678"; int sum {0}; tf::Task task = taskflow.transform_reduce(str.begin(), str.end(), sum, [] (int a, int b) { // binary reduction operator return a + b; }, [] (char c) -> int { // unary transformation operator return c - '0'; } ); executor.run(taskflow).wait(); assert(sum == 1 + 2 + 3 + 4 + 5 + 6 + 7 + 8); // sum will be 36
The order in which we apply the binary operator on the transformed elements is unspecified. It is possible that the binary operator will take r-value in both arguments, for example, bop(uop(*itr1), uop(*itr2))
, due to the transformed temporaries. When data passing is expensive, you may define the result type T
to be move-constructible.
Create a Reduce-by-Index Task
Unlike tf::
, the tf::
function lets you perform a parallel reduction over an index range, but with more control over how each part of the range is processed. This is useful when you need to customize the reduction process for each subrange or you want to incorporate optimizations like SIMD. The example below performs a sum-reduction over all elements in data
with res:
std::vector<double> data(100000); double res = 1.0; taskflow.reduce_by_index( // index range tf::IndexRange<size_t>(0, N, 1), // final result res, // local reducer [&](tf::IndexRange<size_t> subrange, std::optional<double> running_total) { double residual = running_total ? *running_total : 0.0; for(size_t i=subrange.begin(); i<subrange.end(); i+=subrange.step_size()) { data[i] = 1.0; // we initialize the data here residual += data[i]; } printf("partial sum = %lf\n", residual); return residual; }, // global reducer std::plus<double>() ); executor.run(taskflow).wait(); assert(res == 100001);
The local reducer lop
computes a partial sum for each subrange, and the global reducer gop
combines the partial results into the final result and store it in res
, whose initial value (i.e., 1.0
here) also participates in the reduction process. The second argument of the local reducer is a std::running_total
is std::
Configure a Partitioner
You can configure a partitioner for parallel-reduction tasks to run with different scheduling methods, such as guided partitioning, dynamic partitioning, and static partitioning. The following example creates two parallel-reduction tasks using two different partitioners, one with the static partitioning algorithm and another one with the guided partitioning algorithm:
tf::StaticPartitioner static_partitioner; tf::GuidedPartitioner guided_partitioner; int sum1 = 100, sum2 = 100; std::vector<int> vec = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; // create a parallel-reduction task with static partitioner taskflow.reduce(vec.begin(), vec.end(), sum1, [] (int l, int r) { return l + r; }, static_partitioner ); // create a parallel-reduction task with guided partitioner taskflow.reduce(vec.begin(), vec.end(), sum2, [] (int l, int r) { return l + r; }, guided_partitioner );