Conditional Tasking
Parallel workloads often require making control-flow decisions across dependent tasks. Taskflow supports a very efficient interface of conditional tasking for users to implement general control flow such as dynamic flow, cycles and conditionals that are otherwise difficult to do with existing frameworks.
Create a Condition Task
A condition task evaluates a set of instructions and returns an integer index of the next successor task to execute. The index is defined with respect to the order of its successor construction. The following example creates an if-else block using a single condition task.
1: tf::Taskflow taskflow; 2: 3: auto [init, cond, yes, no] = taskflow.emplace( 4: [] () { }, 5: [] () { return 0; }, 6: [] () { std::cout << "yes\n"; }, 7: [] () { std::cout << "no\n"; } 8: ); 9: 10: cond.succeed(init) 11: .precede(yes, no); // executes yes if cond returns 0 12: // executes no if cond returns 1
Line 5 creates a condition task cond
and line 11 creates two dependencies from cond
to two other tasks, yes
and no
. With this order, when cond
returns 0, the execution moves on to task yes
. When cond
returns 1, the execution moves on to task no
.
Condition task can go cyclic to describe iterative control flow. The example below implements a simple yet commonly used feedback loop through a condition task (line 7-10) that returns a random binary value. If the return value from cond
is 0
, it loops back to itself, or otherwise to stop
.
1: tf::Taskflow taskflow; 2: 3: tf::Task init = taskflow.emplace([](){}).name("init"); 4: tf::Task stop = taskflow.emplace([](){}).name("stop"); 5: 6: // creates a condition task that returns 0 or 1 7: tf::Task cond = taskflow.emplace([](){ 8: std::cout << "flipping a coin\n"; 9: return std::rand() % 2; 10: }).name("cond"); 11: 12: // creates a feedback loop {0: cond, 1: stop} 13: init.precede(cond); 14: cond.precede(cond, stop); // returns 0 to 'cond' or 1 to 'stop' 15: 16: executor.run(taskflow).wait();
A taskflow of complex control flow often just takes a few lines of code to implement, and different control flow blocks may run in parallel. The code below creates another taskflow with three condition tasks.
tf::Taskflow taskflow; tf::Task A = taskflow.emplace([](){}).name("A"); tf::Task B = taskflow.emplace([](){}).name("B"); tf::Task C = taskflow.emplace([](){}).name("C"); tf::Task D = taskflow.emplace([](){}).name("D"); tf::Task E = taskflow.emplace([](){}).name("E"); tf::Task F = taskflow.emplace([](){}).name("F"); tf::Task G = taskflow.emplace([](){}).name("G"); tf::Task H = taskflow.emplace([](){}).name("H"); tf::Task I = taskflow.emplace([](){}).name("I"); tf::Task K = taskflow.emplace([](){}).name("K"); tf::Task L = taskflow.emplace([](){}).name("L"); tf::Task M = taskflow.emplace([](){}).name("M"); tf::Task cond_1 = taskflow.emplace([](){ return std::rand()%2; }).name("cond_1"); tf::Task cond_2 = taskflow.emplace([](){ return std::rand()%2; }).name("cond_2"); tf::Task cond_3 = taskflow.emplace([](){ return std::rand()%2; }).name("cond_3"); A.precede(B, F); B.precede(C); C.precede(D); D.precede(cond_1); E.precede(K); F.precede(cond_2); H.precede(I); I.precede(cond_3); L.precede(M); cond_1.precede(B, E); // return 0 to 'B' or 1 to 'E' cond_2.precede(G, H); // return 0 to 'G' or 1 to 'H' cond_3.precede(cond_3, L); // return 0 to 'cond_3' or 1 to 'L' taskflow.dump(std::cout);
The above code creates three condition tasks: (1) a condition task cond_1
that loops back to B
on returning 0
, or proceeds to E
on returning 1
, (2) a condition task cond_2
that goes to G
on returning 0
, or H
on returning 1
, (3) a condition task cond_3
that loops back to itself on returning 0
, or proceeds to L
on returning 1
You can use condition tasks to create cycles as long as the graph does not introduce task race during execution. However, cycles are not allowed in non-condition tasks.
Understand our Task-level Scheduling
In order to understand how an executor schedules condition tasks, we define two dependency types, strong dependency and weak dependency. A strong dependency is a preceding link from a non-condition task to another task. A weak dependency is a preceding link from a condition task to another task. The number of dependents of a task is the sum of strong dependency and weak dependency. The table below lists the strong dependency and weak dependency numbers of each task in the previous example.
task | strong dependency | weak dependency | dependents |
---|---|---|---|
A | 0 | 0 | 0 |
B | 1 | 1 | 2 |
C | 1 | 0 | 1 |
D | 1 | 0 | 1 |
E | 0 | 1 | 1 |
F | 1 | 0 | 1 |
G | 0 | 1 | 1 |
H | 0 | 1 | 1 |
I | 1 | 0 | 1 |
K | 1 | 0 | 1 |
L | 0 | 1 | 1 |
M | 1 | 0 | 1 |
cond_1 | 1 | 0 | 1 |
cond_2 | 1 | 0 | 1 |
cond_3 | 1 | 1 | 2 |
You can query the number of strong dependents, the number of weak dependents, and the number of dependents of a task.
1: tf::Taskflow taskflow; 2: 3: tf::Task task = taskflow.emplace([](){}); 4: 5: // ... add more tasks and preceding links 6: 7: std::cout << task.num_dependents() << '\n'; 8: std::cout << task.num_strong_dependents() << '\n'; 9: std::cout << task.num_weak_dependents() << '\n';
When you submit a task to an executor, the scheduler starts with tasks of zero dependents (both zero strong and weak dependencies) and continues to execute successive tasks whenever their strong dependencies are met. However, the scheduler skips this rule when executing a condition task and jumps directly to its successors indexed by the return value.
Each task has an atomic join counter to keep track of strong dependents that are met at runtime. When a task completes, the join counter is restored to the task's strong dependency number in the graph, such that the subsequent execution can reuse the counter again.
Example
Let's take a look at an example to understand how task-level scheduling works. Suppose we have the following taskflow of one condition task cond
that forms a loop to itself on returning 0
and moves on to stop
on returning 1
:
The scheduler starts with init
task because it has no dependencies (both strong and weak dependencies). Then, the scheduler moves on to the condition task cond
. If cond
returns 0
, the scheduler enqueues cond
and runs it again. If cond
returns 1
, the scheduler enqueues stop
and then moves on.
Avoid Common Pitfalls
Condition tasks are handy in creating dynamic and cyclic control flows, but they are also easy to make mistakes. It is your responsibility to ensure a taskflow is properly conditioned. Top things to avoid include no source tasks to start with and task race. The figure below shows common pitfalls and their remedies.
In the error1
scenario, there is no source task for the scheduler to start with, and the simplest fix is to add a task S
that has no dependents. In the error2
scenario, D
might be scheduled twice by E
through the strong dependency and C
through the weak dependency (on returning 1
). To fix this problem, you can add an auxiliary task D-aux
to break the mixed use of strong dependency and weak dependency. In the risky scenario, task X
may be raced by M
and P
if M
returns 0
and P returns 1
.
Implement Control-flow Graphs
Implement If-Else Control Flow
You can use conditional tasking to implement if-else control flow. The following example creates a nested if-else control flow diagram that executes three condition tasks to check the range of i
.
tf::Taskflow taskflow; int i; // create three condition tasks for nested control flow auto initi = taskflow.emplace([&](){ i=3; }); auto cond1 = taskflow.emplace([&](){ return i>1 ? 1 : 0; }); auto cond2 = taskflow.emplace([&](){ return i>2 ? 1 : 0; }); auto cond3 = taskflow.emplace([&](){ return i>3 ? 1 : 0; }); auto equl1 = taskflow.emplace([&](){ std::cout << "i=1\n"; }); auto equl2 = taskflow.emplace([&](){ std::cout << "i=2\n"; }); auto equl3 = taskflow.emplace([&](){ std::cout << "i=3\n"; }); auto grtr3 = taskflow.emplace([&](){ std::cout << "i>3\n"; }); initi.precede(cond1); cond1.precede(equl1, cond2); // goes to cond2 if i>1 cond2.precede(equl2, cond3); // goes to cond3 if i>2 cond3.precede(equl3, grtr3); // goes to grtr3 if i>3
Implement Switch Control Flow
You can use conditional tasking to implement switch control flow. The following example creates a switch control flow diagram that executes one of the three cases at random using four condition tasks.
tf::Taskflow taskflow; auto [source, swcond, case1, case2, case3, target] = taskflow.emplace( [](){ std::cout << "source\n"; }, [](){ std::cout << "switch\n"; return rand()%3; }, [](){ std::cout << "case 1\n"; return 0; }, [](){ std::cout << "case 2\n"; return 0; }, [](){ std::cout << "case 3\n"; return 0; }, [](){ std::cout << "target\n"; } ); source.precede(swcond); swcond.precede(case1, case2, case3); target.succeed(case1, case2, case3);
Assuming swcond
returns 1, the program outputs:
source switch case 2 target
Keep in mind, both switch and case tasks must be described as condition tasks. The following implementation is a common mistake in which case tasks are not described as condition tasks.
// wrong implementation of switch control flow using only one condition task tf::Taskflow taskflow; auto [source, swcond, case1, case2, case3, target] = taskflow.emplace( [](){ std::cout << "source\n"; }, [](){ std::cout << "switch\n"; return rand()%3; }, [](){ std::cout << "case 1\n"; }, [](){ std::cout << "case 2\n"; }, [](){ std::cout << "case 3\n"; }, [](){ std::cout << "target\n"; } // target has three strong dependencies ); source.precede(swcond); swcond.precede(case1, case2, case3); target.succeed(case1, case2, case3);
In this faulty implementation, task target
has three strong dependencies but only one of them will be met. This is because swcond
is a condition task, and only one case task will be executed depending on the return of swcond
.
Implement Do-While-Loop Control Flow
You can use conditional tasking to implement do-while-loop control flow. The following example creates a do-while-loop control flow diagram that repeatedly increments variable i
five times using one condition task.
tf::Taskflow taskflow; int i; auto [init, body, cond, done] = taskflow.emplace( [&](){ std::cout << "i=0\n"; i=0; }, [&](){ std::cout << "i++ => i="; i++; }, [&](){ std::cout << i << '\n'; return i<5 ? 0 : 1; }, [&](){ std::cout << "done\n"; } ); init.precede(body); body.precede(cond); cond.precede(body, done);
The program outputs:
i=0 i++ => i=1 i++ => i=2 i++ => i=3 i++ => i=4 i++ => i=5 done
Implement While-Loop Control Flow
You can use conditional tasking to implement while-loop control flow. The following example creates a while-loop control flow diagram that repeatedly increments variable i
five times using two condition task.
tf::Taskflow taskflow; int i; auto [init, cond, body, back, done] = taskflow.emplace( [&](){ std::cout << "i=0\n"; i=0; }, [&](){ std::cout << "while i<5\n"; return i < 5 ? 0 : 1; }, [&](){ std::cout << "i++=" << i++ << '\n'; }, [&](){ std::cout << "back\n"; return 0; }, [&](){ std::cout << "done\n"; } ); init.precede(cond); cond.precede(body, done); body.precede(back); back.precede(cond);
The program outputs:
i=0 while i<5 i++=0 back while i<5 i++=1 back while i<5 i++=2 back while i<5 i++=3 back while i<5 i++=4 back while i<5 done
Notice that, when you implement a while-loop block, you cannot direct a dependency from the body task to the loop condition task. Doing so will introduce a strong dependency between the body task and the loop condition task, and the loop condition task will never be executed. The following code shows a common faulty implementation of while-loop control flow.
// wrong implementation of while-loop using only one condition task tf::Taskflow taskflow; int i; auto [init, cond, body, done] = taskflow.emplace( [&](){ std::cout << "i=0\n"; i=0; }, [&](){ std::cout << "while i<5\n"; return i < 5 ? 0 : 1; }, [&](){ std::cout << "i++=" << i++ << '\n'; }, [&](){ std::cout << "done\n"; } ); init.precede(cond); cond.precede(body, done); body.precede(cond);
In the taskflow diagram above, the scheduler starts with init
and then decrements the strong dependency of the loop condition task, while i<5
. After this, there remains one strong dependency, i.e., introduced by the loop body task, i++
. However, task i++
will not be executed until the loop condition task returns 0
, causing a deadlock.
Create a Multi-condition Task
A multi-condition task is a generalized version of conditional tasking. In some cases, applications need to jump to multiple branches from a parent task. This can be done by creating a multi-condition task which allows a task to select one or more successor tasks to execute. Similar to a condition task, a multi-condition task returns a vector of integer indices that indicate the successors to execute when the multi-condition task completes. The index is defined with respect to the order of successors preceded by a multi-condition task. For example, the following code creates a multi-condition task, A
, that informs the scheduler to run on its two successors, B
and D
.
tf::Executor executor; tf::Taskflow taskflow; auto A = taskflow.emplace([&]() -> tf::SmallVector<int> { std::cout << "A\n"; return {0, 2}; }).name("A"); auto B = taskflow.emplace([&](){ std::cout << "B\n"; }).name("B"); auto C = taskflow.emplace([&](){ std::cout << "C\n"; }).name("C"); auto D = taskflow.emplace([&](){ std::cout << "D\n"; }).name("D"); A.precede(B, C, D); executor.run(taskflow).wait();
One important application of conditional tasking is implementing iterative control flow. You can use multi-condition tasks to create multiple loops that run concurrently. The following code creates a sequential chain of four loops in which each loop increments a counter variable ten times. When the program completes, the value of the counter variable is 40
.
tf::Executor executor; tf::Taskflow taskflow; std::atomic<int> counter{0}; auto loop = [&, c = int(0)]() mutable -> tf::SmallVector<int> { counter.fetch_add(1, std::memory_order_relaxed); return {++c < 10 ? 0 : 1}; }; auto init = taskflow.emplace([](){}).name("init"); auto A = taskflow.emplace(loop).name("A"); auto B = taskflow.emplace(loop).name("B"); auto C = taskflow.emplace(loop).name("C"); auto D = taskflow.emplace(loop).name("D"); init.precede(A); A.precede(A, B); B.precede(B, C); C.precede(C, D); D.precede(D); executor.run(taskflow).wait(); // counter == 40 taskflow.dump(std::cout); std::cout << "counter == " << counter << '\n';