Loading...
Searching...
No Matches
executor.hpp
1#pragma once
2
3#include "observer.hpp"
4#include "taskflow.hpp"
5#include "async_task.hpp"
6
11
12namespace tf {
13
14// ----------------------------------------------------------------------------
15// Executor Definition
16// ----------------------------------------------------------------------------
17
62class Executor {
63
64 friend class FlowBuilder;
65 friend class Subflow;
66 friend class Runtime;
67 friend class NonpreemptiveRuntime;
68 friend class Algorithm;
69 friend class TaskGroup;
70
71 public:
72
91 explicit Executor(
92 size_t N = std::thread::hardware_concurrency(),
93 std::shared_ptr<WorkerInterface> wif = nullptr
94 );
95
104
124
145
168 template<typename C>
169 tf::Future<void> run(Taskflow& taskflow, C&& callable);
170
195 template<typename C>
196 tf::Future<void> run(Taskflow&& taskflow, C&& callable);
197
217 tf::Future<void> run_n(Taskflow& taskflow, size_t N);
218
241 tf::Future<void> run_n(Taskflow&& taskflow, size_t N);
242
268 template<typename C>
269 tf::Future<void> run_n(Taskflow& taskflow, size_t N, C&& callable);
270
296 template<typename C>
297 tf::Future<void> run_n(Taskflow&& taskflow, size_t N, C&& callable);
298
322 template<typename P>
323 tf::Future<void> run_until(Taskflow& taskflow, P&& pred);
324
350 template<typename P>
351 tf::Future<void> run_until(Taskflow&& taskflow, P&& pred);
352
379 template<typename P, typename C>
380 tf::Future<void> run_until(Taskflow& taskflow, P&& pred, C&& callable);
381
410 template<typename P, typename C>
411 tf::Future<void> run_until(Taskflow&& taskflow, P&& pred, C&& callable);
412
453 template <typename T>
454 void corun(T& target);
455
484 template <typename P>
485 void corun_until(P&& predicate);
486
501
512 size_t num_workers() const noexcept;
513
520 size_t num_waiters() const noexcept;
521
525 size_t num_queues() const noexcept;
526
540 size_t num_topologies() const;
541
559
577 int this_worker_id() const;
578
579 // --------------------------------------------------------------------------
580 // Observer methods
581 // --------------------------------------------------------------------------
582
600 template <typename Observer, typename... ArgsT>
601 std::shared_ptr<Observer> make_observer(ArgsT&&... args);
602
608 template <typename Observer>
609 void remove_observer(std::shared_ptr<Observer> observer);
610
614 size_t num_observers() const noexcept;
615
616 // --------------------------------------------------------------------------
617 // Async Task Methods
618 // --------------------------------------------------------------------------
619
645 template <typename P, typename F>
646 auto async(P&& params, F&& func);
647
671 template <typename F>
672 auto async(F&& func);
673
697 template <typename P, typename F>
698 void silent_async(P&& params, F&& func);
699
722 template <typename F>
723 void silent_async(F&& func);
724
725 // --------------------------------------------------------------------------
726 // Silent Dependent Async Methods
727 // --------------------------------------------------------------------------
728
756 template <typename F, typename... Tasks>
757requires (std::same_as<std::decay_t<Tasks>, AsyncTask> && ...)
758 tf::AsyncTask silent_dependent_async(F&& func, Tasks&&... tasks);
759
791 template <TaskParameters P, typename F, typename... Tasks>
792 requires (std::same_as<std::decay_t<Tasks>, AsyncTask> && ...)
793 tf::AsyncTask silent_dependent_async(P&& params, F&& func, Tasks&&... tasks);
794
827 template <typename F, typename I>
828requires (!std::same_as<std::decay_t<I>, AsyncTask>)
829 tf::AsyncTask silent_dependent_async(F&& func, I first, I last);
830
865 template <TaskParameters P, typename F, typename I>
866 requires (!std::same_as<std::decay_t<I>, AsyncTask>)
867 tf::AsyncTask silent_dependent_async(P&& params, F&& func, I first, I last);
868
869 // --------------------------------------------------------------------------
870 // Dependent Async Methods
871 // --------------------------------------------------------------------------
872
910 template <typename F, typename... Tasks>
911requires (std::same_as<std::decay_t<Tasks>, AsyncTask> && ...)
912 auto dependent_async(F&& func, Tasks&&... tasks);
913
955 template <TaskParameters P, typename F, typename... Tasks>
956 requires (std::same_as<std::decay_t<Tasks>, AsyncTask> && ...)
957 auto dependent_async(P&& params, F&& func, Tasks&&... tasks);
958
999 template <typename F, typename I>
1000requires (!std::same_as<std::decay_t<I>, AsyncTask>)
1001 auto dependent_async(F&& func, I first, I last);
1002
1047 template <TaskParameters P, typename F, typename I>
1048 requires (!std::same_as<std::decay_t<I>, AsyncTask>)
1049 auto dependent_async(P&& params, F&& func, I first, I last);
1050
1051 // ----------------------------------------------------------------------------------------------
1052 // Task Group
1053 // ----------------------------------------------------------------------------------------------
1054
1100 TaskGroup task_group();
1101
1102 private:
1103
1104 struct Buffer {
1105 std::mutex mutex;
1106 UnboundedWSQ<Node*> queue;
1107 };
1108
1109 std::vector<Worker> _workers;
1110 std::vector<Buffer> _buffers;
1111
1112 // notifier's state variable and num_topologies should sit on different cachelines
1113 // or the false sharing can cause serious performance drop
1114 alignas(TF_CACHELINE_SIZE) DefaultNotifier _notifier;
1115 alignas(TF_CACHELINE_SIZE) std::atomic<size_t> _num_topologies {0};
1116
1117 std::unordered_map<std::thread::id, Worker*> _t2w;
1118 std::unordered_set<std::shared_ptr<ObserverInterface>> _observers;
1119
1120 void _shutdown();
1121 void _observer_prologue(Worker&, Node*);
1122 void _observer_epilogue(Worker&, Node*);
1123 void _spawn(size_t, std::shared_ptr<WorkerInterface>);
1124 void _exploit_task(Worker&, Node*&);
1125 bool _explore_task(Worker&, Node*&);
1126 void _schedule(Worker&, Node*);
1127 void _schedule(Node*);
1128 void _schedule_graph(Worker&, Graph&, Topology*, NodeBase*);
1129 void _spill(Node*);
1130 void _set_up_topology(Worker*, Topology*);
1131 void _tear_down_topology(Worker&, Topology*, Node*&);
1132 void _tear_down_async(Worker&, Node*, Node*&);
1133 void _tear_down_dependent_async(Worker&, Node*, Node*&);
1134 void _tear_down_nonasync(Worker&, Node*, Node*&);
1135 void _tear_down_invoke(Worker&, Node*, Node*&);
1136 void _increment_topology();
1137 void _decrement_topology();
1138 void _invoke(Worker&, Node*);
1139 void _invoke_static_task(Worker&, Node*);
1140 void _invoke_nonpreemptive_runtime_task(Worker&, Node*);
1141 void _invoke_condition_task(Worker&, Node*, SmallVector<int>&);
1142 void _invoke_multi_condition_task(Worker&, Node*, SmallVector<int>&);
1143 void _process_dependent_async(Node*, tf::AsyncTask&, size_t&);
1144 void _process_exception(Worker&, Node*);
1145 void _update_cache(Worker&, Node*&, Node*);
1146 void _corun_graph(Worker&, Graph&, Topology*, NodeBase*);
1147
1148 bool _wait_for_task(Worker&, Node*&);
1149 bool _invoke_subflow_task(Worker&, Node*);
1150 bool _invoke_module_task(Worker&, Node*);
1151 bool _invoke_adopted_module_task(Worker&, Node*);
1152 bool _invoke_module_task_impl(Worker&, Node*, Graph&);
1153 bool _invoke_async_task(Worker&, Node*);
1154 bool _invoke_dependent_async_task(Worker&, Node*);
1155 bool _invoke_runtime_task(Worker&, Node*);
1156 bool _invoke_runtime_task_impl(Worker&, Node*, std::function<void(Runtime&)>&);
1157 bool _invoke_runtime_task_impl(Worker&, Node*, std::function<void(Runtime&, bool)>&);
1158
1159 size_t _set_up_graph(Graph&, Topology*, NodeBase*);
1160
1161 template <typename P>
1162 void _corun_until(Worker&, P&&);
1163
1164 template <typename I>
1165 void _bulk_schedule(Worker&, I, size_t);
1166
1167 template <typename I>
1168 void _bulk_schedule(I, size_t);
1169
1170 template <typename I>
1171 void _bulk_spill(I, size_t);
1172
1173 template <size_t N>
1174 void _bulk_update_cache(Worker&, Node*&, Node*, std::array<Node*, N>&, size_t&);
1175
1176 template <typename P, typename F>
1177 auto _async(P&&, F&&, Topology*, NodeBase*);
1178
1179 template <typename P, typename F>
1180 void _silent_async(P&&, F&&, Topology*, NodeBase*);
1181
1182 template <TaskParameters P, typename F, typename I>
1183 requires (!std::same_as<std::decay_t<I>, AsyncTask>)
1184 auto _dependent_async(P&&, F&&, I, I, Topology*, NodeBase*);
1185
1186 template <TaskParameters P, typename F, typename I>
1187 requires (!std::same_as<std::decay_t<I>, AsyncTask>)
1188 auto _silent_dependent_async(P&&, F&&, I, I, Topology*, NodeBase*);
1189
1190 template <typename... ArgsT>
1191 void _schedule_async_task(ArgsT&&...);
1192
1193 template <typename I, typename... ArgsT>
1194 AsyncTask _schedule_dependent_async_task(I, I, size_t, ArgsT&&...);
1195};
1196
1197#ifndef DOXYGEN_GENERATING_OUTPUT
1198
1199// Constructor
1200inline Executor::Executor(size_t N, std::shared_ptr<WorkerInterface> wif) :
1201 _workers (N),
1202 _buffers (std::bit_width(N)), // Empirically, we find that log2(N) performs best.
1203 _notifier (N) {
1204
1205 if(N == 0) {
1206 TF_THROW("executor must define at least one worker");
1207 }
1208
1209 // If spawning N threads fails, shut down any created threads before
1210 // rethrowing the exception.
1211#ifndef TF_DISABLE_EXCEPTION_HANDLING
1212 try {
1213#endif
1214 _spawn(N, std::move(wif));
1215#ifndef TF_DISABLE_EXCEPTION_HANDLING
1216 }
1217 catch(...) {
1218 _shutdown();
1219 std::rethrow_exception(std::current_exception());
1220 }
1221#endif
1222
1223 // initialize the default observer if requested
1224 if(has_env(TF_ENABLE_PROFILER)) {
1225 TFProfManager::get()._manage(make_observer<TFProfObserver>());
1226 }
1227}
1228
1229// Destructor
1230inline Executor::~Executor() {
1231 _shutdown();
1232}
1233
1234// Function: _shutdown
1235inline void Executor::_shutdown() {
1236
1237 // wait for all topologies to complete
1238 wait_for_all();
1239
1240 // shut down the scheduler
1241 for(size_t i=0; i<_workers.size(); ++i) {
1242 _workers[i]._done.test_and_set(std::memory_order_relaxed);
1243 }
1244
1245 _notifier.notify_all();
1246
1247 // Only join the thread if it is joinable, as std::thread construction
1248 // may fail and throw an exception.
1249 for(auto& w : _workers) {
1250 if(w._thread.joinable()) {
1251 w._thread.join();
1252 }
1253 }
1254}
1255
1256// Function: num_workers
1257inline size_t Executor::num_workers() const noexcept {
1258 return _workers.size();
1259}
1260
1261// Function: num_waiters
1262inline size_t Executor::num_waiters() const noexcept {
1263 return _notifier.num_waiters();
1264}
1265
1266// Function: num_queues
1267inline size_t Executor::num_queues() const noexcept {
1268 return _workers.size() + _buffers.size();
1269}
1270
1271// Function: num_topologies
1272inline size_t Executor::num_topologies() const {
1273 return _num_topologies.load(std::memory_order_relaxed);
1274}
1275
1276// Function: this_worker
1277inline Worker* Executor::this_worker() {
1278 auto itr = _t2w.find(std::this_thread::get_id());
1279 return itr == _t2w.end() ? nullptr : itr->second;
1280}
1281
1282// Function: this_worker_id
1283inline int Executor::this_worker_id() const {
1284 auto i = _t2w.find(std::this_thread::get_id());
1285 return i == _t2w.end() ? -1 : static_cast<int>(i->second->_id);
1286}
1287
1288// Procedure: _spawn
1289inline void Executor::_spawn(size_t N, std::shared_ptr<WorkerInterface> wif) {
1290
1291 for(size_t id=0; id<N; ++id) {
1292 _workers[id]._thread = std::thread([&, id, wif] () {
1293
1294 auto& worker = _workers[id];
1295
1296 worker._id = id;
1297 worker._sticky_victim = id;
1298 worker._rdgen.seed(static_cast<uint32_t>(std::hash<std::thread::id>()(std::this_thread::get_id())));
1299
1300 // before entering the work-stealing loop, call the scheduler prologue
1301 if(wif) {
1302 wif->scheduler_prologue(worker);
1303 }
1304
1305 Node* t = nullptr;
1306 std::exception_ptr ptr = nullptr;
1307
1308 // must use 1 as condition instead of !done because
1309 // the previous worker may stop while the following workers
1310 // are still preparing for entering the scheduling loop
1311#ifndef TF_DISABLE_EXCEPTION_HANDLING
1312 try {
1313#endif
1314 // work-stealing loop
1315 while(1) {
1316
1317 // drains out the local queue first
1318 _exploit_task(worker, t);
1319
1320 // steals and waits for tasks
1321 if(_wait_for_task(worker, t) == false) {
1322 break;
1323 }
1324 }
1325
1326#ifndef TF_DISABLE_EXCEPTION_HANDLING
1327 }
1328 catch(...) {
1329 ptr = std::current_exception();
1330 }
1331#endif
1332
1333 // call the user-specified epilogue function
1334 if(wif) {
1335 wif->scheduler_epilogue(worker, ptr);
1336 }
1337
1338 });
1339
1340 // We avoid using thread-local storage to track the mapping between a thread
1341 // and its corresponding worker in an executor. On Windows, thread-local
1342 // storage can be unreliable in certain situations (see issue #727).
1343 //
1344 // Instead, we maintain a per-executor mapping from threads to workers.
1345 // This approach has an additional advantage: according to the C++ Standard,
1346 // std::thread::id uniquely identifies a thread object. Therefore, once the map
1347 // returns a valid worker, we can be certain that the worker belongs to this
1348 // executor. This eliminates the need for additional executor validation
1349 // required by using thread-local storage.
1350 //
1351 // Example:
1352 //
1353 // Worker* w = this_worker();
1354 // // Using thread-local storage, we would need additional executor validation:
1355 // if (w == nullptr || w->_executor != this) { /* caller is not a worker of this executor */ }
1356 //
1357 // // Using per-executor mapping, it suffices to check:
1358 // if (w == nullptr) { /* caller is not a worker of this executor */ }
1359 //
1360 _t2w.emplace(_workers[id]._thread.get_id(), &_workers[id]);
1361 }
1362}
1363
1364// Function: _explore_task
1365inline bool Executor::_explore_task(Worker& w, Node*& t) {
1366
1367 // Early pruning does not always give consistent performance gain.
1368 //if(_num_topologies.load(std::memory_order_acquire) == 0) {
1369 // return !(w._done.test(std::memory_order_relaxed));
1370 //}
1371
1372 //assert(!t);
1373 const size_t MAX_VICTIM = num_queues();
1374 const size_t MAX_STEALS = ((MAX_VICTIM + 1) << 1);
1375
1376 size_t num_steals = 0;
1377 size_t vtm = w._sticky_victim;
1378
1379 // Make the worker steal immediately from the assigned victim.
1380 while(true) {
1381
1382 // If the worker's victim thread is within the worker pool, steal from the worker's queue.
1383 // Otherwise, steal from the buffer, adjusting the victim index based on the worker pool size.
1384 t = (vtm < _workers.size())
1385 ? _workers[vtm]._wsq.steal()
1386 : _buffers[vtm - _workers.size()].queue.steal();
1387
1388 if(t) {
1389 w._sticky_victim = vtm;
1390 break;
1391 }
1392
1393 // Increment the steal count, and if it exceeds MAX_STEALS, yield the thread.
1394 // If the number of empty steals reaches MAX_STEALS, exit the loop.
1395 if (++num_steals > MAX_STEALS) {
1396 std::this_thread::yield();
1397 if(num_steals > 150 + MAX_STEALS) {
1398 break;
1399 }
1400 }
1401
1402 if(w._done.test(std::memory_order_relaxed)) {
1403 return false;
1404 }
1405
1406 // Randomely generate a next victim.
1407 vtm = w._rdgen() % MAX_VICTIM;
1408 }
1409 return true;
1410}
1411
1412// Procedure: _exploit_task
1413inline void Executor::_exploit_task(Worker& w, Node*& t) {
1414 while(t) {
1415 _invoke(w, t);
1416 t = w._wsq.pop();
1417 }
1418}
1419
1420// Function: _wait_for_task
1421inline bool Executor::_wait_for_task(Worker& w, Node*& t) {
1422
1423 explore_task:
1424
1425 if(_explore_task(w, t) == false) {
1426 return false;
1427 }
1428
1429 // Go exploit the task if we successfully steal one.
1430 if(t) {
1431 return true;
1432 }
1433
1434 // Entering the 2PC guard as all queues are likely empty after many stealing attempts.
1435 _notifier.prepare_wait(w._id);
1436
1437 // Condition #1: buffers should be empty
1438 for(size_t b=0; b<_buffers.size(); ++b) {
1439 if(!_buffers[b].queue.empty()) {
1440 _notifier.cancel_wait(w._id);
1441 w._sticky_victim = b + _workers.size();
1442 goto explore_task;
1443 }
1444 }
1445
1446 // Condition #2: worker queues should be empty
1447 // Note: We need to use index-based looping to avoid data race with _spawn
1448 // which initializes other worker data structure at the same time.
1449 // Also, due to the property of a work-stealing queue, we don't need to check
1450 // this worker's work-stealing queue.
1451 for(size_t k=0; k<_workers.size()-1; ++k) {
1452 if(size_t vtm = k + (k >= w._id); !_workers[vtm]._wsq.empty()) {
1453 _notifier.cancel_wait(w._id);
1454 w._sticky_victim = vtm;
1455 goto explore_task;
1456 }
1457 }
1458
1459 // Condition #3: worker should be alive
1460 if(w._done.test(std::memory_order_relaxed)) {
1461 _notifier.cancel_wait(w._id);
1462 return false;
1463 }
1464
1465 // Now I really need to relinquish myself to others.
1466 _notifier.commit_wait(w._id);
1467 goto explore_task;
1468}
1469
1470// Function: make_observer
1471template<typename Observer, typename... ArgsT>
1472std::shared_ptr<Observer> Executor::make_observer(ArgsT&&... args) {
1473
1474 static_assert(
1475 std::is_base_of_v<ObserverInterface, Observer>,
1476 "Observer must be derived from ObserverInterface"
1477 );
1478
1479 // use a local variable to mimic the constructor
1480 auto ptr = std::make_shared<Observer>(std::forward<ArgsT>(args)...);
1481
1482 ptr->set_up(_workers.size());
1483
1484 _observers.emplace(std::static_pointer_cast<ObserverInterface>(ptr));
1485
1486 return ptr;
1487}
1488
1489// Procedure: remove_observer
1490template <typename Observer>
1491void Executor::remove_observer(std::shared_ptr<Observer> ptr) {
1492
1493 static_assert(
1494 std::is_base_of_v<ObserverInterface, Observer>,
1495 "Observer must be derived from ObserverInterface"
1496 );
1497
1498 _observers.erase(std::static_pointer_cast<ObserverInterface>(ptr));
1499}
1500
1501// Function: num_observers
1502inline size_t Executor::num_observers() const noexcept {
1503 return _observers.size();
1504}
1505
1506// Procedure: _spill
1507inline void Executor::_spill(Node* item) {
1508 // Since pointers are aligned to 8 bytes, we perform a simple hash to avoid
1509 // contention caused by hashing to the same slot.
1510 auto b = (reinterpret_cast<uintptr_t>(item) >> 16) % _buffers.size();
1511 std::scoped_lock lock(_buffers[b].mutex);
1512 _buffers[b].queue.push(item);
1513}
1514
1515// Procedure: _bulk_spill
1516template <typename I>
1517void Executor::_bulk_spill(I first, size_t N) {
1518 // assert(N != 0);
1519 // Since pointers are aligned to 8 bytes, we perform a simple hash to avoid
1520 // contention caused by hashing to the same slot.
1521 auto p = reinterpret_cast<uintptr_t>(*first) >> 16;
1522 auto b = (p ^ (N << 6)) % _buffers.size();
1523 std::scoped_lock lock(_buffers[b].mutex);
1524 _buffers[b].queue.bulk_push(first, N);
1525}
1526
1527// Procedure: _schedule
1528inline void Executor::_schedule(Worker& worker, Node* node) {
1529 // starting at v3.5 we do not use any complicated notification mechanism
1530 // as the experimental result has shown no significant advantage.
1531 if(worker._wsq.try_push(node) == false) {
1532 _spill(node);
1533 }
1534 _notifier.notify_one();
1535}
1536
1537// Procedure: _schedule
1538inline void Executor::_schedule(Node* node) {
1539 _spill(node);
1540 _notifier.notify_one();
1541}
1542
1543// Procedure: _schedule
1544template <typename I>
1545void Executor::_bulk_schedule(Worker& worker, I first, size_t num_nodes) {
1546
1547 if(num_nodes == 0) {
1548 return;
1549 }
1550
1551 // NOTE: We cannot use first/last in the for-loop (e.g., for(; first != last; ++first)).
1552 // This is because when a node v is inserted into the queue, v can run and finish
1553 // immediately. If v is the last node in the graph, it will tear down the parent task vector
1554 // which cause the last ++first to fail. This problem is specific to MSVC which has a stricter
1555 // iterator implementation in std::vector than GCC/Clang.
1556 if(auto n = worker._wsq.try_bulk_push(first, num_nodes); n != num_nodes) {
1557 _bulk_spill(first, num_nodes - n);
1558 }
1559 _notifier.notify_n(num_nodes);
1560
1561 // notify first before spilling to hopefully wake up workers earlier
1562 // however, the experiment does not show any benefit for doing this.
1563 //auto n = worker._wsq.try_bulk_push(first, num_nodes);
1564 //_notifier.notify_n(n);
1565 //_bulk_schedule(first + n, num_nodes - n);
1566}
1567
1568// Procedure: _schedule
1569template <typename I>
1570inline void Executor::_bulk_schedule(I first, size_t num_nodes) {
1571
1572 if(num_nodes == 0) {
1573 return;
1574 }
1575
1576 // NOTE: We cannot use first/last in the for-loop (e.g., for(; first != last; ++first)).
1577 // This is because when a node v is inserted into the queue, v can run and finish
1578 // immediately. If v is the last node in the graph, it will tear down the parent task vector
1579 // which cause the last ++first to fail. This problem is specific to MSVC which has a stricter
1580 // iterator implementation in std::vector than GCC/Clang.
1581 _bulk_spill(first, num_nodes);
1582 _notifier.notify_n(num_nodes);
1583}
1584
1585// Function: _update_cache
1586TF_FORCE_INLINE void Executor::_update_cache(Worker& worker, Node*& cache, Node* node) {
1587 if(cache) {
1588 _schedule(worker, cache);
1589 }
1590 cache = node;
1591}
1592
1593// Function: _bulk_update_cache
1594template <size_t N>
1595TF_FORCE_INLINE void Executor::_bulk_update_cache(
1596 Worker& worker, Node*& cache, Node* node, std::array<Node*, N>& array, size_t& n
1597) {
1598 // experimental results show no benefit of using bulk_update_cache
1599 if(cache) {
1600 array[n++] = cache;
1601 if(n == N) {
1602 _bulk_schedule(worker, array, n);
1603 n = 0;
1604 }
1605 }
1606 cache = node;
1607}
1608
1609// Procedure: _invoke
1610inline void Executor::_invoke(Worker& worker, Node* node) {
1611
1612 #define TF_INVOKE_CONTINUATION() \
1613 if (cache) { \
1614 node = cache; \
1615 goto begin_invoke; \
1616 }
1617
1618 begin_invoke:
1619
1620 Node* cache {nullptr};
1621
1622 // if this is the second invoke due to preemption, directly jump to invoke task
1623 if(node->_nstate & NSTATE::PREEMPTED) {
1624 goto invoke_task;
1625 }
1626
1627 // If the work has been cancelled, there is no need to continue.
1628 // Here, we do tear_down_invoke since async tasks may also get cancelled where
1629 // we need to recycle the node.
1630 if(node->_is_parent_cancelled()) {
1631 _tear_down_invoke(worker, node, cache);
1632 TF_INVOKE_CONTINUATION();
1633 return;
1634 }
1635
1636 // if acquiring semaphore(s) exists, acquire them first
1637 if(node->_semaphores && !node->_semaphores->to_acquire.empty()) {
1638 SmallVector<Node*> waiters;
1639 if(!node->_acquire_all(waiters)) {
1640 _bulk_schedule(worker, waiters.begin(), waiters.size());
1641 return;
1642 }
1643 }
1644
1645 invoke_task:
1646
1647 SmallVector<int> conds;
1648
1649 // switch is faster than nested if-else due to jump table
1650 switch(node->_handle.index()) {
1651 // static task
1652 case Node::STATIC:{
1653 _invoke_static_task(worker, node);
1654 }
1655 break;
1656
1657 // runtime task
1658 case Node::RUNTIME:{
1659 if(_invoke_runtime_task(worker, node)) {
1660 return;
1661 }
1662 }
1663 break;
1664
1665 // non-preemptive runtime task
1666 case Node::NONPREEMPTIVE_RUNTIME:{
1667 _invoke_nonpreemptive_runtime_task(worker, node);
1668 }
1669 break;
1670
1671 // subflow task
1672 case Node::SUBFLOW: {
1673 if(_invoke_subflow_task(worker, node)) {
1674 return;
1675 }
1676 }
1677 break;
1678
1679 // condition task
1680 case Node::CONDITION: {
1681 _invoke_condition_task(worker, node, conds);
1682 }
1683 break;
1684
1685 // multi-condition task
1686 case Node::MULTI_CONDITION: {
1687 _invoke_multi_condition_task(worker, node, conds);
1688 }
1689 break;
1690
1691 // module task
1692 case Node::MODULE: {
1693 if(_invoke_module_task(worker, node)) {
1694 return;
1695 }
1696 }
1697 break;
1698
1699 // adopted module task
1700 case Node::ADOPTED_MODULE: {
1701 if(_invoke_adopted_module_task(worker, node)) {
1702 return;
1703 }
1704 }
1705 break;
1706
1707 // async task
1708 case Node::ASYNC: {
1709 if(_invoke_async_task(worker, node)) {
1710 return;
1711 }
1712 _tear_down_async(worker, node, cache);
1713 TF_INVOKE_CONTINUATION();
1714 return;
1715 }
1716 break;
1717
1718 // dependent async task
1719 case Node::DEPENDENT_ASYNC: {
1720 if(_invoke_dependent_async_task(worker, node)) {
1721 return;
1722 }
1723 _tear_down_dependent_async(worker, node, cache);
1724 TF_INVOKE_CONTINUATION();
1725 return;
1726 }
1727 break;
1728
1729 // monostate (placeholder)
1730 default:
1731 break;
1732 }
1733
1734 // if releasing semaphores exist, release them
1735 if(node->_semaphores && !node->_semaphores->to_release.empty()) {
1736 SmallVector<Node*> waiters;
1737 node->_release_all(waiters);
1738 _bulk_schedule(worker, waiters.begin(), waiters.size());
1739 }
1740
1741 // Reset the join counter with strong dependencies to support cycles.
1742 // + We must do this before scheduling the successors to avoid race
1743 // condition on _predecessors.
1744 // + We must use fetch_add instead of direct assigning
1745 // because the user-level call on "invoke" may explicitly schedule
1746 // this task again (e.g., pipeline) which can access the join_counter.
1747 node->_join_counter.fetch_add(
1748 node->_nstate & NSTATE::STRONG_DEPENDENCIES_MASK, std::memory_order_relaxed
1749 );
1750
1751 // Invoke the task based on the corresponding type
1752 switch(node->_handle.index()) {
1753
1754 // condition and multi-condition tasks
1755 case Node::CONDITION:
1756 case Node::MULTI_CONDITION: {
1757 for(auto cond : conds) {
1758 if(cond >= 0 && static_cast<size_t>(cond) < node->_num_successors) {
1759 auto s = node->_edges[cond];
1760 // zeroing the join counter for invariant
1761 s->_join_counter.store(0, std::memory_order_relaxed);
1762 node->_parent->_join_counter.fetch_add(1, std::memory_order_relaxed);
1763 _update_cache(worker, cache, s);
1764 }
1765 }
1766 }
1767 break;
1768
1769 // non-condition task
1770 default: {
1771 for(size_t i=0; i<node->_num_successors; ++i) {
1772 if(auto s = node->_edges[i]; s->_join_counter.fetch_sub(1, std::memory_order_acq_rel) == 1) {
1773 node->_parent->_join_counter.fetch_add(1, std::memory_order_relaxed);
1774 _update_cache(worker, cache, s);
1775 }
1776 }
1777 }
1778 break;
1779 }
1780
1781 // clean up the node after execution
1782 _tear_down_nonasync(worker, node, cache);
1783 TF_INVOKE_CONTINUATION();
1784}
1785
1786// Procedure: _tear_down_nonasync
1787inline void Executor::_tear_down_nonasync(Worker& worker, Node* node, Node*& cache) {
1788
1789 // we must check parent first before subtracting the join counter,
1790 // or it can introduce data race
1791 if(auto parent = node->_parent; parent == node->_topology) {
1792 if(parent->_join_counter.fetch_sub(1, std::memory_order_acq_rel) == 1) {
1793 _tear_down_topology(worker, node->_topology, cache);
1794 }
1795 }
1796 else {
1797 // needs to fetch every data before join counter becomes zero at which
1798 // the node may be deleted
1799 auto state = parent->_nstate;
1800 if(parent->_join_counter.fetch_sub(1, std::memory_order_acq_rel) == 1) {
1801 // this task is spawned from a preempted parent, so we need to resume it
1802 if(state & NSTATE::PREEMPTED) {
1803 _update_cache(worker, cache, static_cast<Node*>(parent));
1804 }
1805 }
1806 }
1807}
1808
1809// Procedure: _tear_down_invoke
1810inline void Executor::_tear_down_invoke(Worker& worker, Node* node, Node*& cache) {
1811 switch(node->_handle.index()) {
1812 case Node::ASYNC:
1813 _tear_down_async(worker, node, cache);
1814 break;
1815
1816 case Node::DEPENDENT_ASYNC:
1817 _tear_down_dependent_async(worker, node, cache);
1818 break;
1819
1820 default:
1821 _tear_down_nonasync(worker, node, cache);
1822 break;
1823 }
1824}
1825
1826// Procedure: _observer_prologue
1827inline void Executor::_observer_prologue(Worker& worker, Node* node) {
1828 for(auto& observer : _observers) {
1829 observer->on_entry(WorkerView(worker), TaskView(*node));
1830 }
1831}
1832
1833// Procedure: _observer_epilogue
1834inline void Executor::_observer_epilogue(Worker& worker, Node* node) {
1835 for(auto& observer : _observers) {
1836 observer->on_exit(WorkerView(worker), TaskView(*node));
1837 }
1838}
1839
1840// Procedure: _process_exception
1841inline void Executor::_process_exception(Worker&, Node* node) {
1842
1843 // Finds the anchor and mark the entire path with exception,
1844 // so recursive tasks can be cancelled properly.
1845 // Since exception can come from asynchronous task (with runtime), the node itself can be anchored.
1846 NodeBase* ea = node; // explicit anchor
1847 NodeBase* ia = nullptr; // implicit anchor
1848
1849 while(ea && (ea->_estate.load(std::memory_order_relaxed) & ESTATE::EXPLICITLY_ANCHORED) == 0) {
1850 ea->_estate.fetch_or(ESTATE::EXCEPTION, std::memory_order_relaxed);
1851 // we only want the inner-most implicit anchor
1852 if(ia == nullptr && (ea->_nstate & NSTATE::IMPLICITLY_ANCHORED)) {
1853 ia = ea;
1854 }
1855 ea = ea->_parent;
1856 }
1857
1858 // flag used to ensure execution is caught in a thread-safe manner
1859 constexpr static auto flag = ESTATE::EXCEPTION | ESTATE::CAUGHT;
1860
1861 // The exception occurs under a blocking call (e.g., corun, join).
1862 if(ea) {
1863 // multiple tasks may throw, and we only take the first thrown exception
1864 if((ea->_estate.fetch_or(flag, std::memory_order_relaxed) & ESTATE::CAUGHT) == 0) {
1865 ea->_exception_ptr = std::current_exception();
1866 return;
1867 }
1868 }
1869 // Implicit anchor has the lowest priority
1870 else if(ia){
1871 if((ia->_estate.fetch_or(flag, std::memory_order_relaxed) & ESTATE::CAUGHT) == 0) {
1872 ia->_exception_ptr = std::current_exception();
1873 return;
1874 }
1875 }
1876
1877 // For now, we simply store the exception in this node; this can happen in an
1878 // execution that does not have any external control to capture the exception,
1879 // such as silent async task without any parent.
1880 node->_exception_ptr = std::current_exception();
1881}
1882
1883// Procedure: _invoke_static_task
1884inline void Executor::_invoke_static_task(Worker& worker, Node* node) {
1885 _observer_prologue(worker, node);
1886 TF_EXECUTOR_EXCEPTION_HANDLER(worker, node, {
1887 std::get_if<Node::Static>(&node->_handle)->work();
1888 });
1889 _observer_epilogue(worker, node);
1890}
1891
1892// Procedure: _invoke_subflow_task
1893inline bool Executor::_invoke_subflow_task(Worker& worker, Node* node) {
1894
1895 auto& h = *std::get_if<Node::Subflow>(&node->_handle);
1896 auto& g = h.subgraph;
1897
1898 if((node->_nstate & NSTATE::PREEMPTED) == 0) {
1899
1900 // set up the subflow
1901 Subflow sf(*this, worker, node, g);
1902
1903 // invoke the subflow callable
1904 _observer_prologue(worker, node);
1905 TF_EXECUTOR_EXCEPTION_HANDLER(worker, node, {
1906 h.work(sf);
1907 });
1908 _observer_epilogue(worker, node);
1909
1910 // spawn the subflow if it is joinable and its graph is non-empty
1911 // implicit join is faster than Subflow::join as it does not involve corun
1912 if(sf.joinable() && !g.empty()) {
1913
1914 // signal the executor to preempt this node
1915 node->_nstate |= NSTATE::PREEMPTED;
1916
1917 // set up and schedule the graph
1918 _schedule_graph(worker, g, node->_topology, node);
1919 return true;
1920 }
1921 }
1922 else {
1923 node->_nstate &= ~NSTATE::PREEMPTED;
1924 }
1925
1926 // The subflow has finished or joined.
1927 // By default, we clear the subflow storage as applications can perform recursive
1928 // subflow tasking which accumulates a huge amount of memory overhead, hampering
1929 // the performance.
1930 if((node->_nstate & NSTATE::RETAIN_SUBFLOW) == 0) {
1931 g.clear();
1932 }
1933
1934 return false;
1935}
1936
1937// Procedure: _invoke_condition_task
1938inline void Executor::_invoke_condition_task(
1939 Worker& worker, Node* node, SmallVector<int>& conds
1940) {
1941 _observer_prologue(worker, node);
1942 TF_EXECUTOR_EXCEPTION_HANDLER(worker, node, {
1943 auto& work = std::get_if<Node::Condition>(&node->_handle)->work;
1944 conds = { work() };
1945 });
1946 _observer_epilogue(worker, node);
1947}
1948
1949// Procedure: _invoke_multi_condition_task
1950inline void Executor::_invoke_multi_condition_task(
1951 Worker& worker, Node* node, SmallVector<int>& conds
1952) {
1953 _observer_prologue(worker, node);
1954 TF_EXECUTOR_EXCEPTION_HANDLER(worker, node, {
1955 conds = std::get_if<Node::MultiCondition>(&node->_handle)->work();
1956 });
1957 _observer_epilogue(worker, node);
1958}
1959
1960// Procedure: _invoke_module_task
1961inline bool Executor::_invoke_module_task(Worker& w, Node* node) {
1962 return _invoke_module_task_impl(w, node, std::get_if<Node::Module>(&node->_handle)->graph);
1963}
1964
1965// Procedure: _invoke_adopted_module_task
1966inline bool Executor::_invoke_adopted_module_task(Worker& w, Node* node) {
1967 return _invoke_module_task_impl(w, node, std::get_if<Node::AdoptedModule>(&node->_handle)->graph);
1968}
1969
1970// Procedure: _invoke_module_task_impl
1971inline bool Executor::_invoke_module_task_impl(Worker& w, Node* node, Graph& graph) {
1972
1973 // No need to do anything for empty graph
1974 if(graph.empty()) {
1975 return false;
1976 }
1977
1978 // first entry - not spawned yet
1979 if((node->_nstate & NSTATE::PREEMPTED) == 0) {
1980 // signal the executor to preempt this node
1981 node->_nstate |= NSTATE::PREEMPTED;
1982 _schedule_graph(w, graph, node->_topology, node);
1983 return true;
1984 }
1985
1986 // second entry - already spawned
1987 node->_nstate &= ~NSTATE::PREEMPTED;
1988
1989 return false;
1990}
1991
1992
1993// Procedure: _invoke_async_task
1994inline bool Executor::_invoke_async_task(Worker& worker, Node* node) {
1995 auto& work = std::get_if<Node::Async>(&node->_handle)->work;
1996 switch(work.index()) {
1997 // void()
1998 case 0:
1999 _observer_prologue(worker, node);
2000 TF_EXECUTOR_EXCEPTION_HANDLER(worker, node, {
2001 std::get_if<0>(&work)->operator()();
2002 });
2003 _observer_epilogue(worker, node);
2004 break;
2005
2006 // void(Runtime&)
2007 case 1:
2008 if(_invoke_runtime_task_impl(worker, node, *std::get_if<1>(&work))) {
2009 return true;
2010 }
2011 break;
2012
2013 // void(Runtime&, bool)
2014 case 2:
2015 if(_invoke_runtime_task_impl(worker, node, *std::get_if<2>(&work))) {
2016 return true;
2017 }
2018 break;
2019 }
2020
2021 return false;
2022}
2023
2024// Procedure: _invoke_dependent_async_task
2025inline bool Executor::_invoke_dependent_async_task(Worker& worker, Node* node) {
2026 auto& work = std::get_if<Node::DependentAsync>(&node->_handle)->work;
2027 switch(work.index()) {
2028 // void()
2029 case 0:
2030 _observer_prologue(worker, node);
2031 TF_EXECUTOR_EXCEPTION_HANDLER(worker, node, {
2032 std::get_if<0>(&work)->operator()();
2033 });
2034 _observer_epilogue(worker, node);
2035 break;
2036
2037 // void(Runtime&) - silent async
2038 case 1:
2039 if(_invoke_runtime_task_impl(worker, node, *std::get_if<1>(&work))) {
2040 return true;
2041 }
2042 break;
2043
2044 // void(Runtime&, bool) - async
2045 case 2:
2046 if(_invoke_runtime_task_impl(worker, node, *std::get_if<2>(&work))) {
2047 return true;
2048 }
2049 break;
2050 }
2051 return false;
2052}
2053
2054// Function: run
2055inline tf::Future<void> Executor::run(Taskflow& f) {
2056 return run_n(f, 1, [](){});
2057}
2058
2059// Function: run
2060inline tf::Future<void> Executor::run(Taskflow&& f) {
2061 return run_n(std::move(f), 1, [](){});
2062}
2063
2064// Function: run
2065template <typename C>
2066tf::Future<void> Executor::run(Taskflow& f, C&& c) {
2067 return run_n(f, 1, std::forward<C>(c));
2068}
2069
2070// Function: run
2071template <typename C>
2072tf::Future<void> Executor::run(Taskflow&& f, C&& c) {
2073 return run_n(std::move(f), 1, std::forward<C>(c));
2074}
2075
2076// Function: run_n
2077inline tf::Future<void> Executor::run_n(Taskflow& f, size_t repeat) {
2078 return run_n(f, repeat, [](){});
2079}
2080
2081// Function: run_n
2082inline tf::Future<void> Executor::run_n(Taskflow&& f, size_t repeat) {
2083 return run_n(std::move(f), repeat, [](){});
2084}
2085
2086// Function: run_n
2087template <typename C>
2088tf::Future<void> Executor::run_n(Taskflow& f, size_t repeat, C&& c) {
2089 return run_until(
2090 f, [repeat]() mutable { return repeat-- == 0; }, std::forward<C>(c)
2091 );
2092}
2093
2094// Function: run_n
2095template <typename C>
2096tf::Future<void> Executor::run_n(Taskflow&& f, size_t repeat, C&& c) {
2097 return run_until(
2098 std::move(f), [repeat]() mutable { return repeat-- == 0; }, std::forward<C>(c)
2099 );
2100}
2101
2102// Function: run_until
2103template<typename P>
2104tf::Future<void> Executor::run_until(Taskflow& f, P&& pred) {
2105 return run_until(f, std::forward<P>(pred), [](){});
2106}
2107
2108// Function: run_until
2109template<typename P>
2110tf::Future<void> Executor::run_until(Taskflow&& f, P&& pred) {
2111 return run_until(std::move(f), std::forward<P>(pred), [](){});
2112}
2113
2114// Function: run_until
2115template <typename P, typename C>
2116tf::Future<void> Executor::run_until(Taskflow& f, P&& p, C&& c) {
2117
2118 // No need to create a real topology but returns an dummy future for invariant.
2119 if(f.empty() || p()) {
2120 c();
2121 std::promise<void> promise;
2122 promise.set_value();
2123 return tf::Future<void>(promise.get_future());
2124 }
2125
2126 _increment_topology();
2127
2128 // create a topology for this run
2129 auto t = std::make_shared<Topology>(f, std::forward<P>(p), std::forward<C>(c));
2130 //auto t = std::make_shared<DerivedTopology<P, C>>(f, std::forward<P>(p), std::forward<C>(c));
2131
2132 // need to create future before the topology got torn down quickly
2133 tf::Future<void> future(t->_promise.get_future(), t);
2134
2135 // modifying topology needs to be protected under the lock
2136 if(f._fetch_enqueue(t) == 0) {
2137 _set_up_topology(this_worker(), t.get());
2138 }
2139
2140 return future;
2141}
2142
2143
2144
2145// Function: corun_until
2146template <typename P>
2147void Executor::corun_until(P&& predicate) {
2148
2149 Worker* w = this_worker();
2150 if(w == nullptr) {
2151 TF_THROW("corun_until must be called by a worker of the executor");
2152 }
2153
2154 _corun_until(*w, std::forward<P>(predicate));
2155}
2156
2157// Function: _corun_until
2158template <typename P>
2159void Executor::_corun_until(Worker& w, P&& stop_predicate) {
2160
2161 const size_t MAX_VICTIM = num_queues();
2162 const size_t MAX_STEALS = ((MAX_VICTIM + 1) << 1);
2163
2164 exploit:
2165
2166 while(!stop_predicate()) {
2167
2168 // here we don't do while-loop to drain out the local queue as it can
2169 // potentially enter a very deep recursive corun, cuasing stack overflow
2170 if(auto t = w._wsq.pop(); t) {
2171 _invoke(w, t);
2172 }
2173 else {
2174 size_t num_steals = 0;
2175 size_t vtm = w._sticky_victim;
2176
2177 explore:
2178
2179 t = (vtm < _workers.size())
2180 ? _workers[vtm]._wsq.steal()
2181 : _buffers[vtm-_workers.size()].queue.steal();
2182
2183 if(t) {
2184 _invoke(w, t);
2185 w._sticky_victim = vtm;
2186 goto exploit;
2187 }
2188 else if(!stop_predicate()) {
2189 if(++num_steals > MAX_STEALS) {
2190 std::this_thread::yield();
2191 }
2192 vtm = w._rdgen() % MAX_VICTIM;
2193 goto explore;
2194 }
2195 else {
2196 break;
2197 }
2198 }
2199 }
2200}
2201
2202// Function: corun
2203template <typename T>
2204void Executor::corun(T& target) {
2205
2206 Worker* w = this_worker();
2207 if(w == nullptr) {
2208 TF_THROW("corun must be called by a worker of the executor");
2209 }
2210
2211 NodeBase anchor;
2212 _corun_graph(*w, retrieve_graph(target), nullptr, &anchor);
2213}
2214
2215// Procedure: _corun_graph
2216inline void Executor::_corun_graph(Worker& w, Graph& g, Topology* tpg, NodeBase* p) {
2217
2218 // empty graph
2219 if(g.empty()) {
2220 return;
2221 }
2222
2223 // anchor this parent as the blocking point
2224 {
2225 ExplicitAnchorGuard anchor(p);
2226 _schedule_graph(w, g, tpg, p);
2227 _corun_until(w, [p] () -> bool {
2228 return p->_join_counter.load(std::memory_order_acquire) == 0; }
2229 );
2230 }
2231
2232 // rethrow the exception to the caller
2233 p->_rethrow_exception();
2234}
2235
2236// Procedure: _increment_topology
2237inline void Executor::_increment_topology() {
2238 _num_topologies.fetch_add(1, std::memory_order_relaxed);
2239}
2240
2241// Procedure: _decrement_topology
2242inline void Executor::_decrement_topology() {
2243 if(_num_topologies.fetch_sub(1, std::memory_order_acq_rel) == 1) {
2244 _num_topologies.notify_all();
2245 }
2246}
2247
2248// Procedure: wait_for_all
2249inline void Executor::wait_for_all() {
2250 size_t n = _num_topologies.load(std::memory_order_acquire);
2251 while(n != 0) {
2252 _num_topologies.wait(n, std::memory_order_acquire);
2253 n = _num_topologies.load(std::memory_order_acquire);
2254 }
2255}
2256
2257// Function: _schedule_graph
2258inline void Executor::_schedule_graph(
2259 Worker& worker, Graph& graph, Topology* tpg, NodeBase* parent
2260) {
2261 size_t num_srcs = _set_up_graph(graph, tpg, parent);
2262 parent->_join_counter.fetch_add(num_srcs, std::memory_order_relaxed);
2263 _bulk_schedule(worker, graph.begin(), num_srcs);
2264}
2265
2266// Function: _set_up_topology
2267inline void Executor::_set_up_topology(Worker* w, Topology* tpg) {
2268 // ---- under taskflow lock ----
2269 auto& g = tpg->_taskflow._graph;
2270 size_t num_srcs = _set_up_graph(g, tpg, tpg);
2271 tpg->_join_counter.store(num_srcs, std::memory_order_relaxed);
2272 w ? _bulk_schedule(*w, g.begin(), num_srcs) : _bulk_schedule(g.begin(), num_srcs);
2273}
2274
2275// Function: _set_up_graph
2276inline size_t Executor::_set_up_graph(Graph& graph, Topology* tpg, NodeBase* parent) {
2277
2278 auto first = graph.begin();
2279 auto last = graph.end();
2280 auto send = first;
2281 for(; first != last; ++first) {
2282
2283 auto node = *first;
2284 node->_topology = tpg;
2285 node->_parent = parent;
2286 node->_nstate = NSTATE::NONE;
2287 node->_estate.store(ESTATE::NONE, std::memory_order_relaxed);
2288 node->_set_up_join_counter();
2289 node->_exception_ptr = nullptr;
2290
2291 // move source to the first partition
2292 // root, root, root, v1, v2, v3, v4, ...
2293 if(node->num_predecessors() == 0) {
2294 std::iter_swap(send++, first);
2295 }
2296 }
2297 return send - graph.begin();
2298}
2299
2300// Function: _tear_down_topology
2301inline void Executor::_tear_down_topology(Worker& worker, Topology* tpg, Node*& cache) {
2302
2303 auto &f = tpg->_taskflow;
2304
2305 //assert(&tpg == &(f._topologies.front()));
2306
2307 // case 1: we still need to run the topology again
2308 //if(!tpg->_exception_ptr && !tpg->cancelled() && !tpg->predicate()) {
2309 if(!tpg->cancelled() && !tpg->_predicate()) {
2310 //assert(tpg->_join_counter == 0);
2311 //std::lock_guard<std::mutex> lock(f._mutex);
2312 _schedule_graph(worker, tpg->_taskflow._graph, tpg, tpg);
2313 }
2314 // case 2: the final run of this topology
2315 else {
2316
2317 // invoke the callback after each run
2318 tpg->_on_finish();
2319
2320 // there is another topologies to run
2321 if(std::unique_lock<std::mutex> lock(f._mutex); f._topologies.size()>1) {
2322
2323 auto fetched_tpg {std::move(f._topologies.front())};
2324 //assert(fetched_tpg.get() == tpg);
2325
2326 f._topologies.pop();
2327 tpg = f._topologies.front().get();
2328
2329 lock.unlock();
2330
2331 // Soon after we carry out the promise, the associate taskflow may got destroyed
2332 // from the user side, and we should never tough it again.
2333 fetched_tpg->_carry_out_promise();
2334
2335 // decrement the topology
2336 _decrement_topology();
2337
2338 _schedule_graph(worker, tpg->_taskflow._graph, tpg, tpg);
2339 }
2340 else {
2341 //assert(f._topologies.size() == 1);
2342
2343 auto fetched_tpg {std::move(f._topologies.front())};
2344 //assert(fetched_tpg.get() == tpg);
2345
2346 f._topologies.pop();
2347
2348 lock.unlock();
2349
2350 // Soon after we carry out the promise, the associate taskflow may got destroyed
2351 // from the user side, and we should never tough it again.
2352 fetched_tpg->_carry_out_promise();
2353
2354 _decrement_topology();
2355
2356 // remove the parent that owns the moved taskflow so the storage can be freed
2357 if(auto parent = fetched_tpg->_parent; parent) {
2358 //auto state = parent->_nstate;
2359 if(parent->_join_counter.fetch_sub(1, std::memory_order_acq_rel) == 1) {
2360 // this async is spawned from a preempted parent, so we need to resume it
2361 //if(state & NSTATE::PREEMPTED) {
2362 _update_cache(worker, cache, static_cast<Node*>(parent));
2363 //}
2364 }
2365 }
2366 }
2367 }
2368}
2369
2370// ############################################################################
2371// Forward Declaration: Subflow
2372// ############################################################################
2373
2374inline void Subflow::join() {
2375
2376 if(!joinable()) {
2377 TF_THROW("subflow already joined");
2378 }
2379
2380 _executor._corun_graph(_worker, _graph, _node->_topology, _node);
2381
2382 // join here since corun graph may throw exception
2383 _node->_nstate |= NSTATE::JOINED_SUBFLOW;
2384}
2385
2386#endif
2387
2388
2389
2390
2391} // end of namespace tf -----------------------------------------------------
class to hold a dependent asynchronous task with shared ownership
Definition async_task.hpp:45
void silent_async(P &&params, F &&func)
similar to tf::Executor::async but does not return a future object
tf::AsyncTask silent_dependent_async(F &&func, Tasks &&... tasks)
runs the given function asynchronously when the given predecessors finish
tf::Future< void > run_until(Taskflow &taskflow, P &&pred)
runs a taskflow multiple times until the predicate becomes true
void corun_until(P &&predicate)
keeps running the work-stealing loop until the predicate returns true
void remove_observer(std::shared_ptr< Observer > observer)
removes an observer from the executor
auto dependent_async(F &&func, Tasks &&... tasks)
runs the given function asynchronously when the given predecessors finish
tf::Future< void > run(Taskflow &&taskflow)
runs a moved taskflow once
tf::Future< void > run(Taskflow &taskflow)
runs a taskflow once
size_t num_waiters() const noexcept
queries the number of workers that are in the waiting loop
tf::Future< void > run(Taskflow &&taskflow, C &&callable)
runs a moved taskflow once and invoke a callback upon completion
~Executor()
destructs the executor
int this_worker_id() const
queries the id of the caller thread within this executor
size_t num_queues() const noexcept
queries the number of work-stealing queues used by the executor
tf::Future< void > run_n(Taskflow &taskflow, size_t N)
runs a taskflow for N times
size_t num_topologies() const
queries the number of running topologies at the time of this call
Executor(size_t N=std::thread::hardware_concurrency(), std::shared_ptr< WorkerInterface > wif=nullptr)
constructs the executor with N worker threads
TaskGroup task_group()
creates a task group that executes a collection of asynchronous tasks
Definition task_group.hpp:875
void corun(T &target)
runs a target graph and waits until it completes using an internal worker of this executor
size_t num_workers() const noexcept
queries the number of worker threads
tf::Future< void > run_until(Taskflow &&taskflow, P &&pred)
runs a moved taskflow and keeps running it until the predicate becomes true
void wait_for_all()
waits for all tasks to complete
tf::Future< void > run_n(Taskflow &taskflow, size_t N, C &&callable)
runs a taskflow for N times and then invokes a callback
tf::Future< void > run(Taskflow &taskflow, C &&callable)
runs a taskflow once and invoke a callback upon completion
tf::Future< void > run_n(Taskflow &&taskflow, size_t N)
runs a moved taskflow for N times
tf::Future< void > run_n(Taskflow &&taskflow, size_t N, C &&callable)
runs a moved taskflow for N times and then invokes a callback
tf::Future< void > run_until(Taskflow &taskflow, P &&pred, C &&callable)
runs a taskflow multiple times until the predicate becomes true and then invokes the callback
Worker * this_worker()
queries pointer to the calling worker if it belongs to this executor, otherwise returns nullptr
auto async(P &&params, F &&func)
creates a parameterized asynchronous task to run the given function
std::shared_ptr< Observer > make_observer(ArgsT &&... args)
constructs an observer to inspect the activities of worker threads
size_t num_observers() const noexcept
queries the number of observers
Graph & _graph
associated graph object
Definition flow_builder.hpp:1337
class to access the result of an execution
Definition taskflow.hpp:630
class to create a graph object
Definition graph.hpp:47
class to define a vector optimized for small array
Definition small_vector.hpp:931
void join()
enables the subflow to join its parent task
bool joinable() const noexcept
queries if the subflow is joinable
Definition flow_builder.hpp:1615
class to create a taskflow object
Definition taskflow.hpp:64
class to create a lock-free unbounded work-stealing queue
Definition wsq.hpp:67
class to create a worker in an executor
Definition worker.hpp:55
determines if a type is a task parameter type
Definition graph.hpp:177
taskflow namespace
Definition small_vector.hpp:20
NonblockingNotifier DefaultNotifier
the default notifier type used by Taskflow
Definition worker.hpp:38
Graph & retrieve_graph(T &t)
retrieves a reference to the underlying tf::Graph from an object
Definition graph.hpp:975
bool has_env(const std::string &str)
checks whether an environment variable is defined
Definition os.hpp:213