Taskflow Algorithms » Parallel Reduction

Taskflow provides template function that constructs a task to perform parallel reduction over a range of items.

Include the Header

You need to include the header file, taskflow/algorithm/reduce.hpp, for creating a parallel-reduction task.

#include <taskflow/algorithm/reduce.hpp>

Create a Parallel-Reduction Task

The reduction task created by tf::Taskflow::reduce(B first, E last, T& result, O bop, P part) performs parallel reduction over a range of elements specified by [first, last) using the binary operator bop and stores the reduced result in result. It represents the parallel execution of the following reduction loop:

for(auto itr=first; itr<last; itr++) {
  result = bop(result, *itr);
}

At runtime, the reduction task spawns a subflow to perform parallel reduction. The reduced result is stored in result that will be captured by reference in the reduction task. It is your responsibility to ensure result remains alive during the parallel execution.

int sum = 100;
std::vector<int> vec = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};

tf::Task task = taskflow.reduce(vec.begin(), vec.end(), sum, 
  [] (int l, int r) { return l + r; }  // binary reducer operator
);
executor.run(taskflow).wait();

assert(sum == 100 + 55);

The order in which the binary operator is applied to pairs of elements is unspecified. In other words, the elements of the range may be grouped and rearranged in arbitrary order. The result and the argument types of the binary operator must be consistent with the input data type.

Capture Iterators by Reference

You can pass iterators by reference using std::ref to marshal parameter update between dependent tasks. This is especially useful when the range is unknown at the time of creating a parallel-reduction task, but needs initialization from another task.

int sum = 100;
std::vector<int> vec;
std::vector<int>::iterator first, last;

tf::Task init = taskflow.emplace([&](){
  vec   = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
  first = vec.begin();
  last  = vec.end();
});

tf::Task task = taskflow.reduce(std::ref(first), std::ref(last), sum, 
  [] (int l, int r) { return l + r; }  // binary reducer operator
);

// wrong! must use std::ref, or first and last are captured by copy
// tf::Task task = taskflow.reduce(first, last, sum, [] (int l, int r) { 
//   return l + r;    // binary reducer operator
// });

init.precede(task);

executor.run(taskflow).wait();

assert(sum == 100 + 55);

In the above example, when init finishes, vec has been initialized to 10 elements with first and last pointing to the data range of vec. The reduction task will then work on this initialized range as a result of passing iterators by reference.

Create a Parallel-Transform-Reduction Task

It is common to transform each element into a new data type and then perform reduction on the transformed elements. Taskflow provides a method, tf::Taskflow::transform_reduce(B first, E last, T& result, BOP bop, UOP uop, P part), that applies uop to transform each element in the specified range and then perform parallel reduction over result and transformed elements. It represents the parallel execution of the following reduction loop:

for(auto itr=first; itr<last; itr++) {
  result = bop(result, uop(*itr));
}

The example below transforms each digit in a string to an integer number and then sums up all integers in parallel.

std::string str = "12345678";
int sum {0};
tf::Task task = taskflow.transform_reduce(str.begin(), str.end(), sum,
  [] (int a, int b) {      // binary reduction operator
    return a + b;
  },  
  [] (char c) -> int {     // unary transformation operator
    return c - '0';
  }   
); 
executor.run(taskflow).wait(); 
assert(sum == 1 + 2 + 3 + 4 + 5 + 6 + 7 + 8);  // sum will be 36 

The order in which we apply the binary operator on the transformed elements is unspecified. It is possible that the binary operator will take r-value in both arguments, for example, bop(uop(*itr1), uop(*itr2)), due to the transformed temporaries. When data passing is expensive, you may define the result type T to be move-constructible.

Create a Reduce-by-Index Task

Unlike tf::Taskflow::reduce, the tf::Taskflow::reduce_by_index function lets you perform a parallel reduction over an index range, but with more control over how each part of the range is processed. This is useful when you need to customize the reduction process for each subrange or you want to incorporate optimizations like SIMD. The example below performs a sum-reduction over all elements in data with res:

std::vector<double> data(100000);
double res = 1.0;
taskflow.reduce_by_index(
  // index range
  tf::IndexRange<size_t>(0, N, 1),
  // final result
  res,
  // local reducer
  [&](tf::IndexRange<size_t> subrange, std::optional<double> running_total) { 
    double residual = running_total ? *running_total : 0.0;
    for(size_t i=subrange.begin(); i<subrange.end(); i+=subrange.step_size()) {
      data[i] = 1.0;        // we initialize the data here
      residual += data[i];
    }
    printf("partial sum = %lf\n", residual);
    return residual;
  },
  // global reducer
  std::plus<double>()
);

executor.run(taskflow).wait();
assert(res == 100001);

The local reducer lop computes a partial sum for each subrange, and the global reducer gop combines the partial results into the final result and store it in res, whose initial value (i.e., 1.0 here) also participates in the reduction process. The second argument of the local reducer is a std::optional type, which indicates the current partial sum until this subrange. Apparently, the first subrange does not have any partial sum since there is no running total from previous subranges (i.e., running_total is std::nullopt).

Configure a Partitioner

You can configure a partitioner for parallel-reduction tasks to run with different scheduling methods, such as guided partitioning, dynamic partitioning, and static partitioning. The following example creates two parallel-reduction tasks using two different partitioners, one with the static partitioning algorithm and another one with the guided partitioning algorithm:

tf::StaticPartitioner static_partitioner;
tf::GuidedPartitioner guided_partitioner;

int sum1 = 100, sum2 = 100;
std::vector<int> vec = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};

// create a parallel-reduction task with static partitioner
taskflow.reduce(vec.begin(), vec.end(), sum1, 
  [] (int l, int r) { return l + r; },
  static_partitioner
);

// create a parallel-reduction task with guided partitioner
taskflow.reduce(vec.begin(), vec.end(), sum2, 
  [] (int l, int r) { return l + r; },
  guided_partitioner
);