Loading...
Searching...
No Matches
data_pipeline.hpp
1#pragma once
2
3#include "pipeline.hpp"
4
5
6namespace tf {
7
8// ----------------------------------------------------------------------------
9// Class Definition: DataPipe
10// ----------------------------------------------------------------------------
11
51template <typename Input, typename Output, typename C>
52class DataPipe {
53
54 template <typename... Ps>
55 friend class DataPipeline;
56
57 public:
58
62 using callable_t = C;
63
67 using input_t = Input;
68
72 using output_t = Output;
73
77 DataPipe() = default;
78
87 _type{d}, _callable{std::forward<callable_t>(callable)} {
88 }
89
96 PipeType type() const {
97 return _type;
98 }
99
104 _type = type;
105 }
106
116 template <typename U>
117 void callable(U&& callable) {
118 _callable = std::forward<U>(callable);
119 }
120
121 private:
122
123 PipeType _type;
124
125 callable_t _callable;
126};
127
170template <typename Input, typename Output, typename C>
171auto make_data_pipe(PipeType d, C&& callable) {
172 return DataPipe<Input, Output, C>(d, std::forward<C>(callable));
173}
174
175// ----------------------------------------------------------------------------
176// Class Definition: DataPipeline
177// ----------------------------------------------------------------------------
178
253template <typename... Ps>
255
256 static_assert(sizeof...(Ps)>0, "must have at least one pipe");
257
261 struct Line {
262 std::atomic<size_t> join_counter;
263 };
264
268 struct PipeMeta {
269 PipeType type;
270 };
271
272
273 public:
274
278 using data_t = unique_variant_t<std::variant<std::conditional_t<
279 std::is_void_v<typename Ps::output_t>,
280 std::monostate,
281 std::decay_t<typename Ps::output_t>>...
282 >>;
283
295 DataPipeline(size_t num_lines, Ps&&... ps);
296
308 DataPipeline(size_t num_lines, std::tuple<Ps...>&& ps);
309
318 size_t num_lines() const noexcept;
319
326 constexpr size_t num_pipes() const noexcept;
327
335 void reset();
336
343 size_t num_tokens() const noexcept;
344
351 Graph& graph();
352
353 private:
354
355 Graph _graph;
356
357 size_t _num_tokens;
358
359 std::tuple<Ps...> _pipes;
360 std::array<PipeMeta, sizeof...(Ps)> _meta;
361 std::vector<std::array<Line, sizeof...(Ps)>> _lines;
362 std::vector<Task> _tasks;
363 std::vector<Pipeflow> _pipeflows;
364 std::vector<CachelineAligned<data_t>> _buffer;
365
366 template <size_t... I>
367 auto _gen_meta(std::tuple<Ps...>&&, std::index_sequence<I...>);
368
369 void _on_pipe(Pipeflow&, NonpreemptiveRuntime&);
370 void _build();
371};
372
373// constructor
374template <typename... Ps>
375DataPipeline<Ps...>::DataPipeline(size_t num_lines, Ps&&... ps) :
376 _pipes {std::make_tuple(std::forward<Ps>(ps)...)},
377 _meta {PipeMeta{ps.type()}...},
378 _lines (num_lines),
379 _tasks (num_lines + 1),
380 _pipeflows (num_lines),
381 _buffer (num_lines) {
382
383 if(num_lines == 0) {
384 TF_THROW("must have at least one line");
385 }
386
387 if(std::get<0>(_pipes).type() != PipeType::SERIAL) {
388 TF_THROW("first pipe must be serial");
389 }
390
391 reset();
392 _build();
393}
394
395// constructor
396template <typename... Ps>
397DataPipeline<Ps...>::DataPipeline(size_t num_lines, std::tuple<Ps...>&& ps) :
398 _pipes {std::forward<std::tuple<Ps...>>(ps)},
399 _meta {_gen_meta(
400 std::forward<std::tuple<Ps...>>(ps), std::make_index_sequence<sizeof...(Ps)>{}
401 )},
402 _lines (num_lines),
403 _tasks (num_lines + 1),
404 _pipeflows (num_lines),
405 _buffer (num_lines) {
406
407 if(num_lines == 0) {
408 TF_THROW("must have at least one line");
409 }
410
411 if(std::get<0>(_pipes).type() != PipeType::SERIAL) {
412 TF_THROW("first pipe must be serial");
413 }
414
415 reset();
416 _build();
417}
418
419// Function: _get_meta
420template <typename... Ps>
421template <size_t... I>
422auto DataPipeline<Ps...>::_gen_meta(std::tuple<Ps...>&& ps, std::index_sequence<I...>) {
423 return std::array{PipeMeta{std::get<I>(ps).type()}...};
424}
425
426// Function: num_lines
427template <typename... Ps>
428size_t DataPipeline<Ps...>::num_lines() const noexcept {
429 return _pipeflows.size();
430}
431
432// Function: num_pipes
433template <typename... Ps>
434constexpr size_t DataPipeline<Ps...>::num_pipes() const noexcept {
435 return sizeof...(Ps);
436}
437
438// Function: num_tokens
439template <typename... Ps>
440size_t DataPipeline<Ps...>::num_tokens() const noexcept {
441 return _num_tokens;
442}
443
444// Function: graph
445template <typename... Ps>
447 return _graph;
448}
449
450// Function: reset
451template <typename... Ps>
453
454 _num_tokens = 0;
455
456 for(size_t l = 0; l<num_lines(); l++) {
457 _pipeflows[l]._pipe = 0;
458 _pipeflows[l]._line = l;
459 }
460
461 _lines[0][0].join_counter.store(0, std::memory_order_relaxed);
462
463 for(size_t l=1; l<num_lines(); l++) {
464 for(size_t f=1; f<num_pipes(); f++) {
465 _lines[l][f].join_counter.store(
466 static_cast<size_t>(_meta[f].type), std::memory_order_relaxed
467 );
468 }
469 }
470
471 for(size_t f=1; f<num_pipes(); f++) {
472 _lines[0][f].join_counter.store(1, std::memory_order_relaxed);
473 }
474
475 for(size_t l=1; l<num_lines(); l++) {
476 _lines[l][0].join_counter.store(
477 static_cast<size_t>(_meta[0].type) - 1, std::memory_order_relaxed
478 );
479 }
480}
481
482// Procedure: _on_pipe
483template <typename... Ps>
484void DataPipeline<Ps...>::_on_pipe(Pipeflow& pf, NonpreemptiveRuntime&) {
485
486 visit_tuple([&](auto&& pipe){
487
488 using data_pipe_t = std::decay_t<decltype(pipe)>;
489 using callable_t = typename data_pipe_t::callable_t;
490 using input_t = std::decay_t<typename data_pipe_t::input_t>;
491 using output_t = std::decay_t<typename data_pipe_t::output_t>;
492
493 // first pipe
494 if constexpr (std::is_invocable_v<callable_t, Pipeflow&>) {
495 // [](tf::Pipeflow&) -> void {}, i.e., we only have one pipe
496 if constexpr (std::is_void_v<output_t>) {
497 pipe._callable(pf);
498 // [](tf::Pipeflow&) -> output_t {}
499 } else {
500 _buffer[pf._line].data = pipe._callable(pf);
501 }
502 }
503 // other pipes without pipeflow in the second argument
504 else if constexpr (std::is_invocable_v<callable_t, std::add_lvalue_reference_t<input_t> >) {
505 // [](input_t&) -> void {}, i.e., the last pipe
506 if constexpr (std::is_void_v<output_t>) {
507 pipe._callable(std::get<input_t>(_buffer[pf._line].data));
508 // [](input_t&) -> output_t {}
509 } else {
510 _buffer[pf._line].data = pipe._callable(
511 std::get<input_t>(_buffer[pf._line].data)
512 );
513 }
514 }
515 // other pipes with pipeflow in the second argument
516 else if constexpr (std::is_invocable_v<callable_t, input_t&, Pipeflow&>) {
517 // [](input_t&, tf::Pipeflow&) -> void {}
518 if constexpr (std::is_void_v<output_t>) {
519 pipe._callable(std::get<input_t>(_buffer[pf._line].data), pf);
520 // [](input_t&, tf::Pipeflow&) -> output_t {}
521 } else {
522 _buffer[pf._line].data = pipe._callable(
523 std::get<input_t>(_buffer[pf._line].data), pf
524 );
525 }
526 }
527 //else if constexpr(std::is_invocable_v<callable_t, Pipeflow&, NonpreemptiveRuntime&>) {
528 // pipe._callable(pf, rt);
529 //}
530 else {
531 static_assert(dependent_false_v<callable_t>, "un-supported pipe callable type");
532 }
533 }, _pipes, pf._pipe);
534}
535
536// Procedure: _build
537template <typename... Ps>
538void DataPipeline<Ps...>::_build() {
539
540 using namespace std::literals::string_literals;
541
542 FlowBuilder fb(_graph);
543
544 // init task
545 _tasks[0] = fb.emplace([this]() {
546 return static_cast<int>(_num_tokens % num_lines());
547 }).name("cond");
548
549 // line task
550 for(size_t l = 0; l < num_lines(); l++) {
551
552 _tasks[l + 1] = fb.emplace([this, l] (tf::NonpreemptiveRuntime& rt) mutable {
553
554 auto pf = &_pipeflows[l];
555
556 pipeline:
557
558 _lines[pf->_line][pf->_pipe].join_counter.store(
559 static_cast<size_t>(_meta[pf->_pipe].type), std::memory_order_relaxed
560 );
561
562 if (pf->_pipe == 0) {
563 pf->_token = _num_tokens;
564 if (pf->_stop = false, _on_pipe(*pf, rt); pf->_stop == true) {
565 // here, the pipeline is not stopped yet because other
566 // lines of tasks may still be running their last stages
567 return;
568 }
569 ++_num_tokens;
570 }
571 else {
572 _on_pipe(*pf, rt);
573 }
574
575 size_t c_f = pf->_pipe;
576 size_t n_f = (pf->_pipe + 1) % num_pipes();
577 size_t n_l = (pf->_line + 1) % num_lines();
578
579 pf->_pipe = n_f;
580
581 // ---- scheduling starts here ----
582 // Notice that the shared variable f must not be changed after this
583 // point because it can result in data race due to the following
584 // condition:
585 //
586 // a -> b
587 // | |
588 // v v
589 // c -> d
590 //
591 // d will be spawned by either c or b, so if c changes f but b spawns d
592 // then data race on f will happen
593
594 std::array<int, 2> retval;
595 size_t n = 0;
596
597 // downward dependency
598 if(_meta[c_f].type == PipeType::SERIAL &&
599 _lines[n_l][c_f].join_counter.fetch_sub(
600 1, std::memory_order_acq_rel) == 1
601 ) {
602 retval[n++] = 1;
603 }
604
605 // forward dependency
606 if(_lines[pf->_line][n_f].join_counter.fetch_sub(
607 1, std::memory_order_acq_rel) == 1
608 ) {
609 retval[n++] = 0;
610 }
611
612 // notice that the task index starts from 1
613 switch(n) {
614 case 2: {
615 rt.schedule(_tasks[n_l+1]);
616 goto pipeline;
617 }
618 case 1: {
619 // downward dependency
620 if (retval[0] == 1) {
621 pf = &_pipeflows[n_l];
622 }
623 // forward dependency
624 goto pipeline;
625 }
626 }
627 }).name("rt-"s + std::to_string(l));
628
629 _tasks[0].precede(_tasks[l+1]);
630 }
631}
632
633
634} // end of namespace tf -----------------------------------------------------
635
636
637
638
639
class to ensure cacheline-aligned storage for an object.
Definition os.hpp:148
class to create a stage in a data-parallel pipeline
Definition data_pipeline.hpp:52
DataPipe(PipeType d, callable_t &&callable)
constructs a data pipe
Definition data_pipeline.hpp:86
void callable(U &&callable)
assigns a new callable to the data pipe
Definition data_pipeline.hpp:117
C callable_t
callable type of the data pipe
Definition data_pipeline.hpp:62
PipeType type() const
queries the type of the data pipe
Definition data_pipeline.hpp:96
Output output_t
output type of the data pipe
Definition data_pipeline.hpp:72
DataPipe()=default
default constructor
Input input_t
input type of the data pipe
Definition data_pipeline.hpp:67
void type(PipeType type)
assigns a new type to the data pipe
Definition data_pipeline.hpp:103
DataPipeline(size_t num_lines, Ps &&... ps)
constructs a data-parallel pipeline object
Definition data_pipeline.hpp:375
size_t num_tokens() const noexcept
queries the number of generated tokens in the pipeline
Definition data_pipeline.hpp:440
constexpr size_t num_pipes() const noexcept
queries the number of pipes
Definition data_pipeline.hpp:434
size_t num_lines() const noexcept
queries the number of parallel lines
Definition data_pipeline.hpp:428
void reset()
resets the pipeline
Definition data_pipeline.hpp:452
Graph & graph()
obtains the graph object associated with the pipeline construct
Definition data_pipeline.hpp:446
unique_variant_t< std::variant< std::conditional_t< std::is_void_v< typename Ps::output_t >, std::monostate, std::decay_t< typename Ps::output_t > >... > > data_t
internal storage type for each data token (default std::variant)
Definition data_pipeline.hpp:278
class to create a graph object
Definition graph.hpp:47
class to create a pipeflow object used by the pipe callable
Definition pipeline.hpp:43
class to create a task handle over a taskflow node
Definition task.hpp:263
taskflow namespace
Definition small_vector.hpp:20
auto make_data_pipe(PipeType d, C &&callable)
function to construct a data pipe (tf::DataPipe)
Definition data_pipeline.hpp:171
PipeType
enumeration of all pipe types
Definition pipeline.hpp:113
@ SERIAL
serial type
Definition pipeline.hpp:117