Loading...
Searching...
No Matches
runtime.hpp
1#pragma once
2
3#include "executor.hpp"
4
5namespace tf {
6
7// ------------------------------------------------------------------------------------------------
8// class: Runtime
9// ------------------------------------------------------------------------------------------------
10
47class Runtime {
48
49 friend class Executor;
50 friend class FlowBuilder;
51 friend class PreemptionGuard;
52 friend class Algorithm;
53
54 public:
55
71 Executor& executor();
72
76 inline Worker& worker();
77
118 void schedule(Task task);
119
120 // ----------------------------------------------------------------------------------------------
121 // async methods
122 // ----------------------------------------------------------------------------------------------
123
156 template <typename F>
157 auto async(F&& f);
158
179 template <typename P, typename F>
180 auto async(P&& params, F&& f);
181
182 // ----------------------------------------------------------------------------------------------
183 // silent async methods
184 // ----------------------------------------------------------------------------------------------
185
206 template <typename F>
207 void silent_async(F&& f);
208
225 template <typename P, typename F>
226 void silent_async(P&& params, F&& f);
227
228 // ----------------------------------------------------------------------------------------------
229 // dependent async methods
230 // ----------------------------------------------------------------------------------------------
231
266 template <typename F, typename... Tasks>
267 requires (std::same_as<std::decay_t<Tasks>, AsyncTask> && ...)
268 auto dependent_async(F&& func, Tasks&&... tasks);
269
308 template <TaskParameters P, typename F, typename... Tasks>
309 requires (std::same_as<std::decay_t<Tasks>, AsyncTask> && ...)
310 auto dependent_async(P&& params, F&& func, Tasks&&... tasks);
311
349 template <typename F, typename I>
350 requires (!std::same_as<std::decay_t<I>, AsyncTask>)
351 auto dependent_async(F&& func, I first, I last);
352
394 template <TaskParameters P, typename F, typename I>
395 requires (!std::same_as<std::decay_t<I>, AsyncTask>)
396 auto dependent_async(P&& params, F&& func, I first, I last);
397
398 // ----------------------------------------------------------------------------------------------
399 // silent dependent async methods
400 // ----------------------------------------------------------------------------------------------
401
429 template <typename F, typename... Tasks>
430 requires (std::same_as<std::decay_t<Tasks>, AsyncTask> && ...)
431 tf::AsyncTask silent_dependent_async(F&& func, Tasks&&... tasks);
432
464 template <TaskParameters P, typename F, typename... Tasks>
465 requires (std::same_as<std::decay_t<Tasks>, AsyncTask> && ...)
466 tf::AsyncTask silent_dependent_async(P&& params, F&& func, Tasks&&... tasks);
467
500 template <typename F, typename I>
501 requires (!std::same_as<std::decay_t<I>, AsyncTask>)
502 tf::AsyncTask silent_dependent_async(F&& func, I first, I last);
503
538 template <TaskParameters P, typename F, typename I>
539 requires (!std::same_as<std::decay_t<I>, AsyncTask>)
540 tf::AsyncTask silent_dependent_async(P&& params, F&& func, I first, I last);
541
542
543
544 // ----------------------------------------------------------------------------------------------
545 // cooperative execution methods
546 // ----------------------------------------------------------------------------------------------
547
578 void corun();
579
583 void corun_all();
584
588 bool is_cancelled();
589
590 private:
591
595 explicit Runtime(Executor&, Worker&, Node*);
596
600 Executor& _executor;
601
605 Worker& _worker;
606
610 Node* _node;
611};
612
613// constructor
614inline Runtime::Runtime(Executor& executor, Worker& worker, Node* node) :
615 _executor {executor},
616 _worker {worker},
617 _node {node} {
618}
619
620// Function: executor
621inline Executor& Runtime::executor() {
622 return _executor;
623}
624
625// Function: worker
627 return _worker;
628}
629
630// Procedure: schedule
631inline void Runtime::schedule(Task task) {
632
633 auto node = task._node;
634 // need to keep the invariant: when scheduling a task, the task must have
635 // zero dependency (join counter is 0)
636 // or we can encounter bug when inserting a nested flow (e.g., module task)
637 node->_join_counter.store(0, std::memory_order_relaxed);
638
639 auto& j = node->_parent ? node->_parent->_join_counter :
640 node->_topology->_join_counter;
641 j.fetch_add(1, std::memory_order_relaxed);
642 _executor._schedule(_worker, node);
643}
644
645// Function: corun
646inline void Runtime::corun() {
647 {
648 ExplicitAnchorGuard anchor(_node);
649 _executor._corun_until(_worker, [this] () -> bool {
650 return _node->_join_counter.load(std::memory_order_acquire) == 1;
651 });
652 }
653 _node->_rethrow_exception();
654}
655
656// Function: corun_all
657inline void Runtime::corun_all() {
658 corun();
659}
660
661inline bool Runtime::is_cancelled() {
662 return _node->_is_parent_cancelled();
663}
664
665// ------------------------------------------------------------------------------------------------
666// Runtime::silent_async
667// ------------------------------------------------------------------------------------------------
668
669// Function: silent_async
670template <typename F>
672 silent_async(DefaultTaskParams{}, std::forward<F>(f));
673}
674
675// Function: silent_async
676template <typename P, typename F>
677void Runtime::silent_async(P&& params, F&& f) {
678 _node->_join_counter.fetch_add(1, std::memory_order_relaxed);
679 _executor._silent_async(
680 std::forward<P>(params), std::forward<F>(f), _node->_topology, _node
681 );
682}
683
684// ------------------------------------------------------------------------------------------------
685// Runtime::async
686// ------------------------------------------------------------------------------------------------
687
688// Function: async
689template <typename F>
690auto Runtime::async(F&& f) {
691 return async(DefaultTaskParams{}, std::forward<F>(f));
692}
693
694// Function: async
695template <typename P, typename F>
696auto Runtime::async(P&& params, F&& f) {
697 _node->_join_counter.fetch_add(1, std::memory_order_relaxed);
698 return _executor._async(
699 std::forward<P>(params), std::forward<F>(f), _node->_topology, _node
700 );
701}
702
703// ------------------------------------------------------------------------------------------------
704// silent dependent async
705// ------------------------------------------------------------------------------------------------
706
707// Function: silent_dependent_async
708template <typename F, typename... Tasks>
709requires (std::same_as<std::decay_t<Tasks>, AsyncTask> && ...)
712 DefaultTaskParams{}, std::forward<F>(func), std::forward<Tasks>(tasks)...
713 );
714}
715
716// Function: silent_dependent_async
717template <TaskParameters P, typename F, typename... Tasks>
718requires (std::same_as<std::decay_t<Tasks>, AsyncTask> && ...)
720 P&& params, F&& func, Tasks&&... tasks
721){
722 std::array<AsyncTask, sizeof...(Tasks)> array = { std::forward<Tasks>(tasks)... };
724 std::forward<P>(params), std::forward<F>(func), array.begin(), array.end()
725 );
726}
727
728// Function: silent_dependent_async
729template <typename F, typename I>
730requires (!std::same_as<std::decay_t<I>, AsyncTask>)
732 return silent_dependent_async(DefaultTaskParams{}, std::forward<F>(func), first, last);
733}
734
735// Function: silent_dependent_async
736template <TaskParameters P, typename F, typename I>
737requires (!std::same_as<std::decay_t<I>, AsyncTask>)
739 P&& params, F&& func, I first, I last
740) {
741 _node->_join_counter.fetch_add(1, std::memory_order_relaxed);
742 return _executor._silent_dependent_async(
743 std::forward<P>(params), std::forward<F>(func), first, last, _node->_topology, _node
744 );
745}
746
747// ------------------------------------------------------------------------------------------------
748// dependent async
749// ------------------------------------------------------------------------------------------------
750
751// Function: dependent_async
752template <typename F, typename... Tasks>
753requires (std::same_as<std::decay_t<Tasks>, AsyncTask> && ...)
754auto Runtime::dependent_async(F&& func, Tasks&&... tasks) {
755 return dependent_async(DefaultTaskParams{}, std::forward<F>(func), std::forward<Tasks>(tasks)...);
756}
757
758// Function: dependent_async
759template <TaskParameters P, typename F, typename... Tasks>
760requires (std::same_as<std::decay_t<Tasks>, AsyncTask> && ...)
761auto Runtime::dependent_async(P&& params, F&& func, Tasks&&... tasks) {
762 std::array<AsyncTask, sizeof...(Tasks)> array = { std::forward<Tasks>(tasks)... };
763 return dependent_async(
764 std::forward<P>(params), std::forward<F>(func), array.begin(), array.end()
765 );
766}
767
768// Function: dependent_async
769template <typename F, typename I>
770requires (!std::same_as<std::decay_t<I>, AsyncTask>)
771auto Runtime::dependent_async(F&& func, I first, I last) {
772 return dependent_async(DefaultTaskParams{}, std::forward<F>(func), first, last);
773}
774
775// Function: dependent_async
776template <TaskParameters P, typename F, typename I>
777requires (!std::same_as<std::decay_t<I>, AsyncTask>)
778auto Runtime::dependent_async(P&& params, F&& func, I first, I last) {
779 _node->_join_counter.fetch_add(1, std::memory_order_relaxed);
780 return _executor._dependent_async(
781 std::forward<P>(params), std::forward<F>(func), first, last, _node->_topology, _node
782 );
783}
784
785// ----------------------------------------------------------------------------
786// Executor Forward Declaration
787// ----------------------------------------------------------------------------
788
789// Procedure: _invoke_runtime_task
790inline bool Executor::_invoke_runtime_task(Worker& worker, Node* node) {
791 return _invoke_runtime_task_impl(
792 worker, node, std::get_if<Node::Runtime>(&node->_handle)->work
793 );
794}
795
796// Function: _invoke_runtime_task_impl
797inline bool Executor::_invoke_runtime_task_impl(
798 Worker& worker, Node* node, std::function<void(Runtime&)>& work
799) {
800 // first time
801 if((node->_nstate & NSTATE::PREEMPTED) == 0) {
802
803 Runtime rt(*this, worker, node);
804
805 node->_nstate |= (NSTATE::PREEMPTED | NSTATE::IMPLICITLY_ANCHORED);
806
807 node->_join_counter.fetch_add(1, std::memory_order_release);
808
809 _observer_prologue(worker, node);
810 TF_EXECUTOR_EXCEPTION_HANDLER(worker, node, {
811 work(rt);
812 });
813 _observer_epilogue(worker, node);
814
815 // Last one to leave the runtime; no need to preempt this runtime.
816 if(node->_join_counter.fetch_sub(1, std::memory_order_acq_rel) == 1) {
817 node->_nstate &= ~(NSTATE::PREEMPTED | NSTATE::IMPLICITLY_ANCHORED);
818 }
819 // There are still child tasks running; need to preempt this runtime.
820 // Here, we cannot let caller check the state from node->_nstate due to data race,
821 // but return a stateless boolean to indicate preemption.
822 // Ex: if preempted, another task may finish real quck and insert this parent task
823 // again into the scheduling queue. When running this parent task, it will jump to
824 // else branch below and modify tne nstate, thus incuring data race.
825 else {
826 return true;
827 }
828 }
829 // second time - previously preempted
830 else {
831 node->_nstate &= ~(NSTATE::PREEMPTED | NSTATE::IMPLICITLY_ANCHORED);
832 }
833 return false;
834}
835
836// Function: _invoke_runtime_task_impl
837inline bool Executor::_invoke_runtime_task_impl(
838 Worker& worker, Node* node, std::function<void(Runtime&, bool)>& work
839) {
840
841 Runtime rt(*this, worker, node);
842
843 // first time
844 if((node->_nstate & NSTATE::PREEMPTED) == 0) {
845
846 node->_nstate |= (NSTATE::PREEMPTED | NSTATE::IMPLICITLY_ANCHORED);
847 node->_join_counter.fetch_add(1, std::memory_order_release);
848
849 _observer_prologue(worker, node);
850 TF_EXECUTOR_EXCEPTION_HANDLER(worker, node, {
851 work(rt, false);
852 });
853 _observer_epilogue(worker, node);
854
855 // Last one to leave this runtime; no need to preempt this runtime
856 if(node->_join_counter.fetch_sub(1, std::memory_order_acq_rel) == 1) {
857 node->_nstate &= ~(NSTATE::PREEMPTED | NSTATE::IMPLICITLY_ANCHORED);
858 }
859 // Here, we cannot let caller check the state from node->_nstate due to data race,
860 // but return a stateless boolean to indicate preemption.
861 // Ex: if preempted, another task may finish real quck and insert this parent task
862 // again into the scheduling queue. When running this parent task, it will jump to
863 // else branch below and modify tne nstate, thus incuring data race.
864 else {
865 return true;
866 }
867 }
868 // second time - previously preempted
869 else {
870 node->_nstate &= ~(NSTATE::PREEMPTED | NSTATE::IMPLICITLY_ANCHORED);
871 }
872
873 // clean up outstanding work (e.g., exception)
874 work(rt, true);
875
876 return false;
877}
878
879// ------------------------------------------------------------------------------------------------
880// class: NonpreemptiveRuntime (internal use only)
881// ------------------------------------------------------------------------------------------------
882
886class NonpreemptiveRuntime {
887
888 friend class Executor;
889
890 public:
891
895 void schedule(Task task);
896
897 private:
898
902 explicit NonpreemptiveRuntime(Executor& executor, Worker& worker) :
903 _executor {executor}, _worker {worker}{
904 }
905
909 Executor& _executor;
910
914 Worker& _worker;
915};
916
917// Procedure: schedule
918inline void NonpreemptiveRuntime::schedule(Task task) {
919
920 auto node = task._node;
921 // need to keep the invariant: when scheduling a task, the task must have
922 // zero dependency (join counter is 0)
923 // or we can encounter bug when inserting a nested flow (e.g., module task)
924 node->_join_counter.store(0, std::memory_order_relaxed);
925
926 auto& j = node->_parent ? node->_parent->_join_counter :
927 node->_topology->_join_counter;
928 j.fetch_add(1, std::memory_order_relaxed);
929 _executor._schedule(_worker, node);
930}
931
932// ----------------------------------------------------------------------------
933// Executor Forward Declaration
934// ----------------------------------------------------------------------------
935
936// Procedure: _invoke_nonpreemptive_runtime_task
937inline void Executor::_invoke_nonpreemptive_runtime_task(Worker& worker, Node* node) {
938 _observer_prologue(worker, node);
939 tf::NonpreemptiveRuntime nprt(*this, worker);
940 TF_EXECUTOR_EXCEPTION_HANDLER(worker, node, {
941 std::get_if<Node::NonpreemptiveRuntime>(&node->_handle)->work(nprt);
942 });
943 _observer_epilogue(worker, node);
944}
945
946
947// Function: run_until
948template <typename P, typename C>
950
951 // No need to create a real topology but returns an dummy future for invariant.
952 if(f.empty() || p()) {
953 c();
954 std::promise<void> promise;
955 promise.set_value();
956 return tf::Future<void>(promise.get_future());
957 }
958
959 _increment_topology();
960
961 auto g = std::make_unique<Taskflow>(std::move(f));
962
963 // creates a topology for this run
964 auto t = std::make_shared<Topology>(*g, std::forward<P>(p), std::forward<C>(c));
965 //auto t = std::make_shared<DerivedTopology<P, C>>(*g, std::forward<P>(p), std::forward<C>(c));
966
967 // need to create future before the topology got torn down quickly
968 tf::Future<void> future(t->_promise.get_future(), t);
969
970 // creates a silent-async that holds the taskflow
971 silent_async([g=MoC{std::move(g)}, t](tf::Runtime& rt) mutable {
972 t->_parent = rt._node;
973 t->_parent->_join_counter.fetch_add(1, std::memory_order_release);
974 if(g.object->_fetch_enqueue(t) == 0) {
975 rt._executor._schedule_graph(
976 rt._worker, g.object->_graph, t.get(), t.get()
977 );
978 }
979 });
980
981 return future;
982}
983
984} // end of namespace tf -----------------------------------------------------
class to hold a dependent asynchronous task with shared ownership
Definition async_task.hpp:45
class to create an empty task parameter for compile-time optimization
Definition graph.hpp:166
class to create an executor
Definition executor.hpp:62
void silent_async(P &&params, F &&func)
similar to tf::Executor::async but does not return a future object
tf::Future< void > run_until(Taskflow &taskflow, P &&pred)
runs a taskflow multiple times until the predicate becomes true
class to access the result of an execution
Definition taskflow.hpp:630
class to create a runtime task
Definition runtime.hpp:47
tf::AsyncTask silent_dependent_async(F &&func, Tasks &&... tasks)
runs the given function asynchronously when the given predecessors finish
Definition runtime.hpp:710
void silent_async(F &&f)
runs the given function asynchronously without returning any future object
Definition runtime.hpp:671
bool is_cancelled()
queries if this runtime task has been cancelled
Definition runtime.hpp:661
Executor & executor()
obtains the running executor
Definition runtime.hpp:621
auto async(F &&f)
runs the given callable asynchronously
Definition runtime.hpp:690
void schedule(Task task)
schedules an active task immediately to the worker's queue
Definition runtime.hpp:631
void corun()
corun all tasks spawned by this runtime with other workers
Definition runtime.hpp:646
auto dependent_async(F &&func, Tasks &&... tasks)
runs the given function asynchronously when the given predecessors finish
Definition runtime.hpp:754
Worker & worker()
acquire a reference to the underlying worker
Definition runtime.hpp:626
void corun_all()
equivalent to tf::Runtime::corun - just an alias for legacy purpose
Definition runtime.hpp:657
class to create a task handle over a taskflow node
Definition task.hpp:263
class to create a taskflow object
Definition taskflow.hpp:64
class to create a worker in an executor
Definition worker.hpp:55
determines if a type is a task parameter type
Definition graph.hpp:177
taskflow namespace
Definition small_vector.hpp:20