11#include <condition_variable>
16#include "../utility/os.hpp"
86 friend class Executor;
89 alignas (TF_CACHELINE_SIZE) std::atomic<Waiter*> next;
97 std::atomic<unsigned> state {0};
153 TF_THROW(
"nonblocking waiter supports only up to ", (1<<PREWAITER_BITS)-1,
" waiters");
178 for(
auto& w : _waiters) {
179 n += (w.state.load(std::memory_order_relaxed) == Waiter::kWaiting);
213 _waiters[wid].epoch = _state.fetch_add(
PREWAITER_INC, std::memory_order_relaxed);
214 std::atomic_thread_fence(std::memory_order_seq_cst);
237 auto w = &_waiters[wid];
239 w->state.store(Waiter::kNotSignaled, std::memory_order_relaxed);
305 uint64_t state = _state.load(std::memory_order_seq_cst);
307 if (int64_t((state &
EPOCH_MASK) - epoch) < 0) {
310 std::this_thread::yield();
311 state = _state.load(std::memory_order_seq_cst);
315 if (int64_t((state &
EPOCH_MASK) - epoch) > 0) {
325 w->next.store(
nullptr, std::memory_order_relaxed);
329 w->next.store(&_waiters[state &
STACK_MASK], std::memory_order_relaxed);
331 if (_state.compare_exchange_weak(state, newstate, std::memory_order_release)) {
360 uint64_t state = _state.load(std::memory_order_relaxed);
362 if (int64_t((state &
EPOCH_MASK) - epoch) < 0) {
365 std::this_thread::yield();
366 state = _state.load(std::memory_order_relaxed);
370 if (int64_t((state &
EPOCH_MASK) - epoch) > 0) {
376 std::memory_order_relaxed)) {
390 std::atomic_thread_fence(std::memory_order_seq_cst);
391 uint64_t state = _state.load(std::memory_order_acquire);
399 if (num_prewaiters) {
406 Waiter* wnext = w->next.load(std::memory_order_relaxed);
409 if (wnext !=
nullptr) {
410 next =
static_cast<uint64_t
>(wnext - &_waiters[0]);
417 if (_state.compare_exchange_weak(state, newstate, std::memory_order_acquire)) {
423 w->next.store(
nullptr, std::memory_order_relaxed);
438 std::atomic_thread_fence(std::memory_order_seq_cst);
439 uint64_t state = _state.load(std::memory_order_acquire);
451 if (_state.compare_exchange_weak(state, newstate, std::memory_order_acquire)) {
481 if(N >= _waiters.size()) {
486 std::atomic_thread_fence(std::memory_order_seq_cst);
487 uint64_t state = _state.load(std::memory_order_acquire);
499 size_t to_unblock = (N < num_prewaiters) ? N : num_prewaiters;
501 newN = N - to_unblock;
506 Waiter* wnext = w->next.load(std::memory_order_relaxed);
509 if (wnext !=
nullptr) {
510 next =
static_cast<uint64_t
>(wnext - &_waiters[0]);
519 if (_state.compare_exchange_weak(state, newstate, std::memory_order_acquire)) {
521 if(num_prewaiters == 0) {
523 w->next.store(
nullptr, std::memory_order_relaxed);
547 return _waiters.size();
552 std::atomic<uint64_t> _state;
553 std::vector<Waiter> _waiters;
558 void _park(Waiter* w) {
559 unsigned target = Waiter::kNotSignaled;
560 if(w->state.compare_exchange_strong(target, Waiter::kWaiting, std::memory_order_relaxed
561 , std::memory_order_relaxed)) {
562 w->state.wait(Waiter::kWaiting, std::memory_order_relaxed);
572 void _unpark(Waiter* waiters) {
573 Waiter* next =
nullptr;
574 for (Waiter* w = waiters; w; w = next) {
575 next = w->next.load(std::memory_order_relaxed);
579 if(w->state.exchange(Waiter::kSignaled, std::memory_order_relaxed) == Waiter::kWaiting) {
580 w->state.notify_one();
size_t num_waiters() const
returns the number of committed waiters
Definition nonblocking_notifier.hpp:176
static const uint64_t PREWAITER_SHIFT
Bit shift of the pre-waiter ticket field.
Definition nonblocking_notifier.hpp:122
NonblockingNotifier(size_t N)
constructs a notifier with N waiters
Definition nonblocking_notifier.hpp:151
static const uint64_t EPOCH_SHIFT
Bit shift of the epoch field.
Definition nonblocking_notifier.hpp:134
void cancel_wait(size_t wid)
cancels a previously prepared wait operation
Definition nonblocking_notifier.hpp:356
void prepare_wait(size_t wid)
prepares the calling thread to enter the waiting set
Definition nonblocking_notifier.hpp:212
static const uint64_t PREWAITER_INC
Increment value for advancing the pre-waiter ticket.
Definition nonblocking_notifier.hpp:128
static const uint64_t EPOCH_BITS
Number of bits used to encode the epoch counter.
Definition nonblocking_notifier.hpp:131
size_t capacity() const
returns the maximum number of waiters supported by this notifier
Definition nonblocking_notifier.hpp:192
static const uint64_t EPOCH_MASK
Bit mask for extracting the epoch field.
Definition nonblocking_notifier.hpp:137
void commit_wait(size_t wid)
commits a previously prepared wait operation
Definition nonblocking_notifier.hpp:235
static const uint64_t EPOCH_INC
Increment value for advancing the epoch counter.
Definition nonblocking_notifier.hpp:140
void notify_one()
notifies one waiter from the waiting set
Definition nonblocking_notifier.hpp:389
static const uint64_t PREWAITER_MASK
Bit mask for extracting the pre-waiter ticket field.
Definition nonblocking_notifier.hpp:125
static const uint64_t PREWAITER_BITS
Number of bits used to encode the pre-waiter ticket.
Definition nonblocking_notifier.hpp:119
static const uint64_t STACK_MASK
Bit mask for extracting the waiter stack index.
Definition nonblocking_notifier.hpp:116
void notify_n(size_t N)
notifies up to N waiters from the waiting set
Definition nonblocking_notifier.hpp:473
void notify_all()
notifies all waiter from the waiting set
Definition nonblocking_notifier.hpp:437
~NonblockingNotifier()
destructs the notifier
Definition nonblocking_notifier.hpp:163
static const uint64_t STACK_BITS
Number of bits used to encode the waiter stack index.
Definition nonblocking_notifier.hpp:113
size_t size() const
returns the number of waiters supported by this notifier
Definition nonblocking_notifier.hpp:546
taskflow namespace
Definition small_vector.hpp:20