Loading...
Searching...
No Matches
pipeline.hpp
1#pragma once
2
3#include "../taskflow.hpp"
4
9
10namespace tf {
11
12
13// ----------------------------------------------------------------------------
14// Class Definition: Pipeflow
15// ----------------------------------------------------------------------------
16
43class Pipeflow {
44
45 template <typename... Ps>
46 friend class Pipeline;
47
48 template <typename P>
49 friend class ScalablePipeline;
50
51 template <typename... Ps>
52 friend class DataPipeline;
53
54 public:
55
59 Pipeflow() = default;
60
64 size_t line() const {
65 return _line;
66 }
67
71 size_t pipe() const {
72 return _pipe;
73 }
74
78 size_t token() const {
79 return _token;
80 }
81
88 void stop() {
89 if(_pipe != 0) {
90 TF_THROW("only the first pipe can stop the token");
91 }
92 _stop = true;
93 }
94
95 private:
96
97 // Regular data
98 size_t _line;
99 size_t _pipe;
100 size_t _token;
101 bool _stop;
102};
103
104// ----------------------------------------------------------------------------
105// Class Definition: PipeType
106// ----------------------------------------------------------------------------
107
113enum class PipeType : int {
118};
119
120// ----------------------------------------------------------------------------
121// Class Definition: Pipe
122// ----------------------------------------------------------------------------
123
143template <typename C = std::function<void(tf::Pipeflow&)>>
144class Pipe {
145
146 template <typename... Ps>
147 friend class Pipeline;
148
149 template <typename P>
150 friend class ScalablePipeline;
151
152 public:
153
157 using callable_t = C;
158
162 Pipe() = default;
163
182 _type{d}, _callable{std::forward<C>(callable)} {
183 }
184
190 PipeType type() const {
191 return _type;
192 }
193
200 _type = type;
201 }
202
211 template <typename U>
212 void callable(U&& callable) {
213 _callable = std::forward<U>(callable);
214 }
215
216 private:
217
218 PipeType _type;
219
220 C _callable;
221};
222
223// ----------------------------------------------------------------------------
224// Class Definition: Pipeline
225// ----------------------------------------------------------------------------
226
306template <typename... Ps>
307class Pipeline {
308
309 static_assert(sizeof...(Ps)>0, "must have at least one pipe");
310
314 struct Line {
315 std::atomic<size_t> join_counter;
316 };
317
321 struct PipeMeta {
322 PipeType type;
323 };
324
325 public:
326
338 Pipeline(size_t num_lines, Ps&&... ps);
339
351 Pipeline(size_t num_lines, std::tuple<Ps...>&& ps);
352
361 size_t num_lines() const noexcept;
362
369 constexpr size_t num_pipes() const;
370
378 void reset();
379
386 size_t num_tokens() const noexcept;
387
394 Graph& graph();
395
396
397 private:
398
399 Graph _graph;
400
401 size_t _num_tokens;
402
403 std::tuple<Ps...> _pipes;
404 std::array<PipeMeta, sizeof...(Ps)> _meta;
405 std::vector<std::array<Line, sizeof...(Ps)>> _lines;
406 std::vector<Task> _tasks;
407 std::vector<Pipeflow> _pipeflows;
408
409 template <size_t... I>
410 auto _gen_meta(std::tuple<Ps...>&&, std::index_sequence<I...>);
411
412 void _on_pipe(Pipeflow&, NonpreemptiveRuntime&);
413 void _build();
414};
415
416// constructor
417template <typename... Ps>
419 _pipes {std::make_tuple(std::forward<Ps>(ps)...)},
420 _meta {PipeMeta{ps.type()}...},
421 _lines (num_lines),
422 _tasks (num_lines + 1),
423 _pipeflows (num_lines) {
424
425 if(num_lines == 0) {
426 TF_THROW("must have at least one line");
427 }
428
429 if(std::get<0>(_pipes).type() != PipeType::SERIAL) {
430 TF_THROW("first pipe must be serial");
431 }
432
433 reset();
434 _build();
435}
436
437// constructor
438template <typename... Ps>
439Pipeline<Ps...>::Pipeline(size_t num_lines, std::tuple<Ps...>&& ps) :
440 _pipes {std::forward<std::tuple<Ps...>>(ps)},
441 _meta {_gen_meta(
442 std::forward<std::tuple<Ps...>>(ps), std::make_index_sequence<sizeof...(Ps)>{}
443 )},
444 _lines (num_lines),
445 _tasks (num_lines + 1),
446 _pipeflows (num_lines) {
447
448 if(num_lines == 0) {
449 TF_THROW("must have at least one line");
450 }
451
452 if(std::get<0>(_pipes).type() != PipeType::SERIAL) {
453 TF_THROW("first pipe must be serial");
454 }
455
456 reset();
457 _build();
458}
459
460// Function: _get_meta
461template <typename... Ps>
462template <size_t... I>
463auto Pipeline<Ps...>::_gen_meta(std::tuple<Ps...>&& ps, std::index_sequence<I...>) {
464 return std::array{PipeMeta{std::get<I>(ps).type()}...};
465}
466
467// Function: num_lines
468template <typename... Ps>
469size_t Pipeline<Ps...>::num_lines() const noexcept {
470 return _pipeflows.size();
471}
472
473// Function: num_pipes
474template <typename... Ps>
475constexpr size_t Pipeline<Ps...>::num_pipes() const {
476 return sizeof...(Ps);
477}
478
479// Function: num_tokens
480template <typename... Ps>
481size_t Pipeline<Ps...>::num_tokens() const noexcept {
482 return _num_tokens;
483}
484
485// Function: graph
486template <typename... Ps>
488 return _graph;
489}
490
491// Function: reset
492template <typename... Ps>
494
495 _num_tokens = 0;
496
497 for(size_t l = 0; l<num_lines(); l++) {
498 _pipeflows[l]._pipe = 0;
499 _pipeflows[l]._line = l;
500 }
501
502 _lines[0][0].join_counter.store(0, std::memory_order_relaxed);
503
504 for(size_t l=1; l<num_lines(); l++) {
505 for(size_t f=1; f<num_pipes(); f++) {
506 _lines[l][f].join_counter.store(
507 static_cast<size_t>(_meta[f].type), std::memory_order_relaxed
508 );
509 }
510 }
511
512 for(size_t f=1; f<num_pipes(); f++) {
513 _lines[0][f].join_counter.store(1, std::memory_order_relaxed);
514 }
515
516 for(size_t l=1; l<num_lines(); l++) {
517 _lines[l][0].join_counter.store(
518 static_cast<size_t>(_meta[0].type) - 1, std::memory_order_relaxed
519 );
520 }
521}
522
523// Procedure: _on_pipe
524template <typename... Ps>
525void Pipeline<Ps...>::_on_pipe(Pipeflow& pf, NonpreemptiveRuntime& rt) {
526 visit_tuple([&](auto&& pipe){
527 using callable_t = typename std::decay_t<decltype(pipe)>::callable_t;
528 if constexpr (std::is_invocable_v<callable_t, Pipeflow&>) {
529 pipe._callable(pf);
530 }
531 else if constexpr(std::is_invocable_v<callable_t, Pipeflow&, NonpreemptiveRuntime&>) {
532 pipe._callable(pf, rt);
533 }
534 else {
535 static_assert(dependent_false_v<callable_t>, "un-supported pipe callable type");
536 }
537 }, _pipes, pf._pipe);
538}
539
540// Procedure: _build
541template <typename... Ps>
542void Pipeline<Ps...>::_build() {
543
544 using namespace std::literals::string_literals;
545
546 FlowBuilder fb(_graph);
547
548 // init task
549 _tasks[0] = fb.emplace([this]() {
550 return static_cast<int>(_num_tokens % num_lines());
551 }).name("cond");
552
553 // line task
554 for(size_t l = 0; l < num_lines(); l++) {
555
556 _tasks[l + 1] = fb.emplace([this, l] (tf::NonpreemptiveRuntime& rt) mutable {
557
558 auto pf = &_pipeflows[l];
559
560 pipeline:
561
562 _lines[pf->_line][pf->_pipe].join_counter.store(
563 static_cast<size_t>(_meta[pf->_pipe].type), std::memory_order_relaxed
564 );
565
566 // First pipe does all jobs of initialization and token dependencies
567 if (pf->_pipe == 0) {
568 pf->_token = _num_tokens;
569 if (pf->_stop = false, _on_pipe(*pf, rt); pf->_stop == true) {
570 // here, the pipeline is not stopped yet because other
571 // lines of tasks may still be running their last stages
572 return;
573 }
574 ++_num_tokens;
575 }
576 else {
577 _on_pipe(*pf, rt);
578 }
579
580 size_t c_f = pf->_pipe;
581 size_t n_f = (pf->_pipe + 1) % num_pipes();
582 size_t n_l = (pf->_line + 1) % num_lines();
583
584 pf->_pipe = n_f;
585
586 // ---- scheduling starts here ----
587 // Notice that the shared variable f must not be changed after this
588 // point because it can result in data race due to the following
589 // condition:
590 //
591 // a -> b
592 // | |
593 // v v
594 // c -> d
595 //
596 // d will be spawned by either c or b, so if c changes f but b spawns d
597 // then data race on f will happen
598
599 std::array<int, 2> retval;
600 size_t n = 0;
601
602 // downward dependency
603 if(_meta[c_f].type == PipeType::SERIAL &&
604 _lines[n_l][c_f].join_counter.fetch_sub(
605 1, std::memory_order_acq_rel) == 1
606 ) {
607 retval[n++] = 1;
608 }
609
610 // forward dependency
611 if(_lines[pf->_line][n_f].join_counter.fetch_sub(
612 1, std::memory_order_acq_rel) == 1
613 ) {
614 retval[n++] = 0;
615 }
616
617 // notice that the task index starts from 1
618 switch(n) {
619 case 2: {
620 rt.schedule(_tasks[n_l+1]);
621 goto pipeline;
622 }
623 case 1: {
624 // downward dependency
625 if (retval[0] == 1) {
626 pf = &_pipeflows[n_l];
627 }
628 // forward dependency
629 goto pipeline;
630 }
631 }
632 }).name("nprt-"s + std::to_string(l));
633
634 _tasks[0].precede(_tasks[l+1]);
635 }
636}
637
638// ----------------------------------------------------------------------------
639// Class Definition: ScalablePipeline
640// ----------------------------------------------------------------------------
641
774template <typename P>
776
780 struct Line {
781 std::atomic<size_t> join_counter;
782 };
783
784
785 public:
786
790 using pipe_t = typename std::iterator_traits<P>::value_type;
791
795 ScalablePipeline() = default;
796
807
824 ScalablePipeline(size_t num_lines, P first, P last);
825
830
840
844 ScalablePipeline& operator = (const ScalablePipeline&) = delete;
845
854 ScalablePipeline& operator = (ScalablePipeline&& rhs);
855
864 size_t num_lines() const noexcept;
865
872 size_t num_pipes() const noexcept;
873
880 void reset();
881
897 void reset(P first, P last);
898
916 void reset(size_t num_lines, P first, P last);
917
924 size_t num_tokens() const noexcept;
925
932 Graph& graph();
933
934 private:
935
936 Graph _graph;
937
938 size_t _num_tokens{0};
939
940 std::vector<P> _pipes;
941 std::vector<Task> _tasks;
942 std::vector<Pipeflow> _pipeflows;
943 std::unique_ptr<Line[]> _lines;
944
945 void _on_pipe(Pipeflow&, NonpreemptiveRuntime&);
946 void _build();
947
948 Line& _line(size_t, size_t);
949};
950
951// constructor
952template <typename P>
954 _tasks (num_lines + 1),
955 _pipeflows (num_lines) {
956
957 if(num_lines == 0) {
958 TF_THROW("must have at least one line");
959 }
960
961 _build();
962}
963
964// constructor
965template <typename P>
967 _tasks (num_lines + 1),
968 _pipeflows (num_lines) {
969
970 if(num_lines == 0) {
971 TF_THROW("must have at least one line");
972 }
973
974 reset(first, last);
975 _build();
976}
977
978// move constructor
979template <typename P>
981 _num_tokens {rhs._num_tokens},
982 _pipes {std::move(rhs._pipes)},
983 _pipeflows {std::move(rhs._pipeflows)},
984 _lines {std::move(rhs._lines)} {
985
986 _graph.clear();
987 _tasks.resize(_pipeflows.size()+1);
988 rhs._num_tokens = 0;
989 rhs._tasks.clear();
990 _build();
991}
992
993// move assignment operator
994template <typename P>
996 _num_tokens = rhs._num_tokens;
997 _pipes = std::move(rhs._pipes);
998 _pipeflows = std::move(rhs._pipeflows);
999 _lines = std::move(rhs._lines);
1000
1001 _graph.clear();
1002 _tasks.resize(_pipeflows.size()+1);
1003
1004 rhs._num_tokens = 0;
1005 rhs._tasks.clear();
1006 _build();
1007 return *this;
1008}
1009
1010// Function: num_lines
1011template <typename P>
1012size_t ScalablePipeline<P>::num_lines() const noexcept {
1013 return _pipeflows.size();
1014}
1015
1016// Function: num_pipes
1017template <typename P>
1018size_t ScalablePipeline<P>::num_pipes() const noexcept {
1019 return _pipes.size();
1020}
1021
1022// Function: num_tokens
1023template <typename P>
1024size_t ScalablePipeline<P>::num_tokens() const noexcept {
1025 return _num_tokens;
1026}
1027
1028// Function: graph
1029template <typename P>
1031 return _graph;
1032}
1033
1034// Function: _line
1035template <typename P>
1036typename ScalablePipeline<P>::Line& ScalablePipeline<P>::_line(size_t l, size_t p) {
1037 return _lines[l*num_pipes() + p];
1038}
1039
1040template <typename P>
1041void ScalablePipeline<P>::reset(size_t num_lines, P first, P last) {
1042
1043 if(num_lines == 0) {
1044 TF_THROW("must have at least one line");
1045 }
1046
1047 _graph.clear();
1048 _tasks.resize(num_lines + 1);
1049 _pipeflows.resize(num_lines);
1050
1051 reset(first, last);
1052
1053 _build();
1054}
1055
1056// Function: reset
1057template <typename P>
1058void ScalablePipeline<P>::reset(P first, P last) {
1059
1060 size_t num_pipes = static_cast<size_t>(std::distance(first, last));
1061
1062 if(num_pipes == 0) {
1063 TF_THROW("pipeline cannot be empty");
1064 }
1065
1066 if(first->type() != PipeType::SERIAL) {
1067 TF_THROW("first pipe must be serial");
1068 }
1069
1070 _pipes.resize(num_pipes);
1071
1072 size_t i=0;
1073 for(auto itr = first; itr != last; itr++) {
1074 _pipes[i++] = itr;
1075 }
1076
1077 _lines = std::make_unique<Line[]>(num_lines() * _pipes.size());
1078
1079 reset();
1080}
1081
1082// Function: reset
1083template <typename P>
1085
1086 _num_tokens = 0;
1087
1088 for(size_t l = 0; l<num_lines(); l++) {
1089 _pipeflows[l]._pipe = 0;
1090 _pipeflows[l]._line = l;
1091 }
1092
1093 _line(0, 0).join_counter.store(0, std::memory_order_relaxed);
1094
1095 for(size_t l=1; l<num_lines(); l++) {
1096 for(size_t f=1; f<num_pipes(); f++) {
1097 _line(l, f).join_counter.store(
1098 static_cast<size_t>(_pipes[f]->type()), std::memory_order_relaxed
1099 );
1100 }
1101 }
1102
1103 for(size_t f=1; f<num_pipes(); f++) {
1104 _line(0, f).join_counter.store(1, std::memory_order_relaxed);
1105 }
1106
1107 for(size_t l=1; l<num_lines(); l++) {
1108 _line(l, 0).join_counter.store(
1109 static_cast<size_t>(_pipes[0]->type()) - 1, std::memory_order_relaxed
1110 );
1111 }
1112}
1113
1114// Procedure: _on_pipe
1115template <typename P>
1116void ScalablePipeline<P>::_on_pipe(Pipeflow& pf, NonpreemptiveRuntime& rt) {
1117
1118 using callable_t = typename pipe_t::callable_t;
1119
1120 if constexpr (std::is_invocable_v<callable_t, Pipeflow&>) {
1121 _pipes[pf._pipe]->_callable(pf);
1122 }
1123 else if constexpr(std::is_invocable_v<callable_t, Pipeflow&, NonpreemptiveRuntime&>) {
1124 _pipes[pf._pipe]->_callable(pf, rt);
1125 }
1126 else {
1127 static_assert(dependent_false_v<callable_t>, "un-supported pipe callable type");
1128 }
1129}
1130
1131// Procedure: _build
1132template <typename P>
1133void ScalablePipeline<P>::_build() {
1134
1135 using namespace std::literals::string_literals;
1136
1137 FlowBuilder fb(_graph);
1138
1139 // init task
1140 _tasks[0] = fb.emplace([this]() {
1141 return static_cast<int>(_num_tokens % num_lines());
1142 }).name("cond");
1143
1144 // line task
1145 for(size_t l = 0; l < num_lines(); l++) {
1146
1147 _tasks[l + 1] = fb.emplace([this, l] (tf::NonpreemptiveRuntime& rt) mutable {
1148
1149 auto pf = &_pipeflows[l];
1150
1151 pipeline:
1152
1153 _line(pf->_line, pf->_pipe).join_counter.store(
1154 static_cast<size_t>(_pipes[pf->_pipe]->type()), std::memory_order_relaxed
1155 );
1156
1157 // First pipe does all jobs of initialization and token dependencies
1158 if (pf->_pipe == 0) {
1159 pf->_token = _num_tokens;
1160 if (pf->_stop = false, _on_pipe(*pf, rt); pf->_stop == true) {
1161 // here, the pipeline is not stopped yet because other
1162 // lines of tasks may still be running their last stages
1163 return;
1164 }
1165 ++_num_tokens;
1166 }
1167 else {
1168 _on_pipe(*pf, rt);
1169 }
1170
1171 size_t c_f = pf->_pipe;
1172 size_t n_f = (pf->_pipe + 1) % num_pipes();
1173 size_t n_l = (pf->_line + 1) % num_lines();
1174
1175 pf->_pipe = n_f;
1176
1177 // ---- scheduling starts here ----
1178 // Notice that the shared variable f must not be changed after this
1179 // point because it can result in data race due to the following
1180 // condition:
1181 //
1182 // a -> b
1183 // | |
1184 // v v
1185 // c -> d
1186 //
1187 // d will be spawned by either c or b, so if c changes f but b spawns d
1188 // then data race on f will happen
1189
1190 std::array<int, 2> retval;
1191 size_t n = 0;
1192
1193 // downward dependency
1194 if(_pipes[c_f]->type() == PipeType::SERIAL &&
1195 _line(n_l, c_f).join_counter.fetch_sub(
1196 1, std::memory_order_acq_rel) == 1
1197 ) {
1198 retval[n++] = 1;
1199 }
1200
1201 // forward dependency
1202 if(_line(pf->_line, n_f).join_counter.fetch_sub(
1203 1, std::memory_order_acq_rel) == 1
1204 ) {
1205 retval[n++] = 0;
1206 }
1207
1208 // notice that the task index starts from 1
1209 switch(n) {
1210 case 2: {
1211 rt.schedule(_tasks[n_l+1]);
1212 goto pipeline;
1213 }
1214 case 1: {
1215 if (retval[0] == 1) {
1216 pf = &_pipeflows[n_l];
1217 }
1218 goto pipeline;
1219 }
1220 }
1221 }).name("nprt-"s + std::to_string(l));
1222
1223 _tasks[0].precede(_tasks[l+1]);
1224 }
1225}
1226
1227} // end of namespace tf -----------------------------------------------------
1228
1229
1230
1231
1232
class to build a task dependency graph
Definition flow_builder.hpp:22
class to create a graph object
Definition graph.hpp:47
void clear()
clears the graph
Definition graph.hpp:881
PipeType type() const
queries the type of the pipe
Definition pipeline.hpp:190
void callable(U &&callable)
assigns a new callable to the pipe
Definition pipeline.hpp:212
C callable_t
alias of the callable type
Definition pipeline.hpp:157
Pipe()=default
default constructor
void type(PipeType type)
assigns a new type to the pipe
Definition pipeline.hpp:199
Pipe(PipeType d, C &&callable)
constructs the pipe object
Definition pipeline.hpp:181
class to create a pipeflow object used by the pipe callable
Definition pipeline.hpp:43
size_t token() const
queries the token identifier
Definition pipeline.hpp:78
size_t pipe() const
queries the pipe identifier of the present token
Definition pipeline.hpp:71
Pipeflow()=default
default constructor
void stop()
stops the pipeline scheduling
Definition pipeline.hpp:88
size_t line() const
queries the line identifier of the present token
Definition pipeline.hpp:64
void reset()
resets the pipeline
Definition pipeline.hpp:493
Graph & graph()
obtains the graph object associated with the pipeline construct
Definition pipeline.hpp:487
size_t num_lines() const noexcept
queries the number of parallel lines
Definition pipeline.hpp:469
size_t num_tokens() const noexcept
queries the number of generated tokens in the pipeline
Definition pipeline.hpp:481
Pipeline(size_t num_lines, Ps &&... ps)
constructs a pipeline object
Definition pipeline.hpp:418
constexpr size_t num_pipes() const
queries the number of pipes
Definition pipeline.hpp:475
ScalablePipeline(const ScalablePipeline &)=delete
disabled copy constructor
ScalablePipeline()=default
default constructor
Graph & graph()
obtains the graph object associated with the pipeline construct
Definition pipeline.hpp:1030
ScalablePipeline & operator=(const ScalablePipeline &)=delete
disabled copy assignment operator
size_t num_lines() const noexcept
queries the number of parallel lines
Definition pipeline.hpp:1012
size_t num_tokens() const noexcept
queries the number of generated tokens in the pipeline
Definition pipeline.hpp:1024
size_t num_pipes() const noexcept
queries the number of pipes
Definition pipeline.hpp:1018
void reset()
resets the pipeline
Definition pipeline.hpp:1084
typename std::iterator_traits< P >::value_type pipe_t
pipe type
Definition pipeline.hpp:790
taskflow namespace
Definition small_vector.hpp:20
PipeType
enumeration of all pipe types
Definition pipeline.hpp:113
@ SERIAL
serial type
Definition pipeline.hpp:117
@ PARALLEL
parallel type
Definition pipeline.hpp:115