Loading...
Searching...
No Matches
wsq.hpp
1#pragma once
2
3#include <bit>
4
5#include "../utility/macros.hpp"
6#include "../utility/traits.hpp"
7
12
13#ifndef TF_DEFAULT_BOUNDED_TASK_QUEUE_LOG_SIZE
21 #define TF_DEFAULT_BOUNDED_TASK_QUEUE_LOG_SIZE 8
22#endif
23
24#ifndef TF_DEFAULT_UNBOUNDED_TASK_QUEUE_LOG_SIZE
32 #define TF_DEFAULT_UNBOUNDED_TASK_QUEUE_LOG_SIZE 10
33#endif
34
35namespace tf {
36
37// ----------------------------------------------------------------------------
38// Work-stealing queue steal protocol sentinels
39//
40// These free functions define the sentinel value used by steal operations
41// across all work-stealing queue types (BoundedWSQ, UnboundedWSQ). They encode
42// the result of a steal attempt into the return value itself, avoiding any
43// out-parameter or separate status type.
44//
45// For pointer types T:
46// wsq_empty_value<T>() = nullptr — queue was empty (or CAS was lost)
47//
48// The sentinel nullptr is returned whenever steal() cannot deliver a task,
49// whether due to the queue being genuinely empty or losing a concurrent CAS
50// race to another thief.
51//
52// For non-pointer types T, std::nullopt is returned to indicate the absence
53// of a value.
54//
55// Queue classes expose this as a static member function (empty_value) that
56// delegates here, so callers can use either form.
57// ----------------------------------------------------------------------------
58
66template <typename T>
67constexpr auto wsq_empty_value() {
68 if constexpr (std::is_pointer_v<T>) {
69 return T{nullptr};
70 } else {
71 return std::optional<T>{std::nullopt};
72 }
73}
74
75// ----------------------------------------------------------------------------
76// Unbounded Work-stealing Queue (WSQ)
77// ----------------------------------------------------------------------------
78
79
104template <typename T>
106
107 struct Array {
108
109 size_t C;
110 size_t M;
111 std::atomic<T>* S;
112
113 explicit Array(size_t c) :
114 C {c},
115 M {c-1},
116 S {new std::atomic<T>[C]} {
117 }
118
119 ~Array() {
120 delete [] S;
121 }
122
123 size_t capacity() const noexcept {
124 return C;
125 }
126
127 void push(int64_t i, T o) noexcept {
128 S[i & M].store(o, std::memory_order_relaxed);
129 }
130
131 T pop(int64_t i) noexcept {
132 return S[i & M].load(std::memory_order_relaxed);
133 }
134
135 Array* resize(int64_t b, int64_t t) {
136 Array* ptr = new Array(2*C);
137 for(int64_t i=t; i!=b; ++i) {
138 ptr->push(i, pop(i));
139 }
140 return ptr;
141 }
142
143 Array* resize(int64_t b, int64_t t, size_t N) {
144 // assert(N>0);
145 Array* ptr = new Array(std::bit_ceil(C + N));
146 for(int64_t i=t; i!=b; ++i) {
147 ptr->push(i, pop(i));
148 }
149 return ptr;
150 }
151
152 };
153
154 alignas(TF_CACHELINE_SIZE) std::atomic<int64_t> _top;
155 alignas(TF_CACHELINE_SIZE) std::atomic<int64_t> _bottom;
156
157 // Owner-private cached upper bound on _top. Never read by thieves.
158 // Because _top is never decremented, the real occupancy can only be
159 // smaller than what is computed using this cached value, so using it
160 // for the overflow check is always safe.
161 int64_t _cached_top {0};
162
163 // _array on its own cache line: avoids false-sharing with _bottom when
164 // thieves load _array (consume) after reading _bottom (acquire).
165 alignas(TF_CACHELINE_SIZE) std::atomic<Array*> _array;
166 std::vector<Array*> _garbage;
167
168 public:
169
186 using value_type = std::conditional_t<std::is_pointer_v<T>, T, std::optional<T>>;
187
198 explicit UnboundedWSQ(int64_t LogSize = TF_DEFAULT_UNBOUNDED_TASK_QUEUE_LOG_SIZE);
199
204
215 bool empty() const noexcept;
216
227 size_t size() const noexcept;
228
242 size_t capacity() const noexcept;
243
264 void push(T item);
265
293 template <typename I>
294 void bulk_push(I& first, size_t N);
295
317
339
352 static constexpr auto empty_value() { return wsq_empty_value<T>(); }
353
354 private:
355
356 Array* _resize_array(Array* a, int64_t b, int64_t t);
357 Array* _resize_array(Array* a, int64_t b, int64_t t, size_t N);
358};
359
360// Constructor
361template <typename T>
363 _top.store(0, std::memory_order_relaxed);
364 _bottom.store(0, std::memory_order_relaxed);
365 _array.store(new Array{(size_t{1} << LogSize)}, std::memory_order_relaxed);
366 _garbage.reserve(32);
367}
368
369// Destructor
370template <typename T>
372 for(auto a : _garbage) {
373 delete a;
374 }
375 delete _array.load();
376}
377
378// Function: empty
379template <typename T>
380bool UnboundedWSQ<T>::empty() const noexcept {
381 int64_t t = _top.load(std::memory_order_relaxed);
382 int64_t b = _bottom.load(std::memory_order_relaxed);
383 return (b <= t);
384}
385
386// Function: size
387template <typename T>
388size_t UnboundedWSQ<T>::size() const noexcept {
389 int64_t t = _top.load(std::memory_order_relaxed);
390 int64_t b = _bottom.load(std::memory_order_relaxed);
391 return static_cast<size_t>(b >= t ? b - t : 0);
392}
393
394// Function: push
395template <typename T>
397
398 int64_t b = _bottom.load(std::memory_order_relaxed);
399 Array* a = _array.load(std::memory_order_relaxed);
400
401 // queue is full with one additional item (b-t+1)
402 if(a->capacity() < static_cast<size_t>(b - _cached_top + 1)) [[unlikely]] {
403 _cached_top = _top.load(std::memory_order_acquire);
404 if(a->capacity() < static_cast<size_t>(b - _cached_top + 1)) [[unlikely]] {
405 a = _resize_array(a, b, _cached_top);
406 }
407 }
408
409 a->push(b, o);
410 std::atomic_thread_fence(std::memory_order_release);
411
412 // original paper uses relaxed here but tsa complains
413 _bottom.store(b + 1, std::memory_order_release);
414}
415
416// Function: bulk_push
417template <typename T>
418template <typename I>
419void UnboundedWSQ<T>::bulk_push(I& first, size_t N) {
420
421 if(N == 0) return;
422
423 int64_t b = _bottom.load(std::memory_order_relaxed);
424 Array* a = _array.load(std::memory_order_relaxed);
425
426 // queue is full with N additional items
427 if((b - _cached_top + N) > a->capacity()) [[unlikely]] {
428 _cached_top = _top.load(std::memory_order_acquire);
429 if((b - _cached_top + N) > a->capacity()) [[unlikely]] {
430 a = _resize_array(a, b, _cached_top, N);
431 }
432 }
433
434 for(size_t i=0; i<N; ++i) {
435 a->push(b++, *first++);
436 }
437 std::atomic_thread_fence(std::memory_order_release);
438
439 // original paper uses relaxed here but tsa complains
440 _bottom.store(b, std::memory_order_release);
441}
442
443// Function: pop
444template <typename T>
447
448 int64_t b = _bottom.load(std::memory_order_relaxed) - 1;
449 Array* a = _array.load(std::memory_order_relaxed);
450 _bottom.store(b, std::memory_order_relaxed);
451 std::atomic_thread_fence(std::memory_order_seq_cst);
452 int64_t t = _top.load(std::memory_order_relaxed);
453
454 //T item {nullptr};
455 auto item = empty_value();
456
457 if(t <= b) {
458 item = a->pop(b);
459 if(t == b) {
460 // the last item just got stolen
461 if(!_top.compare_exchange_strong(t, t+1, std::memory_order_seq_cst,
462 std::memory_order_relaxed)) {
463 //item = nullptr;
464 item = empty_value();
465 }
466 _bottom.store(b + 1, std::memory_order_relaxed);
467 }
468 }
469 else {
470 _bottom.store(b + 1, std::memory_order_relaxed);
471 }
472
473 return item;
474}
475
476// Function: steal
477template <typename T>
480
481 int64_t t = _top.load(std::memory_order_acquire);
482 std::atomic_thread_fence(std::memory_order_seq_cst);
483 int64_t b = _bottom.load(std::memory_order_acquire);
484
485 //T item {nullptr};
486 auto item = empty_value();
487
488 if(t < b) {
489 Array* a = _array.load(std::memory_order_consume);
490 item = a->pop(t);
491 if(!_top.compare_exchange_strong(t, t+1,
492 std::memory_order_seq_cst,
493 std::memory_order_relaxed)) {
494 //return nullptr;
495 return empty_value();
496 }
497 }
498
499 return item;
500}
501
502
503// Function: capacity
504template <typename T>
505size_t UnboundedWSQ<T>::capacity() const noexcept {
506 return _array.load(std::memory_order_relaxed)->capacity();
507}
508
509template <typename T>
510typename UnboundedWSQ<T>::Array*
511UnboundedWSQ<T>::_resize_array(Array* a, int64_t b, int64_t t) {
512 Array* tmp = a->resize(b, t);
513 _garbage.push_back(a);
514 // Note: the original paper using relaxed causes t-san to complain
515 _array.store(tmp, std::memory_order_release);
516 return tmp;
517}
518
519template <typename T>
520typename UnboundedWSQ<T>::Array*
521UnboundedWSQ<T>::_resize_array(Array* a, int64_t b, int64_t t, size_t N) {
522 Array* tmp = a->resize(b, t, N);
523 _garbage.push_back(a);
524 // Note: the original paper using relaxed causes t-san to complain
525 _array.store(tmp, std::memory_order_release);
526 return tmp;
527}
528
529// ----------------------------------------------------------------------------
530// Bounded Work-stealing Queue (WSQ)
531// ----------------------------------------------------------------------------
532
558template <typename T, size_t LogSize = TF_DEFAULT_BOUNDED_TASK_QUEUE_LOG_SIZE>
560
561 constexpr static size_t BufferSize = size_t{1} << LogSize;
562 constexpr static size_t BufferMask = (BufferSize - 1);
563
564 static_assert((BufferSize >= 2) && ((BufferSize & (BufferSize - 1)) == 0));
565
566 alignas(TF_CACHELINE_SIZE) std::atomic<int64_t> _top {0};
567 alignas(TF_CACHELINE_SIZE) std::atomic<int64_t> _bottom {0};
568 alignas(TF_CACHELINE_SIZE) std::atomic<T> _buffer[BufferSize];
569
570 public:
571
588 using value_type = std::conditional_t<std::is_pointer_v<T>, T, std::optional<T>>;
589
598 BoundedWSQ() = default;
599
603 ~BoundedWSQ() = default;
604
615 bool empty() const noexcept;
616
627 size_t size() const noexcept;
628
639 constexpr size_t capacity() const;
640
663 template <typename O>
664 bool try_push(O&& item);
665
692 template <typename I>
693 size_t try_bulk_push(I& first, size_t N);
694
715
737
738
751 static constexpr auto empty_value() { return wsq_empty_value<T>(); }
752};
753
754// Function: empty
755template <typename T, size_t LogSize>
756bool BoundedWSQ<T, LogSize>::empty() const noexcept {
757 int64_t t = _top.load(std::memory_order_relaxed);
758 int64_t b = _bottom.load(std::memory_order_relaxed);
759 return b <= t;
760}
761
762// Function: size
763template <typename T, size_t LogSize>
764size_t BoundedWSQ<T, LogSize>::size() const noexcept {
765 int64_t t = _top.load(std::memory_order_relaxed);
766 int64_t b = _bottom.load(std::memory_order_relaxed);
767 return static_cast<size_t>(b >= t ? b - t : 0);
768}
769
770// Function: try_push
771template <typename T, size_t LogSize>
772template <typename O>
774
775 int64_t b = _bottom.load(std::memory_order_relaxed);
776 int64_t t = _top.load(std::memory_order_acquire);
777
778 // queue is full with one additional item (b-t+1)
779 if(static_cast<size_t>(b - t + 1) > BufferSize) [[unlikely]] {
780 return false;
781 }
782
783 _buffer[b & BufferMask].store(std::forward<O>(o), std::memory_order_relaxed);
784
785 std::atomic_thread_fence(std::memory_order_release);
786
787 // original paper uses relaxed here but tsa complains
788 _bottom.store(b + 1, std::memory_order_release);
789
790 return true;
791}
792
793// Function: try_bulk_push
794template <typename T, size_t LogSize>
795template <typename I>
796size_t BoundedWSQ<T, LogSize>::try_bulk_push(I& first, size_t N) {
797
798 if(N == 0) return 0;
799
800 int64_t b = _bottom.load(std::memory_order_relaxed);
801 int64_t t = _top.load(std::memory_order_acquire);
802
803 size_t r = BufferSize - (b - t); // remaining capacity
804 size_t n = std::min(N, r); // number of pushable elements
805
806 if(n > 0) {
807 // push n elements into the queue
808 for(size_t i=0; i<n; ++i) {
809 _buffer[b++ & BufferMask].store(*first++, std::memory_order_relaxed);
810 }
811 std::atomic_thread_fence(std::memory_order_release);
812 // original paper uses relaxed here but tsa complains
813 _bottom.store(b, std::memory_order_release);
814 }
815
816 return n;
817}
818
819// Function: pop
820template <typename T, size_t LogSize>
823
824 int64_t b = _bottom.load(std::memory_order_relaxed) - 1;
825 _bottom.store(b, std::memory_order_relaxed);
826 std::atomic_thread_fence(std::memory_order_seq_cst);
827 int64_t t = _top.load(std::memory_order_relaxed);
828
829 //T item {nullptr};
830 auto item = empty_value();
831
832 if(t <= b) {
833 item = _buffer[b & BufferMask].load(std::memory_order_relaxed);
834 if(t == b) {
835 // the last item just got stolen
836 if(!_top.compare_exchange_strong(t, t+1,
837 std::memory_order_seq_cst,
838 std::memory_order_relaxed)) {
839 //item = nullptr;
840 item = empty_value();
841 }
842 _bottom.store(b + 1, std::memory_order_relaxed);
843 }
844 }
845 else {
846 _bottom.store(b + 1, std::memory_order_relaxed);
847 }
848
849 return item;
850}
851
852// Function: steal
853template <typename T, size_t LogSize>
856 int64_t t = _top.load(std::memory_order_acquire);
857 std::atomic_thread_fence(std::memory_order_seq_cst);
858 int64_t b = _bottom.load(std::memory_order_acquire);
859
860 //T item{nullptr};
861 auto item = empty_value();
862
863 if(t < b) {
864 item = _buffer[t & BufferMask].load(std::memory_order_relaxed);
865 if(!_top.compare_exchange_strong(t, t+1,
866 std::memory_order_seq_cst,
867 std::memory_order_relaxed)) {
868 //return nullptr;
869 return empty_value();
870 }
871 }
872
873 return item;
874}
875
876
877// Function: capacity
878template <typename T, size_t LogSize>
879constexpr size_t BoundedWSQ<T, LogSize>::capacity() const {
880 return BufferSize;
881}
882
883
884} // end of namespace tf -----------------------------------------------------
size_t try_bulk_push(I &first, size_t N)
~BoundedWSQ()=default
destructs the queue
constexpr size_t capacity() const
BoundedWSQ()=default
constructs the queue with a given capacity
bool try_push(O &&item)
std::conditional_t< std::is_pointer_v< T >, T, std::optional< T > > value_type
the return type of queue operations
Definition wsq.hpp:588
size_t size() const noexcept
static constexpr auto empty_value()
Definition wsq.hpp:751
bool empty() const noexcept
queries if the queue is empty at the time of this call
Definition wsq.hpp:756
bool empty() const noexcept
queries if the queue is empty at the time of this call
Definition wsq.hpp:380
value_type pop()
pops out an item from the queue
Definition wsq.hpp:446
value_type steal()
Definition wsq.hpp:479
UnboundedWSQ(int64_t LogSize=TF_DEFAULT_UNBOUNDED_TASK_QUEUE_LOG_SIZE)
constructs the queue with the given size in the base-2 logarithm
Definition wsq.hpp:362
void push(T item)
inserts an item to the queue
Definition wsq.hpp:396
size_t size() const noexcept
Definition wsq.hpp:388
void bulk_push(I &first, size_t N)
Definition wsq.hpp:419
std::conditional_t< std::is_pointer_v< T >, T, std::optional< T > > value_type
the return type of queue operations
Definition wsq.hpp:186
size_t capacity() const noexcept
queries the capacity of the queue
Definition wsq.hpp:505
static constexpr auto empty_value()
returns the empty sentinel value for the queue element type
Definition wsq.hpp:352
~UnboundedWSQ()
destructs the queue
Definition wsq.hpp:371
taskflow namespace
Definition small_vector.hpp:20
constexpr auto wsq_empty_value()
returns the empty sentinel for work-stealing steal operations
Definition wsq.hpp:67