5#include "async_task.hpp"
64 friend class FlowBuilder;
67 friend class NonpreemptiveRuntime;
68 friend class Algorithm;
69 friend class TaskGroup;
92 size_t N = std::thread::hardware_concurrency(),
93 std::shared_ptr<WorkerInterface> wif =
nullptr
379 template<
typename P,
typename C>
410 template<
typename P,
typename C>
453 template <
typename T>
484 template <
typename P>
600 template <typename Observer, typename... ArgsT>
608 template <typename Observer>
645 template <typename P, typename F>
671 template <typename F>
697 template <typename P, typename F>
722 template <typename F>
756 template <typename F, typename... Tasks>
757requires (std::same_as<std::decay_t<Tasks>,
AsyncTask> && ...)
792 requires (std::same_as<std::decay_t<Tasks>,
AsyncTask> && ...)
827 template <typename F, typename I>
828requires (!std::same_as<std::decay_t<I>,
AsyncTask>)
866 requires (!std::same_as<std::decay_t<I>,
AsyncTask>)
910 template <typename F, typename... Tasks>
911requires (std::same_as<std::decay_t<Tasks>,
AsyncTask> && ...)
956 requires (std::same_as<std::decay_t<Tasks>,
AsyncTask> && ...)
999 template <typename F, typename I>
1000requires (!std::same_as<std::decay_t<I>,
AsyncTask>)
1048 requires (!std::same_as<std::decay_t<I>,
AsyncTask>)
1109 std::vector<Worker> _workers;
1110 std::vector<Buffer> _buffers;
1115 alignas(TF_CACHELINE_SIZE) std::atomic<size_t> _num_topologies {0};
1117 std::unordered_map<std::thread::id, Worker*> _t2w;
1118 std::unordered_set<std::shared_ptr<ObserverInterface>> _observers;
1121 void _observer_prologue(Worker&, Node*);
1122 void _observer_epilogue(Worker&, Node*);
1123 void _spawn(
size_t, std::shared_ptr<WorkerInterface>);
1124 void _exploit_task(Worker&, Node*&);
1125 bool _explore_task(Worker&, Node*&);
1126 void _schedule(Worker&, Node*);
1127 void _schedule(Node*);
1128 void _schedule_graph(Worker&, Graph&, Topology*, NodeBase*);
1130 void _set_up_topology(Worker*, Topology*);
1131 void _tear_down_topology(Worker&, Topology*, Node*&);
1132 void _tear_down_async(Worker&, Node*, Node*&);
1133 void _tear_down_dependent_async(Worker&, Node*, Node*&);
1134 void _tear_down_nonasync(Worker&, Node*, Node*&);
1135 void _tear_down_invoke(Worker&, Node*, Node*&);
1136 void _increment_topology();
1137 void _decrement_topology();
1138 void _invoke(Worker&, Node*);
1139 void _invoke_static_task(Worker&, Node*);
1140 void _invoke_nonpreemptive_runtime_task(Worker&, Node*);
1141 void _invoke_condition_task(Worker&, Node*, SmallVector<int>&);
1142 void _invoke_multi_condition_task(Worker&, Node*, SmallVector<int>&);
1143 void _process_dependent_async(Node*,
tf::AsyncTask&,
size_t&);
1144 void _process_exception(Worker&, Node*);
1145 void _update_cache(Worker&, Node*&, Node*);
1146 void _corun_graph(Worker&, Graph&, Topology*, NodeBase*);
1148 bool _wait_for_task(Worker&, Node*&);
1149 bool _invoke_subflow_task(Worker&, Node*);
1150 bool _invoke_module_task(Worker&, Node*);
1151 bool _invoke_adopted_module_task(Worker&, Node*);
1152 bool _invoke_module_task_impl(Worker&, Node*, Graph&);
1153 bool _invoke_async_task(Worker&, Node*);
1154 bool _invoke_dependent_async_task(Worker&, Node*);
1155 bool _invoke_runtime_task(Worker&, Node*);
1156 bool _invoke_runtime_task_impl(Worker&, Node*, std::function<
void(Runtime&)>&);
1157 bool _invoke_runtime_task_impl(Worker&, Node*, std::function<
void(Runtime&,
bool)>&);
1159 size_t _set_up_graph(Graph&, Topology*, NodeBase*);
1161 template <
typename P>
1162 void _corun_until(Worker&, P&&);
1164 template <
typename I>
1165 void _bulk_schedule(Worker&, I,
size_t);
1167 template <
typename I>
1168 void _bulk_schedule(I,
size_t);
1170 template <
typename I>
1171 void _bulk_spill(I,
size_t);
1174 void _bulk_update_cache(Worker&, Node*&, Node*, std::array<Node*, N>&,
size_t&);
1176 template <
typename P,
typename F>
1177 auto _async(P&&, F&&, Topology*, NodeBase*);
1179 template <
typename P,
typename F>
1180 void _silent_async(P&&, F&&, Topology*, NodeBase*);
1182 template <TaskParameters P,
typename F,
typename I>
1183 requires (!std::same_as<std::decay_t<I>, AsyncTask>)
1184 auto _dependent_async(P&&, F&&, I, I, Topology*, NodeBase*);
1186 template <TaskParameters P,
typename F,
typename I>
1187 requires (!std::same_as<std::decay_t<I>, AsyncTask>)
1188 auto _silent_dependent_async(P&&, F&&, I, I, Topology*, NodeBase*);
1190 template <
typename... ArgsT>
1191 void _schedule_async_task(ArgsT&&...);
1193 template <
typename I,
typename... ArgsT>
1194 AsyncTask _schedule_dependent_async_task(I, I,
size_t, ArgsT&&...);
1197#ifndef DOXYGEN_GENERATING_OUTPUT
1202 _buffers (std::bit_width(N)),
1206 TF_THROW(
"executor must define at least one worker");
1211#ifndef TF_DISABLE_EXCEPTION_HANDLING
1214 _spawn(N, std::move(wif));
1215#ifndef TF_DISABLE_EXCEPTION_HANDLING
1219 std::rethrow_exception(std::current_exception());
1224 if(
has_env(TF_ENABLE_PROFILER)) {
1235inline void Executor::_shutdown() {
1241 for(
size_t i=0; i<_workers.size(); ++i) {
1242 _workers[i]._done.test_and_set(std::memory_order_relaxed);
1245 _notifier.notify_all();
1249 for(
auto& w : _workers) {
1250 if(w._thread.joinable()) {
1258 return _workers.size();
1263 return _notifier.num_waiters();
1268 return _workers.size() + _buffers.size();
1273 return _num_topologies.load(std::memory_order_relaxed);
1278 auto itr = _t2w.find(std::this_thread::get_id());
1279 return itr == _t2w.end() ? nullptr : itr->second;
1284 auto i = _t2w.find(std::this_thread::get_id());
1285 return i == _t2w.end() ? -1 :
static_cast<int>(i->second->_id);
1289inline void Executor::_spawn(
size_t N, std::shared_ptr<WorkerInterface> wif) {
1291 for(
size_t id=0;
id<N; ++id) {
1292 _workers[id]._thread = std::thread([&,
id, wif] () {
1294 auto& worker = _workers[id];
1297 worker._sticky_victim = id;
1298 worker._rdgen.seed(
static_cast<uint32_t
>(std::hash<std::thread::id>()(std::this_thread::get_id())));
1302 wif->scheduler_prologue(worker);
1306 std::exception_ptr ptr =
nullptr;
1311#ifndef TF_DISABLE_EXCEPTION_HANDLING
1318 _exploit_task(worker, t);
1321 if(_wait_for_task(worker, t) ==
false) {
1326#ifndef TF_DISABLE_EXCEPTION_HANDLING
1329 ptr = std::current_exception();
1335 wif->scheduler_epilogue(worker, ptr);
1360 _t2w.emplace(_workers[
id]._thread.get_id(), &_workers[
id]);
1365inline bool Executor::_explore_task(
Worker& w, Node*& t) {
1374 const size_t MAX_STEALS = ((MAX_VICTIM + 1) << 1);
1376 size_t num_steals = 0;
1377 size_t vtm = w._sticky_victim;
1384 t = (vtm < _workers.size())
1385 ? _workers[vtm]._wsq.steal()
1386 : _buffers[vtm - _workers.size()].queue.steal();
1389 w._sticky_victim = vtm;
1395 if (++num_steals > MAX_STEALS) {
1396 std::this_thread::yield();
1397 if(num_steals > 150 + MAX_STEALS) {
1402 if(w._done.test(std::memory_order_relaxed)) {
1407 vtm = w._rdgen() % MAX_VICTIM;
1413inline void Executor::_exploit_task(
Worker& w, Node*& t) {
1421inline bool Executor::_wait_for_task(
Worker& w, Node*& t) {
1425 if(_explore_task(w, t) ==
false) {
1435 _notifier.prepare_wait(w._id);
1438 for(
size_t b=0; b<_buffers.size(); ++b) {
1439 if(!_buffers[b].queue.empty()) {
1440 _notifier.cancel_wait(w._id);
1441 w._sticky_victim = b + _workers.size();
1451 for(
size_t k=0; k<_workers.size()-1; ++k) {
1452 if(
size_t vtm = k + (k >= w._id); !_workers[vtm]._wsq.empty()) {
1453 _notifier.cancel_wait(w._id);
1454 w._sticky_victim = vtm;
1460 if(w._done.test(std::memory_order_relaxed)) {
1461 _notifier.cancel_wait(w._id);
1466 _notifier.commit_wait(w._id);
1471template<
typename Observer,
typename... ArgsT>
1475 std::is_base_of_v<ObserverInterface, Observer>,
1476 "Observer must be derived from ObserverInterface"
1480 auto ptr = std::make_shared<Observer>(std::forward<ArgsT>(args)...);
1482 ptr->set_up(_workers.size());
1484 _observers.emplace(std::static_pointer_cast<ObserverInterface>(ptr));
1490template <
typename Observer>
1494 std::is_base_of_v<ObserverInterface, Observer>,
1495 "Observer must be derived from ObserverInterface"
1498 _observers.erase(std::static_pointer_cast<ObserverInterface>(ptr));
1503 return _observers.size();
1507inline void Executor::_spill(Node* item) {
1510 auto b = (
reinterpret_cast<uintptr_t
>(item) >> 16) % _buffers.size();
1511 std::scoped_lock lock(_buffers[b].mutex);
1512 _buffers[b].queue.push(item);
1516template <
typename I>
1517void Executor::_bulk_spill(I first,
size_t N) {
1521 auto p =
reinterpret_cast<uintptr_t
>(*first) >> 16;
1522 auto b = (p ^ (N << 6)) % _buffers.size();
1523 std::scoped_lock lock(_buffers[b].mutex);
1524 _buffers[b].queue.bulk_push(first, N);
1528inline void Executor::_schedule(
Worker& worker, Node* node) {
1531 if(worker._wsq.try_push(node) ==
false) {
1534 _notifier.notify_one();
1538inline void Executor::_schedule(Node* node) {
1540 _notifier.notify_one();
1544template <
typename I>
1545void Executor::_bulk_schedule(
Worker& worker, I first,
size_t num_nodes) {
1547 if(num_nodes == 0) {
1556 if(
auto n = worker._wsq.try_bulk_push(first, num_nodes); n != num_nodes) {
1557 _bulk_spill(first, num_nodes - n);
1559 _notifier.notify_n(num_nodes);
1569template <
typename I>
1570inline void Executor::_bulk_schedule(I first,
size_t num_nodes) {
1572 if(num_nodes == 0) {
1581 _bulk_spill(first, num_nodes);
1582 _notifier.notify_n(num_nodes);
1586TF_FORCE_INLINE
void Executor::_update_cache(
Worker& worker, Node*& cache, Node* node) {
1588 _schedule(worker, cache);
1595TF_FORCE_INLINE
void Executor::_bulk_update_cache(
1596 Worker& worker, Node*& cache, Node* node, std::array<Node*, N>& array,
size_t& n
1602 _bulk_schedule(worker, array, n);
1610inline void Executor::_invoke(
Worker& worker, Node* node) {
1612 #define TF_INVOKE_CONTINUATION() \
1615 goto begin_invoke; \
1620 Node* cache {
nullptr};
1623 if(node->_nstate & NSTATE::PREEMPTED) {
1630 if(node->_is_parent_cancelled()) {
1631 _tear_down_invoke(worker, node, cache);
1632 TF_INVOKE_CONTINUATION();
1637 if(node->_semaphores && !node->_semaphores->to_acquire.empty()) {
1638 SmallVector<Node*> waiters;
1639 if(!node->_acquire_all(waiters)) {
1640 _bulk_schedule(worker, waiters.begin(), waiters.size());
1647 SmallVector<int> conds;
1650 switch(node->_handle.index()) {
1653 _invoke_static_task(worker, node);
1658 case Node::RUNTIME:{
1659 if(_invoke_runtime_task(worker, node)) {
1666 case Node::NONPREEMPTIVE_RUNTIME:{
1667 _invoke_nonpreemptive_runtime_task(worker, node);
1672 case Node::SUBFLOW: {
1673 if(_invoke_subflow_task(worker, node)) {
1680 case Node::CONDITION: {
1681 _invoke_condition_task(worker, node, conds);
1686 case Node::MULTI_CONDITION: {
1687 _invoke_multi_condition_task(worker, node, conds);
1692 case Node::MODULE: {
1693 if(_invoke_module_task(worker, node)) {
1700 case Node::ADOPTED_MODULE: {
1701 if(_invoke_adopted_module_task(worker, node)) {
1709 if(_invoke_async_task(worker, node)) {
1712 _tear_down_async(worker, node, cache);
1713 TF_INVOKE_CONTINUATION();
1719 case Node::DEPENDENT_ASYNC: {
1720 if(_invoke_dependent_async_task(worker, node)) {
1723 _tear_down_dependent_async(worker, node, cache);
1724 TF_INVOKE_CONTINUATION();
1735 if(node->_semaphores && !node->_semaphores->to_release.empty()) {
1736 SmallVector<Node*> waiters;
1737 node->_release_all(waiters);
1738 _bulk_schedule(worker, waiters.begin(), waiters.size());
1747 node->_join_counter.fetch_add(
1748 node->_nstate & NSTATE::STRONG_DEPENDENCIES_MASK, std::memory_order_relaxed
1752 switch(node->_handle.index()) {
1755 case Node::CONDITION:
1756 case Node::MULTI_CONDITION: {
1757 for(
auto cond : conds) {
1758 if(cond >= 0 &&
static_cast<size_t>(cond) < node->_num_successors) {
1759 auto s = node->_edges[cond];
1761 s->_join_counter.store(0, std::memory_order_relaxed);
1762 node->_parent->_join_counter.fetch_add(1, std::memory_order_relaxed);
1763 _update_cache(worker, cache, s);
1771 for(
size_t i=0; i<node->_num_successors; ++i) {
1772 if(
auto s = node->_edges[i]; s->_join_counter.fetch_sub(1, std::memory_order_acq_rel) == 1) {
1773 node->_parent->_join_counter.fetch_add(1, std::memory_order_relaxed);
1774 _update_cache(worker, cache, s);
1782 _tear_down_nonasync(worker, node, cache);
1783 TF_INVOKE_CONTINUATION();
1787inline void Executor::_tear_down_nonasync(
Worker& worker, Node* node, Node*& cache) {
1791 if(
auto parent = node->_parent; parent == node->_topology) {
1792 if(parent->_join_counter.fetch_sub(1, std::memory_order_acq_rel) == 1) {
1793 _tear_down_topology(worker, node->_topology, cache);
1799 auto state = parent->_nstate;
1800 if(parent->_join_counter.fetch_sub(1, std::memory_order_acq_rel) == 1) {
1802 if(state & NSTATE::PREEMPTED) {
1803 _update_cache(worker, cache,
static_cast<Node*
>(parent));
1810inline void Executor::_tear_down_invoke(
Worker& worker, Node* node, Node*& cache) {
1811 switch(node->_handle.index()) {
1813 _tear_down_async(worker, node, cache);
1816 case Node::DEPENDENT_ASYNC:
1817 _tear_down_dependent_async(worker, node, cache);
1821 _tear_down_nonasync(worker, node, cache);
1827inline void Executor::_observer_prologue(
Worker& worker, Node* node) {
1828 for(
auto& observer : _observers) {
1829 observer->on_entry(WorkerView(worker), TaskView(*node));
1834inline void Executor::_observer_epilogue(
Worker& worker, Node* node) {
1835 for(
auto& observer : _observers) {
1836 observer->on_exit(WorkerView(worker), TaskView(*node));
1841inline void Executor::_process_exception(
Worker&, Node* node) {
1846 NodeBase* ea = node;
1847 NodeBase* ia =
nullptr;
1849 while(ea && (ea->_estate.load(std::memory_order_relaxed) & ESTATE::EXPLICITLY_ANCHORED) == 0) {
1850 ea->_estate.fetch_or(ESTATE::EXCEPTION, std::memory_order_relaxed);
1852 if(ia ==
nullptr && (ea->_nstate & NSTATE::IMPLICITLY_ANCHORED)) {
1859 constexpr static auto flag = ESTATE::EXCEPTION | ESTATE::CAUGHT;
1864 if((ea->_estate.fetch_or(flag, std::memory_order_relaxed) & ESTATE::CAUGHT) == 0) {
1865 ea->_exception_ptr = std::current_exception();
1871 if((ia->_estate.fetch_or(flag, std::memory_order_relaxed) & ESTATE::CAUGHT) == 0) {
1872 ia->_exception_ptr = std::current_exception();
1880 node->_exception_ptr = std::current_exception();
1884inline void Executor::_invoke_static_task(
Worker& worker, Node* node) {
1885 _observer_prologue(worker, node);
1886 TF_EXECUTOR_EXCEPTION_HANDLER(worker, node, {
1887 std::get_if<Node::Static>(&node->_handle)->work();
1889 _observer_epilogue(worker, node);
1893inline bool Executor::_invoke_subflow_task(
Worker& worker, Node* node) {
1895 auto& h = *std::get_if<Node::Subflow>(&node->_handle);
1896 auto& g = h.subgraph;
1898 if((node->_nstate & NSTATE::PREEMPTED) == 0) {
1901 Subflow sf(*
this, worker, node, g);
1904 _observer_prologue(worker, node);
1905 TF_EXECUTOR_EXCEPTION_HANDLER(worker, node, {
1908 _observer_epilogue(worker, node);
1912 if(sf.joinable() && !g.empty()) {
1915 node->_nstate |= NSTATE::PREEMPTED;
1918 _schedule_graph(worker, g, node->_topology, node);
1923 node->_nstate &= ~NSTATE::PREEMPTED;
1930 if((node->_nstate & NSTATE::RETAIN_SUBFLOW) == 0) {
1938inline void Executor::_invoke_condition_task(
1941 _observer_prologue(worker, node);
1942 TF_EXECUTOR_EXCEPTION_HANDLER(worker, node, {
1943 auto& work = std::get_if<Node::Condition>(&node->_handle)->work;
1946 _observer_epilogue(worker, node);
1950inline void Executor::_invoke_multi_condition_task(
1953 _observer_prologue(worker, node);
1954 TF_EXECUTOR_EXCEPTION_HANDLER(worker, node, {
1955 conds = std::get_if<Node::MultiCondition>(&node->_handle)->work();
1957 _observer_epilogue(worker, node);
1961inline bool Executor::_invoke_module_task(
Worker& w, Node* node) {
1962 return _invoke_module_task_impl(w, node, std::get_if<Node::Module>(&node->_handle)->graph);
1966inline bool Executor::_invoke_adopted_module_task(
Worker& w, Node* node) {
1967 return _invoke_module_task_impl(w, node, std::get_if<Node::AdoptedModule>(&node->_handle)->graph);
1971inline bool Executor::_invoke_module_task_impl(
Worker& w, Node* node,
Graph& graph) {
1979 if((node->_nstate & NSTATE::PREEMPTED) == 0) {
1981 node->_nstate |= NSTATE::PREEMPTED;
1982 _schedule_graph(w, graph, node->_topology, node);
1987 node->_nstate &= ~NSTATE::PREEMPTED;
1994inline bool Executor::_invoke_async_task(
Worker& worker, Node* node) {
1995 auto& work = std::get_if<Node::Async>(&node->_handle)->work;
1996 switch(work.index()) {
1999 _observer_prologue(worker, node);
2000 TF_EXECUTOR_EXCEPTION_HANDLER(worker, node, {
2001 std::get_if<0>(&work)->operator()();
2003 _observer_epilogue(worker, node);
2008 if(_invoke_runtime_task_impl(worker, node, *std::get_if<1>(&work))) {
2015 if(_invoke_runtime_task_impl(worker, node, *std::get_if<2>(&work))) {
2025inline bool Executor::_invoke_dependent_async_task(
Worker& worker, Node* node) {
2026 auto& work = std::get_if<Node::DependentAsync>(&node->_handle)->work;
2027 switch(work.index()) {
2030 _observer_prologue(worker, node);
2031 TF_EXECUTOR_EXCEPTION_HANDLER(worker, node, {
2032 std::get_if<0>(&work)->operator()();
2034 _observer_epilogue(worker, node);
2039 if(_invoke_runtime_task_impl(worker, node, *std::get_if<1>(&work))) {
2046 if(_invoke_runtime_task_impl(worker, node, *std::get_if<2>(&work))) {
2056 return run_n(f, 1, [](){});
2061 return run_n(std::move(f), 1, [](){});
2065template <
typename C>
2067 return run_n(f, 1, std::forward<C>(c));
2071template <
typename C>
2073 return run_n(std::move(f), 1, std::forward<C>(c));
2078 return run_n(f, repeat, [](){});
2083 return run_n(std::move(f), repeat, [](){});
2087template <
typename C>
2090 f, [repeat]()
mutable {
return repeat-- == 0; }, std::forward<C>(c)
2095template <
typename C>
2098 std::move(f), [repeat]()
mutable {
return repeat-- == 0; }, std::forward<C>(c)
2105 return run_until(f, std::forward<P>(pred), [](){});
2111 return run_until(std::move(f), std::forward<P>(pred), [](){});
2115template <
typename P,
typename C>
2119 if(f.empty() || p()) {
2121 std::promise<void> promise;
2122 promise.set_value();
2123 return tf::Future<void>(promise.get_future());
2126 _increment_topology();
2129 auto t = std::make_shared<Topology>(f, std::forward<P>(p), std::forward<C>(c));
2133 tf::Future<void> future(t->_promise.get_future(), t);
2136 if(f._fetch_enqueue(t) == 0) {
2146template <
typename P>
2151 TF_THROW(
"corun_until must be called by a worker of the executor");
2154 _corun_until(*w, std::forward<P>(predicate));
2158template <
typename P>
2159void Executor::_corun_until(
Worker& w, P&& stop_predicate) {
2162 const size_t MAX_STEALS = ((MAX_VICTIM + 1) << 1);
2166 while(!stop_predicate()) {
2170 if(
auto t = w._wsq.pop(); t) {
2174 size_t num_steals = 0;
2175 size_t vtm = w._sticky_victim;
2179 t = (vtm < _workers.size())
2180 ? _workers[vtm]._wsq.steal()
2181 : _buffers[vtm-_workers.size()].queue.steal();
2185 w._sticky_victim = vtm;
2188 else if(!stop_predicate()) {
2189 if(++num_steals > MAX_STEALS) {
2190 std::this_thread::yield();
2192 vtm = w._rdgen() % MAX_VICTIM;
2203template <
typename T>
2208 TF_THROW(
"corun must be called by a worker of the executor");
2216inline void Executor::_corun_graph(
Worker& w,
Graph& g, Topology* tpg, NodeBase* p) {
2225 ExplicitAnchorGuard anchor(p);
2226 _schedule_graph(w, g, tpg, p);
2227 _corun_until(w, [p] () ->
bool {
2228 return p->_join_counter.load(std::memory_order_acquire) == 0; }
2233 p->_rethrow_exception();
2237inline void Executor::_increment_topology() {
2238 _num_topologies.fetch_add(1, std::memory_order_relaxed);
2242inline void Executor::_decrement_topology() {
2243 if(_num_topologies.fetch_sub(1, std::memory_order_acq_rel) == 1) {
2244 _num_topologies.notify_all();
2250 size_t n = _num_topologies.load(std::memory_order_acquire);
2252 _num_topologies.wait(n, std::memory_order_acquire);
2253 n = _num_topologies.load(std::memory_order_acquire);
2258inline void Executor::_schedule_graph(
2259 Worker& worker,
Graph& graph, Topology* tpg, NodeBase* parent
2261 size_t num_srcs = _set_up_graph(graph, tpg, parent);
2262 parent->_join_counter.fetch_add(num_srcs, std::memory_order_relaxed);
2263 _bulk_schedule(worker, graph.begin(), num_srcs);
2267inline void Executor::_set_up_topology(
Worker* w, Topology* tpg) {
2269 auto& g = tpg->_taskflow._graph;
2270 size_t num_srcs = _set_up_graph(g, tpg, tpg);
2271 tpg->_join_counter.store(num_srcs, std::memory_order_relaxed);
2272 w ? _bulk_schedule(*w, g.begin(), num_srcs) : _bulk_schedule(g.begin(), num_srcs);
2276inline size_t Executor::_set_up_graph(
Graph& graph, Topology* tpg, NodeBase* parent) {
2278 auto first = graph.begin();
2279 auto last = graph.end();
2281 for(; first != last; ++first) {
2284 node->_topology = tpg;
2285 node->_parent = parent;
2286 node->_nstate = NSTATE::NONE;
2287 node->_estate.store(ESTATE::NONE, std::memory_order_relaxed);
2288 node->_set_up_join_counter();
2289 node->_exception_ptr =
nullptr;
2293 if(node->num_predecessors() == 0) {
2294 std::iter_swap(send++, first);
2297 return send - graph.begin();
2301inline void Executor::_tear_down_topology(
Worker& worker, Topology* tpg, Node*& cache) {
2303 auto &f = tpg->_taskflow;
2309 if(!tpg->cancelled() && !tpg->_predicate()) {
2312 _schedule_graph(worker, tpg->_taskflow._graph, tpg, tpg);
2321 if(std::unique_lock<std::mutex> lock(f._mutex); f._topologies.size()>1) {
2323 auto fetched_tpg {std::move(f._topologies.front())};
2326 f._topologies.pop();
2327 tpg = f._topologies.front().get();
2333 fetched_tpg->_carry_out_promise();
2336 _decrement_topology();
2338 _schedule_graph(worker, tpg->_taskflow._graph, tpg, tpg);
2343 auto fetched_tpg {std::move(f._topologies.front())};
2346 f._topologies.pop();
2352 fetched_tpg->_carry_out_promise();
2354 _decrement_topology();
2357 if(
auto parent = fetched_tpg->_parent; parent) {
2359 if(parent->_join_counter.fetch_sub(1, std::memory_order_acq_rel) == 1) {
2362 _update_cache(worker, cache,
static_cast<Node*
>(parent));
2377 TF_THROW(
"subflow already joined");
2380 _executor._corun_graph(_worker,
_graph, _node->_topology, _node);
2383 _node->_nstate |= NSTATE::JOINED_SUBFLOW;
class to hold a dependent asynchronous task with shared ownership
Definition async_task.hpp:45
void silent_async(P &¶ms, F &&func)
similar to tf::Executor::async but does not return a future object
tf::AsyncTask silent_dependent_async(F &&func, Tasks &&... tasks)
runs the given function asynchronously when the given predecessors finish
tf::Future< void > run_until(Taskflow &taskflow, P &&pred)
runs a taskflow multiple times until the predicate becomes true
void corun_until(P &&predicate)
keeps running the work-stealing loop until the predicate returns true
void remove_observer(std::shared_ptr< Observer > observer)
removes an observer from the executor
auto dependent_async(F &&func, Tasks &&... tasks)
runs the given function asynchronously when the given predecessors finish
tf::Future< void > run(Taskflow &&taskflow)
runs a moved taskflow once
tf::Future< void > run(Taskflow &taskflow)
runs a taskflow once
size_t num_waiters() const noexcept
queries the number of workers that are in the waiting loop
tf::Future< void > run(Taskflow &&taskflow, C &&callable)
runs a moved taskflow once and invoke a callback upon completion
~Executor()
destructs the executor
int this_worker_id() const
queries the id of the caller thread within this executor
size_t num_queues() const noexcept
queries the number of work-stealing queues used by the executor
tf::Future< void > run_n(Taskflow &taskflow, size_t N)
runs a taskflow for N times
size_t num_topologies() const
queries the number of running topologies at the time of this call
Executor(size_t N=std::thread::hardware_concurrency(), std::shared_ptr< WorkerInterface > wif=nullptr)
constructs the executor with N worker threads
TaskGroup task_group()
creates a task group that executes a collection of asynchronous tasks
Definition task_group.hpp:875
void corun(T &target)
runs a target graph and waits until it completes using an internal worker of this executor
size_t num_workers() const noexcept
queries the number of worker threads
tf::Future< void > run_until(Taskflow &&taskflow, P &&pred)
runs a moved taskflow and keeps running it until the predicate becomes true
void wait_for_all()
waits for all tasks to complete
tf::Future< void > run_n(Taskflow &taskflow, size_t N, C &&callable)
runs a taskflow for N times and then invokes a callback
tf::Future< void > run(Taskflow &taskflow, C &&callable)
runs a taskflow once and invoke a callback upon completion
tf::Future< void > run_n(Taskflow &&taskflow, size_t N)
runs a moved taskflow for N times
tf::Future< void > run_n(Taskflow &&taskflow, size_t N, C &&callable)
runs a moved taskflow for N times and then invokes a callback
tf::Future< void > run_until(Taskflow &taskflow, P &&pred, C &&callable)
runs a taskflow multiple times until the predicate becomes true and then invokes the callback
Worker * this_worker()
queries pointer to the calling worker if it belongs to this executor, otherwise returns nullptr
auto async(P &¶ms, F &&func)
creates a parameterized asynchronous task to run the given function
std::shared_ptr< Observer > make_observer(ArgsT &&... args)
constructs an observer to inspect the activities of worker threads
size_t num_observers() const noexcept
queries the number of observers
Graph & _graph
associated graph object
Definition flow_builder.hpp:1337
class to access the result of an execution
Definition taskflow.hpp:630
class to create a graph object
Definition graph.hpp:47
class to define a vector optimized for small array
Definition small_vector.hpp:931
void join()
enables the subflow to join its parent task
bool joinable() const noexcept
queries if the subflow is joinable
Definition flow_builder.hpp:1615
class to create a taskflow object
Definition taskflow.hpp:64
class to create a lock-free unbounded work-stealing queue
Definition wsq.hpp:67
class to create a worker in an executor
Definition worker.hpp:55
determines if a type is a task parameter type
Definition graph.hpp:177
taskflow namespace
Definition small_vector.hpp:20
NonblockingNotifier DefaultNotifier
the default notifier type used by Taskflow
Definition worker.hpp:38
Graph & retrieve_graph(T &t)
retrieves a reference to the underlying tf::Graph from an object
Definition graph.hpp:975
bool has_env(const std::string &str)
checks whether an environment variable is defined
Definition os.hpp:213