Loading...
Searching...
No Matches
taskflow.hpp
1#pragma once
2
3#include "flow_builder.hpp"
4
9
10namespace tf {
11
12// ----------------------------------------------------------------------------
13
64class Taskflow : public FlowBuilder {
65
66 friend class Topology;
67 friend class Executor;
68 friend class FlowBuilder;
69 friend class Subflow;
70
71 struct Dumper {
72 size_t id;
73 std::stack<std::tuple<const Node*, const Graph*, size_t>> stack; // added depth
74 std::unordered_map<const Graph*, size_t> visited;
75 };
76
77 public:
78
87 Taskflow(const std::string& name);
88
92 Taskflow();
93
109 Taskflow(Taskflow&& rhs);
110
127
156 ~Taskflow() = default;
157
184 void dump(std::ostream& ostream) const;
185
192 std::string dump() const;
193
219 size_t num_tasks() const;
220
233 bool empty() const;
234
243 void name(const std::string&);
244
253 const std::string& name() const;
254
262 void clear();
263
277 template <typename V>
278 void for_each_task(V&& visitor) const;
279
313 void remove_dependency(Task from, Task to);
314
322 Graph& graph();
323
324 private:
325
326 mutable std::mutex _mutex;
327
328 std::string _name;
329
330 Graph _graph;
331
332 std::queue<std::shared_ptr<Topology>> _topologies;
333
334 void _dump(std::ostream&, const Graph*) const;
335 void _dump(std::ostream&, const Node*, Dumper&, size_t level) const;
336 void _dump(std::ostream&, const Graph*, Dumper&, size_t level) const;
337
338 size_t _fetch_enqueue(std::shared_ptr<Topology>);
339};
340
341// Constructor
342inline Taskflow::Taskflow(const std::string& name) :
343 FlowBuilder {_graph},
344 _name {name} {
345}
346
347// Constructor
348inline Taskflow::Taskflow() : FlowBuilder{_graph} {
349}
350
351// Move constructor
352inline Taskflow::Taskflow(Taskflow&& rhs) : FlowBuilder{_graph} {
353 std::scoped_lock<std::mutex> lock(rhs._mutex);
354 _name = std::move(rhs._name);
355 _graph = std::move(rhs._graph);
356 _topologies = std::move(rhs._topologies);
357}
358
359// Move assignment
361 if(this != &rhs) {
362 std::scoped_lock<std::mutex, std::mutex> lock(_mutex, rhs._mutex);
363 _name = std::move(rhs._name);
364 _graph = std::move(rhs._graph);
365 _topologies = std::move(rhs._topologies);
366 }
367 return *this;
368}
369
370// Function:
371inline void Taskflow::clear() {
372 _graph.clear();
373}
374
375// Function: num_tasks
376inline size_t Taskflow::num_tasks() const {
377 return _graph.size();
378}
379
380// Function: empty
381inline bool Taskflow::empty() const {
382 return _graph.empty();
383}
384
385// Function: name
386inline void Taskflow::name(const std::string &name) {
387 _name = name;
388}
389
390// Function: name
391inline const std::string& Taskflow::name() const {
392 return _name;
393}
394
395// Function: graph
397 return _graph;
398}
399
400// Function: for_each_task
401template <typename V>
402void Taskflow::for_each_task(V&& visitor) const {
403 for(auto itr = _graph.begin(); itr != _graph.end(); ++itr) {
404 visitor(Task(*itr));
405 }
406}
407
408// Function: remove_dependency
409inline void Taskflow::remove_dependency(Task from, Task to) {
410 // remove "to" from the succcessor list of "from"
411 from._node->_remove_successors(to._node);
412
413 // remove "from" from the predecessor list of "to"
414 to._node->_remove_predecessors(from._node);
415}
416
417// Function: _fetch_enqueue
418inline size_t Taskflow::_fetch_enqueue(std::shared_ptr<Topology> tpg) {
419 std::lock_guard<std::mutex> lock(_mutex);
420 auto pre_size = _topologies.size();
421 _topologies.emplace(std::move(tpg));
422 return pre_size;
423}
424
425// Function: dump
426inline std::string Taskflow::dump() const {
427 std::ostringstream oss;
428 dump(oss);
429 return oss.str();
430}
431
432// Function: dump
433inline void Taskflow::dump(std::ostream& os) const {
434 os << "digraph Taskflow {\n";
435 os << " compound=true;\n"; // required for lhead/ltail cluster edges
436 _dump(os, &_graph);
437 os << "}\n";
438}
439
440// Function: _dump (top-level — iterates module stack)
441inline void Taskflow::_dump(std::ostream& os, const Graph* top) const {
442
443 Dumper dumper;
444
445 dumper.id = 0;
446 dumper.stack.push({nullptr, top, 1});
447 dumper.visited[top] = dumper.id++;
448
449 while(!dumper.stack.empty()) {
450
451 auto [p, f, depth] = dumper.stack.top();
452 dumper.stack.pop();
453
454 std::string ind(depth * 2, ' ');
455 std::string ind2((depth + 1) * 2, ' ');
456
457 os << ind << "subgraph cluster_p" << f << " {\n";
458 os << ind2 << "label=\"";
459
460 // n-level module
461 if(p) {
462 if (p->_name.empty()) os << 'm' << dumper.visited[f];
463 else os << p->name();
464 }
465 // top-level taskflow graph
466 else {
467 os << "Taskflow: ";
468 if(_name.empty()) os << 'p' << this;
469 else os << _name;
470 }
471
472 os << "\";\n";
473
474 _dump(os, f, dumper, depth + 1);
475 os << ind << "}\n";
476 }
477}
478
479// Function: _dump (single node)
480inline void Taskflow::_dump(
481 std::ostream& os, const Node* node, Dumper& dumper, size_t level
482) const {
483
484 std::string ind(level * 2, ' ');
485
486 // label of the node
487 os << ind << 'p' << node << "[label=\"";
488 if(node->_name.empty()) os << 'p' << node;
489 else os << node->_name;
490 os << "\" ";
491
492 // shape of the node
493 switch(node->_handle.index()) {
494
495 case Node::CONDITION:
496 case Node::MULTI_CONDITION:
497 os << "shape=diamond color=black fillcolor=aquamarine style=filled";
498 break;
499
500 default:
501 break;
502 }
503
504 os << "];\n";
505
506 for(size_t s=0; s<node->_num_successors; ++s) {
507 if(node->_is_conditioner()) {
508 os << ind << 'p' << node << " -> p" << node->_edges[s]
509 << " [style=dashed label=\"" << s << "\"];\n";
510 } else {
511 os << ind << 'p' << node << " -> p" << node->_edges[s] << ";\n";
512 }
513 }
514
515 // node info
516 switch(auto hid = node->_handle.index(); hid) {
517
518 case Node::SUBFLOW:
519 case Node::ADOPTED_MODULE: {
520
521 auto& g = (hid == Node::SUBFLOW) ?
522 std::get_if<Node::Subflow>(&node->_handle)->subgraph :
523 std::get_if<Node::AdoptedModule>(&node->_handle)->graph;
524
525 if(!g.empty()) {
526 std::string ind2((level + 1) * 2, ' ');
527
528 os << ind << "subgraph cluster_p" << node << " {\n";
529 os << ind2 << ((hid == Node::SUBFLOW) ? "label=\"Subflow: "
530 : "label=\"AdoptedModule: ");
531 if(node->_name.empty()) os << 'p' << node;
532 else os << node->_name;
533 os << "\";\n";
534 os << ind2 << "color=blue;\n";
535
536 _dump(os, &g, dumper, level + 1);
537 os << ind << "}\n";
538
539 // Single cluster-level join edge: subflow cluster → parent node.
540 // ltail clips the arrow tail at the cluster boundary.
541 auto first = *g.begin();
542 os << ind << 'p' << first << " -> p" << node
543 << " [ltail=cluster_p" << node
544 << " style=dashed color=blue];\n";
545 }
546 }
547 break;
548
549 default:
550 break;
551 }
552}
553
554// Function: _dump (graph — iterates nodes)
555inline void Taskflow::_dump(
556 std::ostream& os, const Graph* graph, Dumper& dumper, size_t level
557) const {
558
559 std::string ind(level * 2, ' ');
560
561 for(auto itr = graph->begin(); itr != graph->end(); ++itr) {
562
563 Node* n = *itr;
564
565 // regular task
566 if(n->_handle.index() != Node::MODULE) {
567 _dump(os, n, dumper, level);
568 }
569 // module task
570 else {
571 auto mgraph = &(std::get_if<Node::Module>(&n->_handle)->graph);
572
573 os << ind << 'p' << n << "[shape=box3d, color=blue, label=\"";
574 if(n->_name.empty()) os << 'p' << n;
575 else os << n->_name;
576
577 if(dumper.visited.find(mgraph) == dumper.visited.end()) {
578 dumper.visited[mgraph] = dumper.id++;
579 dumper.stack.push({n, mgraph, level});
580 }
581
582 if(n->_name.empty()) os << " [m" << dumper.visited[mgraph] << "]";
583 os << "\"];\n";
584
585 for(size_t i=0; i<n->_num_successors; ++i) {
586 os << ind << 'p' << n << " -> " << 'p' << n->_edges[i] << ";\n";
587 }
588 }
589 }
590}
591
592// ----------------------------------------------------------------------------
593// class definition: Future
594// ----------------------------------------------------------------------------
595
629template <typename T>
630class Future : public std::future<T> {
631
632 friend class Executor;
633 friend class Subflow;
634 friend class Runtime;
635
636 public:
637
641 Future() = default;
642
646 Future(const Future&) = delete;
647
651 Future(Future&&) = default;
652
656 Future(std::future<T>&&);
657
661 Future& operator = (const Future&) = delete;
662
666 Future& operator = (Future&&) = default;
667
700 bool cancel();
701
702 private:
703
704 std::weak_ptr<Topology> _topology;
705
706 Future(std::future<T>&&, std::weak_ptr<Topology>);
707};
708
709template <typename T>
710Future<T>::Future(std::future<T>&& f, std::weak_ptr<Topology> p) :
711 std::future<T> {std::move(f)},
712 _topology {std::move(p)} {
713}
714
715template <typename T>
716Future<T>::Future(std::future<T>&& f) : std::future<T> {std::move(f)} {
717}
718
719// Function: cancel
720template <typename T>
722 if(auto ptr = _topology.lock(); ptr) {
723 ptr->_estate.fetch_or(ESTATE::CANCELLED, std::memory_order_relaxed);
724 return true;
725 }
726 return false;
727}
728
729
730} // end of namespace tf. ---------------------------------------------------
bool cancel()
cancels the execution of the running taskflow associated with this future object
Definition taskflow.hpp:721
Future & operator=(const Future &)=delete
disabled copy assignment
Future()=default
default constructor
Future(const Future &)=delete
disabled copy constructor
Future(Future &&)=default
default move constructor
class to create a graph object
Definition graph.hpp:47
class to create a task handle over a taskflow node
Definition task.hpp:263
void clear()
clears the associated task dependency graph
Definition taskflow.hpp:371
bool empty() const
queries if this taskflow is empty (has no tasks)
Definition taskflow.hpp:381
void remove_dependency(Task from, Task to)
removes dependencies that go from task from to task to
Definition taskflow.hpp:409
void for_each_task(V &&visitor) const
applies a visitor to each task in this taskflow
Definition taskflow.hpp:402
Taskflow(const std::string &name)
constructs a taskflow with the given name
Definition taskflow.hpp:342
Graph & graph()
returns a reference to the underlying graph object
Definition taskflow.hpp:396
const std::string & name() const
queries the name of this taskflow
Definition taskflow.hpp:391
std::string dump() const
dumps the taskflow to a std::string of DOT format
Definition taskflow.hpp:426
Taskflow & operator=(Taskflow &&rhs)
move assignment operator
Definition taskflow.hpp:360
~Taskflow()=default
default destructor
void name(const std::string &)
assigns a new name to this taskflow
Definition taskflow.hpp:386
Taskflow()
constructs a taskflow
Definition taskflow.hpp:348
size_t num_tasks() const
queries the number of tasks in this taskflow
Definition taskflow.hpp:376
taskflow namespace
Definition small_vector.hpp:20