51template <
typename Input,
typename Output,
typename C>
54 template <
typename... Ps>
55 friend class DataPipeline;
116 template <
typename U>
118 _callable = std::forward<U>(
callable);
170template <
typename Input,
typename Output,
typename C>
253template <
typename... Ps>
256 static_assert(
sizeof...(Ps)>0,
"must have at least one pipe");
262 std::atomic<size_t> join_counter;
278 using data_t = unique_variant_t<std::variant<std::conditional_t<
279 std::is_void_v<typename Ps::output_t>,
281 std::decay_t<typename Ps::output_t>>...
326 constexpr
size_t num_pipes() const noexcept;
359 std::tuple<Ps...> _pipes;
360 std::array<PipeMeta, sizeof...(Ps)> _meta;
361 std::vector<std::array<Line, sizeof...(Ps)>> _lines;
362 std::vector<
Task> _tasks;
366 template <
size_t... I>
367 auto _gen_meta(std::tuple<Ps...>&&, std::index_sequence<I...>);
369 void _on_pipe(
Pipeflow&, NonpreemptiveRuntime&);
374template <typename... Ps>
376 _pipes {std::make_tuple(std::forward<Ps>(ps)...)},
377 _meta {PipeMeta{ps.type()}...},
384 TF_THROW(
"must have at least one line");
388 TF_THROW(
"first pipe must be serial");
396template <
typename... Ps>
398 _pipes {std::forward<std::tuple<Ps...>>(ps)},
400 std::forward<std::tuple<Ps...>>(ps), std::make_index_sequence<sizeof...(Ps)>{}
403 _tasks (num_lines + 1),
404 _pipeflows (num_lines),
405 _buffer (num_lines) {
408 TF_THROW(
"must have at least one line");
411 if(std::get<0>(_pipes).type() != PipeType::SERIAL) {
412 TF_THROW(
"first pipe must be serial");
420template <
typename... Ps>
421template <
size_t... I>
422auto DataPipeline<Ps...>::_gen_meta(std::tuple<Ps...>&& ps, std::index_sequence<I...>) {
423 return std::array{PipeMeta{std::get<I>(ps).type()}...};
427template <
typename... Ps>
429 return _pipeflows.size();
433template <
typename... Ps>
435 return sizeof...(Ps);
439template <
typename... Ps>
445template <
typename... Ps>
451template <
typename... Ps>
457 _pipeflows[l]._pipe = 0;
458 _pipeflows[l]._line = l;
461 _lines[0][0].join_counter.store(0, std::memory_order_relaxed);
465 _lines[l][f].join_counter.store(
466 static_cast<size_t>(_meta[f].type), std::memory_order_relaxed
472 _lines[0][f].join_counter.store(1, std::memory_order_relaxed);
476 _lines[l][0].join_counter.store(
477 static_cast<size_t>(_meta[0].type) - 1, std::memory_order_relaxed
483template <
typename... Ps>
484void DataPipeline<Ps...>::_on_pipe(
Pipeflow& pf, NonpreemptiveRuntime&) {
486 visit_tuple([&](
auto&& pipe){
488 using data_pipe_t = std::decay_t<
decltype(pipe)>;
489 using callable_t =
typename data_pipe_t::callable_t;
490 using input_t = std::decay_t<typename data_pipe_t::input_t>;
491 using output_t = std::decay_t<typename data_pipe_t::output_t>;
494 if constexpr (std::is_invocable_v<callable_t, Pipeflow&>) {
496 if constexpr (std::is_void_v<output_t>) {
500 _buffer[pf._line].data = pipe._callable(pf);
504 else if constexpr (std::is_invocable_v<callable_t, std::add_lvalue_reference_t<input_t> >) {
506 if constexpr (std::is_void_v<output_t>) {
507 pipe._callable(std::get<input_t>(_buffer[pf._line].data));
510 _buffer[pf._line].data = pipe._callable(
511 std::get<input_t>(_buffer[pf._line].data)
516 else if constexpr (std::is_invocable_v<callable_t, input_t&, Pipeflow&>) {
518 if constexpr (std::is_void_v<output_t>) {
519 pipe._callable(std::get<input_t>(_buffer[pf._line].data), pf);
522 _buffer[pf._line].data = pipe._callable(
523 std::get<input_t>(_buffer[pf._line].data), pf
531 static_assert(dependent_false_v<callable_t>,
"un-supported pipe callable type");
533 }, _pipes, pf._pipe);
537template <
typename... Ps>
538void DataPipeline<Ps...>::_build() {
540 using namespace std::literals::string_literals;
542 FlowBuilder fb(_graph);
545 _tasks[0] = fb.emplace([
this]() {
546 return static_cast<int>(_num_tokens % num_lines());
550 for(
size_t l = 0; l < num_lines(); l++) {
552 _tasks[l + 1] = fb.emplace([
this, l] (tf::NonpreemptiveRuntime& rt)
mutable {
554 auto pf = &_pipeflows[l];
558 _lines[pf->_line][pf->_pipe].join_counter.store(
559 static_cast<size_t>(_meta[pf->_pipe].type), std::memory_order_relaxed
562 if (pf->_pipe == 0) {
563 pf->_token = _num_tokens;
564 if (pf->_stop =
false, _on_pipe(*pf, rt); pf->_stop ==
true) {
575 size_t c_f = pf->_pipe;
576 size_t n_f = (pf->_pipe + 1) % num_pipes();
577 size_t n_l = (pf->_line + 1) % num_lines();
594 std::array<int, 2> retval;
598 if(_meta[c_f].type == PipeType::SERIAL &&
599 _lines[n_l][c_f].join_counter.fetch_sub(
600 1, std::memory_order_acq_rel) == 1
606 if(_lines[pf->_line][n_f].join_counter.fetch_sub(
607 1, std::memory_order_acq_rel) == 1
615 rt.schedule(_tasks[n_l+1]);
620 if (retval[0] == 1) {
621 pf = &_pipeflows[n_l];
627 }).name(
"rt-"s + std::to_string(l));
629 _tasks[0].precede(_tasks[l+1]);
class to ensure cacheline-aligned storage for an object.
Definition os.hpp:148
class to create a stage in a data-parallel pipeline
Definition data_pipeline.hpp:52
DataPipe(PipeType d, callable_t &&callable)
constructs a data pipe
Definition data_pipeline.hpp:86
void callable(U &&callable)
assigns a new callable to the data pipe
Definition data_pipeline.hpp:117
C callable_t
callable type of the data pipe
Definition data_pipeline.hpp:62
PipeType type() const
queries the type of the data pipe
Definition data_pipeline.hpp:96
Output output_t
output type of the data pipe
Definition data_pipeline.hpp:72
DataPipe()=default
default constructor
Input input_t
input type of the data pipe
Definition data_pipeline.hpp:67
void type(PipeType type)
assigns a new type to the data pipe
Definition data_pipeline.hpp:103
DataPipeline(size_t num_lines, Ps &&... ps)
constructs a data-parallel pipeline object
Definition data_pipeline.hpp:375
size_t num_tokens() const noexcept
queries the number of generated tokens in the pipeline
Definition data_pipeline.hpp:440
constexpr size_t num_pipes() const noexcept
queries the number of pipes
Definition data_pipeline.hpp:434
size_t num_lines() const noexcept
queries the number of parallel lines
Definition data_pipeline.hpp:428
void reset()
resets the pipeline
Definition data_pipeline.hpp:452
Graph & graph()
obtains the graph object associated with the pipeline construct
Definition data_pipeline.hpp:446
unique_variant_t< std::variant< std::conditional_t< std::is_void_v< typename Ps::output_t >, std::monostate, std::decay_t< typename Ps::output_t > >... > > data_t
internal storage type for each data token (default std::variant)
Definition data_pipeline.hpp:278
class to create a graph object
Definition graph.hpp:47
class to create a pipeflow object used by the pipe callable
Definition pipeline.hpp:43
class to create a task handle over a taskflow node
Definition task.hpp:263
taskflow namespace
Definition small_vector.hpp:20
auto make_data_pipe(PipeType d, C &&callable)
function to construct a data pipe (tf::DataPipe)
Definition data_pipeline.hpp:171
PipeType
enumeration of all pipe types
Definition pipeline.hpp:113
@ SERIAL
serial type
Definition pipeline.hpp:117