Loading...
Searching...
No Matches
executor.hpp
1#pragma once
2
3#include "../observer/tfprof.hpp"
4#include "taskflow.hpp"
5#include "async_task.hpp"
6
11
12namespace tf {
13
14// ----------------------------------------------------------------------------
15// Executor Definition
16// ----------------------------------------------------------------------------
17
62class Executor {
63
64 friend class FlowBuilder;
65 friend class Subflow;
66 friend class Runtime;
67 friend class NonpreemptiveRuntime;
68 friend class Algorithm;
69 friend class TaskGroup;
70
71 public:
72
91 explicit Executor(
92 size_t N = std::thread::hardware_concurrency(),
93 std::shared_ptr<WorkerInterface> wif = nullptr
94 );
95
104
124
145
168 template<typename C>
169 tf::Future<void> run(Taskflow& taskflow, C&& callable);
170
195 template<typename C>
196 tf::Future<void> run(Taskflow&& taskflow, C&& callable);
197
217 tf::Future<void> run_n(Taskflow& taskflow, size_t N);
218
241 tf::Future<void> run_n(Taskflow&& taskflow, size_t N);
242
268 template<typename C>
269 tf::Future<void> run_n(Taskflow& taskflow, size_t N, C&& callable);
270
296 template<typename C>
297 tf::Future<void> run_n(Taskflow&& taskflow, size_t N, C&& callable);
298
322 template<typename P>
323 tf::Future<void> run_until(Taskflow& taskflow, P&& pred);
324
350 template<typename P>
351 tf::Future<void> run_until(Taskflow&& taskflow, P&& pred);
352
379 template<typename P, typename C>
380 tf::Future<void> run_until(Taskflow& taskflow, P&& pred, C&& callable);
381
410 template<typename P, typename C>
411 tf::Future<void> run_until(Taskflow&& taskflow, P&& pred, C&& callable);
412
453 template <typename T>
454 void corun(T& target);
455
484 template <typename P>
485 void corun_until(P&& predicate);
486
501
512 size_t num_workers() const noexcept;
513
520 size_t num_waiters() const noexcept;
521
525 size_t num_queues() const noexcept;
526
540 size_t num_topologies() const;
541
559
577 int this_worker_id() const;
578
579 // --------------------------------------------------------------------------
580 // Observer methods
581 // --------------------------------------------------------------------------
582
600 template <typename Observer, typename... ArgsT>
601 std::shared_ptr<Observer> make_observer(ArgsT&&... args);
602
608 template <typename Observer>
609 void remove_observer(std::shared_ptr<Observer> observer);
610
614 size_t num_observers() const noexcept;
615
616 // --------------------------------------------------------------------------
617 // Async Task Methods
618 // --------------------------------------------------------------------------
619
645 template <typename P, typename F>
646 auto async(P&& params, F&& func);
647
671 template <typename F>
672 auto async(F&& func);
673
697 template <typename P, typename F>
698 void silent_async(P&& params, F&& func);
699
722 template <typename F>
723 void silent_async(F&& func);
724
725 // --------------------------------------------------------------------------
726 // Silent Dependent Async Methods
727 // --------------------------------------------------------------------------
728
756 template <typename F, typename... Tasks>
757requires (std::same_as<std::decay_t<Tasks>, AsyncTask> && ...)
758 tf::AsyncTask silent_dependent_async(F&& func, Tasks&&... tasks);
759
791 template <TaskParamsLike P, typename F, typename... Tasks>
792 requires (std::same_as<std::decay_t<Tasks>, AsyncTask> && ...)
793 tf::AsyncTask silent_dependent_async(P&& params, F&& func, Tasks&&... tasks);
794
827 template <typename F, typename I>
828requires (!std::same_as<std::decay_t<I>, AsyncTask>)
829 tf::AsyncTask silent_dependent_async(F&& func, I first, I last);
830
865 template <TaskParamsLike P, typename F, typename I>
866 requires (!std::same_as<std::decay_t<I>, AsyncTask>)
867 tf::AsyncTask silent_dependent_async(P&& params, F&& func, I first, I last);
868
869 // --------------------------------------------------------------------------
870 // Dependent Async Methods
871 // --------------------------------------------------------------------------
872
910 template <typename F, typename... Tasks>
911requires (std::same_as<std::decay_t<Tasks>, AsyncTask> && ...)
912 auto dependent_async(F&& func, Tasks&&... tasks);
913
955 template <TaskParamsLike P, typename F, typename... Tasks>
956 requires (std::same_as<std::decay_t<Tasks>, AsyncTask> && ...)
957 auto dependent_async(P&& params, F&& func, Tasks&&... tasks);
958
999 template <typename F, typename I>
1000requires (!std::same_as<std::decay_t<I>, AsyncTask>)
1001 auto dependent_async(F&& func, I first, I last);
1002
1047 template <TaskParamsLike P, typename F, typename I>
1048 requires (!std::same_as<std::decay_t<I>, AsyncTask>)
1049 auto dependent_async(P&& params, F&& func, I first, I last);
1050
1051 // ----------------------------------------------------------------------------------------------
1052 // Task Group
1053 // ----------------------------------------------------------------------------------------------
1054
1100 TaskGroup task_group();
1101
1102 private:
1103
1104 struct Buffer {
1105 std::mutex mutex;
1106 UnboundedWSQ<Node*> queue;
1107 };
1108
1109 std::vector<Worker> _workers;
1110 std::vector<Buffer> _buffers;
1111
1112 // notifier's state variable and num_topologies should sit on different cachelines
1113 // or the false sharing can cause serious performance drop
1114 alignas(TF_CACHELINE_SIZE) DefaultNotifier _notifier;
1115 alignas(TF_CACHELINE_SIZE) std::atomic<size_t> _num_topologies {0};
1116
1117 std::unordered_map<std::thread::id, Worker*> _t2w;
1118 std::unordered_set<std::shared_ptr<ObserverInterface>> _observers;
1119
1120 void _shutdown();
1121 void _observer_prologue(Worker&, Node*);
1122 void _observer_epilogue(Worker&, Node*);
1123 void _spawn(size_t, std::shared_ptr<WorkerInterface>);
1124 void _exploit_task(Worker&, Node*&);
1125 bool _explore_task(Worker&, Node*&);
1126 void _schedule(Worker&, Node*);
1127 void _schedule(Node*);
1128 void _schedule_graph(Worker&, Graph&, Topology*, NodeBase*);
1129 void _spill(Node*);
1130 void _set_up_topology(Worker*, Topology*);
1131 void _tear_down_topology(Worker&, Topology*, Node*&);
1132 void _tear_down_async(Worker&, Node*, Node*&);
1133 void _tear_down_dependent_async(Worker&, Node*, Node*&);
1134 void _tear_down_nonasync(Worker&, Node*, Node*&);
1135 void _tear_down_invoke(Worker&, Node*, Node*&);
1136 void _increment_topology();
1137 void _decrement_topology();
1138 void _invoke(Worker&, Node*);
1139 void _invoke_static_task(Worker&, Node*);
1140 void _invoke_nonpreemptive_runtime_task(Worker&, Node*);
1141 void _invoke_condition_task(Worker&, Node*, SmallVector<int>&);
1142 void _invoke_multi_condition_task(Worker&, Node*, SmallVector<int>&);
1143 void _process_dependent_async(Node*, tf::AsyncTask&, size_t&);
1144 void _process_exception(Worker&, Node*);
1145 void _update_cache(Worker&, Node*&, Node*);
1146 void _corun_graph(Worker&, Graph&, Topology*, NodeBase*);
1147
1148 bool _wait_for_task(Worker&, Node*&);
1149 bool _invoke_subflow_task(Worker&, Node*);
1150 bool _invoke_module_task(Worker&, Node*);
1151 bool _invoke_adopted_module_task(Worker&, Node*);
1152 bool _invoke_module_task_impl(Worker&, Node*, Graph&);
1153 bool _invoke_async_task(Worker&, Node*);
1154 bool _invoke_dependent_async_task(Worker&, Node*);
1155 bool _invoke_runtime_task(Worker&, Node*);
1156 bool _invoke_runtime_task_impl(Worker&, Node*, std::function<void(Runtime&)>&);
1157 bool _invoke_runtime_task_impl(Worker&, Node*, std::function<void(Runtime&, bool)>&);
1158
1159 size_t _set_up_graph(Graph&, Topology*, NodeBase*);
1160
1161 template <typename P>
1162 void _corun_until(Worker&, P&&);
1163
1164 template <typename I>
1165 void _bulk_schedule(Worker&, I, size_t);
1166
1167 template <typename I>
1168 void _bulk_schedule(I, size_t);
1169
1170 template <typename I>
1171 void _bulk_spill(I, size_t);
1172
1173 template <typename I>
1174 void _bulk_spill_round_robin(I, size_t);
1175
1176 template <size_t N>
1177 void _bulk_update_cache(Worker&, Node*&, Node*, std::array<Node*, N>&, size_t&);
1178
1179 template <typename P, typename F>
1180 auto _async(P&&, F&&, Topology*, NodeBase*);
1181
1182 template <typename P, typename F>
1183 void _silent_async(P&&, F&&, Topology*, NodeBase*);
1184
1185 template <TaskParamsLike P, typename F, typename I>
1186 requires (!std::same_as<std::decay_t<I>, AsyncTask>)
1187 auto _dependent_async(P&&, F&&, I, I, Topology*, NodeBase*);
1188
1189 template <TaskParamsLike P, typename F, typename I>
1190 requires (!std::same_as<std::decay_t<I>, AsyncTask>)
1191 auto _silent_dependent_async(P&&, F&&, I, I, Topology*, NodeBase*);
1192
1193 template <typename... ArgsT>
1194 void _schedule_async_task(ArgsT&&...);
1195
1196 template <typename I, typename... ArgsT>
1197 AsyncTask _schedule_dependent_async_task(I, I, size_t, ArgsT&&...);
1198};
1199
1200#ifndef DOXYGEN_GENERATING_OUTPUT
1201
1202// Constructor
1203inline Executor::Executor(size_t N, std::shared_ptr<WorkerInterface> wif) :
1204 _workers (N),
1205 _buffers (std::bit_width(N)), // Empirically, we find that log2(N) performs best.
1206 _notifier (N) {
1207
1208 if(N == 0) {
1209 TF_THROW("executor must define at least one worker");
1210 }
1211
1212 // If spawning N threads fails, shut down any created threads before
1213 // rethrowing the exception.
1214#ifndef TF_DISABLE_EXCEPTION_HANDLING
1215 try {
1216#endif
1217 _spawn(N, std::move(wif));
1218#ifndef TF_DISABLE_EXCEPTION_HANDLING
1219 }
1220 catch(...) {
1221 _shutdown();
1222 std::rethrow_exception(std::current_exception());
1223 }
1224#endif
1225
1226 // initialize the default observer if requested
1227 if(has_env(TF_ENABLE_PROFILER)) {
1228 TFProfManager::get()._manage(make_observer<TFProfObserver>());
1229 }
1230}
1231
1232// Destructor
1233inline Executor::~Executor() {
1234 _shutdown();
1235}
1236
1237// Function: _shutdown
1238inline void Executor::_shutdown() {
1239
1240 // wait for all topologies to complete
1241 wait_for_all();
1242
1243 // shut down the scheduler
1244 for(size_t i=0; i<_workers.size(); ++i) {
1245 _workers[i]._done.test_and_set(std::memory_order_relaxed);
1246 }
1247
1248 _notifier.notify_all();
1249
1250 // Only join the thread if it is joinable, as std::thread construction
1251 // may fail and throw an exception.
1252 for(auto& w : _workers) {
1253 if(w._thread.joinable()) {
1254 w._thread.join();
1255 }
1256 }
1257}
1258
1259// Function: num_workers
1260inline size_t Executor::num_workers() const noexcept {
1261 return _workers.size();
1262}
1263
1264// Function: num_waiters
1265inline size_t Executor::num_waiters() const noexcept {
1266 return _notifier.num_waiters();
1267}
1268
1269// Function: num_queues
1270inline size_t Executor::num_queues() const noexcept {
1271 return _workers.size() + _buffers.size();
1272}
1273
1274// Function: num_topologies
1275inline size_t Executor::num_topologies() const {
1276 return _num_topologies.load(std::memory_order_relaxed);
1277}
1278
1279// Function: this_worker
1280inline Worker* Executor::this_worker() {
1281 auto itr = _t2w.find(std::this_thread::get_id());
1282 return itr == _t2w.end() ? nullptr : itr->second;
1283}
1284
1285// Function: this_worker_id
1286inline int Executor::this_worker_id() const {
1287 auto i = _t2w.find(std::this_thread::get_id());
1288 return i == _t2w.end() ? -1 : static_cast<int>(i->second->_id);
1289}
1290
1291// Procedure: _spawn
1292inline void Executor::_spawn(size_t N, std::shared_ptr<WorkerInterface> wif) {
1293
1294 for(size_t id=0; id<N; ++id) {
1295 _workers[id]._thread = std::thread([&, id, wif] () {
1296
1297 auto& worker = _workers[id];
1298
1299 worker._id = id;
1300 worker._sticky_victim = id;
1301 worker._rdgen.seed(static_cast<uint32_t>(std::hash<std::thread::id>()(std::this_thread::get_id())));
1302
1303 // before entering the work-stealing loop, call the scheduler prologue
1304 if(wif) {
1305 wif->scheduler_prologue(worker);
1306 }
1307
1308 Node* t = nullptr;
1309 std::exception_ptr ptr = nullptr;
1310
1311 // must use 1 as condition instead of !done because
1312 // the previous worker may stop while the following workers
1313 // are still preparing for entering the scheduling loop
1314#ifndef TF_DISABLE_EXCEPTION_HANDLING
1315 try {
1316#endif
1317 // work-stealing loop
1318 while(1) {
1319
1320 // drains out the local queue first
1321 _exploit_task(worker, t);
1322
1323 // steals and waits for tasks
1324 if(_wait_for_task(worker, t) == false) {
1325 break;
1326 }
1327 }
1328
1329#ifndef TF_DISABLE_EXCEPTION_HANDLING
1330 }
1331 catch(...) {
1332 ptr = std::current_exception();
1333 }
1334#endif
1335
1336 // call the user-specified epilogue function
1337 if(wif) {
1338 wif->scheduler_epilogue(worker, ptr);
1339 }
1340
1341 });
1342
1343 // We avoid using thread-local storage to track the mapping between a thread
1344 // and its corresponding worker in an executor. On Windows, thread-local
1345 // storage can be unreliable in certain situations (see issue #727).
1346 //
1347 // Instead, we maintain a per-executor mapping from threads to workers.
1348 // This approach has an additional advantage: according to the C++ Standard,
1349 // std::thread::id uniquely identifies a thread object. Therefore, once the map
1350 // returns a valid worker, we can be certain that the worker belongs to this
1351 // executor. This eliminates the need for additional executor validation
1352 // required by using thread-local storage.
1353 //
1354 // Example:
1355 //
1356 // Worker* w = this_worker();
1357 // // Using thread-local storage, we would need additional executor validation:
1358 // if (w == nullptr || w->_executor != this) { /* caller is not a worker of this executor */ }
1359 //
1360 // // Using per-executor mapping, it suffices to check:
1361 // if (w == nullptr) { /* caller is not a worker of this executor */ }
1362 //
1363 _t2w.emplace(_workers[id]._thread.get_id(), &_workers[id]);
1364 }
1365}
1366
1367// Function: _explore_task
1368inline bool Executor::_explore_task(Worker& w, Node*& t) {
1369
1370 // Fast path: if no topologies are live, all queues are guaranteed empty
1371 // by the executor's invariant (num_topologies reaches zero only after all
1372 // nodes have been scheduled and their queues flushed). Skip the entire
1373 // steal loop and return immediately so the caller enters _wait_for_task
1374 // to sleep. relaxed ordering is sufficient — this is a hint, and any
1375 // missed update is caught safely by the 2PC guard in _wait_for_task.
1376 if(_num_topologies.load(std::memory_order_relaxed) == 0) {
1377 return true;
1378 }
1379
1380 const size_t MAX_VICTIM = num_queues(); // guaranteed >= 2 by constructor
1381 const size_t MAX_STEALS = ((MAX_VICTIM + 1) << 1);
1382
1383 // local aliases for steal protocol sentinels — these are properties of the
1384 // steal protocol, not of any specific queue type
1385 size_t num_steals = 0;
1386 size_t vtm = w._sticky_victim;
1387
1388 while(true) {
1389
1390 t = (vtm < _workers.size())
1391 ? _workers[vtm]._wsq.steal()
1392 : _buffers[vtm - _workers.size()].queue.steal();
1393
1394 if(t) {
1395 w._sticky_victim = vtm;
1396 break;
1397 }
1398
1399 // EMPTY: pick a new victim excluding self since our own queue is likely empty.
1400 // map [0, MAX_VICTIM-1) over [0, MAX_VICTIM) \ {w._id} — always safe since MAX_VICTIM >= 2.
1401 vtm = w._rdgen() % (MAX_VICTIM - 1);
1402 if(vtm >= w._id) vtm++;
1403
1404 if(++num_steals > MAX_STEALS) {
1405 std::this_thread::yield();
1406 if(num_steals > 150 + MAX_STEALS) {
1407 break;
1408 }
1409 }
1410
1411 if(w._done.test(std::memory_order_relaxed)) {
1412 return false;
1413 }
1414 }
1415
1416 return true;
1417}
1418
1419/*
1420// Function: _explore_task
1421inline bool Executor::_explore_task(Worker& w, Node*& t) {
1422
1423 // Fast path: if no topologies are live, all queues are guaranteed empty
1424 // by the executor's invariant (num_topologies reaches zero only after all
1425 // nodes have been scheduled and their queues flushed). Skip the entire
1426 // steal loop and return immediately so the caller enters _wait_for_task
1427 // to sleep. relaxed ordering is sufficient — this is a hint, and any
1428 // missed update is caught safely by the 2PC guard in _wait_for_task.
1429 if(_num_topologies.load(std::memory_order_relaxed) == 0) {
1430 return true;
1431 }
1432
1433 //assert(!t);
1434 const size_t MAX_VICTIM = num_queues();
1435 const size_t MAX_STEALS = ((MAX_VICTIM + 1) << 1);
1436
1437 size_t num_steals = 0;
1438 size_t vtm = w._sticky_victim;
1439
1440 // Make the worker steal immediately from the assigned victim.
1441 while(true) {
1442
1443 // If the worker's victim thread is within the worker pool, steal from the worker's queue.
1444 // Otherwise, steal from the buffer, adjusting the victim index based on the worker pool size.
1445 t = (vtm < _workers.size())
1446 ? _workers[vtm]._wsq.steal()
1447 : _buffers[vtm - _workers.size()].queue.steal();
1448
1449 if(t) {
1450 w._sticky_victim = vtm;
1451 break;
1452 }
1453
1454 // Increment the steal count, and if it exceeds MAX_STEALS, yield the thread.
1455 // If the number of empty steals reaches MAX_STEALS, exit the loop.
1456 if (++num_steals > MAX_STEALS) {
1457 std::this_thread::yield();
1458 if(num_steals > 150 + MAX_STEALS) {
1459 break;
1460 }
1461 }
1462
1463 if(w._done.test(std::memory_order_relaxed)) {
1464 return false;
1465 }
1466
1467 // Randomely generate a next victim.
1468 vtm = w._rdgen() % MAX_VICTIM;
1469 }
1470 return true;
1471}
1472*/
1473
1474// Procedure: _exploit_task
1475inline void Executor::_exploit_task(Worker& w, Node*& t) {
1476 while(t) {
1477 _invoke(w, t);
1478 t = w._wsq.pop();
1479 }
1480}
1481
1482// Function: _wait_for_task
1483inline bool Executor::_wait_for_task(Worker& w, Node*& t) {
1484
1485 explore_task:
1486
1487 if(_explore_task(w, t) == false) {
1488 return false;
1489 }
1490
1491 // Go exploit the task if we successfully steal one.
1492 if(t) {
1493 return true;
1494 }
1495
1496 // Entering the 2PC guard as all queues are likely empty after many stealing attempts.
1497 _notifier.prepare_wait(w._id);
1498
1499 // Fast path: if no topologies are live, all queues are guaranteed empty.
1500 // Skip the O(N) buffer and worker queue scans and go directly to sleep.
1501 // This is safe because prepare_wait has already been called — any notify
1502 // that arrives after this check but before commit_wait will be caught by
1503 // the 2PC guarantee of the notifier.
1504 if(_num_topologies.load(std::memory_order_relaxed) == 0) {
1505 // still check done flag before committing to sleep
1506 if(w._done.test(std::memory_order_relaxed)) {
1507 _notifier.cancel_wait(w._id);
1508 return false;
1509 }
1510 _notifier.commit_wait(w._id);
1511 goto explore_task;
1512 }
1513
1514 // Condition #1: buffers should be empty
1515 for(size_t b=0; b<_buffers.size(); ++b) {
1516 if(!_buffers[b].queue.empty()) {
1517 _notifier.cancel_wait(w._id);
1518 w._sticky_victim = b + _workers.size();
1519 goto explore_task;
1520 }
1521 }
1522
1523 // Condition #2: worker queues should be empty
1524 // Note: We need to use index-based looping to avoid data race with _spawn
1525 // which initializes other worker data structure at the same time.
1526 // Also, due to the property of a work-stealing queue, we don't need to check
1527 // this worker's work-stealing queue.
1528 for(size_t k=0; k<_workers.size()-1; ++k) {
1529 if(size_t vtm = k + (k >= w._id); !_workers[vtm]._wsq.empty()) {
1530 _notifier.cancel_wait(w._id);
1531 w._sticky_victim = vtm;
1532 goto explore_task;
1533 }
1534 }
1535
1536 // Condition #3: worker should be alive
1537 if(w._done.test(std::memory_order_relaxed)) {
1538 _notifier.cancel_wait(w._id);
1539 return false;
1540 }
1541
1542 // Now I really need to relinquish myself to others.
1543 _notifier.commit_wait(w._id);
1544 goto explore_task;
1545}
1546
1547// Function: make_observer
1548template<typename Observer, typename... ArgsT>
1549std::shared_ptr<Observer> Executor::make_observer(ArgsT&&... args) {
1550
1551 static_assert(
1552 std::is_base_of_v<ObserverInterface, Observer>,
1553 "Observer must be derived from ObserverInterface"
1554 );
1555
1556 // use a local variable to mimic the constructor
1557 auto ptr = std::make_shared<Observer>(std::forward<ArgsT>(args)...);
1558
1559 ptr->set_up(_workers.size());
1560
1561 _observers.emplace(std::static_pointer_cast<ObserverInterface>(ptr));
1562
1563 return ptr;
1564}
1565
1566// Procedure: remove_observer
1567template <typename Observer>
1568void Executor::remove_observer(std::shared_ptr<Observer> ptr) {
1569
1570 static_assert(
1571 std::is_base_of_v<ObserverInterface, Observer>,
1572 "Observer must be derived from ObserverInterface"
1573 );
1574
1575 _observers.erase(std::static_pointer_cast<ObserverInterface>(ptr));
1576}
1577
1578// Function: num_observers
1579inline size_t Executor::num_observers() const noexcept {
1580 return _observers.size();
1581}
1582
1583// Procedure: _spill
1584inline void Executor::_spill(Node* item) {
1585 // Since pointers are aligned to 8 bytes, we perform a simple hash to avoid
1586 // contention caused by hashing to the same slot.
1587 auto b = (reinterpret_cast<uintptr_t>(item) >> 16) % _buffers.size();
1588 std::scoped_lock lock(_buffers[b].mutex);
1589 _buffers[b].queue.push(item);
1590}
1591
1592// Procedure: _bulk_spill (single batch to one buffer)
1593// Uses Knuth multiplicative hash on the first pointer to select a buffer,
1594// providing better bit diffusion than the shift-based approach, especially
1595// when the allocator returns pointers with regular low-bit patterns.
1596template <typename I>
1597void Executor::_bulk_spill(I first, size_t N) {
1598 //assert(N != 0);
1599 auto b = ((reinterpret_cast<uintptr_t>(*first) * 2654435761ULL) >> 32) % _buffers.size();
1600 std::scoped_lock lock(_buffers[b].mutex);
1601 _buffers[b].queue.bulk_push(first, N);
1602}
1603
1604// Procedure: _bulk_spill
1605// Distributes a batch of N spilled nodes across all buffers in round-robin
1606// order starting from a hash of the first node's pointer. Each buffer's lock
1607// is held only for its chunk, reducing contention compared to sending the
1608// entire batch to a single buffer.
1609template <typename I>
1610void Executor::_bulk_spill_round_robin(I first, size_t N) {
1611
1612 // assert(N != 0);
1613 const size_t B = _buffers.size();
1614 const size_t start = ((reinterpret_cast<uintptr_t>(*first) * 2654435761ULL) >> 32) % B;
1615 const size_t per_buf = (N + B - 1) / B;
1616 size_t remaining = N;
1617 for(size_t i = 0; i < B && remaining > 0; ++i) {
1618 size_t b = (start + i) % B;
1619 size_t chunk = std::min(per_buf, remaining);
1620 {
1621 std::scoped_lock lock(_buffers[b].mutex);
1622 _buffers[b].queue.bulk_push(first, chunk);
1623 }
1624 // terminates early via remaining > 0, so we don't acquire unnecessary locks on empty chunks.
1625 remaining -= chunk;
1626 }
1627}
1628
1629// Procedure: _schedule
1630inline void Executor::_schedule(Worker& worker, Node* node) {
1631 // starting at v3.5 we do not use any complicated notification mechanism
1632 // as the experimental result has shown no significant advantage.
1633 if(worker._wsq.try_push(node) == false) {
1634 _spill(node);
1635 }
1636 _notifier.notify_one();
1637}
1638
1639// Procedure: _schedule
1640inline void Executor::_schedule(Node* node) {
1641 _spill(node);
1642 _notifier.notify_one();
1643}
1644
1645// Procedure: _schedule
1646template <typename I>
1647void Executor::_bulk_schedule(Worker& worker, I first, size_t num_nodes) {
1648
1649 if(num_nodes == 0) {
1650 return;
1651 }
1652
1653 // NOTE: We cannot use first/last in the for-loop (e.g., for(; first != last; ++first)).
1654 // This is because when a node v is inserted into the queue, v can run and finish
1655 // immediately. If v is the last node in the graph, it will tear down the parent task vector
1656 // which cause the last ++first to fail. This problem is specific to MSVC which has a stricter
1657 // iterator implementation in std::vector than GCC/Clang.
1658 if(auto n = worker._wsq.try_bulk_push(first, num_nodes); n != num_nodes) {
1659 _bulk_spill(first, num_nodes - n);
1660 }
1661 _notifier.notify_n(num_nodes);
1662
1663 // notify first before spilling to hopefully wake up workers earlier
1664 // however, the experiment does not show any benefit for doing this.
1665 //auto n = worker._wsq.try_bulk_push(first, num_nodes);
1666 //_notifier.notify_n(n);
1667 //_bulk_schedule(first + n, num_nodes - n);
1668}
1669
1670// Procedure: _schedule
1671template <typename I>
1672inline void Executor::_bulk_schedule(I first, size_t num_nodes) {
1673
1674 if(num_nodes == 0) {
1675 return;
1676 }
1677
1678 // NOTE: We cannot use first/last in the for-loop (e.g., for(; first != last; ++first)).
1679 // This is because when a node v is inserted into the queue, v can run and finish
1680 // immediately. If v is the last node in the graph, it will tear down the parent task vector
1681 // which cause the last ++first to fail. This problem is specific to MSVC which has a stricter
1682 // iterator implementation in std::vector than GCC/Clang.
1683 _bulk_spill(first, num_nodes);
1684 _notifier.notify_n(num_nodes);
1685}
1686
1687// Function: _update_cache
1688TF_FORCE_INLINE void Executor::_update_cache(Worker& worker, Node*& cache, Node* node) {
1689 if(cache) {
1690 _schedule(worker, cache);
1691 }
1692 cache = node;
1693}
1694
1695// Function: _bulk_update_cache
1696template <size_t N>
1697TF_FORCE_INLINE void Executor::_bulk_update_cache(
1698 Worker& worker, Node*& cache, Node* node, std::array<Node*, N>& array, size_t& n
1699) {
1700 // experimental results show no benefit of using bulk_update_cache
1701 if(cache) {
1702 array[n++] = cache;
1703 if(n == N) {
1704 _bulk_schedule(worker, array, n);
1705 n = 0;
1706 }
1707 }
1708 cache = node;
1709}
1710
1711// Procedure: _invoke
1712inline void Executor::_invoke(Worker& worker, Node* node) {
1713
1714 #define TF_INVOKE_CONTINUATION() \
1715 if (cache) { \
1716 node = cache; \
1717 goto begin_invoke; \
1718 }
1719
1720 begin_invoke:
1721
1722 Node* cache {nullptr};
1723
1724 // if this is the second invoke due to preemption, directly jump to invoke task
1725 if(node->_nstate & NSTATE::PREEMPTED) {
1726 goto invoke_task;
1727 }
1728
1729 // If the work has been cancelled, there is no need to continue.
1730 // Here, we do tear_down_invoke since async tasks may also get cancelled where
1731 // we need to recycle the node.
1732 if(node->_is_parent_cancelled()) {
1733 _tear_down_invoke(worker, node, cache);
1734 TF_INVOKE_CONTINUATION();
1735 return;
1736 }
1737
1738 // if acquiring semaphore(s) exists, acquire them first
1739 if(node->_semaphores && !node->_semaphores->to_acquire.empty()) {
1740 SmallVector<Node*> waiters;
1741 if(!node->_acquire_all(waiters)) {
1742 _bulk_schedule(worker, waiters.begin(), waiters.size());
1743 return;
1744 }
1745 }
1746
1747 invoke_task:
1748
1749 SmallVector<int> conds;
1750
1751 // switch is faster than nested if-else due to jump table
1752 switch(node->_handle.index()) {
1753 // static task
1754 case Node::STATIC:{
1755 _invoke_static_task(worker, node);
1756 }
1757 break;
1758
1759 // runtime task
1760 case Node::RUNTIME:{
1761 if(_invoke_runtime_task(worker, node)) {
1762 return;
1763 }
1764 }
1765 break;
1766
1767 // non-preemptive runtime task
1768 case Node::NONPREEMPTIVE_RUNTIME:{
1769 _invoke_nonpreemptive_runtime_task(worker, node);
1770 }
1771 break;
1772
1773 // subflow task
1774 case Node::SUBFLOW: {
1775 if(_invoke_subflow_task(worker, node)) {
1776 return;
1777 }
1778 }
1779 break;
1780
1781 // condition task
1782 case Node::CONDITION: {
1783 _invoke_condition_task(worker, node, conds);
1784 }
1785 break;
1786
1787 // multi-condition task
1788 case Node::MULTI_CONDITION: {
1789 _invoke_multi_condition_task(worker, node, conds);
1790 }
1791 break;
1792
1793 // module task
1794 case Node::MODULE: {
1795 if(_invoke_module_task(worker, node)) {
1796 return;
1797 }
1798 }
1799 break;
1800
1801 // adopted module task
1802 case Node::ADOPTED_MODULE: {
1803 if(_invoke_adopted_module_task(worker, node)) {
1804 return;
1805 }
1806 }
1807 break;
1808
1809 // async task
1810 case Node::ASYNC: {
1811 if(_invoke_async_task(worker, node)) {
1812 return;
1813 }
1814 _tear_down_async(worker, node, cache);
1815 TF_INVOKE_CONTINUATION();
1816 return;
1817 }
1818 break;
1819
1820 // dependent async task
1821 case Node::DEPENDENT_ASYNC: {
1822 if(_invoke_dependent_async_task(worker, node)) {
1823 return;
1824 }
1825 _tear_down_dependent_async(worker, node, cache);
1826 TF_INVOKE_CONTINUATION();
1827 return;
1828 }
1829 break;
1830
1831 // monostate (placeholder)
1832 default:
1833 break;
1834 }
1835
1836 // if releasing semaphores exist, release them
1837 if(node->_semaphores && !node->_semaphores->to_release.empty()) {
1838 SmallVector<Node*> waiters;
1839 node->_release_all(waiters);
1840 _bulk_schedule(worker, waiters.begin(), waiters.size());
1841 }
1842
1843 // Reset the join counter with strong dependencies to support cycles.
1844 // + We must do this before scheduling the successors to avoid race
1845 // condition on _predecessors.
1846 // + We must use fetch_add instead of direct assigning
1847 // because the user-level call on "invoke" may explicitly schedule
1848 // this task again (e.g., pipeline) which can access the join_counter.
1849 node->_join_counter.fetch_add(
1850 node->_nstate & NSTATE::STRONG_DEPENDENCIES_MASK, std::memory_order_relaxed
1851 );
1852
1853 // Invoke the task based on the corresponding type
1854 switch(node->_handle.index()) {
1855
1856 // condition and multi-condition tasks
1857 case Node::CONDITION:
1858 case Node::MULTI_CONDITION: {
1859 for(auto cond : conds) {
1860 if(cond >= 0 && static_cast<size_t>(cond) < node->_num_successors) {
1861 auto s = node->_edges[cond];
1862 // zeroing the join counter for invariant
1863 s->_join_counter.store(0, std::memory_order_relaxed);
1864 node->_parent->_join_counter.fetch_add(1, std::memory_order_relaxed);
1865 _update_cache(worker, cache, s);
1866 }
1867 }
1868 }
1869 break;
1870
1871 // non-condition task
1872 default: {
1873 for(size_t i=0; i<node->_num_successors; ++i) {
1874 if(auto s = node->_edges[i]; s->_join_counter.fetch_sub(1, std::memory_order_acq_rel) == 1) {
1875 node->_parent->_join_counter.fetch_add(1, std::memory_order_relaxed);
1876 _update_cache(worker, cache, s);
1877 }
1878 }
1879 }
1880 break;
1881 }
1882
1883 // clean up the node after execution
1884 _tear_down_nonasync(worker, node, cache);
1885 TF_INVOKE_CONTINUATION();
1886}
1887
1888// Procedure: _tear_down_nonasync
1889inline void Executor::_tear_down_nonasync(Worker& worker, Node* node, Node*& cache) {
1890
1891 // we must check parent first before subtracting the join counter,
1892 // or it can introduce data race
1893 if(auto parent = node->_parent; parent == node->_topology) {
1894 if(parent->_join_counter.fetch_sub(1, std::memory_order_acq_rel) == 1) {
1895 _tear_down_topology(worker, node->_topology, cache);
1896 }
1897 }
1898 else {
1899 // needs to fetch every data before join counter becomes zero at which
1900 // the node may be deleted
1901 auto state = parent->_nstate;
1902 if(parent->_join_counter.fetch_sub(1, std::memory_order_acq_rel) == 1) {
1903 // this task is spawned from a preempted parent, so we need to resume it
1904 if(state & NSTATE::PREEMPTED) {
1905 _update_cache(worker, cache, static_cast<Node*>(parent));
1906 }
1907 }
1908 }
1909}
1910
1911// Procedure: _tear_down_invoke
1912inline void Executor::_tear_down_invoke(Worker& worker, Node* node, Node*& cache) {
1913 switch(node->_handle.index()) {
1914 case Node::ASYNC:
1915 _tear_down_async(worker, node, cache);
1916 break;
1917
1918 case Node::DEPENDENT_ASYNC:
1919 _tear_down_dependent_async(worker, node, cache);
1920 break;
1921
1922 default:
1923 _tear_down_nonasync(worker, node, cache);
1924 break;
1925 }
1926}
1927
1928// Procedure: _observer_prologue
1929inline void Executor::_observer_prologue(Worker& worker, Node* node) {
1930 for(auto& observer : _observers) {
1931 observer->on_entry(WorkerView(worker), TaskView(*node));
1932 }
1933}
1934
1935// Procedure: _observer_epilogue
1936inline void Executor::_observer_epilogue(Worker& worker, Node* node) {
1937 for(auto& observer : _observers) {
1938 observer->on_exit(WorkerView(worker), TaskView(*node));
1939 }
1940}
1941
1942// Procedure: _process_exception
1943inline void Executor::_process_exception(Worker&, Node* node) {
1944
1945 // Finds the anchor and mark the entire path with exception,
1946 // so recursive tasks can be cancelled properly.
1947 // Since exception can come from asynchronous task (with runtime), the node itself can be anchored.
1948 NodeBase* ea = node; // explicit anchor
1949 NodeBase* ia = nullptr; // implicit anchor
1950
1951 while(ea && (ea->_estate.load(std::memory_order_relaxed) & ESTATE::EXPLICITLY_ANCHORED) == 0) {
1952 ea->_estate.fetch_or(ESTATE::EXCEPTION, std::memory_order_relaxed);
1953 // we only want the inner-most implicit anchor
1954 if(ia == nullptr && (ea->_nstate & NSTATE::IMPLICITLY_ANCHORED)) {
1955 ia = ea;
1956 }
1957 ea = ea->_parent;
1958 }
1959
1960 // flag used to ensure execution is caught in a thread-safe manner
1961 constexpr static auto flag = ESTATE::EXCEPTION | ESTATE::CAUGHT;
1962
1963 // The exception occurs under a blocking call (e.g., corun, join).
1964 if(ea) {
1965 // multiple tasks may throw, and we only take the first thrown exception
1966 if((ea->_estate.fetch_or(flag, std::memory_order_relaxed) & ESTATE::CAUGHT) == 0) {
1967 ea->_exception_ptr = std::current_exception();
1968 return;
1969 }
1970 }
1971 // Implicit anchor has the lowest priority
1972 else if(ia){
1973 if((ia->_estate.fetch_or(flag, std::memory_order_relaxed) & ESTATE::CAUGHT) == 0) {
1974 ia->_exception_ptr = std::current_exception();
1975 return;
1976 }
1977 }
1978
1979 // For now, we simply store the exception in this node; this can happen in an
1980 // execution that does not have any external control to capture the exception,
1981 // such as silent async task without any parent.
1982 node->_exception_ptr = std::current_exception();
1983}
1984
1985// Procedure: _invoke_static_task
1986inline void Executor::_invoke_static_task(Worker& worker, Node* node) {
1987 _observer_prologue(worker, node);
1988 TF_EXECUTOR_EXCEPTION_HANDLER(worker, node, {
1989 std::get_if<Node::Static>(&node->_handle)->work();
1990 });
1991 _observer_epilogue(worker, node);
1992}
1993
1994// Procedure: _invoke_subflow_task
1995inline bool Executor::_invoke_subflow_task(Worker& worker, Node* node) {
1996
1997 auto& h = *std::get_if<Node::Subflow>(&node->_handle);
1998 auto& g = h.subgraph;
1999
2000 if((node->_nstate & NSTATE::PREEMPTED) == 0) {
2001
2002 // set up the subflow
2003 Subflow sf(*this, worker, node, g);
2004
2005 // invoke the subflow callable
2006 _observer_prologue(worker, node);
2007 TF_EXECUTOR_EXCEPTION_HANDLER(worker, node, {
2008 h.work(sf);
2009 });
2010 _observer_epilogue(worker, node);
2011
2012 // spawn the subflow if it is joinable and its graph is non-empty
2013 // implicit join is faster than Subflow::join as it does not involve corun
2014 if(sf.joinable() && !g.empty()) {
2015
2016 // signal the executor to preempt this node
2017 node->_nstate |= NSTATE::PREEMPTED;
2018
2019 // set up and schedule the graph
2020 _schedule_graph(worker, g, node->_topology, node);
2021 return true;
2022 }
2023 }
2024 else {
2025 node->_nstate &= ~NSTATE::PREEMPTED;
2026 }
2027
2028 // The subflow has finished or joined.
2029 // By default, we clear the subflow storage as applications can perform recursive
2030 // subflow tasking which accumulates a huge amount of memory overhead, hampering
2031 // the performance.
2032 if((node->_nstate & NSTATE::RETAIN_SUBFLOW) == 0) {
2033 g.clear();
2034 }
2035
2036 return false;
2037}
2038
2039// Procedure: _invoke_condition_task
2040inline void Executor::_invoke_condition_task(
2041 Worker& worker, Node* node, SmallVector<int>& conds
2042) {
2043 _observer_prologue(worker, node);
2044 TF_EXECUTOR_EXCEPTION_HANDLER(worker, node, {
2045 auto& work = std::get_if<Node::Condition>(&node->_handle)->work;
2046 conds = { work() };
2047 });
2048 _observer_epilogue(worker, node);
2049}
2050
2051// Procedure: _invoke_multi_condition_task
2052inline void Executor::_invoke_multi_condition_task(
2053 Worker& worker, Node* node, SmallVector<int>& conds
2054) {
2055 _observer_prologue(worker, node);
2056 TF_EXECUTOR_EXCEPTION_HANDLER(worker, node, {
2057 conds = std::get_if<Node::MultiCondition>(&node->_handle)->work();
2058 });
2059 _observer_epilogue(worker, node);
2060}
2061
2062// Procedure: _invoke_module_task
2063inline bool Executor::_invoke_module_task(Worker& w, Node* node) {
2064 return _invoke_module_task_impl(w, node, std::get_if<Node::Module>(&node->_handle)->graph);
2065}
2066
2067// Procedure: _invoke_adopted_module_task
2068inline bool Executor::_invoke_adopted_module_task(Worker& w, Node* node) {
2069 return _invoke_module_task_impl(w, node, std::get_if<Node::AdoptedModule>(&node->_handle)->graph);
2070}
2071
2072// Procedure: _invoke_module_task_impl
2073inline bool Executor::_invoke_module_task_impl(Worker& w, Node* node, Graph& graph) {
2074
2075 // No need to do anything for empty graph
2076 if(graph.empty()) {
2077 return false;
2078 }
2079
2080 // first entry - not spawned yet
2081 if((node->_nstate & NSTATE::PREEMPTED) == 0) {
2082 // signal the executor to preempt this node
2083 node->_nstate |= NSTATE::PREEMPTED;
2084 _schedule_graph(w, graph, node->_topology, node);
2085 return true;
2086 }
2087
2088 // second entry - already spawned
2089 node->_nstate &= ~NSTATE::PREEMPTED;
2090
2091 return false;
2092}
2093
2094
2095// Procedure: _invoke_async_task
2096inline bool Executor::_invoke_async_task(Worker& worker, Node* node) {
2097 auto& work = std::get_if<Node::Async>(&node->_handle)->work;
2098 switch(work.index()) {
2099 // void()
2100 case 0:
2101 _observer_prologue(worker, node);
2102 TF_EXECUTOR_EXCEPTION_HANDLER(worker, node, {
2103 std::get_if<0>(&work)->operator()();
2104 });
2105 _observer_epilogue(worker, node);
2106 break;
2107
2108 // void(Runtime&)
2109 case 1:
2110 if(_invoke_runtime_task_impl(worker, node, *std::get_if<1>(&work))) {
2111 return true;
2112 }
2113 break;
2114
2115 // void(Runtime&, bool)
2116 case 2:
2117 if(_invoke_runtime_task_impl(worker, node, *std::get_if<2>(&work))) {
2118 return true;
2119 }
2120 break;
2121 }
2122
2123 return false;
2124}
2125
2126// Procedure: _invoke_dependent_async_task
2127inline bool Executor::_invoke_dependent_async_task(Worker& worker, Node* node) {
2128 auto& work = std::get_if<Node::DependentAsync>(&node->_handle)->work;
2129 switch(work.index()) {
2130 // void()
2131 case 0:
2132 _observer_prologue(worker, node);
2133 TF_EXECUTOR_EXCEPTION_HANDLER(worker, node, {
2134 std::get_if<0>(&work)->operator()();
2135 });
2136 _observer_epilogue(worker, node);
2137 break;
2138
2139 // void(Runtime&) - silent async
2140 case 1:
2141 if(_invoke_runtime_task_impl(worker, node, *std::get_if<1>(&work))) {
2142 return true;
2143 }
2144 break;
2145
2146 // void(Runtime&, bool) - async
2147 case 2:
2148 if(_invoke_runtime_task_impl(worker, node, *std::get_if<2>(&work))) {
2149 return true;
2150 }
2151 break;
2152 }
2153 return false;
2154}
2155
2156// Function: run
2157inline tf::Future<void> Executor::run(Taskflow& f) {
2158 return run_n(f, 1, [](){});
2159}
2160
2161// Function: run
2162inline tf::Future<void> Executor::run(Taskflow&& f) {
2163 return run_n(std::move(f), 1, [](){});
2164}
2165
2166// Function: run
2167template <typename C>
2168tf::Future<void> Executor::run(Taskflow& f, C&& c) {
2169 return run_n(f, 1, std::forward<C>(c));
2170}
2171
2172// Function: run
2173template <typename C>
2174tf::Future<void> Executor::run(Taskflow&& f, C&& c) {
2175 return run_n(std::move(f), 1, std::forward<C>(c));
2176}
2177
2178// Function: run_n
2179inline tf::Future<void> Executor::run_n(Taskflow& f, size_t repeat) {
2180 return run_n(f, repeat, [](){});
2181}
2182
2183// Function: run_n
2184inline tf::Future<void> Executor::run_n(Taskflow&& f, size_t repeat) {
2185 return run_n(std::move(f), repeat, [](){});
2186}
2187
2188// Function: run_n
2189template <typename C>
2190tf::Future<void> Executor::run_n(Taskflow& f, size_t repeat, C&& c) {
2191 return run_until(
2192 f, [repeat]() mutable { return repeat-- == 0; }, std::forward<C>(c)
2193 );
2194}
2195
2196// Function: run_n
2197template <typename C>
2198tf::Future<void> Executor::run_n(Taskflow&& f, size_t repeat, C&& c) {
2199 return run_until(
2200 std::move(f), [repeat]() mutable { return repeat-- == 0; }, std::forward<C>(c)
2201 );
2202}
2203
2204// Function: run_until
2205template<typename P>
2206tf::Future<void> Executor::run_until(Taskflow& f, P&& pred) {
2207 return run_until(f, std::forward<P>(pred), [](){});
2208}
2209
2210// Function: run_until
2211template<typename P>
2212tf::Future<void> Executor::run_until(Taskflow&& f, P&& pred) {
2213 return run_until(std::move(f), std::forward<P>(pred), [](){});
2214}
2215
2216// Function: run_until
2217template <typename P, typename C>
2218tf::Future<void> Executor::run_until(Taskflow& f, P&& p, C&& c) {
2219
2220 // No need to create a real topology but returns an dummy future for invariant.
2221 if(f.empty() || p()) {
2222 c();
2223 std::promise<void> promise;
2224 promise.set_value();
2225 return tf::Future<void>(promise.get_future());
2226 }
2227
2228 _increment_topology();
2229
2230 // create a topology for this run
2231 auto t = std::make_shared<Topology>(f, std::forward<P>(p), std::forward<C>(c));
2232 //auto t = std::make_shared<DerivedTopology<P, C>>(f, std::forward<P>(p), std::forward<C>(c));
2233
2234 // need to create future before the topology got torn down quickly
2235 tf::Future<void> future(t->_promise.get_future(), t);
2236
2237 // modifying topology needs to be protected under the lock
2238 if(f._fetch_enqueue(t) == 0) {
2239 _set_up_topology(this_worker(), t.get());
2240 }
2241
2242 return future;
2243}
2244
2245
2246
2247// Function: corun_until
2248template <typename P>
2249void Executor::corun_until(P&& predicate) {
2250
2251 Worker* w = this_worker();
2252 if(w == nullptr) {
2253 TF_THROW("corun_until must be called by a worker of the executor");
2254 }
2255
2256 _corun_until(*w, std::forward<P>(predicate));
2257}
2258
2259/*
2260// Function: _corun_until
2261template <typename P>
2262void Executor::_corun_until(Worker& w, P&& stop_predicate) {
2263
2264 const size_t MAX_VICTIM = num_queues();
2265 const size_t MAX_STEALS = ((MAX_VICTIM + 1) << 1);
2266
2267 bool stop = false;
2268
2269 while(!stop && !(stop = stop_predicate())) {
2270
2271 // try local queue first — only one task at a time to avoid deep
2272 // recursive corun calls causing stack overflow
2273 if(auto t = w._wsq.pop(); t) {
2274 _invoke(w, t);
2275 continue;
2276 }
2277
2278 // local queue empty: steal from others until stop_predicate or stolen.
2279 // stop is set by the inner loop condition so when predicate becomes true
2280 // the outer loop exits immediately without calling stop_predicate again.
2281 size_t num_steals = 0;
2282 size_t vtm = w._sticky_victim;
2283
2284 while(!(stop = stop_predicate())) {
2285
2286 auto t = (vtm < _workers.size())
2287 ? _workers[vtm]._wsq.steal()
2288 : _buffers[vtm - _workers.size()].queue.steal();
2289
2290 if(t) {
2291 // STOLEN: invoke task then return to outer loop to re-check
2292 // local queue and stop_predicate
2293 _invoke(w, t);
2294 w._sticky_victim = vtm;
2295 break;
2296 }
2297
2298 // pick a new victim excluding self
2299 vtm = w._rdgen() % (MAX_VICTIM - 1);
2300 if(vtm >= w._id) vtm++;
2301
2302 if(++num_steals > MAX_STEALS) {
2303 // unlike _explore_task we cannot sleep here — the calling worker
2304 // is blocked inside a task and must keep making progress to avoid
2305 // deadlock. yield to let other threads run and make progress.
2306 std::this_thread::yield();
2307 }
2308 }
2309 }
2310}*/
2311
2312// Function: _corun_until
2313template <typename P>
2314void Executor::_corun_until(Worker& w, P&& stop_predicate) {
2315
2316 const size_t MAX_VICTIM = num_queues();
2317 const size_t MAX_STEALS = ((MAX_VICTIM + 1) << 1);
2318
2319 exploit:
2320
2321 while(!stop_predicate()) {
2322
2323 // here we don't do while-loop to drain out the local queue as it can
2324 // potentially enter a very deep recursive corun, cuasing stack overflow
2325 if(auto t = w._wsq.pop(); t) {
2326 _invoke(w, t);
2327 }
2328 else {
2329 size_t num_steals = 0;
2330 size_t vtm = w._sticky_victim;
2331
2332 explore:
2333
2334 t = (vtm < _workers.size())
2335 ? _workers[vtm]._wsq.steal()
2336 : _buffers[vtm-_workers.size()].queue.steal();
2337
2338 if(t) {
2339 _invoke(w, t);
2340 w._sticky_victim = vtm;
2341 goto exploit;
2342 }
2343 else if(!stop_predicate()) {
2344 if(++num_steals > MAX_STEALS) {
2345 std::this_thread::yield();
2346 }
2347 vtm = w._rdgen() % MAX_VICTIM;
2348 goto explore;
2349 }
2350 else {
2351 break;
2352 }
2353 }
2354 }
2355}
2356
2357// Function: corun
2358template <typename T>
2359void Executor::corun(T& target) {
2360
2361 Worker* w = this_worker();
2362 if(w == nullptr) {
2363 TF_THROW("corun must be called by a worker of the executor");
2364 }
2365
2366 NodeBase anchor;
2367 _corun_graph(*w, retrieve_graph(target), nullptr, &anchor);
2368}
2369
2370// Procedure: _corun_graph
2371inline void Executor::_corun_graph(Worker& w, Graph& g, Topology* tpg, NodeBase* p) {
2372
2373 // empty graph
2374 if(g.empty()) {
2375 return;
2376 }
2377
2378 // anchor this parent as the blocking point
2379 {
2380 ExplicitAnchorGuard anchor(p);
2381 _schedule_graph(w, g, tpg, p);
2382 _corun_until(w, [p] () -> bool {
2383 return p->_join_counter.load(std::memory_order_acquire) == 0; }
2384 );
2385 }
2386
2387 // rethrow the exception to the caller
2388 p->_rethrow_exception();
2389}
2390
2391// Procedure: _increment_topology
2392inline void Executor::_increment_topology() {
2393 _num_topologies.fetch_add(1, std::memory_order_relaxed);
2394}
2395
2396// Procedure: _decrement_topology
2397inline void Executor::_decrement_topology() {
2398 if(_num_topologies.fetch_sub(1, std::memory_order_acq_rel) == 1) {
2399 _num_topologies.notify_all();
2400 }
2401}
2402
2403// Procedure: wait_for_all
2404inline void Executor::wait_for_all() {
2405 size_t n = _num_topologies.load(std::memory_order_acquire);
2406 while(n != 0) {
2407 _num_topologies.wait(n, std::memory_order_acquire);
2408 n = _num_topologies.load(std::memory_order_acquire);
2409 }
2410}
2411
2412// Function: _schedule_graph
2413inline void Executor::_schedule_graph(
2414 Worker& worker, Graph& graph, Topology* tpg, NodeBase* parent
2415) {
2416 size_t num_srcs = _set_up_graph(graph, tpg, parent);
2417 parent->_join_counter.fetch_add(num_srcs, std::memory_order_relaxed);
2418 _bulk_schedule(worker, graph.begin(), num_srcs);
2419}
2420
2421// Function: _set_up_topology
2422inline void Executor::_set_up_topology(Worker* w, Topology* tpg) {
2423 // ---- under taskflow lock ----
2424 auto& g = tpg->_taskflow._graph;
2425 size_t num_srcs = _set_up_graph(g, tpg, tpg);
2426 tpg->_join_counter.store(num_srcs, std::memory_order_relaxed);
2427 w ? _bulk_schedule(*w, g.begin(), num_srcs) : _bulk_schedule(g.begin(), num_srcs);
2428}
2429
2430// Function: _set_up_graph
2431inline size_t Executor::_set_up_graph(Graph& graph, Topology* tpg, NodeBase* parent) {
2432
2433 auto first = graph.begin();
2434 auto last = graph.end();
2435 auto send = first;
2436 for(; first != last; ++first) {
2437
2438 auto node = *first;
2439 node->_topology = tpg;
2440 node->_parent = parent;
2441 node->_nstate = NSTATE::NONE;
2442 node->_estate.store(ESTATE::NONE, std::memory_order_relaxed);
2443 node->_set_up_join_counter();
2444 node->_exception_ptr = nullptr;
2445
2446 // move source to the first partition
2447 // root, root, root, v1, v2, v3, v4, ...
2448 if(node->num_predecessors() == 0) {
2449 std::iter_swap(send++, first);
2450 }
2451 }
2452 return send - graph.begin();
2453}
2454
2455// Function: _tear_down_topology
2456inline void Executor::_tear_down_topology(Worker& worker, Topology* tpg, Node*& cache) {
2457
2458 auto &f = tpg->_taskflow;
2459
2460 //assert(&tpg == &(f._topologies.front()));
2461
2462 // case 1: we still need to run the topology again
2463 //if(!tpg->_exception_ptr && !tpg->cancelled() && !tpg->predicate()) {
2464 if(!tpg->cancelled() && !tpg->_predicate()) {
2465 //assert(tpg->_join_counter == 0);
2466 //std::lock_guard<std::mutex> lock(f._mutex);
2467 _schedule_graph(worker, tpg->_taskflow._graph, tpg, tpg);
2468 }
2469 // case 2: the final run of this topology
2470 else {
2471
2472 // invoke the callback after each run
2473 tpg->_on_finish();
2474
2475 // there is another topologies to run
2476 if(std::unique_lock<std::mutex> lock(f._mutex); f._topologies.size()>1) {
2477
2478 auto fetched_tpg {std::move(f._topologies.front())};
2479 //assert(fetched_tpg.get() == tpg);
2480
2481 f._topologies.pop();
2482 tpg = f._topologies.front().get();
2483
2484 lock.unlock();
2485
2486 // Soon after we carry out the promise, the associate taskflow may got destroyed
2487 // from the user side, and we should never tough it again.
2488 fetched_tpg->_carry_out_promise();
2489
2490 // decrement the topology
2491 _decrement_topology();
2492
2493 _schedule_graph(worker, tpg->_taskflow._graph, tpg, tpg);
2494 }
2495 else {
2496 //assert(f._topologies.size() == 1);
2497
2498 auto fetched_tpg {std::move(f._topologies.front())};
2499 //assert(fetched_tpg.get() == tpg);
2500
2501 f._topologies.pop();
2502
2503 lock.unlock();
2504
2505 // Soon after we carry out the promise, the associate taskflow may got destroyed
2506 // from the user side, and we should never tough it again.
2507 fetched_tpg->_carry_out_promise();
2508
2509 _decrement_topology();
2510
2511 // remove the parent that owns the moved taskflow so the storage can be freed
2512 if(auto parent = fetched_tpg->_parent; parent) {
2513 //auto state = parent->_nstate;
2514 if(parent->_join_counter.fetch_sub(1, std::memory_order_acq_rel) == 1) {
2515 // this async is spawned from a preempted parent, so we need to resume it
2516 //if(state & NSTATE::PREEMPTED) {
2517 _update_cache(worker, cache, static_cast<Node*>(parent));
2518 //}
2519 }
2520 }
2521 }
2522 }
2523}
2524
2525// ############################################################################
2526// Forward Declaration: Subflow
2527// ############################################################################
2528
2529inline void Subflow::join() {
2530
2531 if(!joinable()) {
2532 TF_THROW("subflow already joined");
2533 }
2534
2535 _executor._corun_graph(_worker, _graph, _node->_topology, _node);
2536
2537 // join here since corun graph may throw exception
2538 _node->_nstate |= NSTATE::JOINED_SUBFLOW;
2539}
2540
2541#endif
2542
2543
2544
2545
2546} // end of namespace tf -----------------------------------------------------
class to hold a dependent asynchronous task with shared ownership
Definition async_task.hpp:45
void silent_async(P &&params, F &&func)
similar to tf::Executor::async but does not return a future object
tf::AsyncTask silent_dependent_async(F &&func, Tasks &&... tasks)
runs the given function asynchronously when the given predecessors finish
tf::Future< void > run_until(Taskflow &taskflow, P &&pred)
runs a taskflow multiple times until the predicate becomes true
void corun_until(P &&predicate)
keeps running the work-stealing loop until the predicate returns true
void remove_observer(std::shared_ptr< Observer > observer)
removes an observer from the executor
auto dependent_async(F &&func, Tasks &&... tasks)
runs the given function asynchronously when the given predecessors finish
tf::Future< void > run(Taskflow &&taskflow)
runs a moved taskflow once
tf::Future< void > run(Taskflow &taskflow)
runs a taskflow once
size_t num_waiters() const noexcept
queries the number of workers that are in the waiting loop
tf::Future< void > run(Taskflow &&taskflow, C &&callable)
runs a moved taskflow once and invoke a callback upon completion
~Executor()
destructs the executor
int this_worker_id() const
queries the id of the caller thread within this executor
size_t num_queues() const noexcept
queries the number of work-stealing queues used by the executor
tf::Future< void > run_n(Taskflow &taskflow, size_t N)
runs a taskflow for N times
size_t num_topologies() const
queries the number of running topologies at the time of this call
Executor(size_t N=std::thread::hardware_concurrency(), std::shared_ptr< WorkerInterface > wif=nullptr)
constructs the executor with N worker threads
TaskGroup task_group()
creates a task group that executes a collection of asynchronous tasks
Definition task_group.hpp:875
void corun(T &target)
runs a target graph and waits until it completes using an internal worker of this executor
size_t num_workers() const noexcept
queries the number of worker threads
tf::Future< void > run_until(Taskflow &&taskflow, P &&pred)
runs a moved taskflow and keeps running it until the predicate becomes true
void wait_for_all()
waits for all tasks to complete
tf::Future< void > run_n(Taskflow &taskflow, size_t N, C &&callable)
runs a taskflow for N times and then invokes a callback
tf::Future< void > run(Taskflow &taskflow, C &&callable)
runs a taskflow once and invoke a callback upon completion
tf::Future< void > run_n(Taskflow &&taskflow, size_t N)
runs a moved taskflow for N times
tf::Future< void > run_n(Taskflow &&taskflow, size_t N, C &&callable)
runs a moved taskflow for N times and then invokes a callback
tf::Future< void > run_until(Taskflow &taskflow, P &&pred, C &&callable)
runs a taskflow multiple times until the predicate becomes true and then invokes the callback
Worker * this_worker()
queries pointer to the calling worker if it belongs to this executor, otherwise returns nullptr
auto async(P &&params, F &&func)
creates a parameterized asynchronous task to run the given function
std::shared_ptr< Observer > make_observer(ArgsT &&... args)
constructs an observer to inspect the activities of worker threads
size_t num_observers() const noexcept
queries the number of observers
class to access the result of an execution
Definition taskflow.hpp:630
class to create a graph object
Definition graph.hpp:47
class to define a vector optimized for small array
Definition small_vector.hpp:931
void join()
enables the subflow to join its parent task
bool joinable() const noexcept
queries if the subflow is joinable
Definition flow_builder.hpp:1825
class to create a taskflow object
Definition taskflow.hpp:64
class to create a lock-free unbounded work-stealing queue
Definition wsq.hpp:105
class to create a worker in an executor
Definition worker.hpp:55
determines if a type is a task parameter type
Definition graph.hpp:202
taskflow namespace
Definition small_vector.hpp:20
NonblockingNotifier DefaultNotifier
the default notifier type used by Taskflow
Definition worker.hpp:38
Graph & retrieve_graph(T &target)
retrieves a reference to the underlying tf::Graph from an object
Definition graph.hpp:1067
bool has_env(const std::string &str)
checks whether an environment variable is defined
Definition os.hpp:284