Loading...
Searching...
No Matches
wsq.hpp
1#pragma once
2
3#include <bit>
4
5#include "../utility/macros.hpp"
6#include "../utility/traits.hpp"
7
12
13#ifndef TF_DEFAULT_BOUNDED_TASK_QUEUE_LOG_SIZE
21 #define TF_DEFAULT_BOUNDED_TASK_QUEUE_LOG_SIZE 8
22#endif
23
24#ifndef TF_DEFAULT_UNBOUNDED_TASK_QUEUE_LOG_SIZE
32 #define TF_DEFAULT_UNBOUNDED_TASK_QUEUE_LOG_SIZE 10
33#endif
34
35namespace tf {
36
37// ----------------------------------------------------------------------------
38// Work-stealing queue steal protocol sentinels
39//
40// These free functions define the two sentinel values used by steal operations
41// across all work-stealing queue types (BoundedWSQ, UnboundedWSQ). They encode
42// the result of a steal attempt into the return value itself, avoiding any
43// out-parameter or separate status type.
44//
45// For pointer types T:
46// wsq_empty_value<T>() = nullptr — queue was genuinely empty
47// wsq_contended_value<T>() = 0x1 — queue had work but CAS was lost to
48// another thief; caller should retry
49//
50// The sentinel 0x1 is safe because any real object pointer is aligned to at
51// least alignof(T) >= 1, and the OS never maps address 0x1. For void* there
52// is no pointee alignment to check, but the same reasoning applies — no
53// allocator ever returns address 0x1.
54//
55// For non-pointer types T, both return std::nullopt since sentinel encoding
56// is not possible without a dedicated out-of-band value.
57//
58// Both queue classes expose these as static member functions (empty_value,
59// contended_value) that delegate here, so callers can use either form.
60// ----------------------------------------------------------------------------
61
69template <typename T>
70constexpr auto wsq_empty_value() {
71 if constexpr (std::is_pointer_v<T>) {
72 return T{nullptr};
73 } else {
74 return std::optional<T>{std::nullopt};
75 }
76}
77
89template <typename T>
91 if constexpr (std::is_pointer_v<T>) {
92 return reinterpret_cast<T>(uintptr_t{1});
93 } else {
94 return std::optional<T>{std::nullopt};
95 }
96}
97
98// ----------------------------------------------------------------------------
99// Unbounded Work-stealing Queue (WSQ)
100// ----------------------------------------------------------------------------
101
102
127template <typename T>
129
130 struct Array {
131
132 size_t C;
133 size_t M;
134 std::atomic<T>* S;
135
136 explicit Array(size_t c) :
137 C {c},
138 M {c-1},
139 S {new std::atomic<T>[C]} {
140 }
141
142 ~Array() {
143 delete [] S;
144 }
145
146 size_t capacity() const noexcept {
147 return C;
148 }
149
150 void push(int64_t i, T o) noexcept {
151 S[i & M].store(o, std::memory_order_relaxed);
152 }
153
154 T pop(int64_t i) noexcept {
155 return S[i & M].load(std::memory_order_relaxed);
156 }
157
158 Array* resize(int64_t b, int64_t t) {
159 Array* ptr = new Array {2*C};
160 for(int64_t i=t; i!=b; ++i) {
161 ptr->push(i, pop(i));
162 }
163 return ptr;
164 }
165
166 Array* resize(int64_t b, int64_t t, size_t N) {
167 // assert(N>0);
168 Array* ptr = new Array {std::bit_ceil(C + N)};
169 for(int64_t i=t; i!=b; ++i) {
170 ptr->push(i, pop(i));
171 }
172 return ptr;
173 }
174
175 };
176
177 alignas(TF_CACHELINE_SIZE) std::atomic<int64_t> _top;
178 alignas(TF_CACHELINE_SIZE) std::atomic<int64_t> _bottom;
179 std::atomic<Array*> _array;
180 std::vector<Array*> _garbage;
181
182 public:
183
200 using value_type = std::conditional_t<std::is_pointer_v<T>, T, std::optional<T>>;
201
212 explicit UnboundedWSQ(int64_t LogSize = TF_DEFAULT_UNBOUNDED_TASK_QUEUE_LOG_SIZE);
213
218
229 bool empty() const noexcept;
230
241 size_t size() const noexcept;
242
256 size_t capacity() const noexcept;
257
278 void push(T item);
279
307 template <typename I>
308 void bulk_push(I& first, size_t N);
309
331
353
401
414 static constexpr auto empty_value() { return wsq_empty_value<T>(); }
415
426 static auto contended_value() { return wsq_contended_value<T>(); }
427
428 private:
429
430 Array* _resize_array(Array* a, int64_t b, int64_t t);
431 Array* _resize_array(Array* a, int64_t b, int64_t t, size_t N);
432};
433
434// Constructor
435template <typename T>
437 _top.store(0, std::memory_order_relaxed);
438 _bottom.store(0, std::memory_order_relaxed);
439 _array.store(new Array{(size_t{1} << LogSize)}, std::memory_order_relaxed);
440 _garbage.reserve(32);
441}
442
443// Destructor
444template <typename T>
446 for(auto a : _garbage) {
447 delete a;
448 }
449 delete _array.load();
450}
451
452// Function: empty
453template <typename T>
454bool UnboundedWSQ<T>::empty() const noexcept {
455 int64_t t = _top.load(std::memory_order_relaxed);
456 int64_t b = _bottom.load(std::memory_order_relaxed);
457 return (b <= t);
458}
459
460// Function: size
461template <typename T>
462size_t UnboundedWSQ<T>::size() const noexcept {
463 int64_t t = _top.load(std::memory_order_relaxed);
464 int64_t b = _bottom.load(std::memory_order_relaxed);
465 return static_cast<size_t>(b >= t ? b - t : 0);
466}
467
468// Function: push
469template <typename T>
471
472 int64_t b = _bottom.load(std::memory_order_relaxed);
473 int64_t t = _top.load(std::memory_order_acquire);
474 Array* a = _array.load(std::memory_order_relaxed);
475
476 // queue is full with one additional item (b-t+1)
477 if (a->capacity() < static_cast<size_t>(b - t + 1)) [[unlikely]] {
478 a = _resize_array(a, b, t);
479 }
480
481 a->push(b, o);
482 std::atomic_thread_fence(std::memory_order_release);
483
484 // original paper uses relaxed here but tsa complains
485 _bottom.store(b + 1, std::memory_order_release);
486}
487
488// Function: bulk_push
489template <typename T>
490template <typename I>
491void UnboundedWSQ<T>::bulk_push(I& first, size_t N) {
492
493 if(N == 0) return;
494
495 int64_t b = _bottom.load(std::memory_order_relaxed);
496 int64_t t = _top.load(std::memory_order_acquire);
497 Array* a = _array.load(std::memory_order_relaxed);
498
499 // queue is full with N additional items
500 if ( (b - t + N) > a->capacity() ) [[unlikely]] {
501 a = _resize_array(a, b, t, N);
502 }
503
504 for(size_t i=0; i<N; ++i) {
505 a->push(b++, *first++);
506 }
507 std::atomic_thread_fence(std::memory_order_release);
508
509 // original paper uses relaxed here but tsa complains
510 _bottom.store(b, std::memory_order_release);
511}
512
513// Function: pop
514template <typename T>
517
518 int64_t b = _bottom.load(std::memory_order_relaxed) - 1;
519 Array* a = _array.load(std::memory_order_relaxed);
520 _bottom.store(b, std::memory_order_relaxed);
521 std::atomic_thread_fence(std::memory_order_seq_cst);
522 int64_t t = _top.load(std::memory_order_relaxed);
523
524 //T item {nullptr};
525 auto item = empty_value();
526
527 if(t <= b) {
528 item = a->pop(b);
529 if(t == b) {
530 // the last item just got stolen
531 if(!_top.compare_exchange_strong(t, t+1, std::memory_order_seq_cst,
532 std::memory_order_relaxed)) {
533 //item = nullptr;
534 item = empty_value();
535 }
536 _bottom.store(b + 1, std::memory_order_relaxed);
537 }
538 }
539 else {
540 _bottom.store(b + 1, std::memory_order_relaxed);
541 }
542
543 return item;
544}
545
546// Function: steal
547template <typename T>
550
551 int64_t t = _top.load(std::memory_order_acquire);
552 std::atomic_thread_fence(std::memory_order_seq_cst);
553 int64_t b = _bottom.load(std::memory_order_acquire);
554
555 //T item {nullptr};
556 auto item = empty_value();
557
558 if(t < b) {
559 Array* a = _array.load(std::memory_order_consume);
560 item = a->pop(t);
561 if(!_top.compare_exchange_strong(t, t+1,
562 std::memory_order_seq_cst,
563 std::memory_order_relaxed)) {
564 //return nullptr;
565 return empty_value();
566 }
567 }
568
569 return item;
570}
571
572// Function: steal_with_feedback
573// Returns a stolen item, contended_value(), or empty_value() — see declaration.
574template <typename T>
577
578 int64_t t = _top.load(std::memory_order_acquire);
579 std::atomic_thread_fence(std::memory_order_seq_cst);
580 int64_t b = _bottom.load(std::memory_order_acquire);
581
582 if(t < b) {
583 // queue is non-empty: load the candidate item and attempt the CAS
584 Array* a = _array.load(std::memory_order_consume);
585 auto item = a->pop(t);
586 if(!_top.compare_exchange_strong(t, t+1,
587 std::memory_order_seq_cst,
588 std::memory_order_relaxed)) {
589 // CAS lost to another thief — queue had work but we didn't get it.
590 // Return contended_value() so the caller knows to retry this victim.
591 return contended_value();
592 }
593 return item;
594 }
595
596 // bottom <= top: queue is genuinely empty
597 return empty_value();
598}
599
600// Function: capacity
601template <typename T>
602size_t UnboundedWSQ<T>::capacity() const noexcept {
603 return _array.load(std::memory_order_relaxed)->capacity();
604}
605
606template <typename T>
607typename UnboundedWSQ<T>::Array*
608UnboundedWSQ<T>::_resize_array(Array* a, int64_t b, int64_t t) {
609 Array* tmp = a->resize(b, t);
610 _garbage.push_back(a);
611 // Note: the original paper using relaxed causes t-san to complain
612 _array.store(tmp, std::memory_order_release);
613 return tmp;
614}
615
616template <typename T>
617typename UnboundedWSQ<T>::Array*
618UnboundedWSQ<T>::_resize_array(Array* a, int64_t b, int64_t t, size_t N) {
619 Array* tmp = a->resize(b, t, N);
620 _garbage.push_back(a);
621 // Note: the original paper using relaxed causes t-san to complain
622 _array.store(tmp, std::memory_order_release);
623 return tmp;
624}
625
626// ----------------------------------------------------------------------------
627// Bounded Work-stealing Queue (WSQ)
628// ----------------------------------------------------------------------------
629
655template <typename T, size_t LogSize = TF_DEFAULT_BOUNDED_TASK_QUEUE_LOG_SIZE>
657
658 constexpr static size_t BufferSize = size_t{1} << LogSize;
659 constexpr static size_t BufferMask = (BufferSize - 1);
660
661 static_assert((BufferSize >= 2) && ((BufferSize & (BufferSize - 1)) == 0));
662
663 alignas(TF_CACHELINE_SIZE) std::atomic<int64_t> _top {0};
664 alignas(TF_CACHELINE_SIZE) std::atomic<int64_t> _bottom {0};
665 alignas(TF_CACHELINE_SIZE) std::atomic<T> _buffer[BufferSize];
666
667 public:
668
685 using value_type = std::conditional_t<std::is_pointer_v<T>, T, std::optional<T>>;
686
695 BoundedWSQ() = default;
696
700 ~BoundedWSQ() = default;
701
712 bool empty() const noexcept;
713
724 size_t size() const noexcept;
725
736 constexpr size_t capacity() const;
737
760 template <typename O>
761 bool try_push(O&& item);
762
789 template <typename I>
790 size_t try_bulk_push(I& first, size_t N);
791
812
834
882
895 static constexpr auto empty_value() { return wsq_empty_value<T>(); }
896
907 static auto contended_value() { return wsq_contended_value<T>(); }
908};
909
910// Function: empty
911template <typename T, size_t LogSize>
912bool BoundedWSQ<T, LogSize>::empty() const noexcept {
913 int64_t t = _top.load(std::memory_order_relaxed);
914 int64_t b = _bottom.load(std::memory_order_relaxed);
915 return b <= t;
916}
917
918// Function: size
919template <typename T, size_t LogSize>
920size_t BoundedWSQ<T, LogSize>::size() const noexcept {
921 int64_t t = _top.load(std::memory_order_relaxed);
922 int64_t b = _bottom.load(std::memory_order_relaxed);
923 return static_cast<size_t>(b >= t ? b - t : 0);
924}
925
926// Function: try_push
927template <typename T, size_t LogSize>
928template <typename O>
930
931 int64_t b = _bottom.load(std::memory_order_relaxed);
932 int64_t t = _top.load(std::memory_order_acquire);
933
934 // queue is full with one additional item (b-t+1)
935 if(static_cast<size_t>(b - t + 1) > BufferSize) [[unlikely]] {
936 return false;
937 }
938
939 _buffer[b & BufferMask].store(std::forward<O>(o), std::memory_order_relaxed);
940
941 std::atomic_thread_fence(std::memory_order_release);
942
943 // original paper uses relaxed here but tsa complains
944 _bottom.store(b + 1, std::memory_order_release);
945
946 return true;
947}
948
949// Function: try_bulk_push
950template <typename T, size_t LogSize>
951template <typename I>
952size_t BoundedWSQ<T, LogSize>::try_bulk_push(I& first, size_t N) {
953
954 if(N == 0) return 0;
955
956 int64_t b = _bottom.load(std::memory_order_relaxed);
957 int64_t t = _top.load(std::memory_order_acquire);
958
959 size_t r = BufferSize - (b - t); // remaining capacity
960 size_t n = std::min(N, r); // number of pushable elements
961
962 if(n > 0) {
963 // push n elements into the queue
964 for(size_t i=0; i<n; ++i) {
965 _buffer[b++ & BufferMask].store(*first++, std::memory_order_relaxed);
966 }
967 std::atomic_thread_fence(std::memory_order_release);
968 // original paper uses relaxed here but tsa complains
969 _bottom.store(b, std::memory_order_release);
970 }
971
972 return n;
973}
974
975// Function: pop
976template <typename T, size_t LogSize>
979
980 int64_t b = _bottom.load(std::memory_order_relaxed) - 1;
981 _bottom.store(b, std::memory_order_relaxed);
982 std::atomic_thread_fence(std::memory_order_seq_cst);
983 int64_t t = _top.load(std::memory_order_relaxed);
984
985 //T item {nullptr};
986 auto item = empty_value();
987
988 if(t <= b) {
989 item = _buffer[b & BufferMask].load(std::memory_order_relaxed);
990 if(t == b) {
991 // the last item just got stolen
992 if(!_top.compare_exchange_strong(t, t+1,
993 std::memory_order_seq_cst,
994 std::memory_order_relaxed)) {
995 //item = nullptr;
996 item = empty_value();
997 }
998 _bottom.store(b + 1, std::memory_order_relaxed);
999 }
1000 }
1001 else {
1002 _bottom.store(b + 1, std::memory_order_relaxed);
1003 }
1004
1005 return item;
1006}
1007
1008// Function: steal
1009template <typename T, size_t LogSize>
1012 int64_t t = _top.load(std::memory_order_acquire);
1013 std::atomic_thread_fence(std::memory_order_seq_cst);
1014 int64_t b = _bottom.load(std::memory_order_acquire);
1015
1016 //T item{nullptr};
1017 auto item = empty_value();
1018
1019 if(t < b) {
1020 item = _buffer[t & BufferMask].load(std::memory_order_relaxed);
1021 if(!_top.compare_exchange_strong(t, t+1,
1022 std::memory_order_seq_cst,
1023 std::memory_order_relaxed)) {
1024 //return nullptr;
1025 return empty_value();
1026 }
1027 }
1028
1029 return item;
1030}
1031
1032// Function: steal_with_feedback
1033// Returns a stolen item, contended_value(), or empty_value() — see declaration.
1034template <typename T, size_t LogSize>
1037
1038 int64_t t = _top.load(std::memory_order_acquire);
1039 std::atomic_thread_fence(std::memory_order_seq_cst);
1040 int64_t b = _bottom.load(std::memory_order_acquire);
1041
1042 if(t < b) {
1043 // queue is non-empty: load the candidate item and attempt the CAS
1044 auto item = _buffer[t & BufferMask].load(std::memory_order_relaxed);
1045 if(!_top.compare_exchange_strong(t, t+1,
1046 std::memory_order_seq_cst,
1047 std::memory_order_relaxed)) {
1048 // CAS lost to another thief — queue had work but we didn't get it.
1049 // Return contended_value() so the caller knows to retry this victim.
1050 return contended_value();
1051 }
1052 return item;
1053 }
1054
1055 // bottom <= top: queue is genuinely empty
1056 return empty_value();
1057}
1058
1059// Function: capacity
1060template <typename T, size_t LogSize>
1061constexpr size_t BoundedWSQ<T, LogSize>::capacity() const {
1062 return BufferSize;
1063}
1064
1065
1066} // end of namespace tf -----------------------------------------------------
size_t try_bulk_push(I &first, size_t N)
~BoundedWSQ()=default
destructs the queue
value_type steal_with_feedback()
constexpr size_t capacity() const
BoundedWSQ()=default
constructs the queue with a given capacity
bool try_push(O &&item)
std::conditional_t< std::is_pointer_v< T >, T, std::optional< T > > value_type
the return type of queue operations
Definition wsq.hpp:685
size_t size() const noexcept
static constexpr auto empty_value()
Definition wsq.hpp:895
static auto contended_value()
returns the contended sentinel value for pointer element types
Definition wsq.hpp:907
bool empty() const noexcept
queries if the queue is empty at the time of this call
Definition wsq.hpp:912
bool empty() const noexcept
queries if the queue is empty at the time of this call
Definition wsq.hpp:454
value_type pop()
pops out an item from the queue
Definition wsq.hpp:516
value_type steal()
Definition wsq.hpp:549
UnboundedWSQ(int64_t LogSize=TF_DEFAULT_UNBOUNDED_TASK_QUEUE_LOG_SIZE)
constructs the queue with the given size in the base-2 logarithm
Definition wsq.hpp:436
void push(T item)
inserts an item to the queue
Definition wsq.hpp:470
size_t size() const noexcept
Definition wsq.hpp:462
void bulk_push(I &first, size_t N)
Definition wsq.hpp:491
std::conditional_t< std::is_pointer_v< T >, T, std::optional< T > > value_type
the return type of queue operations
Definition wsq.hpp:200
size_t capacity() const noexcept
queries the capacity of the queue
Definition wsq.hpp:602
static auto contended_value()
returns the contended sentinel value for pointer element types
Definition wsq.hpp:426
static constexpr auto empty_value()
returns the empty sentinel value for the queue element type
Definition wsq.hpp:414
~UnboundedWSQ()
destructs the queue
Definition wsq.hpp:445
value_type steal_with_feedback()
Definition wsq.hpp:576
taskflow namespace
Definition small_vector.hpp:20
constexpr auto wsq_empty_value()
returns the empty sentinel for work-stealing steal operations
Definition wsq.hpp:70
auto wsq_contended_value()
returns the contended sentinel for work-stealing steal operations
Definition wsq.hpp:90