Parallel Merge
Taskflow provides standalone template methods for merging two sorted ranges of items into a sorted range of items.
Include the Header
You need to include the header file, taskflow/cuda/algorithm/merge.hpp
, for using the parallel-merge algorithm.
#include <taskflow/cuda/algorithm/merge.hpp>
Merge Two Sorted Ranges of Items
tf::input_1
and input_2
, each of 1000 items, into a sorted array output
of 2000 items.
const size_t N = 1000; int* input_1 = tf::cuda_malloc_shared<int>(N); // input vector 1 int* input_2 = tf::cuda_malloc_shared<int>(N); // input vector 2 int* output = tf::cuda_malloc_shared<int>(2*N); // output vector // initializes the data for(size_t i=0; i<N; i++) { input_1[i] = rand()%100; input_2[i] = rand()%100; } std::sort(input_1, input1 + N); std::sort(input_2, input2 + N); // create an execution policy tf::cudaStream stream; tf::cudaDefaultExecutionPolicy policy(stream); // queries the required buffer size to merge two N-element sorted vectors auto bytes = policy.merge_bufsz(N, N); auto buffer = tf::cuda_malloc_device<std::byte>(bytes); // merge input_1 and input_2 to output tf::cuda_merge(policy, input_1, input_1 + N, input_2, input_2 + N, output, []__device__ (int a, int b) { return a < b; }, // comparator buffer ); // synchronizes the execution and verifies the result stream.synchronize(); // verify the result assert(std::is_sorted(output, output + 2*N)); // delete the buffer cudaFree(input1); cudaFree(input2); cudaFree(output); cudaFree(buffer);
The merge algorithm runs asynchronously through the stream specified in the execution policy. You need to synchronize the stream to obtain correct results. Since the GPU merge algorithm may require extra buffer to store the temporary results, you need to provide a buffer of size at least larger or equal to the value returned from tf::
. The buffer size depends only on the two input vector sizes.
Merge Two Sorted Ranges of Key-Value Items
tf::a
and b:
const size_t N = 2; int* a_keys = tf::cuda_malloc_shared<int>(N); int* a_vals = tf::cuda_malloc_shared<int>(N); int* b_keys = tf::cuda_malloc_shared<int>(N); int* b_vals = tf::cuda_malloc_shared<int>(N); int* c_keys = tf::cuda_malloc_shared<int>(2*N); int* c_vals = tf::cuda_malloc_shared<int>(2*N); // initializes the data a_keys[0] = 8, a_keys[1] = 1; a_vals[0] = 1, a_vals[1] = 2; b_keys[0] = 3, b_keys[1] = 7; b_vals[0] = 3, b_vals[1] = 4; // create an execution policy tf::cudaStream stream; tf::cudaDefaultExecutionPolicy policy(stream); // queries the required buffer size to merge two N-element sorted vectors by keys auto bytes = policy.merge_bufsz(N, N); auto buffer = tf::cuda_malloc_device<std::byte>(bytes); // merge keys and values of a and b to c tf::cuda_merge_by_key( policy, a_keys, a_keys+N, a_vals, b_keys, b_keys+N, b_vals, c_keys, c_vals, []__device__ (int a, int b) { return a < b; }, // comparator buffer ); // wait for the merge to complete stream.synchronize(); // now, c_keys = {1, 3, 7, 8} // now, c_vals = {2, 3, 4, 1} // delete the device memory cudaFree(buffer); cudaFree(a_keys); cudaFree(b_keys); cudaFree(c_keys); cudaFree(a_vals); cudaFree(b_vals); cudaFree(c_vals);
Since the GPU merge algorithm may require extra buffer to store the temporary results, you need to provide a buffer of size at least larger or equal to the value returned from tf::
. The buffer size depends only on the two input vector sizes.