Learning from Examples » Taskflow Processing Pipeline

We study a taskflow processing pipeline that propagates a sequence of tokens through linearly dependent taskflows. The pipeline embeds a taskflow in each pipe to run a parallel algorithm using task graph parallelism.

Formulate the Taskflow Processing Pipeline Problem

Many complex and irregular pipeline applications require each pipe to run a parallel algorithm using task graph parallelism. We can formulate such applications as scheduling a sequence of tokens through linearly dependent taskflows. The following example illustrates the pipeline propagation of three scheduling tokens through three linearly dependent taskflows:

R f1_A taskflow1 on token 1 f1_B taskflow2 on token 1 f1_A->f1_B f2_A taskflow1 on token 2 f1_A->f2_A f1_C taskflow3 on token 1 f1_B->f1_C f2_B taskflow2 on token 2 f1_B->f2_B f2_C taskflow3 on token 2 f1_C->f2_C f2_A->f2_B f3_A taskflow1 on token 3 f2_A->f3_A f2_B->f2_C f3_B taskflow2 on token 3 f2_B->f3_B f3_C taskflow3 on token 3 f2_C->f3_C f3_A->f3_B f3_B->f3_C
G cluster_1 taskflow1 cluster_2 taskflow2 cluster_3 taskflow3 A1 A1 B1 B1 A1->B1 C1 C1 A1->C1 D1 D1 B1->D1 C1->D1 A2 A2 B2 B2 A2->B2 C2 C2 B2->C2 D2 D2 C2->D2 A3 A3 B3 B3 A3->B3 C3 C3 A3->C3 D3 D3 A3->D3

Each pipe (stage) in the pipeline embeds a taskflow to perform a stage-specific parallel algorithm on an input scheduling token. Parallelism exhibits both inside and outside the three taskflows, combining both task graph parallelism and pipeline parallelism.

Create a Taskflow Processing Pipeline

Using the example from the previous section, we create a pipeline of three serial pipes each running a taskflow on a sequence of five scheduling tokens. The overall implementation is shown below:

#include <taskflow/taskflow.hpp>
#include <taskflow/algorithm/pipeline.hpp>

// taskflow on the first pipe
void make_taskflow1(tf::Taskflow& tf) {
  auto [A1, B1, C1, D1] = tf.emplace(
    [](){ printf("A1\n"); },
    [](){ printf("B1\n"); },
    [](){ printf("C1\n"); },
    [](){ printf("D1\n"); }
  );
  A1.precede(B1, C1);
  D1.succeed(B1, C1);
}

// taskflow on the second pipe
void make_taskflow2(tf::Taskflow& tf) {
  auto [A2, B2, C2, D2] = tf.emplace(
    [](){ printf("A2\n"); },
    [](){ printf("B2\n"); },
    [](){ printf("C2\n"); },
    [](){ printf("D2\n"); }
  );
  tf.linearize({A2, B2, C2, D2});
}

// taskflow on the third pipe
void make_taskflow3(tf::Taskflow& tf) {
  auto [A3, B3, C3, D3] = tf.emplace(
    [](){ printf("A3\n"); },
    [](){ printf("B3\n"); },
    [](){ printf("C3\n"); },
    [](){ printf("D3\n"); }
  );
  A3.precede(B3, C3, D3);
}

int main() {

  tf::Taskflow taskflow("taskflow processing pipeline");
  tf::Executor executor;

  const size_t num_lines = 2;
  const size_t num_pipes = 3;
  
  // define the taskflow storage
  // we use the pipe dimension because we create three 'serial' pipes
  std::array<tf::Taskflow, num_pipes> taskflows;

  // create three different taskflows for the three pipes
  make_taskflow1(taskflows[0]);
  make_taskflow2(taskflows[1]);
  make_taskflow3(taskflows[2]);

  // the pipeline consists of three serial pipes
  // and up to two concurrent scheduling tokens
  tf::Pipeline pl(num_lines,

    // first pipe runs taskflow1
    tf::Pipe{tf::PipeType::SERIAL, [&](tf::Pipeflow& pf) {
      if(pf.token() == 5) {
        pf.stop();
        return;
      }
      printf("begin token %zu\n", pf.token());
      executor.corun(taskflows[pf.pipe()]);
    }},
    
    // second pipe runs taskflow2
    tf::Pipe{tf::PipeType::SERIAL, [&](tf::Pipeflow& pf) {
      executor.corun(taskflows[pf.pipe()]);
    }},

    // third pipe calls taskflow3
    tf::Pipe{tf::PipeType::SERIAL, [&](tf::Pipeflow& pf) {
      executor.corun(taskflows[pf.pipe()]);
    }}
  );

  // build the pipeline graph using composition
  tf::Task init = taskflow.emplace([](){ std::cout << "ready\n"; })
                          .name("starting pipeline");
  tf::Task task = taskflow.composed_of(pl)
                          .name("pipeline");
  tf::Task stop = taskflow.emplace([](){ std::cout << "stopped\n"; })
                          .name("pipeline stopped");

  // create task dependency
  init.precede(task);
  task.precede(stop);

  // dump the pipeline graph structure (with composition)
  taskflow.dump(std::cout);

  // run the pipeline
  executor.run(taskflow).wait();

  return 0;
}

