Loading...
Searching...
No Matches
executor.hpp
1#pragma once
2
3#include "../observer/tfprof.hpp"
4#include "taskflow.hpp"
5#include "async_task.hpp"
6
11
12namespace tf {
13
14// ----------------------------------------------------------------------------
15// Executor Definition
16// ----------------------------------------------------------------------------
17
62class Executor {
63
64 friend class FlowBuilder;
65 friend class Subflow;
66 friend class Runtime;
67 friend class NonpreemptiveRuntime;
68 friend class Algorithm;
69 friend class TaskGroup;
70
71 public:
72
91 explicit Executor(
92 size_t N = std::thread::hardware_concurrency(),
93 std::shared_ptr<WorkerInterface> wif = nullptr
94 );
95
104
124
145
168 template<typename C>
169 tf::Future<void> run(Taskflow& taskflow, C&& callable);
170
195 template<typename C>
196 tf::Future<void> run(Taskflow&& taskflow, C&& callable);
197
217 tf::Future<void> run_n(Taskflow& taskflow, size_t N);
218
241 tf::Future<void> run_n(Taskflow&& taskflow, size_t N);
242
268 template<typename C>
269 tf::Future<void> run_n(Taskflow& taskflow, size_t N, C&& callable);
270
296 template<typename C>
297 tf::Future<void> run_n(Taskflow&& taskflow, size_t N, C&& callable);
298
322 template<typename P>
323 tf::Future<void> run_until(Taskflow& taskflow, P&& pred);
324
350 template<typename P>
351 tf::Future<void> run_until(Taskflow&& taskflow, P&& pred);
352
379 template<typename P, typename C>
380 tf::Future<void> run_until(Taskflow& taskflow, P&& pred, C&& callable);
381
410 template<typename P, typename C>
411 tf::Future<void> run_until(Taskflow&& taskflow, P&& pred, C&& callable);
412
453 template <typename T>
454 void corun(T& target);
455
484 template <typename P>
485 void corun_until(P&& predicate);
486
501
512 size_t num_workers() const noexcept;
513
520 size_t num_waiters() const noexcept;
521
525 size_t num_queues() const noexcept;
526
540 size_t num_topologies() const;
541
559
577 int this_worker_id() const;
578
579 // --------------------------------------------------------------------------
580 // Observer methods
581 // --------------------------------------------------------------------------
582
600 template <typename Observer, typename... ArgsT>
601 std::shared_ptr<Observer> make_observer(ArgsT&&... args);
602
608 template <typename Observer>
609 void remove_observer(std::shared_ptr<Observer> observer);
610
614 size_t num_observers() const noexcept;
615
616 // --------------------------------------------------------------------------
617 // Async Task Methods
618 // --------------------------------------------------------------------------
619
645 template <typename P, typename F>
646 auto async(P&& params, F&& func);
647
671 template <typename F>
672 auto async(F&& func);
673
697 template <typename P, typename F>
698 void silent_async(P&& params, F&& func);
699
722 template <typename F>
723 void silent_async(F&& func);
724
725 // --------------------------------------------------------------------------
726 // Silent Dependent Async Methods
727 // --------------------------------------------------------------------------
728
756 template <typename F, typename... Tasks>
757requires (std::same_as<std::decay_t<Tasks>, AsyncTask> && ...)
758 tf::AsyncTask silent_dependent_async(F&& func, Tasks&&... tasks);
759
791 template <TaskParamsLike P, typename F, typename... Tasks>
792 requires (std::same_as<std::decay_t<Tasks>, AsyncTask> && ...)
793 tf::AsyncTask silent_dependent_async(P&& params, F&& func, Tasks&&... tasks);
794
827 template <typename F, typename I>
828requires (!std::same_as<std::decay_t<I>, AsyncTask>)
829 tf::AsyncTask silent_dependent_async(F&& func, I first, I last);
830
865 template <TaskParamsLike P, typename F, typename I>
866 requires (!std::same_as<std::decay_t<I>, AsyncTask>)
867 tf::AsyncTask silent_dependent_async(P&& params, F&& func, I first, I last);
868
869 // --------------------------------------------------------------------------
870 // Dependent Async Methods
871 // --------------------------------------------------------------------------
872
910 template <typename F, typename... Tasks>
911requires (std::same_as<std::decay_t<Tasks>, AsyncTask> && ...)
912 auto dependent_async(F&& func, Tasks&&... tasks);
913
955 template <TaskParamsLike P, typename F, typename... Tasks>
956 requires (std::same_as<std::decay_t<Tasks>, AsyncTask> && ...)
957 auto dependent_async(P&& params, F&& func, Tasks&&... tasks);
958
999 template <typename F, typename I>
1000requires (!std::same_as<std::decay_t<I>, AsyncTask>)
1001 auto dependent_async(F&& func, I first, I last);
1002
1047 template <TaskParamsLike P, typename F, typename I>
1048 requires (!std::same_as<std::decay_t<I>, AsyncTask>)
1049 auto dependent_async(P&& params, F&& func, I first, I last);
1050
1051 // ----------------------------------------------------------------------------------------------
1052 // Task Group
1053 // ----------------------------------------------------------------------------------------------
1054
1100 TaskGroup task_group();
1101
1102 private:
1103
1104 struct Buffer {
1105 std::mutex mutex;
1106 UnboundedWSQ<Node*> queue;
1107 };
1108
1109 std::vector<Worker> _workers;
1110 std::vector<Buffer> _buffers;
1111
1112 // notifier's state variable and num_topologies should sit on different cachelines
1113 // or the false sharing can cause serious performance drop
1114 alignas(TF_CACHELINE_SIZE) DefaultNotifier _notifier;
1115 alignas(TF_CACHELINE_SIZE) std::atomic<size_t> _num_topologies {0};
1116
1117 std::unordered_map<std::thread::id, Worker*> _t2w;
1118 std::unordered_set<std::shared_ptr<ObserverInterface>> _observers;
1119
1120 void _shutdown();
1121 void _observer_prologue(Worker&, Node*);
1122 void _observer_epilogue(Worker&, Node*);
1123 void _spawn(size_t, std::shared_ptr<WorkerInterface>);
1124 void _exploit_task(Worker&, Node*&);
1125 bool _explore_task(Worker&, Node*&);
1126 void _schedule(Worker&, Node*);
1127 void _schedule(Node*);
1128 void _schedule_graph(Worker&, Graph&, Topology*, NodeBase*);
1129 void _spill(Node*);
1130 void _set_up_topology(Worker*, Topology*);
1131 void _tear_down_topology(Worker&, Topology*, Node*&);
1132 void _tear_down_async(Worker&, Node*, Node*&);
1133 void _tear_down_dependent_async(Worker&, Node*, Node*&);
1134 void _tear_down_nonasync(Worker&, Node*, Node*&);
1135 void _tear_down_invoke(Worker&, Node*, Node*&);
1136 void _increment_topology();
1137 void _decrement_topology();
1138 void _invoke(Worker&, Node*);
1139 void _invoke_static_task(Worker&, Node*);
1140 void _invoke_nonpreemptive_runtime_task(Worker&, Node*);
1141 void _invoke_condition_task(Worker&, Node*, SmallVector<int>&);
1142 void _invoke_multi_condition_task(Worker&, Node*, SmallVector<int>&);
1143 void _process_dependent_async(Node*, tf::AsyncTask&, size_t&);
1144 void _process_exception(Worker&, Node*);
1145 void _update_cache(Worker&, Node*&, Node*);
1146 void _corun_graph(Worker&, Graph&, Topology*, NodeBase*);
1147
1148 bool _wait_for_task(Worker&, Node*&);
1149 bool _invoke_subflow_task(Worker&, Node*);
1150 bool _invoke_module_task(Worker&, Node*);
1151 bool _invoke_adopted_module_task(Worker&, Node*);
1152 bool _invoke_module_task_impl(Worker&, Node*, Graph&);
1153 bool _invoke_async_task(Worker&, Node*);
1154 bool _invoke_dependent_async_task(Worker&, Node*);
1155 bool _invoke_runtime_task(Worker&, Node*);
1156 bool _invoke_runtime_task_impl(Worker&, Node*, std::function<void(Runtime&)>&);
1157 bool _invoke_runtime_task_impl(Worker&, Node*, std::function<void(Runtime&, bool)>&);
1158
1159 size_t _set_up_graph(Graph&, Topology*, NodeBase*);
1160
1161 template <typename P>
1162 void _corun_until(Worker&, P&&);
1163
1164 template <typename I>
1165 void _bulk_schedule(Worker&, I, size_t);
1166
1167 template <typename I>
1168 void _bulk_schedule(I, size_t);
1169
1170 template <typename I>
1171 void _bulk_spill(I, size_t);
1172
1173 template <typename I>
1174 void _bulk_spill_round_robin(I, size_t);
1175
1176 template <size_t N>
1177 void _bulk_update_cache(Worker&, Node*&, Node*, std::array<Node*, N>&, size_t&);
1178
1179 template <typename P, typename F>
1180 auto _async(P&&, F&&, Topology*, NodeBase*);
1181
1182 template <typename P, typename F>
1183 void _silent_async(P&&, F&&, Topology*, NodeBase*);
1184
1185 template <TaskParamsLike P, typename F, typename I>
1186 requires (!std::same_as<std::decay_t<I>, AsyncTask>)
1187 auto _dependent_async(P&&, F&&, I, I, Topology*, NodeBase*);
1188
1189 template <TaskParamsLike P, typename F, typename I>
1190 requires (!std::same_as<std::decay_t<I>, AsyncTask>)
1191 auto _silent_dependent_async(P&&, F&&, I, I, Topology*, NodeBase*);
1192
1193 template <typename... ArgsT>
1194 void _schedule_async_task(ArgsT&&...);
1195
1196 template <typename I, typename... ArgsT>
1197 AsyncTask _schedule_dependent_async_task(I, I, size_t, ArgsT&&...);
1198};
1199
1200#ifndef DOXYGEN_GENERATING_OUTPUT
1201
1202// Constructor
1203inline Executor::Executor(size_t N, std::shared_ptr<WorkerInterface> wif) :
1204 _workers (N),
1205 _buffers (std::bit_width(N)), // Empirically, we find that log2(N) performs best.
1206 _notifier (N) {
1207
1208 if(N == 0) {
1209 TF_THROW("executor must define at least one worker");
1210 }
1211
1212 // If spawning N threads fails, shut down any created threads before
1213 // rethrowing the exception.
1214#ifndef TF_DISABLE_EXCEPTION_HANDLING
1215 try {
1216#endif
1217 _spawn(N, std::move(wif));
1218#ifndef TF_DISABLE_EXCEPTION_HANDLING
1219 }
1220 catch(...) {
1221 _shutdown();
1222 std::rethrow_exception(std::current_exception());
1223 }
1224#endif
1225
1226 // initialize the default observer if requested
1227 if(has_env(TF_ENABLE_PROFILER)) {
1228 TFProfManager::get()._manage(make_observer<TFProfObserver>());
1229 }
1230}
1231
1232// Destructor
1233inline Executor::~Executor() {
1234 _shutdown();
1235}
1236
1237// Function: _shutdown
1238inline void Executor::_shutdown() {
1239
1240 // wait for all topologies to complete
1241 wait_for_all();
1242
1243 // shut down the scheduler
1244 for(size_t i=0; i<_workers.size(); ++i) {
1245 _workers[i]._done.test_and_set(std::memory_order_relaxed);
1246 }
1247
1248 _notifier.notify_all();
1249
1250 // Only join the thread if it is joinable, as std::thread construction
1251 // may fail and throw an exception.
1252 for(auto& w : _workers) {
1253 if(w._thread.joinable()) {
1254 w._thread.join();
1255 }
1256 }
1257}
1258
1259// Function: num_workers
1260inline size_t Executor::num_workers() const noexcept {
1261 return _workers.size();
1262}
1263
1264// Function: num_waiters
1265inline size_t Executor::num_waiters() const noexcept {
1266 return _notifier.num_waiters();
1267}
1268
1269// Function: num_queues
1270inline size_t Executor::num_queues() const noexcept {
1271 return _workers.size() + _buffers.size();
1272}
1273
1274// Function: num_topologies
1275inline size_t Executor::num_topologies() const {
1276 return _num_topologies.load(std::memory_order_relaxed);
1277}
1278
1279// Function: this_worker
1280inline Worker* Executor::this_worker() {
1281 auto itr = _t2w.find(std::this_thread::get_id());
1282 return itr == _t2w.end() ? nullptr : itr->second;
1283}
1284
1285// Function: this_worker_id
1286inline int Executor::this_worker_id() const {
1287 auto i = _t2w.find(std::this_thread::get_id());
1288 return i == _t2w.end() ? -1 : static_cast<int>(i->second->_id);
1289}
1290
1291// Procedure: _spawn
1292inline void Executor::_spawn(size_t N, std::shared_ptr<WorkerInterface> wif) {
1293
1294 for(size_t id=0; id<N; ++id) {
1295 _workers[id]._thread = std::thread([&, id, wif] () {
1296
1297 auto& worker = _workers[id];
1298
1299 worker._id = id;
1300 worker._sticky_victim = id;
1301 worker._rdgen.seed(static_cast<uint32_t>(std::hash<std::thread::id>()(std::this_thread::get_id())));
1302
1303 // before entering the work-stealing loop, call the scheduler prologue
1304 if(wif) {
1305 wif->scheduler_prologue(worker);
1306 }
1307
1308 Node* t = nullptr;
1309 std::exception_ptr ptr = nullptr;
1310
1311 // must use 1 as condition instead of !done because
1312 // the previous worker may stop while the following workers
1313 // are still preparing for entering the scheduling loop
1314#ifndef TF_DISABLE_EXCEPTION_HANDLING
1315 try {
1316#endif
1317 // work-stealing loop
1318 while(1) {
1319
1320 // drains out the local queue first
1321 _exploit_task(worker, t);
1322
1323 // steals and waits for tasks
1324 if(_wait_for_task(worker, t) == false) {
1325 break;
1326 }
1327 }
1328
1329#ifndef TF_DISABLE_EXCEPTION_HANDLING
1330 }
1331 catch(...) {
1332 ptr = std::current_exception();
1333 }
1334#endif
1335
1336 // call the user-specified epilogue function
1337 if(wif) {
1338 wif->scheduler_epilogue(worker, ptr);
1339 }
1340
1341 });
1342
1343 // We avoid using thread-local storage to track the mapping between a thread
1344 // and its corresponding worker in an executor. On Windows, thread-local
1345 // storage can be unreliable in certain situations (see issue #727).
1346 //
1347 // Instead, we maintain a per-executor mapping from threads to workers.
1348 // This approach has an additional advantage: according to the C++ Standard,
1349 // std::thread::id uniquely identifies a thread object. Therefore, once the map
1350 // returns a valid worker, we can be certain that the worker belongs to this
1351 // executor. This eliminates the need for additional executor validation
1352 // required by using thread-local storage.
1353 //
1354 // Example:
1355 //
1356 // Worker* w = this_worker();
1357 // // Using thread-local storage, we would need additional executor validation:
1358 // if (w == nullptr || w->_executor != this) { /* caller is not a worker of this executor */ }
1359 //
1360 // // Using per-executor mapping, it suffices to check:
1361 // if (w == nullptr) { /* caller is not a worker of this executor */ }
1362 //
1363 _t2w.emplace(_workers[id]._thread.get_id(), &_workers[id]);
1364 }
1365}
1366
1367// Function: _explore_task
1368inline bool Executor::_explore_task(Worker& w, Node*& t) {
1369
1370 // Fast path: if no topologies are live, all queues are guaranteed empty
1371 // by the executor's invariant (num_topologies reaches zero only after all
1372 // nodes have been scheduled and their queues flushed). Skip the entire
1373 // steal loop and return immediately so the caller enters _wait_for_task
1374 // to sleep. relaxed ordering is sufficient — this is a hint, and any
1375 // missed update is caught safely by the 2PC guard in _wait_for_task.
1376 if(_num_topologies.load(std::memory_order_relaxed) == 0) {
1377 return true;
1378 }
1379
1380 const size_t MAX_VICTIM = num_queues(); // guaranteed >= 2 by constructor
1381 const size_t MAX_STEALS = ((MAX_VICTIM + 1) << 1);
1382 const size_t STICKY_THRESH = 4; // max retries on a contended victim
1383
1384 // local aliases for steal protocol sentinels — these are properties of the
1385 // steal protocol, not of any specific queue type
1386 constexpr Node* empty_steal = wsq_empty_value<Node*>();
1387 const Node* contended_steal = wsq_contended_value<Node*>();
1388
1389 size_t num_steals = 0;
1390 size_t num_contended = 0;
1391 size_t vtm = w._sticky_victim;
1392
1393 while(true) {
1394
1395 Node* result = (vtm < _workers.size())
1396 ? _workers[vtm]._wsq.steal_with_feedback()
1397 : _buffers[vtm - _workers.size()].queue.steal_with_feedback();
1398
1399 if(result != empty_steal && result != contended_steal) {
1400 // STOLEN: successfully acquired a task — reinforce sticky victim
1401 t = result;
1402 w._sticky_victim = vtm;
1403 break;
1404 }
1405
1406 if(result == contended_steal) {
1407 // CONTENDED: victim has work but we lost the CAS race — retry the
1408 // same victim up to STICKY_THRESH times before moving on
1409 if(++num_contended < STICKY_THRESH) {
1410 continue; // stay on vtm, skip victim switch and num_steals increment
1411 }
1412 }
1413 // EMPTY or CONTENDED-exhausted: pick a new victim excluding self
1414 // since our own queue is empty by invariant. map [0, MAX_VICTIM-1)
1415 // over [0, MAX_VICTIM) \ {w._id} — always safe since MAX_VICTIM >= 2.
1416 num_contended = 0;
1417 vtm = w._rdgen() % (MAX_VICTIM - 1);
1418 if(vtm >= w._id) vtm++;
1419
1420 if(++num_steals > MAX_STEALS) {
1421 std::this_thread::yield();
1422 if(num_steals > 150 + MAX_STEALS) {
1423 break;
1424 }
1425 }
1426
1427 if(w._done.test(std::memory_order_relaxed)) {
1428 return false;
1429 }
1430 }
1431
1432 return true;
1433}
1434
1435/*
1436// Function: _explore_task
1437inline bool Executor::_explore_task(Worker& w, Node*& t) {
1438
1439 // Fast path: if no topologies are live, all queues are guaranteed empty
1440 // by the executor's invariant (num_topologies reaches zero only after all
1441 // nodes have been scheduled and their queues flushed). Skip the entire
1442 // steal loop and return immediately so the caller enters _wait_for_task
1443 // to sleep. relaxed ordering is sufficient — this is a hint, and any
1444 // missed update is caught safely by the 2PC guard in _wait_for_task.
1445 if(_num_topologies.load(std::memory_order_relaxed) == 0) {
1446 return true;
1447 }
1448
1449 //assert(!t);
1450 const size_t MAX_VICTIM = num_queues();
1451 const size_t MAX_STEALS = ((MAX_VICTIM + 1) << 1);
1452
1453 size_t num_steals = 0;
1454 size_t vtm = w._sticky_victim;
1455
1456 // Make the worker steal immediately from the assigned victim.
1457 while(true) {
1458
1459 // If the worker's victim thread is within the worker pool, steal from the worker's queue.
1460 // Otherwise, steal from the buffer, adjusting the victim index based on the worker pool size.
1461 t = (vtm < _workers.size())
1462 ? _workers[vtm]._wsq.steal()
1463 : _buffers[vtm - _workers.size()].queue.steal();
1464
1465 if(t) {
1466 w._sticky_victim = vtm;
1467 break;
1468 }
1469
1470 // Increment the steal count, and if it exceeds MAX_STEALS, yield the thread.
1471 // If the number of empty steals reaches MAX_STEALS, exit the loop.
1472 if (++num_steals > MAX_STEALS) {
1473 std::this_thread::yield();
1474 if(num_steals > 150 + MAX_STEALS) {
1475 break;
1476 }
1477 }
1478
1479 if(w._done.test(std::memory_order_relaxed)) {
1480 return false;
1481 }
1482
1483 // Randomely generate a next victim.
1484 vtm = w._rdgen() % MAX_VICTIM;
1485 }
1486 return true;
1487}
1488*/
1489
1490// Procedure: _exploit_task
1491inline void Executor::_exploit_task(Worker& w, Node*& t) {
1492 while(t) {
1493 _invoke(w, t);
1494 t = w._wsq.pop();
1495 }
1496}
1497
1498// Function: _wait_for_task
1499inline bool Executor::_wait_for_task(Worker& w, Node*& t) {
1500
1501 explore_task:
1502
1503 if(_explore_task(w, t) == false) {
1504 return false;
1505 }
1506
1507 // Go exploit the task if we successfully steal one.
1508 if(t) {
1509 return true;
1510 }
1511
1512 // Entering the 2PC guard as all queues are likely empty after many stealing attempts.
1513 _notifier.prepare_wait(w._id);
1514
1515 // Fast path: if no topologies are live, all queues are guaranteed empty.
1516 // Skip the O(N) buffer and worker queue scans and go directly to sleep.
1517 // This is safe because prepare_wait has already been called — any notify
1518 // that arrives after this check but before commit_wait will be caught by
1519 // the 2PC guarantee of the notifier.
1520 if(_num_topologies.load(std::memory_order_relaxed) == 0) {
1521 // still check done flag before committing to sleep
1522 if(w._done.test(std::memory_order_relaxed)) {
1523 _notifier.cancel_wait(w._id);
1524 return false;
1525 }
1526 _notifier.commit_wait(w._id);
1527 goto explore_task;
1528 }
1529
1530 // Condition #1: buffers should be empty
1531 for(size_t b=0; b<_buffers.size(); ++b) {
1532 if(!_buffers[b].queue.empty()) {
1533 _notifier.cancel_wait(w._id);
1534 w._sticky_victim = b + _workers.size();
1535 goto explore_task;
1536 }
1537 }
1538
1539 // Condition #2: worker queues should be empty
1540 // Note: We need to use index-based looping to avoid data race with _spawn
1541 // which initializes other worker data structure at the same time.
1542 // Also, due to the property of a work-stealing queue, we don't need to check
1543 // this worker's work-stealing queue.
1544 for(size_t k=0; k<_workers.size()-1; ++k) {
1545 if(size_t vtm = k + (k >= w._id); !_workers[vtm]._wsq.empty()) {
1546 _notifier.cancel_wait(w._id);
1547 w._sticky_victim = vtm;
1548 goto explore_task;
1549 }
1550 }
1551
1552 // Condition #3: worker should be alive
1553 if(w._done.test(std::memory_order_relaxed)) {
1554 _notifier.cancel_wait(w._id);
1555 return false;
1556 }
1557
1558 // Now I really need to relinquish myself to others.
1559 _notifier.commit_wait(w._id);
1560 goto explore_task;
1561}
1562
1563// Function: make_observer
1564template<typename Observer, typename... ArgsT>
1565std::shared_ptr<Observer> Executor::make_observer(ArgsT&&... args) {
1566
1567 static_assert(
1568 std::is_base_of_v<ObserverInterface, Observer>,
1569 "Observer must be derived from ObserverInterface"
1570 );
1571
1572 // use a local variable to mimic the constructor
1573 auto ptr = std::make_shared<Observer>(std::forward<ArgsT>(args)...);
1574
1575 ptr->set_up(_workers.size());
1576
1577 _observers.emplace(std::static_pointer_cast<ObserverInterface>(ptr));
1578
1579 return ptr;
1580}
1581
1582// Procedure: remove_observer
1583template <typename Observer>
1584void Executor::remove_observer(std::shared_ptr<Observer> ptr) {
1585
1586 static_assert(
1587 std::is_base_of_v<ObserverInterface, Observer>,
1588 "Observer must be derived from ObserverInterface"
1589 );
1590
1591 _observers.erase(std::static_pointer_cast<ObserverInterface>(ptr));
1592}
1593
1594// Function: num_observers
1595inline size_t Executor::num_observers() const noexcept {
1596 return _observers.size();
1597}
1598
1599// Procedure: _spill
1600inline void Executor::_spill(Node* item) {
1601 // Since pointers are aligned to 8 bytes, we perform a simple hash to avoid
1602 // contention caused by hashing to the same slot.
1603 auto b = (reinterpret_cast<uintptr_t>(item) >> 16) % _buffers.size();
1604 std::scoped_lock lock(_buffers[b].mutex);
1605 _buffers[b].queue.push(item);
1606}
1607
1608// Procedure: _bulk_spill (single batch to one buffer)
1609// Uses Knuth multiplicative hash on the first pointer to select a buffer,
1610// providing better bit diffusion than the shift-based approach, especially
1611// when the allocator returns pointers with regular low-bit patterns.
1612template <typename I>
1613void Executor::_bulk_spill(I first, size_t N) {
1614 //assert(N != 0);
1615 auto b = ((reinterpret_cast<uintptr_t>(*first) * 2654435761ULL) >> 32) % _buffers.size();
1616 std::scoped_lock lock(_buffers[b].mutex);
1617 _buffers[b].queue.bulk_push(first, N);
1618}
1619
1620// Procedure: _bulk_spill
1621// Distributes a batch of N spilled nodes across all buffers in round-robin
1622// order starting from a hash of the first node's pointer. Each buffer's lock
1623// is held only for its chunk, reducing contention compared to sending the
1624// entire batch to a single buffer.
1625template <typename I>
1626void Executor::_bulk_spill_round_robin(I first, size_t N) {
1627
1628 // assert(N != 0);
1629 const size_t B = _buffers.size();
1630 const size_t start = ((reinterpret_cast<uintptr_t>(*first) * 2654435761ULL) >> 32) % B;
1631 const size_t per_buf = (N + B - 1) / B;
1632 size_t remaining = N;
1633 for(size_t i = 0; i < B && remaining > 0; ++i) {
1634 size_t b = (start + i) % B;
1635 size_t chunk = std::min(per_buf, remaining);
1636 {
1637 std::scoped_lock lock(_buffers[b].mutex);
1638 _buffers[b].queue.bulk_push(first, chunk);
1639 }
1640 // terminates early via remaining > 0, so we don't acquire unnecessary locks on empty chunks.
1641 remaining -= chunk;
1642 }
1643}
1644
1645// Procedure: _schedule
1646inline void Executor::_schedule(Worker& worker, Node* node) {
1647 // starting at v3.5 we do not use any complicated notification mechanism
1648 // as the experimental result has shown no significant advantage.
1649 if(worker._wsq.try_push(node) == false) {
1650 _spill(node);
1651 }
1652 _notifier.notify_one();
1653}
1654
1655// Procedure: _schedule
1656inline void Executor::_schedule(Node* node) {
1657 _spill(node);
1658 _notifier.notify_one();
1659}
1660
1661// Procedure: _schedule
1662template <typename I>
1663void Executor::_bulk_schedule(Worker& worker, I first, size_t num_nodes) {
1664
1665 if(num_nodes == 0) {
1666 return;
1667 }
1668
1669 // NOTE: We cannot use first/last in the for-loop (e.g., for(; first != last; ++first)).
1670 // This is because when a node v is inserted into the queue, v can run and finish
1671 // immediately. If v is the last node in the graph, it will tear down the parent task vector
1672 // which cause the last ++first to fail. This problem is specific to MSVC which has a stricter
1673 // iterator implementation in std::vector than GCC/Clang.
1674 if(auto n = worker._wsq.try_bulk_push(first, num_nodes); n != num_nodes) {
1675 _bulk_spill(first, num_nodes - n);
1676 }
1677 _notifier.notify_n(num_nodes);
1678
1679 // notify first before spilling to hopefully wake up workers earlier
1680 // however, the experiment does not show any benefit for doing this.
1681 //auto n = worker._wsq.try_bulk_push(first, num_nodes);
1682 //_notifier.notify_n(n);
1683 //_bulk_schedule(first + n, num_nodes - n);
1684}
1685
1686// Procedure: _schedule
1687template <typename I>
1688inline void Executor::_bulk_schedule(I first, size_t num_nodes) {
1689
1690 if(num_nodes == 0) {
1691 return;
1692 }
1693
1694 // NOTE: We cannot use first/last in the for-loop (e.g., for(; first != last; ++first)).
1695 // This is because when a node v is inserted into the queue, v can run and finish
1696 // immediately. If v is the last node in the graph, it will tear down the parent task vector
1697 // which cause the last ++first to fail. This problem is specific to MSVC which has a stricter
1698 // iterator implementation in std::vector than GCC/Clang.
1699 _bulk_spill(first, num_nodes);
1700 _notifier.notify_n(num_nodes);
1701}
1702
1703// Function: _update_cache
1704TF_FORCE_INLINE void Executor::_update_cache(Worker& worker, Node*& cache, Node* node) {
1705 if(cache) {
1706 _schedule(worker, cache);
1707 }
1708 cache = node;
1709}
1710
1711// Function: _bulk_update_cache
1712template <size_t N>
1713TF_FORCE_INLINE void Executor::_bulk_update_cache(
1714 Worker& worker, Node*& cache, Node* node, std::array<Node*, N>& array, size_t& n
1715) {
1716 // experimental results show no benefit of using bulk_update_cache
1717 if(cache) {
1718 array[n++] = cache;
1719 if(n == N) {
1720 _bulk_schedule(worker, array, n);
1721 n = 0;
1722 }
1723 }
1724 cache = node;
1725}
1726
1727// Procedure: _invoke
1728inline void Executor::_invoke(Worker& worker, Node* node) {
1729
1730 #define TF_INVOKE_CONTINUATION() \
1731 if (cache) { \
1732 node = cache; \
1733 goto begin_invoke; \
1734 }
1735
1736 begin_invoke:
1737
1738 Node* cache {nullptr};
1739
1740 // if this is the second invoke due to preemption, directly jump to invoke task
1741 if(node->_nstate & NSTATE::PREEMPTED) {
1742 goto invoke_task;
1743 }
1744
1745 // If the work has been cancelled, there is no need to continue.
1746 // Here, we do tear_down_invoke since async tasks may also get cancelled where
1747 // we need to recycle the node.
1748 if(node->_is_parent_cancelled()) {
1749 _tear_down_invoke(worker, node, cache);
1750 TF_INVOKE_CONTINUATION();
1751 return;
1752 }
1753
1754 // if acquiring semaphore(s) exists, acquire them first
1755 if(node->_semaphores && !node->_semaphores->to_acquire.empty()) {
1756 SmallVector<Node*> waiters;
1757 if(!node->_acquire_all(waiters)) {
1758 _bulk_schedule(worker, waiters.begin(), waiters.size());
1759 return;
1760 }
1761 }
1762
1763 invoke_task:
1764
1765 SmallVector<int> conds;
1766
1767 // switch is faster than nested if-else due to jump table
1768 switch(node->_handle.index()) {
1769 // static task
1770 case Node::STATIC:{
1771 _invoke_static_task(worker, node);
1772 }
1773 break;
1774
1775 // runtime task
1776 case Node::RUNTIME:{
1777 if(_invoke_runtime_task(worker, node)) {
1778 return;
1779 }
1780 }
1781 break;
1782
1783 // non-preemptive runtime task
1784 case Node::NONPREEMPTIVE_RUNTIME:{
1785 _invoke_nonpreemptive_runtime_task(worker, node);
1786 }
1787 break;
1788
1789 // subflow task
1790 case Node::SUBFLOW: {
1791 if(_invoke_subflow_task(worker, node)) {
1792 return;
1793 }
1794 }
1795 break;
1796
1797 // condition task
1798 case Node::CONDITION: {
1799 _invoke_condition_task(worker, node, conds);
1800 }
1801 break;
1802
1803 // multi-condition task
1804 case Node::MULTI_CONDITION: {
1805 _invoke_multi_condition_task(worker, node, conds);
1806 }
1807 break;
1808
1809 // module task
1810 case Node::MODULE: {
1811 if(_invoke_module_task(worker, node)) {
1812 return;
1813 }
1814 }
1815 break;
1816
1817 // adopted module task
1818 case Node::ADOPTED_MODULE: {
1819 if(_invoke_adopted_module_task(worker, node)) {
1820 return;
1821 }
1822 }
1823 break;
1824
1825 // async task
1826 case Node::ASYNC: {
1827 if(_invoke_async_task(worker, node)) {
1828 return;
1829 }
1830 _tear_down_async(worker, node, cache);
1831 TF_INVOKE_CONTINUATION();
1832 return;
1833 }
1834 break;
1835
1836 // dependent async task
1837 case Node::DEPENDENT_ASYNC: {
1838 if(_invoke_dependent_async_task(worker, node)) {
1839 return;
1840 }
1841 _tear_down_dependent_async(worker, node, cache);
1842 TF_INVOKE_CONTINUATION();
1843 return;
1844 }
1845 break;
1846
1847 // monostate (placeholder)
1848 default:
1849 break;
1850 }
1851
1852 // if releasing semaphores exist, release them
1853 if(node->_semaphores && !node->_semaphores->to_release.empty()) {
1854 SmallVector<Node*> waiters;
1855 node->_release_all(waiters);
1856 _bulk_schedule(worker, waiters.begin(), waiters.size());
1857 }
1858
1859 // Reset the join counter with strong dependencies to support cycles.
1860 // + We must do this before scheduling the successors to avoid race
1861 // condition on _predecessors.
1862 // + We must use fetch_add instead of direct assigning
1863 // because the user-level call on "invoke" may explicitly schedule
1864 // this task again (e.g., pipeline) which can access the join_counter.
1865 node->_join_counter.fetch_add(
1866 node->_nstate & NSTATE::STRONG_DEPENDENCIES_MASK, std::memory_order_relaxed
1867 );
1868
1869 // Invoke the task based on the corresponding type
1870 switch(node->_handle.index()) {
1871
1872 // condition and multi-condition tasks
1873 case Node::CONDITION:
1874 case Node::MULTI_CONDITION: {
1875 for(auto cond : conds) {
1876 if(cond >= 0 && static_cast<size_t>(cond) < node->_num_successors) {
1877 auto s = node->_edges[cond];
1878 // zeroing the join counter for invariant
1879 s->_join_counter.store(0, std::memory_order_relaxed);
1880 node->_parent->_join_counter.fetch_add(1, std::memory_order_relaxed);
1881 _update_cache(worker, cache, s);
1882 }
1883 }
1884 }
1885 break;
1886
1887 // non-condition task
1888 default: {
1889 for(size_t i=0; i<node->_num_successors; ++i) {
1890 if(auto s = node->_edges[i]; s->_join_counter.fetch_sub(1, std::memory_order_acq_rel) == 1) {
1891 node->_parent->_join_counter.fetch_add(1, std::memory_order_relaxed);
1892 _update_cache(worker, cache, s);
1893 }
1894 }
1895 }
1896 break;
1897 }
1898
1899 // clean up the node after execution
1900 _tear_down_nonasync(worker, node, cache);
1901 TF_INVOKE_CONTINUATION();
1902}
1903
1904// Procedure: _tear_down_nonasync
1905inline void Executor::_tear_down_nonasync(Worker& worker, Node* node, Node*& cache) {
1906
1907 // we must check parent first before subtracting the join counter,
1908 // or it can introduce data race
1909 if(auto parent = node->_parent; parent == node->_topology) {
1910 if(parent->_join_counter.fetch_sub(1, std::memory_order_acq_rel) == 1) {
1911 _tear_down_topology(worker, node->_topology, cache);
1912 }
1913 }
1914 else {
1915 // needs to fetch every data before join counter becomes zero at which
1916 // the node may be deleted
1917 auto state = parent->_nstate;
1918 if(parent->_join_counter.fetch_sub(1, std::memory_order_acq_rel) == 1) {
1919 // this task is spawned from a preempted parent, so we need to resume it
1920 if(state & NSTATE::PREEMPTED) {
1921 _update_cache(worker, cache, static_cast<Node*>(parent));
1922 }
1923 }
1924 }
1925}
1926
1927// Procedure: _tear_down_invoke
1928inline void Executor::_tear_down_invoke(Worker& worker, Node* node, Node*& cache) {
1929 switch(node->_handle.index()) {
1930 case Node::ASYNC:
1931 _tear_down_async(worker, node, cache);
1932 break;
1933
1934 case Node::DEPENDENT_ASYNC:
1935 _tear_down_dependent_async(worker, node, cache);
1936 break;
1937
1938 default:
1939 _tear_down_nonasync(worker, node, cache);
1940 break;
1941 }
1942}
1943
1944// Procedure: _observer_prologue
1945inline void Executor::_observer_prologue(Worker& worker, Node* node) {
1946 for(auto& observer : _observers) {
1947 observer->on_entry(WorkerView(worker), TaskView(*node));
1948 }
1949}
1950
1951// Procedure: _observer_epilogue
1952inline void Executor::_observer_epilogue(Worker& worker, Node* node) {
1953 for(auto& observer : _observers) {
1954 observer->on_exit(WorkerView(worker), TaskView(*node));
1955 }
1956}
1957
1958// Procedure: _process_exception
1959inline void Executor::_process_exception(Worker&, Node* node) {
1960
1961 // Finds the anchor and mark the entire path with exception,
1962 // so recursive tasks can be cancelled properly.
1963 // Since exception can come from asynchronous task (with runtime), the node itself can be anchored.
1964 NodeBase* ea = node; // explicit anchor
1965 NodeBase* ia = nullptr; // implicit anchor
1966
1967 while(ea && (ea->_estate.load(std::memory_order_relaxed) & ESTATE::EXPLICITLY_ANCHORED) == 0) {
1968 ea->_estate.fetch_or(ESTATE::EXCEPTION, std::memory_order_relaxed);
1969 // we only want the inner-most implicit anchor
1970 if(ia == nullptr && (ea->_nstate & NSTATE::IMPLICITLY_ANCHORED)) {
1971 ia = ea;
1972 }
1973 ea = ea->_parent;
1974 }
1975
1976 // flag used to ensure execution is caught in a thread-safe manner
1977 constexpr static auto flag = ESTATE::EXCEPTION | ESTATE::CAUGHT;
1978
1979 // The exception occurs under a blocking call (e.g., corun, join).
1980 if(ea) {
1981 // multiple tasks may throw, and we only take the first thrown exception
1982 if((ea->_estate.fetch_or(flag, std::memory_order_relaxed) & ESTATE::CAUGHT) == 0) {
1983 ea->_exception_ptr = std::current_exception();
1984 return;
1985 }
1986 }
1987 // Implicit anchor has the lowest priority
1988 else if(ia){
1989 if((ia->_estate.fetch_or(flag, std::memory_order_relaxed) & ESTATE::CAUGHT) == 0) {
1990 ia->_exception_ptr = std::current_exception();
1991 return;
1992 }
1993 }
1994
1995 // For now, we simply store the exception in this node; this can happen in an
1996 // execution that does not have any external control to capture the exception,
1997 // such as silent async task without any parent.
1998 node->_exception_ptr = std::current_exception();
1999}
2000
2001// Procedure: _invoke_static_task
2002inline void Executor::_invoke_static_task(Worker& worker, Node* node) {
2003 _observer_prologue(worker, node);
2004 TF_EXECUTOR_EXCEPTION_HANDLER(worker, node, {
2005 std::get_if<Node::Static>(&node->_handle)->work();
2006 });
2007 _observer_epilogue(worker, node);
2008}
2009
2010// Procedure: _invoke_subflow_task
2011inline bool Executor::_invoke_subflow_task(Worker& worker, Node* node) {
2012
2013 auto& h = *std::get_if<Node::Subflow>(&node->_handle);
2014 auto& g = h.subgraph;
2015
2016 if((node->_nstate & NSTATE::PREEMPTED) == 0) {
2017
2018 // set up the subflow
2019 Subflow sf(*this, worker, node, g);
2020
2021 // invoke the subflow callable
2022 _observer_prologue(worker, node);
2023 TF_EXECUTOR_EXCEPTION_HANDLER(worker, node, {
2024 h.work(sf);
2025 });
2026 _observer_epilogue(worker, node);
2027
2028 // spawn the subflow if it is joinable and its graph is non-empty
2029 // implicit join is faster than Subflow::join as it does not involve corun
2030 if(sf.joinable() && !g.empty()) {
2031
2032 // signal the executor to preempt this node
2033 node->_nstate |= NSTATE::PREEMPTED;
2034
2035 // set up and schedule the graph
2036 _schedule_graph(worker, g, node->_topology, node);
2037 return true;
2038 }
2039 }
2040 else {
2041 node->_nstate &= ~NSTATE::PREEMPTED;
2042 }
2043
2044 // The subflow has finished or joined.
2045 // By default, we clear the subflow storage as applications can perform recursive
2046 // subflow tasking which accumulates a huge amount of memory overhead, hampering
2047 // the performance.
2048 if((node->_nstate & NSTATE::RETAIN_SUBFLOW) == 0) {
2049 g.clear();
2050 }
2051
2052 return false;
2053}
2054
2055// Procedure: _invoke_condition_task
2056inline void Executor::_invoke_condition_task(
2057 Worker& worker, Node* node, SmallVector<int>& conds
2058) {
2059 _observer_prologue(worker, node);
2060 TF_EXECUTOR_EXCEPTION_HANDLER(worker, node, {
2061 auto& work = std::get_if<Node::Condition>(&node->_handle)->work;
2062 conds = { work() };
2063 });
2064 _observer_epilogue(worker, node);
2065}
2066
2067// Procedure: _invoke_multi_condition_task
2068inline void Executor::_invoke_multi_condition_task(
2069 Worker& worker, Node* node, SmallVector<int>& conds
2070) {
2071 _observer_prologue(worker, node);
2072 TF_EXECUTOR_EXCEPTION_HANDLER(worker, node, {
2073 conds = std::get_if<Node::MultiCondition>(&node->_handle)->work();
2074 });
2075 _observer_epilogue(worker, node);
2076}
2077
2078// Procedure: _invoke_module_task
2079inline bool Executor::_invoke_module_task(Worker& w, Node* node) {
2080 return _invoke_module_task_impl(w, node, std::get_if<Node::Module>(&node->_handle)->graph);
2081}
2082
2083// Procedure: _invoke_adopted_module_task
2084inline bool Executor::_invoke_adopted_module_task(Worker& w, Node* node) {
2085 return _invoke_module_task_impl(w, node, std::get_if<Node::AdoptedModule>(&node->_handle)->graph);
2086}
2087
2088// Procedure: _invoke_module_task_impl
2089inline bool Executor::_invoke_module_task_impl(Worker& w, Node* node, Graph& graph) {
2090
2091 // No need to do anything for empty graph
2092 if(graph.empty()) {
2093 return false;
2094 }
2095
2096 // first entry - not spawned yet
2097 if((node->_nstate & NSTATE::PREEMPTED) == 0) {
2098 // signal the executor to preempt this node
2099 node->_nstate |= NSTATE::PREEMPTED;
2100 _schedule_graph(w, graph, node->_topology, node);
2101 return true;
2102 }
2103
2104 // second entry - already spawned
2105 node->_nstate &= ~NSTATE::PREEMPTED;
2106
2107 return false;
2108}
2109
2110
2111// Procedure: _invoke_async_task
2112inline bool Executor::_invoke_async_task(Worker& worker, Node* node) {
2113 auto& work = std::get_if<Node::Async>(&node->_handle)->work;
2114 switch(work.index()) {
2115 // void()
2116 case 0:
2117 _observer_prologue(worker, node);
2118 TF_EXECUTOR_EXCEPTION_HANDLER(worker, node, {
2119 std::get_if<0>(&work)->operator()();
2120 });
2121 _observer_epilogue(worker, node);
2122 break;
2123
2124 // void(Runtime&)
2125 case 1:
2126 if(_invoke_runtime_task_impl(worker, node, *std::get_if<1>(&work))) {
2127 return true;
2128 }
2129 break;
2130
2131 // void(Runtime&, bool)
2132 case 2:
2133 if(_invoke_runtime_task_impl(worker, node, *std::get_if<2>(&work))) {
2134 return true;
2135 }
2136 break;
2137 }
2138
2139 return false;
2140}
2141
2142// Procedure: _invoke_dependent_async_task
2143inline bool Executor::_invoke_dependent_async_task(Worker& worker, Node* node) {
2144 auto& work = std::get_if<Node::DependentAsync>(&node->_handle)->work;
2145 switch(work.index()) {
2146 // void()
2147 case 0:
2148 _observer_prologue(worker, node);
2149 TF_EXECUTOR_EXCEPTION_HANDLER(worker, node, {
2150 std::get_if<0>(&work)->operator()();
2151 });
2152 _observer_epilogue(worker, node);
2153 break;
2154
2155 // void(Runtime&) - silent async
2156 case 1:
2157 if(_invoke_runtime_task_impl(worker, node, *std::get_if<1>(&work))) {
2158 return true;
2159 }
2160 break;
2161
2162 // void(Runtime&, bool) - async
2163 case 2:
2164 if(_invoke_runtime_task_impl(worker, node, *std::get_if<2>(&work))) {
2165 return true;
2166 }
2167 break;
2168 }
2169 return false;
2170}
2171
2172// Function: run
2173inline tf::Future<void> Executor::run(Taskflow& f) {
2174 return run_n(f, 1, [](){});
2175}
2176
2177// Function: run
2178inline tf::Future<void> Executor::run(Taskflow&& f) {
2179 return run_n(std::move(f), 1, [](){});
2180}
2181
2182// Function: run
2183template <typename C>
2184tf::Future<void> Executor::run(Taskflow& f, C&& c) {
2185 return run_n(f, 1, std::forward<C>(c));
2186}
2187
2188// Function: run
2189template <typename C>
2190tf::Future<void> Executor::run(Taskflow&& f, C&& c) {
2191 return run_n(std::move(f), 1, std::forward<C>(c));
2192}
2193
2194// Function: run_n
2195inline tf::Future<void> Executor::run_n(Taskflow& f, size_t repeat) {
2196 return run_n(f, repeat, [](){});
2197}
2198
2199// Function: run_n
2200inline tf::Future<void> Executor::run_n(Taskflow&& f, size_t repeat) {
2201 return run_n(std::move(f), repeat, [](){});
2202}
2203
2204// Function: run_n
2205template <typename C>
2206tf::Future<void> Executor::run_n(Taskflow& f, size_t repeat, C&& c) {
2207 return run_until(
2208 f, [repeat]() mutable { return repeat-- == 0; }, std::forward<C>(c)
2209 );
2210}
2211
2212// Function: run_n
2213template <typename C>
2214tf::Future<void> Executor::run_n(Taskflow&& f, size_t repeat, C&& c) {
2215 return run_until(
2216 std::move(f), [repeat]() mutable { return repeat-- == 0; }, std::forward<C>(c)
2217 );
2218}
2219
2220// Function: run_until
2221template<typename P>
2222tf::Future<void> Executor::run_until(Taskflow& f, P&& pred) {
2223 return run_until(f, std::forward<P>(pred), [](){});
2224}
2225
2226// Function: run_until
2227template<typename P>
2228tf::Future<void> Executor::run_until(Taskflow&& f, P&& pred) {
2229 return run_until(std::move(f), std::forward<P>(pred), [](){});
2230}
2231
2232// Function: run_until
2233template <typename P, typename C>
2234tf::Future<void> Executor::run_until(Taskflow& f, P&& p, C&& c) {
2235
2236 // No need to create a real topology but returns an dummy future for invariant.
2237 if(f.empty() || p()) {
2238 c();
2239 std::promise<void> promise;
2240 promise.set_value();
2241 return tf::Future<void>(promise.get_future());
2242 }
2243
2244 _increment_topology();
2245
2246 // create a topology for this run
2247 auto t = std::make_shared<Topology>(f, std::forward<P>(p), std::forward<C>(c));
2248 //auto t = std::make_shared<DerivedTopology<P, C>>(f, std::forward<P>(p), std::forward<C>(c));
2249
2250 // need to create future before the topology got torn down quickly
2251 tf::Future<void> future(t->_promise.get_future(), t);
2252
2253 // modifying topology needs to be protected under the lock
2254 if(f._fetch_enqueue(t) == 0) {
2255 _set_up_topology(this_worker(), t.get());
2256 }
2257
2258 return future;
2259}
2260
2261
2262
2263// Function: corun_until
2264template <typename P>
2265void Executor::corun_until(P&& predicate) {
2266
2267 Worker* w = this_worker();
2268 if(w == nullptr) {
2269 TF_THROW("corun_until must be called by a worker of the executor");
2270 }
2271
2272 _corun_until(*w, std::forward<P>(predicate));
2273}
2274
2275// Function: _corun_until
2276template <typename P>
2277void Executor::_corun_until(Worker& w, P&& stop_predicate) {
2278
2279 const size_t MAX_VICTIM = num_queues();
2280 const size_t MAX_STEALS = ((MAX_VICTIM + 1) << 1);
2281 const size_t STICKY_THRESH = 8;
2282
2283 constexpr Node* empty_steal = wsq_empty_value<Node*>();
2284 const Node* contended_steal = wsq_contended_value<Node*>();
2285
2286 bool stop = false;
2287
2288 while(!stop && !(stop = stop_predicate())) {
2289
2290 // try local queue first — only one task at a time to avoid deep
2291 // recursive corun calls causing stack overflow
2292 if(auto t = w._wsq.pop(); t) {
2293 _invoke(w, t);
2294 continue;
2295 }
2296
2297 // local queue empty: steal from others until stop_predicate or stolen.
2298 // stop is set by the inner loop condition so when predicate becomes true
2299 // the outer loop exits immediately without calling stop_predicate again.
2300 size_t num_steals = 0;
2301 size_t num_contended = 0;
2302 size_t vtm = w._sticky_victim;
2303
2304 while(!(stop = stop_predicate())) {
2305
2306 Node* result = (vtm < _workers.size())
2307 ? _workers[vtm]._wsq.steal_with_feedback()
2308 : _buffers[vtm - _workers.size()].queue.steal_with_feedback();
2309
2310 if(result != empty_steal && result != contended_steal) {
2311 // STOLEN: invoke task then return to outer loop to re-check
2312 // local queue and stop_predicate
2313 _invoke(w, result);
2314 w._sticky_victim = vtm;
2315 break;
2316 }
2317
2318 if(result == contended_steal) {
2319 // CONTENDED: victim has work, retry same victim up to STICKY_THRESH
2320 if(++num_contended < STICKY_THRESH) {
2321 continue;
2322 }
2323 }
2324
2325 // EMPTY or CONTENDED-exhausted: pick a new victim excluding self
2326 num_contended = 0;
2327 vtm = w._rdgen() % (MAX_VICTIM - 1);
2328 if(vtm >= w._id) vtm++;
2329
2330 if(++num_steals > MAX_STEALS) {
2331 // unlike _explore_task we cannot sleep here — the calling worker
2332 // is blocked inside a task and must keep making progress to avoid
2333 // deadlock. yield to let other threads run and make progress.
2334 std::this_thread::yield();
2335 }
2336 }
2337 }
2338}
2339
2340/*
2341// Function: _corun_until
2342template <typename P>
2343void Executor::_corun_until(Worker& w, P&& stop_predicate) {
2344
2345 const size_t MAX_VICTIM = num_queues();
2346 const size_t MAX_STEALS = ((MAX_VICTIM + 1) << 1);
2347
2348 exploit:
2349
2350 while(!stop_predicate()) {
2351
2352 // here we don't do while-loop to drain out the local queue as it can
2353 // potentially enter a very deep recursive corun, cuasing stack overflow
2354 if(auto t = w._wsq.pop(); t) {
2355 _invoke(w, t);
2356 }
2357 else {
2358 size_t num_steals = 0;
2359 size_t vtm = w._sticky_victim;
2360
2361 explore:
2362
2363 t = (vtm < _workers.size())
2364 ? _workers[vtm]._wsq.steal()
2365 : _buffers[vtm-_workers.size()].queue.steal();
2366
2367 if(t) {
2368 _invoke(w, t);
2369 w._sticky_victim = vtm;
2370 goto exploit;
2371 }
2372 else if(!stop_predicate()) {
2373 if(++num_steals > MAX_STEALS) {
2374 std::this_thread::yield();
2375 }
2376 vtm = w._rdgen() % MAX_VICTIM;
2377 goto explore;
2378 }
2379 else {
2380 break;
2381 }
2382 }
2383 }
2384}*/
2385
2386// Function: corun
2387template <typename T>
2388void Executor::corun(T& target) {
2389
2390 Worker* w = this_worker();
2391 if(w == nullptr) {
2392 TF_THROW("corun must be called by a worker of the executor");
2393 }
2394
2395 NodeBase anchor;
2396 _corun_graph(*w, retrieve_graph(target), nullptr, &anchor);
2397}
2398
2399// Procedure: _corun_graph
2400inline void Executor::_corun_graph(Worker& w, Graph& g, Topology* tpg, NodeBase* p) {
2401
2402 // empty graph
2403 if(g.empty()) {
2404 return;
2405 }
2406
2407 // anchor this parent as the blocking point
2408 {
2409 ExplicitAnchorGuard anchor(p);
2410 _schedule_graph(w, g, tpg, p);
2411 _corun_until(w, [p] () -> bool {
2412 return p->_join_counter.load(std::memory_order_acquire) == 0; }
2413 );
2414 }
2415
2416 // rethrow the exception to the caller
2417 p->_rethrow_exception();
2418}
2419
2420// Procedure: _increment_topology
2421inline void Executor::_increment_topology() {
2422 _num_topologies.fetch_add(1, std::memory_order_relaxed);
2423}
2424
2425// Procedure: _decrement_topology
2426inline void Executor::_decrement_topology() {
2427 if(_num_topologies.fetch_sub(1, std::memory_order_acq_rel) == 1) {
2428 _num_topologies.notify_all();
2429 }
2430}
2431
2432// Procedure: wait_for_all
2433inline void Executor::wait_for_all() {
2434 size_t n = _num_topologies.load(std::memory_order_acquire);
2435 while(n != 0) {
2436 _num_topologies.wait(n, std::memory_order_acquire);
2437 n = _num_topologies.load(std::memory_order_acquire);
2438 }
2439}
2440
2441// Function: _schedule_graph
2442inline void Executor::_schedule_graph(
2443 Worker& worker, Graph& graph, Topology* tpg, NodeBase* parent
2444) {
2445 size_t num_srcs = _set_up_graph(graph, tpg, parent);
2446 parent->_join_counter.fetch_add(num_srcs, std::memory_order_relaxed);
2447 _bulk_schedule(worker, graph.begin(), num_srcs);
2448}
2449
2450// Function: _set_up_topology
2451inline void Executor::_set_up_topology(Worker* w, Topology* tpg) {
2452 // ---- under taskflow lock ----
2453 auto& g = tpg->_taskflow._graph;
2454 size_t num_srcs = _set_up_graph(g, tpg, tpg);
2455 tpg->_join_counter.store(num_srcs, std::memory_order_relaxed);
2456 w ? _bulk_schedule(*w, g.begin(), num_srcs) : _bulk_schedule(g.begin(), num_srcs);
2457}
2458
2459// Function: _set_up_graph
2460inline size_t Executor::_set_up_graph(Graph& graph, Topology* tpg, NodeBase* parent) {
2461
2462 auto first = graph.begin();
2463 auto last = graph.end();
2464 auto send = first;
2465 for(; first != last; ++first) {
2466
2467 auto node = *first;
2468 node->_topology = tpg;
2469 node->_parent = parent;
2470 node->_nstate = NSTATE::NONE;
2471 node->_estate.store(ESTATE::NONE, std::memory_order_relaxed);
2472 node->_set_up_join_counter();
2473 node->_exception_ptr = nullptr;
2474
2475 // move source to the first partition
2476 // root, root, root, v1, v2, v3, v4, ...
2477 if(node->num_predecessors() == 0) {
2478 std::iter_swap(send++, first);
2479 }
2480 }
2481 return send - graph.begin();
2482}
2483
2484// Function: _tear_down_topology
2485inline void Executor::_tear_down_topology(Worker& worker, Topology* tpg, Node*& cache) {
2486
2487 auto &f = tpg->_taskflow;
2488
2489 //assert(&tpg == &(f._topologies.front()));
2490
2491 // case 1: we still need to run the topology again
2492 //if(!tpg->_exception_ptr && !tpg->cancelled() && !tpg->predicate()) {
2493 if(!tpg->cancelled() && !tpg->_predicate()) {
2494 //assert(tpg->_join_counter == 0);
2495 //std::lock_guard<std::mutex> lock(f._mutex);
2496 _schedule_graph(worker, tpg->_taskflow._graph, tpg, tpg);
2497 }
2498 // case 2: the final run of this topology
2499 else {
2500
2501 // invoke the callback after each run
2502 tpg->_on_finish();
2503
2504 // there is another topologies to run
2505 if(std::unique_lock<std::mutex> lock(f._mutex); f._topologies.size()>1) {
2506
2507 auto fetched_tpg {std::move(f._topologies.front())};
2508 //assert(fetched_tpg.get() == tpg);
2509
2510 f._topologies.pop();
2511 tpg = f._topologies.front().get();
2512
2513 lock.unlock();
2514
2515 // Soon after we carry out the promise, the associate taskflow may got destroyed
2516 // from the user side, and we should never tough it again.
2517 fetched_tpg->_carry_out_promise();
2518
2519 // decrement the topology
2520 _decrement_topology();
2521
2522 _schedule_graph(worker, tpg->_taskflow._graph, tpg, tpg);
2523 }
2524 else {
2525 //assert(f._topologies.size() == 1);
2526
2527 auto fetched_tpg {std::move(f._topologies.front())};
2528 //assert(fetched_tpg.get() == tpg);
2529
2530 f._topologies.pop();
2531
2532 lock.unlock();
2533
2534 // Soon after we carry out the promise, the associate taskflow may got destroyed
2535 // from the user side, and we should never tough it again.
2536 fetched_tpg->_carry_out_promise();
2537
2538 _decrement_topology();
2539
2540 // remove the parent that owns the moved taskflow so the storage can be freed
2541 if(auto parent = fetched_tpg->_parent; parent) {
2542 //auto state = parent->_nstate;
2543 if(parent->_join_counter.fetch_sub(1, std::memory_order_acq_rel) == 1) {
2544 // this async is spawned from a preempted parent, so we need to resume it
2545 //if(state & NSTATE::PREEMPTED) {
2546 _update_cache(worker, cache, static_cast<Node*>(parent));
2547 //}
2548 }
2549 }
2550 }
2551 }
2552}
2553
2554// ############################################################################
2555// Forward Declaration: Subflow
2556// ############################################################################
2557
2558inline void Subflow::join() {
2559
2560 if(!joinable()) {
2561 TF_THROW("subflow already joined");
2562 }
2563
2564 _executor._corun_graph(_worker, _graph, _node->_topology, _node);
2565
2566 // join here since corun graph may throw exception
2567 _node->_nstate |= NSTATE::JOINED_SUBFLOW;
2568}
2569
2570#endif
2571
2572
2573
2574
2575} // end of namespace tf -----------------------------------------------------
class to hold a dependent asynchronous task with shared ownership
Definition async_task.hpp:45
void silent_async(P &&params, F &&func)
similar to tf::Executor::async but does not return a future object
tf::AsyncTask silent_dependent_async(F &&func, Tasks &&... tasks)
runs the given function asynchronously when the given predecessors finish
tf::Future< void > run_until(Taskflow &taskflow, P &&pred)
runs a taskflow multiple times until the predicate becomes true
void corun_until(P &&predicate)
keeps running the work-stealing loop until the predicate returns true
void remove_observer(std::shared_ptr< Observer > observer)
removes an observer from the executor
auto dependent_async(F &&func, Tasks &&... tasks)
runs the given function asynchronously when the given predecessors finish
tf::Future< void > run(Taskflow &&taskflow)
runs a moved taskflow once
tf::Future< void > run(Taskflow &taskflow)
runs a taskflow once
size_t num_waiters() const noexcept
queries the number of workers that are in the waiting loop
tf::Future< void > run(Taskflow &&taskflow, C &&callable)
runs a moved taskflow once and invoke a callback upon completion
~Executor()
destructs the executor
int this_worker_id() const
queries the id of the caller thread within this executor
size_t num_queues() const noexcept
queries the number of work-stealing queues used by the executor
tf::Future< void > run_n(Taskflow &taskflow, size_t N)
runs a taskflow for N times
size_t num_topologies() const
queries the number of running topologies at the time of this call
Executor(size_t N=std::thread::hardware_concurrency(), std::shared_ptr< WorkerInterface > wif=nullptr)
constructs the executor with N worker threads
TaskGroup task_group()
creates a task group that executes a collection of asynchronous tasks
Definition task_group.hpp:875
void corun(T &target)
runs a target graph and waits until it completes using an internal worker of this executor
size_t num_workers() const noexcept
queries the number of worker threads
tf::Future< void > run_until(Taskflow &&taskflow, P &&pred)
runs a moved taskflow and keeps running it until the predicate becomes true
void wait_for_all()
waits for all tasks to complete
tf::Future< void > run_n(Taskflow &taskflow, size_t N, C &&callable)
runs a taskflow for N times and then invokes a callback
tf::Future< void > run(Taskflow &taskflow, C &&callable)
runs a taskflow once and invoke a callback upon completion
tf::Future< void > run_n(Taskflow &&taskflow, size_t N)
runs a moved taskflow for N times
tf::Future< void > run_n(Taskflow &&taskflow, size_t N, C &&callable)
runs a moved taskflow for N times and then invokes a callback
tf::Future< void > run_until(Taskflow &taskflow, P &&pred, C &&callable)
runs a taskflow multiple times until the predicate becomes true and then invokes the callback
Worker * this_worker()
queries pointer to the calling worker if it belongs to this executor, otherwise returns nullptr
auto async(P &&params, F &&func)
creates a parameterized asynchronous task to run the given function
std::shared_ptr< Observer > make_observer(ArgsT &&... args)
constructs an observer to inspect the activities of worker threads
size_t num_observers() const noexcept
queries the number of observers
class to access the result of an execution
Definition taskflow.hpp:630
class to create a graph object
Definition graph.hpp:47
class to define a vector optimized for small array
Definition small_vector.hpp:931
void join()
enables the subflow to join its parent task
bool joinable() const noexcept
queries if the subflow is joinable
Definition flow_builder.hpp:1814
class to create a taskflow object
Definition taskflow.hpp:64
class to create a lock-free unbounded work-stealing queue
Definition wsq.hpp:128
class to create a worker in an executor
Definition worker.hpp:55
determines if a type is a task parameter type
Definition graph.hpp:202
taskflow namespace
Definition small_vector.hpp:20
NonblockingNotifier DefaultNotifier
the default notifier type used by Taskflow
Definition worker.hpp:38
Graph & retrieve_graph(T &target)
retrieves a reference to the underlying tf::Graph from an object
Definition graph.hpp:1098
constexpr auto wsq_empty_value()
returns the empty sentinel for work-stealing steal operations
Definition wsq.hpp:70
auto wsq_contended_value()
returns the contended sentinel for work-stealing steal operations
Definition wsq.hpp:90
bool has_env(const std::string &str)
checks whether an environment variable is defined
Definition os.hpp:213