3#include "../taskflow.hpp"
45 template <
typename... Ps>
46 friend class Pipeline;
49 friend class ScalablePipeline;
51 template <
typename... Ps>
52 friend class DataPipeline;
90 TF_THROW(
"only the first pipe can stop the token");
143template <
typename C = std::function<
void(tf::Pipeflow&)>>
146 template <
typename... Ps>
147 friend class Pipeline;
149 template <
typename P>
150 friend class ScalablePipeline;
182 _type{d}, _callable{std::forward<C>(
callable)} {
211 template <
typename U>
213 _callable = std::forward<U>(
callable);
306template <
typename... Ps>
309 static_assert(
sizeof...(Ps)>0,
"must have at least one pipe");
315 std::atomic<size_t> join_counter;
403 std::tuple<Ps...> _pipes;
404 std::array<PipeMeta,
sizeof...(Ps)> _meta;
405 std::vector<std::array<Line,
sizeof...(Ps)>> _lines;
406 std::vector<Task> _tasks;
407 std::vector<Pipeflow> _pipeflows;
409 template <
size_t... I>
410 auto _gen_meta(std::tuple<Ps...>&&, std::index_sequence<I...>);
412 void _on_pipe(
Pipeflow&, NonpreemptiveRuntime&);
417template <
typename... Ps>
419 _pipes {std::make_tuple(std::forward<Ps>(ps)...)},
420 _meta {PipeMeta{ps.type()}...},
426 TF_THROW(
"must have at least one line");
430 TF_THROW(
"first pipe must be serial");
438template <
typename... Ps>
440 _pipes {std::forward<std::tuple<Ps...>>(ps)},
442 std::forward<std::tuple<Ps...>>(ps), std::make_index_sequence<sizeof...(Ps)>{}
445 _tasks (num_lines + 1),
446 _pipeflows (num_lines) {
449 TF_THROW(
"must have at least one line");
452 if(std::get<0>(_pipes).type() != PipeType::SERIAL) {
453 TF_THROW(
"first pipe must be serial");
461template <
typename... Ps>
462template <
size_t... I>
463auto Pipeline<Ps...>::_gen_meta(std::tuple<Ps...>&& ps, std::index_sequence<I...>) {
464 return std::array{PipeMeta{std::get<I>(ps).type()}...};
468template <
typename... Ps>
470 return _pipeflows.size();
474template <
typename... Ps>
476 return sizeof...(Ps);
480template <
typename... Ps>
486template <
typename... Ps>
492template <
typename... Ps>
498 _pipeflows[l]._pipe = 0;
499 _pipeflows[l]._line = l;
502 _lines[0][0].join_counter.store(0, std::memory_order_relaxed);
506 _lines[l][f].join_counter.store(
507 static_cast<size_t>(_meta[f].type), std::memory_order_relaxed
513 _lines[0][f].join_counter.store(1, std::memory_order_relaxed);
517 _lines[l][0].join_counter.store(
518 static_cast<size_t>(_meta[0].type) - 1, std::memory_order_relaxed
524template <
typename... Ps>
525void Pipeline<Ps...>::_on_pipe(
Pipeflow& pf, NonpreemptiveRuntime& rt) {
526 visit_tuple([&](
auto&& pipe){
527 using callable_t =
typename std::decay_t<
decltype(pipe)>::callable_t;
528 if constexpr (std::is_invocable_v<callable_t, Pipeflow&>) {
531 else if constexpr(std::is_invocable_v<callable_t, Pipeflow&, NonpreemptiveRuntime&>) {
532 pipe._callable(pf, rt);
535 static_assert(dependent_false_v<callable_t>,
"un-supported pipe callable type");
537 }, _pipes, pf._pipe);
541template <
typename... Ps>
542void Pipeline<Ps...>::_build() {
544 using namespace std::literals::string_literals;
546 FlowBuilder fb(_graph);
549 _tasks[0] = fb.emplace([
this]() {
550 return static_cast<int>(_num_tokens % num_lines());
554 for(
size_t l = 0; l < num_lines(); l++) {
556 _tasks[l + 1] = fb.emplace([
this, l] (tf::NonpreemptiveRuntime& rt)
mutable {
558 auto pf = &_pipeflows[l];
562 _lines[pf->_line][pf->_pipe].join_counter.store(
563 static_cast<size_t>(_meta[pf->_pipe].type), std::memory_order_relaxed
567 if (pf->_pipe == 0) {
568 pf->_token = _num_tokens;
569 if (pf->_stop =
false, _on_pipe(*pf, rt); pf->_stop ==
true) {
580 size_t c_f = pf->_pipe;
581 size_t n_f = (pf->_pipe + 1) % num_pipes();
582 size_t n_l = (pf->_line + 1) % num_lines();
599 std::array<int, 2> retval;
603 if(_meta[c_f].type == PipeType::SERIAL &&
604 _lines[n_l][c_f].join_counter.fetch_sub(
605 1, std::memory_order_acq_rel) == 1
611 if(_lines[pf->_line][n_f].join_counter.fetch_sub(
612 1, std::memory_order_acq_rel) == 1
620 rt.schedule(_tasks[n_l+1]);
625 if (retval[0] == 1) {
626 pf = &_pipeflows[n_l];
632 }).name(
"nprt-"s + std::to_string(l));
634 _tasks[0].precede(_tasks[l+1]);
781 std::atomic<size_t> join_counter;
790 using pipe_t =
typename std::iterator_traits<P>::value_type;
897 void reset(P first, P last);
938 size_t _num_tokens{0};
940 std::vector<P> _pipes;
941 std::vector<Task> _tasks;
942 std::vector<Pipeflow> _pipeflows;
943 std::unique_ptr<Line[]> _lines;
945 void _on_pipe(
Pipeflow&, NonpreemptiveRuntime&);
948 Line& _line(
size_t,
size_t);
958 TF_THROW(
"must have at least one line");
971 TF_THROW(
"must have at least one line");
981 _num_tokens {rhs._num_tokens},
982 _pipes {std::move(rhs._pipes)},
983 _pipeflows {std::move(rhs._pipeflows)},
984 _lines {std::move(rhs._lines)} {
987 _tasks.resize(_pipeflows.size()+1);
996 _num_tokens = rhs._num_tokens;
997 _pipes = std::move(rhs._pipes);
998 _pipeflows = std::move(rhs._pipeflows);
999 _lines = std::move(rhs._lines);
1002 _tasks.resize(_pipeflows.size()+1);
1004 rhs._num_tokens = 0;
1011template <
typename P>
1013 return _pipeflows.size();
1017template <
typename P>
1019 return _pipes.size();
1023template <
typename P>
1029template <
typename P>
1035template <
typename P>
1036typename ScalablePipeline<P>::Line& ScalablePipeline<P>::_line(
size_t l,
size_t p) {
1037 return _lines[l*num_pipes() + p];
1040template <
typename P>
1044 TF_THROW(
"must have at least one line");
1057template <
typename P>
1060 size_t num_pipes =
static_cast<size_t>(std::distance(first, last));
1063 TF_THROW(
"pipeline cannot be empty");
1067 TF_THROW(
"first pipe must be serial");
1073 for(
auto itr = first; itr != last; itr++) {
1077 _lines = std::make_unique<Line[]>(
num_lines() * _pipes.size());
1083template <
typename P>
1089 _pipeflows[l]._pipe = 0;
1090 _pipeflows[l]._line = l;
1093 _line(0, 0).join_counter.store(0, std::memory_order_relaxed);
1097 _line(l, f).join_counter.store(
1098 static_cast<size_t>(_pipes[f]->type()), std::memory_order_relaxed
1104 _line(0, f).join_counter.store(1, std::memory_order_relaxed);
1108 _line(l, 0).join_counter.store(
1109 static_cast<size_t>(_pipes[0]->type()) - 1, std::memory_order_relaxed
1115template <
typename P>
1116void ScalablePipeline<P>::_on_pipe(
Pipeflow& pf, NonpreemptiveRuntime& rt) {
1118 using callable_t =
typename pipe_t::callable_t;
1120 if constexpr (std::is_invocable_v<callable_t, Pipeflow&>) {
1121 _pipes[pf._pipe]->_callable(pf);
1123 else if constexpr(std::is_invocable_v<callable_t, Pipeflow&, NonpreemptiveRuntime&>) {
1124 _pipes[pf._pipe]->_callable(pf, rt);
1127 static_assert(dependent_false_v<callable_t>,
"un-supported pipe callable type");
1132template <
typename P>
1133void ScalablePipeline<P>::_build() {
1135 using namespace std::literals::string_literals;
1140 _tasks[0] = fb.emplace([
this]() {
1141 return static_cast<int>(_num_tokens % num_lines());
1145 for(
size_t l = 0; l < num_lines(); l++) {
1147 _tasks[l + 1] = fb.emplace([
this, l] (tf::NonpreemptiveRuntime& rt)
mutable {
1149 auto pf = &_pipeflows[l];
1153 _line(pf->_line, pf->_pipe).join_counter.store(
1154 static_cast<size_t>(_pipes[pf->_pipe]->type()), std::memory_order_relaxed
1158 if (pf->_pipe == 0) {
1159 pf->_token = _num_tokens;
1160 if (pf->_stop =
false, _on_pipe(*pf, rt); pf->_stop ==
true) {
1171 size_t c_f = pf->_pipe;
1172 size_t n_f = (pf->_pipe + 1) % num_pipes();
1173 size_t n_l = (pf->_line + 1) % num_lines();
1190 std::array<int, 2> retval;
1195 _line(n_l, c_f).join_counter.fetch_sub(
1196 1, std::memory_order_acq_rel) == 1
1202 if(_line(pf->_line, n_f).join_counter.fetch_sub(
1203 1, std::memory_order_acq_rel) == 1
1211 rt.schedule(_tasks[n_l+1]);
1215 if (retval[0] == 1) {
1216 pf = &_pipeflows[n_l];
1221 }).name(
"nprt-"s + std::to_string(l));
1223 _tasks[0].precede(_tasks[l+1]);
class to build a task dependency graph
Definition flow_builder.hpp:22
class to create a graph object
Definition graph.hpp:47
void clear()
clears the graph
Definition graph.hpp:881
PipeType type() const
queries the type of the pipe
Definition pipeline.hpp:190
void callable(U &&callable)
assigns a new callable to the pipe
Definition pipeline.hpp:212
C callable_t
alias of the callable type
Definition pipeline.hpp:157
Pipe()=default
default constructor
void type(PipeType type)
assigns a new type to the pipe
Definition pipeline.hpp:199
Pipe(PipeType d, C &&callable)
constructs the pipe object
Definition pipeline.hpp:181
class to create a pipeflow object used by the pipe callable
Definition pipeline.hpp:43
size_t token() const
queries the token identifier
Definition pipeline.hpp:78
size_t pipe() const
queries the pipe identifier of the present token
Definition pipeline.hpp:71
Pipeflow()=default
default constructor
void stop()
stops the pipeline scheduling
Definition pipeline.hpp:88
size_t line() const
queries the line identifier of the present token
Definition pipeline.hpp:64
void reset()
resets the pipeline
Definition pipeline.hpp:493
Graph & graph()
obtains the graph object associated with the pipeline construct
Definition pipeline.hpp:487
size_t num_lines() const noexcept
queries the number of parallel lines
Definition pipeline.hpp:469
size_t num_tokens() const noexcept
queries the number of generated tokens in the pipeline
Definition pipeline.hpp:481
Pipeline(size_t num_lines, Ps &&... ps)
constructs a pipeline object
Definition pipeline.hpp:418
constexpr size_t num_pipes() const
queries the number of pipes
Definition pipeline.hpp:475
ScalablePipeline(const ScalablePipeline &)=delete
disabled copy constructor
ScalablePipeline()=default
default constructor
Graph & graph()
obtains the graph object associated with the pipeline construct
Definition pipeline.hpp:1030
ScalablePipeline & operator=(const ScalablePipeline &)=delete
disabled copy assignment operator
size_t num_lines() const noexcept
queries the number of parallel lines
Definition pipeline.hpp:1012
size_t num_tokens() const noexcept
queries the number of generated tokens in the pipeline
Definition pipeline.hpp:1024
size_t num_pipes() const noexcept
queries the number of pipes
Definition pipeline.hpp:1018
void reset()
resets the pipeline
Definition pipeline.hpp:1084
typename std::iterator_traits< P >::value_type pipe_t
pipe type
Definition pipeline.hpp:790
taskflow namespace
Definition small_vector.hpp:20
PipeType
enumeration of all pipe types
Definition pipeline.hpp:113
@ SERIAL
serial type
Definition pipeline.hpp:117
@ PARALLEL
parallel type
Definition pipeline.hpp:115