Define Taskflows

First, we define three taskflows for the three pipes in the pipeline:

// taskflow on the first pipe
void make_taskflow1(tf::Taskflow& tf) {
  auto [A1, B1, C1, D1] = tf.emplace(
    [](){ printf("A1\n"); },
    [](){ printf("B1\n"); },
    [](){ printf("C1\n"); },
    [](){ printf("D1\n"); }
  );
  A1.precede(B1, C1);
  D1.succeed(B1, C1);
}

// taskflow on the second pipe
void make_taskflow2(tf::Taskflow& tf) {
  auto [A2, B2, C2, D2] = tf.emplace(
    [](){ printf("A2\n"); },
    [](){ printf("B2\n"); },
    [](){ printf("C2\n"); },
    [](){ printf("D2\n"); }
  );
  tf.linearize({A2, B2, C2, D2});
}

// taskflow on the third pipe
void make_taskflow3(tf::Taskflow& tf) {
  auto [A3, B3, C3, D3] = tf.emplace(
    [](){ printf("A3\n"); },
    [](){ printf("B3\n"); },
    [](){ printf("C3\n"); },
    [](){ printf("D3\n"); }
  );
  A3.precede(B3, C3, D3);
}

As each taskflow corresponds to a pipe in the pipeline, we create a linear array to store the three taskflows:

std::array<tf::Taskflow, num_pipes> taskflows;
make_taskflow1(taskflows[0]);
make_taskflow2(taskflows[1]);
make_taskflow3(taskflows[2]);

Since the three taskflows are linearly dependent, at most one taskflow will run at a pipe. We can store the three taskflows in a linear array of dimension equal to the number of pipes. If there is a parallel pipe, we need to use two-dimensional array, as multiple taskflows at a stage can run simultaneously across parallel lines.

Define the Pipes

The pipe definition is straightforward. Each pipe runs the corresponding taskflow, which can be indexed at taskflows with the pipe's identifier, tf::Pipeflow::pipe(). The first pipe will cease the pipeline scheduling when it has processed five scheduling tokens:

// first pipe runs taskflow1
tf::Pipe{tf::PipeType::SERIAL, [&](tf::Pipeflow& pf) {
  if(pf.token() == 5) {
    pf.stop();
    return;
  }
  printf("begin token %zu\n", pf.token());
  executor.corun(taskflows[pf.pipe()]);
}},

// second pipe runs taskflow2
tf::Pipe{tf::PipeType::SERIAL, [&](tf::Pipeflow& pf) {
  executor.corun(taskflows[pf.pipe()]);
}},

// third pipe calls taskflow3
tf::Pipe{tf::PipeType::SERIAL, [&](tf::Pipeflow& pf) {
  executor.corun(taskflows[pf.pipe()]);
}}

At each pipe, we use tf::Executor::corun to execute the corresponding taskflow and wait until the execution completes. This is important because we want the caller thread, which is the worker that invokes the pipe callable, to not block (i.e., executor.run(taskflows[pf.pipe()]).wait()) but participate in the work-stealing loop of the scheduler to avoid deadlock.

Define the Task Graph

To build up the taskflow for the pipeline, we create a module task with the defined pipeline structure and connect it with two tasks that output helper messages before and after the pipeline:

tf::Task init = taskflow.emplace([](){ std::cout << "ready\n"; })
                        .name("starting pipeline");
tf::Task task = taskflow.composed_of(pl)
                        .name("pipeline");
tf::Task stop = taskflow.emplace([](){ std::cout << "stopped\n"; })
                        .name("pipeline stopped");
init.precede(task);
task.precede(stop);
Taskflow cluster_p0x7ffd7418c200 Taskflow Processing Pipeline cluster_p0x7ffd7418c110 m1 p0x7bc4000142e8 starting pipeline p0x7bc4000143d0 pipeline [m1] p0x7bc4000142e8->p0x7bc4000143d0 p0x7bc4000144b8 pipeline stopped p0x7bc4000143d0->p0x7bc4000144b8 p0x7bc400014030 cond p0x7bc400014118 rt-0 p0x7bc400014030->p0x7bc400014118 0 p0x7bc400014200 rt-1 p0x7bc400014030->p0x7bc400014200 1

Submit the Task Graph

Finally, we submit the taskflow to the execution and run it once:

executor.run(taskflow).wait();

One possible output is shown below:

ready
begin token 0
A1
C1
B1
D1
begin token 1
A2
B2
A1
C1
B1
D1
C2
D2
A3
D3
C3
B3
begin token 2
A2
B2
C2
D2
A1
C1
B1
D1
A3
D3
C3
B3
A2
B2
C2
D2
begin token 3
A3
D3
C3
B3
A1
C1
B1
D1
begin token 4
A2
A1
C1
B1
D1
B2
C2
D2
A3
D3
C3
B3
A2
B2
C2
D2
A3
D3
C3
B3
stopped