49 friend class Executor;
50 friend class FlowBuilder;
51 friend class PreemptionGuard;
52 friend class Algorithm;
156 template <
typename F>
179 template <
typename P,
typename F>
180 auto async(P&& params, F&& f);
206 template <
typename F>
225 template <
typename P,
typename F>
266 template <
typename F,
typename... Tasks>
267 requires (std::same_as<std::decay_t<Tasks>,
AsyncTask> && ...)
309 requires (std::same_as<std::decay_t<Tasks>,
AsyncTask> && ...)
349 template <
typename F,
typename I>
350 requires (!std::same_as<std::decay_t<I>,
AsyncTask>)
394 template <TaskParameters P,
typename F,
typename I>
395 requires (!std::same_as<std::decay_t<I>,
AsyncTask>)
429 template <
typename F,
typename... Tasks>
430 requires (std::same_as<std::decay_t<Tasks>,
AsyncTask> && ...)
465 requires (std::same_as<std::decay_t<Tasks>,
AsyncTask> && ...)
500 template <
typename F,
typename I>
501 requires (!std::same_as<std::decay_t<I>,
AsyncTask>)
538 template <TaskParameters P,
typename F,
typename I>
539 requires (!std::same_as<std::decay_t<I>,
AsyncTask>)
595 explicit Runtime(Executor&,
Worker&, Node*);
614inline Runtime::Runtime(
Executor& executor,
Worker& worker, Node* node) :
615 _executor {executor},
633 auto node = task._node;
637 node->_join_counter.store(0, std::memory_order_relaxed);
639 auto& j = node->_parent ? node->_parent->_join_counter :
640 node->_topology->_join_counter;
641 j.fetch_add(1, std::memory_order_relaxed);
642 _executor._schedule(_worker, node);
648 ExplicitAnchorGuard anchor(_node);
649 _executor._corun_until(_worker, [
this] () ->
bool {
650 return _node->_join_counter.load(std::memory_order_acquire) == 1;
653 _node->_rethrow_exception();
662 return _node->_is_parent_cancelled();
676template <
typename P,
typename F>
678 _node->_join_counter.fetch_add(1, std::memory_order_relaxed);
679 _executor._silent_async(
680 std::forward<P>(params), std::forward<F>(f), _node->_topology, _node
695template <
typename P,
typename F>
697 _node->_join_counter.fetch_add(1, std::memory_order_relaxed);
698 return _executor._async(
699 std::forward<P>(params), std::forward<F>(f), _node->_topology, _node
708template <
typename F,
typename... Tasks>
709requires (std::same_as<std::decay_t<Tasks>,
AsyncTask> && ...)
718requires (std::same_as<std::decay_t<Tasks>,
AsyncTask> && ...)
720 P&& params, F&& func, Tasks&&... tasks
722 std::array<
AsyncTask,
sizeof...(Tasks)> array = { std::forward<Tasks>(tasks)... };
724 std::forward<P>(params), std::forward<F>(func), array.begin(), array.end()
729template <
typename F,
typename I>
730requires (!std::same_as<std::decay_t<I>,
AsyncTask>)
736template <TaskParameters P,
typename F,
typename I>
737requires (!std::same_as<std::decay_t<I>,
AsyncTask>)
739 P&& params, F&& func, I first, I last
741 _node->_join_counter.fetch_add(1, std::memory_order_relaxed);
742 return _executor._silent_dependent_async(
743 std::forward<P>(params), std::forward<F>(func), first, last, _node->_topology, _node
752template <
typename F,
typename... Tasks>
753requires (std::same_as<std::decay_t<Tasks>,
AsyncTask> && ...)
760requires (std::same_as<std::decay_t<Tasks>,
AsyncTask> && ...)
762 std::array<
AsyncTask,
sizeof...(Tasks)> array = { std::forward<Tasks>(tasks)... };
764 std::forward<P>(params), std::forward<F>(func), array.begin(), array.end()
769template <
typename F,
typename I>
770requires (!std::same_as<std::decay_t<I>,
AsyncTask>)
776template <TaskParameters P,
typename F,
typename I>
777requires (!std::same_as<std::decay_t<I>,
AsyncTask>)
779 _node->_join_counter.fetch_add(1, std::memory_order_relaxed);
780 return _executor._dependent_async(
781 std::forward<P>(params), std::forward<F>(func), first, last, _node->_topology, _node
790inline bool Executor::_invoke_runtime_task(
Worker& worker, Node* node) {
791 return _invoke_runtime_task_impl(
792 worker, node, std::get_if<Node::Runtime>(&node->_handle)->work
797inline bool Executor::_invoke_runtime_task_impl(
798 Worker& worker, Node* node, std::function<
void(Runtime&)>& work
801 if((node->_nstate & NSTATE::PREEMPTED) == 0) {
803 Runtime rt(*
this, worker, node);
805 node->_nstate |= (NSTATE::PREEMPTED | NSTATE::IMPLICITLY_ANCHORED);
807 node->_join_counter.fetch_add(1, std::memory_order_release);
809 _observer_prologue(worker, node);
810 TF_EXECUTOR_EXCEPTION_HANDLER(worker, node, {
813 _observer_epilogue(worker, node);
816 if(node->_join_counter.fetch_sub(1, std::memory_order_acq_rel) == 1) {
817 node->_nstate &= ~(NSTATE::PREEMPTED | NSTATE::IMPLICITLY_ANCHORED);
831 node->_nstate &= ~(NSTATE::PREEMPTED | NSTATE::IMPLICITLY_ANCHORED);
837inline bool Executor::_invoke_runtime_task_impl(
838 Worker& worker, Node* node, std::function<
void(
Runtime&,
bool)>& work
841 Runtime rt(*
this, worker, node);
844 if((node->_nstate & NSTATE::PREEMPTED) == 0) {
846 node->_nstate |= (NSTATE::PREEMPTED | NSTATE::IMPLICITLY_ANCHORED);
847 node->_join_counter.fetch_add(1, std::memory_order_release);
849 _observer_prologue(worker, node);
850 TF_EXECUTOR_EXCEPTION_HANDLER(worker, node, {
853 _observer_epilogue(worker, node);
856 if(node->_join_counter.fetch_sub(1, std::memory_order_acq_rel) == 1) {
857 node->_nstate &= ~(NSTATE::PREEMPTED | NSTATE::IMPLICITLY_ANCHORED);
870 node->_nstate &= ~(NSTATE::PREEMPTED | NSTATE::IMPLICITLY_ANCHORED);
886class NonpreemptiveRuntime {
888 friend class Executor;
895 void schedule(Task task);
902 explicit NonpreemptiveRuntime(Executor& executor, Worker& worker) :
903 _executor {executor}, _worker {worker}{
918inline void NonpreemptiveRuntime::schedule(
Task task) {
920 auto node = task._node;
924 node->_join_counter.store(0, std::memory_order_relaxed);
926 auto& j = node->_parent ? node->_parent->_join_counter :
927 node->_topology->_join_counter;
928 j.fetch_add(1, std::memory_order_relaxed);
929 _executor._schedule(_worker, node);
937inline void Executor::_invoke_nonpreemptive_runtime_task(
Worker& worker, Node* node) {
938 _observer_prologue(worker, node);
939 tf::NonpreemptiveRuntime nprt(*
this, worker);
940 TF_EXECUTOR_EXCEPTION_HANDLER(worker, node, {
941 std::get_if<Node::NonpreemptiveRuntime>(&node->_handle)->work(nprt);
943 _observer_epilogue(worker, node);
948template <
typename P,
typename C>
952 if(f.empty() || p()) {
954 std::promise<void> promise;
959 _increment_topology();
961 auto g = std::make_unique<Taskflow>(std::move(f));
964 auto t = std::make_shared<Topology>(*g, std::forward<P>(p), std::forward<C>(c));
972 t->_parent = rt._node;
973 t->_parent->_join_counter.fetch_add(1, std::memory_order_release);
974 if(g.object->_fetch_enqueue(t) == 0) {
975 rt._executor._schedule_graph(
976 rt._worker, g.object->_graph, t.get(), t.get()
class to hold a dependent asynchronous task with shared ownership
Definition async_task.hpp:45
class to create an empty task parameter for compile-time optimization
Definition graph.hpp:166
class to create an executor
Definition executor.hpp:62
void silent_async(P &¶ms, F &&func)
similar to tf::Executor::async but does not return a future object
tf::Future< void > run_until(Taskflow &taskflow, P &&pred)
runs a taskflow multiple times until the predicate becomes true
class to access the result of an execution
Definition taskflow.hpp:630
class to create a runtime task
Definition runtime.hpp:47
tf::AsyncTask silent_dependent_async(F &&func, Tasks &&... tasks)
runs the given function asynchronously when the given predecessors finish
Definition runtime.hpp:710
void silent_async(F &&f)
runs the given function asynchronously without returning any future object
Definition runtime.hpp:671
bool is_cancelled()
queries if this runtime task has been cancelled
Definition runtime.hpp:661
Executor & executor()
obtains the running executor
Definition runtime.hpp:621
auto async(F &&f)
runs the given callable asynchronously
Definition runtime.hpp:690
void schedule(Task task)
schedules an active task immediately to the worker's queue
Definition runtime.hpp:631
void corun()
corun all tasks spawned by this runtime with other workers
Definition runtime.hpp:646
auto dependent_async(F &&func, Tasks &&... tasks)
runs the given function asynchronously when the given predecessors finish
Definition runtime.hpp:754
Worker & worker()
acquire a reference to the underlying worker
Definition runtime.hpp:626
void corun_all()
equivalent to tf::Runtime::corun - just an alias for legacy purpose
Definition runtime.hpp:657
class to create a task handle over a taskflow node
Definition task.hpp:263
class to create a taskflow object
Definition taskflow.hpp:64
class to create a worker in an executor
Definition worker.hpp:55
determines if a type is a task parameter type
Definition graph.hpp:177
taskflow namespace
Definition small_vector.hpp:20