Loading...
Searching...
No Matches
nonblocking_notifier.hpp
1#pragma once
2
3#include <iostream>
4#include <vector>
5#include <cstdlib>
6#include <cstdio>
7#include <atomic>
8#include <memory>
9#include <deque>
10#include <mutex>
11#include <condition_variable>
12#include <thread>
13#include <algorithm>
14#include <numeric>
15#include <cassert>
16#include "../utility/os.hpp"
17
22
23namespace tf {
24
85
86 friend class Executor;
87
88 struct Waiter {
89 alignas (TF_CACHELINE_SIZE) std::atomic<Waiter*> next;
90 uint64_t epoch;
91
92 enum : unsigned {
93 kNotSignaled = 0,
94 kWaiting,
95 kSignaled,
96 };
97 std::atomic<unsigned> state {0};
98
99 //mutable std::mutex mu;
100 //std::condition_variable cv;
101 //unsigned state;
102 };
103
104 public:
105
106 // The state variable consists of the following three parts:
107 // - low STACK_BITS is a stack of waiters committed wait.
108 // - next PREWAITER_BITS is count of waiters in prewait state.
109 // - next EPOCH_BITS is modification counter.
110 // [ 32-bit epoch | 16-bit pre-waiter count | 16-bit pre-waiter stack]
111
113 static const uint64_t STACK_BITS = 16;
114
116 static const uint64_t STACK_MASK = (1ull << STACK_BITS) - 1;
117
119 static const uint64_t PREWAITER_BITS = 16;
120
122 static const uint64_t PREWAITER_SHIFT = 16;
123
125 static const uint64_t PREWAITER_MASK = ((1ull << PREWAITER_BITS) - 1) << PREWAITER_SHIFT;
126
128 static const uint64_t PREWAITER_INC = 1ull << PREWAITER_BITS;
129
131 static const uint64_t EPOCH_BITS = 32;
132
134 static const uint64_t EPOCH_SHIFT = 32;
135
137 static const uint64_t EPOCH_MASK = ((1ull << EPOCH_BITS) - 1) << EPOCH_SHIFT;
138
140 static const uint64_t EPOCH_INC = 1ull << EPOCH_SHIFT;
141
151 explicit NonblockingNotifier(size_t N) : _state(STACK_MASK), _waiters(N) {
152 if(_waiters.size() >= ((1 << PREWAITER_BITS) - 1)) {
153 TF_THROW("nonblocking waiter supports only up to ", (1<<PREWAITER_BITS)-1, " waiters");
154 }
155 //assert(_waiters.size() < (1 << PREWAITER_BITS) - 1);
156 // Initialize epoch to something close to overflow to test overflow.
157 //_state = STACK_MASK | (EPOCH_MASK - EPOCH_INC * _waiters.size() * 2);
158 }
159
164 // Ensure there are no waiters.
165 assert((_state.load() & (STACK_MASK | PREWAITER_MASK)) == STACK_MASK);
166 }
167
176 size_t num_waiters() const {
177 size_t n = 0;
178 for(auto& w : _waiters) {
179 n += (w.state.load(std::memory_order_relaxed) == Waiter::kWaiting);
180 //std::scoped_lock lock(w.mu);
181 //n += (w.state == Waiter::kWaiting);
182 }
183 return n;
184 }
185
192 size_t capacity() const {
193 return 1 << STACK_BITS;
194 }
195
212 void prepare_wait(size_t wid) {
213 _waiters[wid].epoch = _state.fetch_add(PREWAITER_INC, std::memory_order_relaxed);
214 std::atomic_thread_fence(std::memory_order_seq_cst);
215 }
216
235 void commit_wait(size_t wid) {
236
237 auto w = &_waiters[wid];
238
239 w->state.store(Waiter::kNotSignaled, std::memory_order_relaxed);
240
241 /*
242 Epoch and ticket semantics.
243
244 `sepoch` = _state & EPOCH_MASK
245 `wepoch` = w->epoch & EPOCH_MASK
246 `ticket` = w->epoch & PREWAITER_MASK
247
248 Each waiter entering the pre-waiting stage is assigned a monotonically
249 increasing ticket that determines the processing order (e.g.,
250 cancel_wait, commit_wait, notify). Ticket 0 is processed first, followed
251 by ticket 1, and so on.
252
253 The global epoch `sepoch` is incremented whenever a request is fulfilled.
254 Therefore, the difference `sepoch - wepoch` indicates which ticket is
255 currently ready to be handled:
256
257 - `sepoch - wepoch == ticket` : this waiter's turn
258 - `sepoch - wepoch > ticket` : this waiter's ticket has expired
259 - `sepoch - wepoch < ticket` : this waiter's ticket has not yet reached
260
261 Unsigned wraparound does not affect correctness. All epoch arithmetic is
262 performed using unsigned integers, which obey modulo-2^N arithmetic.
263 Converting the unsigned difference to a signed value yields the correct
264 result as long as the true difference lies within the signed range.
265
266 In general:
267 - Unsigned range: [0, 2^N − 1]
268 - Signed range : [−2^(N−1), 2^(N−1) − 1]
269
270 When overflow occurs, unsigned subtraction computes:
271
272 (sepoch − wepoch) mod 2^N
273
274 If the true value of `sepoch − wepoch` is within the signed range
275 [−2^(N−1), 2^(N−1) − 1], reinterpreting this result as a signed integer
276 produces the correct mathematical difference.
277
278 Example (3-bit arithmetic):
279
280 a b | true a−b | unsigned (bin / dec) | signed (dec)
281 ----------------------------------------------------
282 1 0 | 1 | 001 / 1 | +1
283 1 1 | 0 | 000 / 0 | 0
284 1 2 | -1 | 111 / 7 | -1
285 1 3 | -2 | 110 / 6 | -2
286 1 4 | -3 | 101 / 5 | -3
287 1 5 | -4 | 100 / 4 | -4
288 1 6 | -5 | 011 / 3 | +3 (wrap around)
289 1 7 | -6 | 010 / 2 | +2 (wrap around)
290
291 Signed interpretation is correct only when the true difference lies
292 within [−4, +3].
293
294 In this implementation, `sepoch − wepoch` is guaranteed not to exceed
295 2^16 in magnitude, which is far smaller than 2^(EPOCH_BITS − 1).
296 Consequently, the expression:
297
298 int64_t((state & EPOCH_MASK) - epoch)
299
300 remains correct even if `sepoch` and `wepoch` individually overflow.
301 */
302 uint64_t epoch =
303 (w->epoch & EPOCH_MASK) +
304 (((w->epoch & PREWAITER_MASK) >> PREWAITER_SHIFT) << EPOCH_SHIFT);
305 uint64_t state = _state.load(std::memory_order_seq_cst);
306 for (;;) {
307 if (int64_t((state & EPOCH_MASK) - epoch) < 0) {
308 // The preceding waiter has not decided on its fate. Wait until it
309 // calls either cancel_wait or commit_wait, or is notified.
310 std::this_thread::yield();
311 state = _state.load(std::memory_order_seq_cst);
312 continue;
313 }
314 // We've already been notified.
315 if (int64_t((state & EPOCH_MASK) - epoch) > 0) {
316 return;
317 }
318 // Remove this thread from prewait counter and add it to the waiter stack.
319 assert((state & PREWAITER_MASK) != 0);
320 uint64_t newstate = state - PREWAITER_INC + EPOCH_INC;
321 newstate = (newstate & ~STACK_MASK) | wid;
322
323 // stack is empty -> this waiter is at the top of the stack, pointing to nothing
324 if ((state & STACK_MASK) == STACK_MASK) {
325 w->next.store(nullptr, std::memory_order_relaxed);
326 }
327 // stack is non-empty -> this waiter is at the top of the stack, pointing to the origin top
328 else {
329 w->next.store(&_waiters[state & STACK_MASK], std::memory_order_relaxed);
330 }
331 if (_state.compare_exchange_weak(state, newstate, std::memory_order_release)) {
332 break;
333 }
334 }
335 _park(w);
336 }
337
356 void cancel_wait(size_t wid) {
357 uint64_t epoch =
358 (_waiters[wid].epoch & EPOCH_MASK) +
359 (((_waiters[wid].epoch & PREWAITER_MASK) >> PREWAITER_SHIFT) << EPOCH_SHIFT);
360 uint64_t state = _state.load(std::memory_order_relaxed);
361 for (;;) {
362 if (int64_t((state & EPOCH_MASK) - epoch) < 0) {
363 // The preceding waiter has not decided on its fate. Wait until it
364 // calls either cancel_wait or commit_wait, or is notified.
365 std::this_thread::yield();
366 state = _state.load(std::memory_order_relaxed);
367 continue;
368 }
369 // We've already been notified.
370 if (int64_t((state & EPOCH_MASK) - epoch) > 0) {
371 return;
372 }
373 // Remove this thread from prewait counter.
374 assert((state & PREWAITER_MASK) != 0);
375 if (_state.compare_exchange_weak(state, state - PREWAITER_INC + EPOCH_INC,
376 std::memory_order_relaxed)) {
377 return;
378 }
379 }
380 }
381
389 void notify_one() {
390 std::atomic_thread_fence(std::memory_order_seq_cst);
391 uint64_t state = _state.load(std::memory_order_acquire);
392 for (;;) {
393 // Easy case: no waiters.
394 if ((state & STACK_MASK) == STACK_MASK && (state & PREWAITER_MASK) == 0) {
395 return;
396 }
397 uint64_t num_prewaiters = (state & PREWAITER_MASK) >> PREWAITER_SHIFT;
398 uint64_t newstate;
399 if (num_prewaiters) {
400 // There is a thread in pre-wait state, unblock it.
401 newstate = state + EPOCH_INC - PREWAITER_INC;
402 }
403 else {
404 // Pop a waiter from list and unpark it.
405 Waiter* w = &_waiters[state & STACK_MASK];
406 Waiter* wnext = w->next.load(std::memory_order_relaxed);
407 uint64_t next = STACK_MASK;
408 //if (wnext != nullptr) next = wnext - &_waiters[0];
409 if (wnext != nullptr) {
410 next = static_cast<uint64_t>(wnext - &_waiters[0]);
411 }
412 // Note: we don't add EPOCH_INC here. ABA problem on the lock-free stack
413 // can't happen because a waiter is re-pushed onto the stack only after
414 // it was in the pre-wait state which inevitably leads to epoch increment.
415 newstate = (state & EPOCH_MASK) + next;
416 }
417 if (_state.compare_exchange_weak(state, newstate, std::memory_order_acquire)) {
418 if(num_prewaiters) {
419 return; // unblocked pre-wait thread
420 }
421 // if there is no pre-waiters, the stack must have something
422 Waiter* w = &_waiters[state & STACK_MASK];
423 w->next.store(nullptr, std::memory_order_relaxed);
424 _unpark(w);
425 return;
426 }
427 }
428 }
429
437 void notify_all() {
438 std::atomic_thread_fence(std::memory_order_seq_cst);
439 uint64_t state = _state.load(std::memory_order_acquire);
440 for (;;) {
441
442 // Easy case: no waiters.
443 if ((state & STACK_MASK) == STACK_MASK && (state & PREWAITER_MASK) == 0) {
444 return;
445 }
446 uint64_t num_prewaiters = (state & PREWAITER_MASK) >> PREWAITER_SHIFT;
447
448 // Reset prewait counter and empty wait list.
449 uint64_t newstate = (state & EPOCH_MASK) + (EPOCH_INC * num_prewaiters) + STACK_MASK;
450
451 if (_state.compare_exchange_weak(state, newstate, std::memory_order_acquire)) {
452 if ((state & STACK_MASK) == STACK_MASK) {
453 return;
454 }
455 Waiter* w = &_waiters[state & STACK_MASK];
456 _unpark(w);
457 return;
458 }
459 }
460 }
461
473 void notify_n(size_t N) {
474
475 // trivial case
476 if(N == 0) {
477 return;
478 }
479
480 // if the target N is bigger than the waiter size, notify all waiters
481 if(N >= _waiters.size()) {
482 notify_all();
483 return;
484 }
485
486 std::atomic_thread_fence(std::memory_order_seq_cst);
487 uint64_t state = _state.load(std::memory_order_acquire);
488 do {
489 // Easy case: no waiters.
490 if ((state & STACK_MASK) == STACK_MASK && (state & PREWAITER_MASK) == 0) {
491 return;
492 }
493 uint64_t num_prewaiters = (state & PREWAITER_MASK) >> PREWAITER_SHIFT;
494 uint64_t newstate;
495 size_t newN;
496
497 // unblock waiters from pre-waiting list first.
498 if(num_prewaiters) {
499 size_t to_unblock = (N < num_prewaiters) ? N : num_prewaiters;
500 newstate = state + (EPOCH_INC * to_unblock) - (PREWAITER_INC * to_unblock);
501 newN = N - to_unblock;
502 }
503 // pop one waiter from the stack
504 else {
505 Waiter* w = &_waiters[state & STACK_MASK];
506 Waiter* wnext = w->next.load(std::memory_order_relaxed);
507 uint64_t next = STACK_MASK;
508 //if (wnext != nullptr) next = wnext - &_waiters[0];
509 if (wnext != nullptr) {
510 next = static_cast<uint64_t>(wnext - &_waiters[0]);
511 }
512 // Note: we don't add EPOCH_INC here. ABA problem on the lock-free stack
513 // can't happen because a waiter is re-pushed onto the stack only after
514 // it was in the pre-wait state which inevitably leads to epoch increment.
515 newstate = (state & EPOCH_MASK) + next;
516 newN = N - 1;
517 }
518
519 if (_state.compare_exchange_weak(state, newstate, std::memory_order_acquire)) {
520 N = newN;
521 if(num_prewaiters == 0) {
522 Waiter* w = &_waiters[state & STACK_MASK];
523 w->next.store(nullptr, std::memory_order_relaxed);
524 _unpark(w);
525 }
526 }
527 } while(N > 0);
528
529 //if(n >= _waiters.size()) {
530 // notify_all();
531 //}
532 //else {
533 // for(size_t k=0; k<n; ++k) {
534 // notify_one();
535 // }
536 //}
537 }
538
546 size_t size() const {
547 return _waiters.size();
548 }
549
550 private:
551
552 std::atomic<uint64_t> _state;
553 std::vector<Waiter> _waiters;
554
555 // only this waiter can park itself, with the following two possible paths:
556 // 1. kNotSignaled (this) -> in-stack -> kWaiting (this) -> wait
557 // 2. kNotSignaled (this) -> in-stack -> kSignaled -> unwait
558 void _park(Waiter* w) {
559 unsigned target = Waiter::kNotSignaled;
560 if(w->state.compare_exchange_strong(target, Waiter::kWaiting, std::memory_order_relaxed
561 , std::memory_order_relaxed)) {
562 w->state.wait(Waiter::kWaiting, std::memory_order_relaxed);
563 }
564 //std::unique_lock<std::mutex> lock(w->mu);
565 //while (w->state != Waiter::kSignaled) {
566 // w->state = Waiter::kWaiting;
567 // w->cv.wait(lock);
568 //}
569 }
570
571 // others can unpark
572 void _unpark(Waiter* waiters) {
573 Waiter* next = nullptr;
574 for (Waiter* w = waiters; w; w = next) {
575 next = w->next.load(std::memory_order_relaxed);
576 // We only notify if the other is waiting - this is why we use tri-state
577 // variable instead of binary-state variable (i.e., atomic_flag)
578 // Performance is about 0.1% faster
579 if(w->state.exchange(Waiter::kSignaled, std::memory_order_relaxed) == Waiter::kWaiting) {
580 w->state.notify_one();
581 }
582 //unsigned state;
583 //{
584 // std::unique_lock<std::mutex> lock(w->mu);
585 // state = w->state;
586 // w->state = Waiter::kSignaled;
587 //}
589 //if (state == Waiter::kWaiting) w->cv.notify_one();
590 }
591 }
592
593 // notify wakes one or all waiting threads.
594 // Must be called after changing the associated wait predicate.
595 //void _notify(bool all) {
596 // std::atomic_thread_fence(std::memory_order_seq_cst);
597 // uint64_t state = _state.load(std::memory_order_acquire);
598 // for (;;) {
599 // // Easy case: no waiters.
600 // if ((state & STACK_MASK) == STACK_MASK && (state & PREWAITER_MASK) == 0) {
601 // return;
602 // }
603 // uint64_t num_prewaiters = (state & PREWAITER_MASK) >> PREWAITER_SHIFT;
604 // uint64_t newstate;
605 // if (all) {
606 // // Reset prewait counter and empty wait list.
607 // newstate = (state & EPOCH_MASK) + (EPOCH_INC * num_prewaiters) + STACK_MASK;
608 // } else if (num_prewaiters) {
609 // // There is a thread in pre-wait state, unblock it.
610 // newstate = state + EPOCH_INC - PREWAITER_INC;
611 // } else {
612 // // Pop a waiter from list and unpark it.
613 // Waiter* w = &_waiters[state & STACK_MASK];
614 // Waiter* wnext = w->next.load(std::memory_order_relaxed);
615 // uint64_t next = STACK_MASK;
616 // //if (wnext != nullptr) next = wnext - &_waiters[0];
617 // if (wnext != nullptr) {
618 // next = static_cast<uint64_t>(wnext - &_waiters[0]);
619 // }
620 // // Note: we don't add EPOCH_INC here. ABA problem on the lock-free stack
621 // // can't happen because a waiter is re-pushed onto the stack only after
622 // // it was in the pre-wait state which inevitably leads to epoch increment.
623 // newstate = (state & EPOCH_MASK) + next;
624 // }
625 // if (_state.compare_exchange_weak(state, newstate, std::memory_order_acquire)) {
626 // if(!all && num_prewaiters) return; // unblocked pre-wait thread
627 // if ((state & STACK_MASK) == STACK_MASK) return;
628 // Waiter* w = &_waiters[state & STACK_MASK];
629 // if(!all) {
630 // w->next.store(nullptr, std::memory_order_relaxed);
631 // }
632 // _unpark(w);
633 // return;
634 // }
635 // }
636 //}
637};
638
639
640} // namespace tf ------------------------------------------------------------
641
size_t num_waiters() const
returns the number of committed waiters
Definition nonblocking_notifier.hpp:176
static const uint64_t PREWAITER_SHIFT
Bit shift of the pre-waiter ticket field.
Definition nonblocking_notifier.hpp:122
NonblockingNotifier(size_t N)
constructs a notifier with N waiters
Definition nonblocking_notifier.hpp:151
static const uint64_t EPOCH_SHIFT
Bit shift of the epoch field.
Definition nonblocking_notifier.hpp:134
void cancel_wait(size_t wid)
cancels a previously prepared wait operation
Definition nonblocking_notifier.hpp:356
void prepare_wait(size_t wid)
prepares the calling thread to enter the waiting set
Definition nonblocking_notifier.hpp:212
static const uint64_t PREWAITER_INC
Increment value for advancing the pre-waiter ticket.
Definition nonblocking_notifier.hpp:128
static const uint64_t EPOCH_BITS
Number of bits used to encode the epoch counter.
Definition nonblocking_notifier.hpp:131
size_t capacity() const
returns the maximum number of waiters supported by this notifier
Definition nonblocking_notifier.hpp:192
static const uint64_t EPOCH_MASK
Bit mask for extracting the epoch field.
Definition nonblocking_notifier.hpp:137
void commit_wait(size_t wid)
commits a previously prepared wait operation
Definition nonblocking_notifier.hpp:235
static const uint64_t EPOCH_INC
Increment value for advancing the epoch counter.
Definition nonblocking_notifier.hpp:140
void notify_one()
notifies one waiter from the waiting set
Definition nonblocking_notifier.hpp:389
static const uint64_t PREWAITER_MASK
Bit mask for extracting the pre-waiter ticket field.
Definition nonblocking_notifier.hpp:125
static const uint64_t PREWAITER_BITS
Number of bits used to encode the pre-waiter ticket.
Definition nonblocking_notifier.hpp:119
static const uint64_t STACK_MASK
Bit mask for extracting the waiter stack index.
Definition nonblocking_notifier.hpp:116
void notify_n(size_t N)
notifies up to N waiters from the waiting set
Definition nonblocking_notifier.hpp:473
void notify_all()
notifies all waiter from the waiting set
Definition nonblocking_notifier.hpp:437
~NonblockingNotifier()
destructs the notifier
Definition nonblocking_notifier.hpp:163
static const uint64_t STACK_BITS
Number of bits used to encode the waiter stack index.
Definition nonblocking_notifier.hpp:113
size_t size() const
returns the number of waiters supported by this notifier
Definition nonblocking_notifier.hpp:546
taskflow namespace
Definition small_vector.hpp:20