Loading...
Searching...
No Matches
wsq.hpp
1#pragma once
2
3#include <bit>
4
5#include "../utility/macros.hpp"
6#include "../utility/traits.hpp"
7
12
13#ifndef TF_DEFAULT_BOUNDED_TASK_QUEUE_LOG_SIZE
21 #define TF_DEFAULT_BOUNDED_TASK_QUEUE_LOG_SIZE 8
22#endif
23
24#ifndef TF_DEFAULT_UNBOUNDED_TASK_QUEUE_LOG_SIZE
32 #define TF_DEFAULT_UNBOUNDED_TASK_QUEUE_LOG_SIZE 10
33#endif
34
35namespace tf {
36
37// ----------------------------------------------------------------------------
38// Unbounded Work-stealing Queue (WSQ)
39// ----------------------------------------------------------------------------
40
41
66template <typename T>
68
69 struct Array {
70
71 size_t C;
72 size_t M;
73 std::atomic<T>* S;
74
75 explicit Array(size_t c) :
76 C {c},
77 M {c-1},
78 S {new std::atomic<T>[C]} {
79 }
80
81 ~Array() {
82 delete [] S;
83 }
84
85 size_t capacity() const noexcept {
86 return C;
87 }
88
89 void push(int64_t i, T o) noexcept {
90 S[i & M].store(o, std::memory_order_relaxed);
91 }
92
93 T pop(int64_t i) noexcept {
94 return S[i & M].load(std::memory_order_relaxed);
95 }
96
97 Array* resize(int64_t b, int64_t t) {
98 Array* ptr = new Array {2*C};
99 for(int64_t i=t; i!=b; ++i) {
100 ptr->push(i, pop(i));
101 }
102 return ptr;
103 }
104
105 Array* resize(int64_t b, int64_t t, size_t N) {
106 // assert(N>0);
107 Array* ptr = new Array {std::bit_ceil(C + N)};
108 for(int64_t i=t; i!=b; ++i) {
109 ptr->push(i, pop(i));
110 }
111 return ptr;
112 }
113
114 };
115
116 alignas(TF_CACHELINE_SIZE) std::atomic<int64_t> _top;
117 alignas(TF_CACHELINE_SIZE) std::atomic<int64_t> _bottom;
118 std::atomic<Array*> _array;
119 std::vector<Array*> _garbage;
120
121 public:
122
139 using value_type = std::conditional_t<std::is_pointer_v<T>, T, std::optional<T>>;
140
151 explicit UnboundedWSQ(int64_t LogSize = TF_DEFAULT_UNBOUNDED_TASK_QUEUE_LOG_SIZE);
152
157
168 bool empty() const noexcept;
169
180 size_t size() const noexcept;
181
195 size_t capacity() const noexcept;
196
217 void push(T item);
218
246 template <typename I>
247 void bulk_push(I& first, size_t N);
248
270
292
321 value_type steal_with_feedback(size_t& num_empty_steals);
322
335 static constexpr auto empty_value() {
336 if constexpr (std::is_pointer_v<T>) {
337 return T{nullptr};
338 } else {
339 return std::optional<T>{std::nullopt};
340 }
341 }
342
343 private:
344
345 Array* _resize_array(Array* a, int64_t b, int64_t t);
346 Array* _resize_array(Array* a, int64_t b, int64_t t, size_t N);
347};
348
349// Constructor
350template <typename T>
352 _top.store(0, std::memory_order_relaxed);
353 _bottom.store(0, std::memory_order_relaxed);
354 _array.store(new Array{(size_t{1} << LogSize)}, std::memory_order_relaxed);
355 _garbage.reserve(32);
356}
357
358// Destructor
359template <typename T>
361 for(auto a : _garbage) {
362 delete a;
363 }
364 delete _array.load();
365}
366
367// Function: empty
368template <typename T>
369bool UnboundedWSQ<T>::empty() const noexcept {
370 int64_t t = _top.load(std::memory_order_relaxed);
371 int64_t b = _bottom.load(std::memory_order_relaxed);
372 return (b <= t);
373}
374
375// Function: size
376template <typename T>
377size_t UnboundedWSQ<T>::size() const noexcept {
378 int64_t t = _top.load(std::memory_order_relaxed);
379 int64_t b = _bottom.load(std::memory_order_relaxed);
380 return static_cast<size_t>(b >= t ? b - t : 0);
381}
382
383// Function: push
384template <typename T>
386
387 int64_t b = _bottom.load(std::memory_order_relaxed);
388 int64_t t = _top.load(std::memory_order_acquire);
389 Array* a = _array.load(std::memory_order_relaxed);
390
391 // queue is full with one additional item (b-t+1)
392 if (a->capacity() < static_cast<size_t>(b - t + 1)) [[unlikely]] {
393 a = _resize_array(a, b, t);
394 }
395
396 a->push(b, o);
397 std::atomic_thread_fence(std::memory_order_release);
398
399 // original paper uses relaxed here but tsa complains
400 _bottom.store(b + 1, std::memory_order_release);
401}
402
403// Function: bulk_push
404template <typename T>
405template <typename I>
406void UnboundedWSQ<T>::bulk_push(I& first, size_t N) {
407
408 if(N == 0) return;
409
410 int64_t b = _bottom.load(std::memory_order_relaxed);
411 int64_t t = _top.load(std::memory_order_acquire);
412 Array* a = _array.load(std::memory_order_relaxed);
413
414 // queue is full with N additional items
415 if ( (b - t + N) > a->capacity() ) [[unlikely]] {
416 a = _resize_array(a, b, t, N);
417 }
418
419 for(size_t i=0; i<N; ++i) {
420 a->push(b++, *first++);
421 }
422 std::atomic_thread_fence(std::memory_order_release);
423
424 // original paper uses relaxed here but tsa complains
425 _bottom.store(b, std::memory_order_release);
426}
427
428// Function: pop
429template <typename T>
432
433 int64_t b = _bottom.load(std::memory_order_relaxed) - 1;
434 Array* a = _array.load(std::memory_order_relaxed);
435 _bottom.store(b, std::memory_order_relaxed);
436 std::atomic_thread_fence(std::memory_order_seq_cst);
437 int64_t t = _top.load(std::memory_order_relaxed);
438
439 //T item {nullptr};
440 auto item = empty_value();
441
442 if(t <= b) {
443 item = a->pop(b);
444 if(t == b) {
445 // the last item just got stolen
446 if(!_top.compare_exchange_strong(t, t+1, std::memory_order_seq_cst,
447 std::memory_order_relaxed)) {
448 //item = nullptr;
449 item = empty_value();
450 }
451 _bottom.store(b + 1, std::memory_order_relaxed);
452 }
453 }
454 else {
455 _bottom.store(b + 1, std::memory_order_relaxed);
456 }
457
458 return item;
459}
460
461// Function: steal
462template <typename T>
465
466 int64_t t = _top.load(std::memory_order_acquire);
467 std::atomic_thread_fence(std::memory_order_seq_cst);
468 int64_t b = _bottom.load(std::memory_order_acquire);
469
470 //T item {nullptr};
471 auto item = empty_value();
472
473 if(t < b) {
474 Array* a = _array.load(std::memory_order_consume);
475 item = a->pop(t);
476 if(!_top.compare_exchange_strong(t, t+1,
477 std::memory_order_seq_cst,
478 std::memory_order_relaxed)) {
479 //return nullptr;
480 return empty_value();
481 }
482 }
483
484 return item;
485}
486
487// Function: steal
488template <typename T>
490UnboundedWSQ<T>::steal_with_feedback(size_t& num_empty_steals) {
491
492 int64_t t = _top.load(std::memory_order_acquire);
493 std::atomic_thread_fence(std::memory_order_seq_cst);
494 int64_t b = _bottom.load(std::memory_order_acquire);
495
496 //T item {nullptr};
497 auto item = empty_value();
498
499 if(t < b) {
500 num_empty_steals = 0;
501 Array* a = _array.load(std::memory_order_consume);
502 item = a->pop(t);
503 if(!_top.compare_exchange_strong(t, t+1,
504 std::memory_order_seq_cst,
505 std::memory_order_relaxed)) {
506 //return nullptr;
507 return empty_value();
508 }
509 }
510 else {
511 ++num_empty_steals;
512 }
513 return item;
514}
515
516// Function: capacity
517template <typename T>
518size_t UnboundedWSQ<T>::capacity() const noexcept {
519 return _array.load(std::memory_order_relaxed)->capacity();
520}
521
522template <typename T>
523typename UnboundedWSQ<T>::Array*
524UnboundedWSQ<T>::_resize_array(Array* a, int64_t b, int64_t t) {
525 Array* tmp = a->resize(b, t);
526 _garbage.push_back(a);
527 // Note: the original paper using relaxed causes t-san to complain
528 _array.store(tmp, std::memory_order_release);
529 return tmp;
530}
531
532template <typename T>
533typename UnboundedWSQ<T>::Array*
534UnboundedWSQ<T>::_resize_array(Array* a, int64_t b, int64_t t, size_t N) {
535 Array* tmp = a->resize(b, t, N);
536 _garbage.push_back(a);
537 // Note: the original paper using relaxed causes t-san to complain
538 _array.store(tmp, std::memory_order_release);
539 return tmp;
540}
541
542// ----------------------------------------------------------------------------
543// Bounded Work-stealing Queue (WSQ)
544// ----------------------------------------------------------------------------
545
571template <typename T, size_t LogSize = TF_DEFAULT_BOUNDED_TASK_QUEUE_LOG_SIZE>
573
574 constexpr static size_t BufferSize = size_t{1} << LogSize;
575 constexpr static size_t BufferMask = (BufferSize - 1);
576
577 static_assert((BufferSize >= 2) && ((BufferSize & (BufferSize - 1)) == 0));
578
579 alignas(TF_CACHELINE_SIZE) std::atomic<int64_t> _top {0};
580 alignas(TF_CACHELINE_SIZE) std::atomic<int64_t> _bottom {0};
581 alignas(TF_CACHELINE_SIZE) std::atomic<T> _buffer[BufferSize];
582
583 public:
584
601 using value_type = std::conditional_t<std::is_pointer_v<T>, T, std::optional<T>>;
602
611 BoundedWSQ() = default;
612
616 ~BoundedWSQ() = default;
617
628 bool empty() const noexcept;
629
640 size_t size() const noexcept;
641
652 constexpr size_t capacity() const;
653
676 template <typename O>
677 bool try_push(O&& item);
678
705 template <typename I>
706 size_t try_bulk_push(I& first, size_t N);
707
728
750
779 value_type steal_with_feedback(size_t& num_empty_steals);
780
793 static constexpr auto empty_value() {
794 if constexpr (std::is_pointer_v<T>) {
795 return T{nullptr};
796 } else {
797 return std::optional<T>{std::nullopt};
798 }
799 }
800};
801
802// Function: empty
803template <typename T, size_t LogSize>
804bool BoundedWSQ<T, LogSize>::empty() const noexcept {
805 int64_t t = _top.load(std::memory_order_relaxed);
806 int64_t b = _bottom.load(std::memory_order_relaxed);
807 return b <= t;
808}
809
810// Function: size
811template <typename T, size_t LogSize>
812size_t BoundedWSQ<T, LogSize>::size() const noexcept {
813 int64_t t = _top.load(std::memory_order_relaxed);
814 int64_t b = _bottom.load(std::memory_order_relaxed);
815 return static_cast<size_t>(b >= t ? b - t : 0);
816}
817
818// Function: try_push
819template <typename T, size_t LogSize>
820template <typename O>
822
823 int64_t b = _bottom.load(std::memory_order_relaxed);
824 int64_t t = _top.load(std::memory_order_acquire);
825
826 // queue is full with one additional item (b-t+1)
827 if(static_cast<size_t>(b - t + 1) > BufferSize) [[unlikely]] {
828 return false;
829 }
830
831 _buffer[b & BufferMask].store(std::forward<O>(o), std::memory_order_relaxed);
832
833 std::atomic_thread_fence(std::memory_order_release);
834
835 // original paper uses relaxed here but tsa complains
836 _bottom.store(b + 1, std::memory_order_release);
837
838 return true;
839}
840
841// Function: try_bulk_push
842template <typename T, size_t LogSize>
843template <typename I>
844size_t BoundedWSQ<T, LogSize>::try_bulk_push(I& first, size_t N) {
845
846 if(N == 0) return 0;
847
848 int64_t b = _bottom.load(std::memory_order_relaxed);
849 int64_t t = _top.load(std::memory_order_acquire);
850
851 size_t r = BufferSize - (b - t); // remaining capacity
852 size_t n = std::min(N, r); // number of pushable elements
853
854 if(n > 0) {
855 // push n elements into the queue
856 for(size_t i=0; i<n; ++i) {
857 _buffer[b++ & BufferMask].store(*first++, std::memory_order_relaxed);
858 }
859 std::atomic_thread_fence(std::memory_order_release);
860 // original paper uses relaxed here but tsa complains
861 _bottom.store(b, std::memory_order_release);
862 }
863
864 return n;
865}
866
867// Function: pop
868template <typename T, size_t LogSize>
871
872 int64_t b = _bottom.load(std::memory_order_relaxed) - 1;
873 _bottom.store(b, std::memory_order_relaxed);
874 std::atomic_thread_fence(std::memory_order_seq_cst);
875 int64_t t = _top.load(std::memory_order_relaxed);
876
877 //T item {nullptr};
878 auto item = empty_value();
879
880 if(t <= b) {
881 item = _buffer[b & BufferMask].load(std::memory_order_relaxed);
882 if(t == b) {
883 // the last item just got stolen
884 if(!_top.compare_exchange_strong(t, t+1,
885 std::memory_order_seq_cst,
886 std::memory_order_relaxed)) {
887 //item = nullptr;
888 item = empty_value();
889 }
890 _bottom.store(b + 1, std::memory_order_relaxed);
891 }
892 }
893 else {
894 _bottom.store(b + 1, std::memory_order_relaxed);
895 }
896
897 return item;
898}
899
900// Function: steal
901template <typename T, size_t LogSize>
904 int64_t t = _top.load(std::memory_order_acquire);
905 std::atomic_thread_fence(std::memory_order_seq_cst);
906 int64_t b = _bottom.load(std::memory_order_acquire);
907
908 //T item{nullptr};
909 auto item = empty_value();
910
911 if(t < b) {
912 item = _buffer[t & BufferMask].load(std::memory_order_relaxed);
913 if(!_top.compare_exchange_strong(t, t+1,
914 std::memory_order_seq_cst,
915 std::memory_order_relaxed)) {
916 //return nullptr;
917 return empty_value();
918 }
919 }
920
921 return item;
922}
923
924// Function: steal
925template <typename T, size_t LogSize>
928 int64_t t = _top.load(std::memory_order_acquire);
929 std::atomic_thread_fence(std::memory_order_seq_cst);
930 int64_t b = _bottom.load(std::memory_order_acquire);
931
932 //T item {nullptr};
933 auto item = empty_value();
934
935 if(t < b) {
936 num_empty_steals = 0;
937 item = _buffer[t & BufferMask].load(std::memory_order_relaxed);
938 if(!_top.compare_exchange_strong(t, t+1,
939 std::memory_order_seq_cst,
940 std::memory_order_relaxed)) {
941 //return nullptr;
942 return empty_value();
943 }
944 }
945 else {
946 ++num_empty_steals;
947 }
948 return item;
949}
950
951// Function: capacity
952template <typename T, size_t LogSize>
953constexpr size_t BoundedWSQ<T, LogSize>::capacity() const {
954 return BufferSize;
955}
956
957
958} // end of namespace tf -----------------------------------------------------
959
960
961
value_type pop()
Definition wsq.hpp:870
size_t try_bulk_push(I &first, size_t N)
Definition wsq.hpp:844
~BoundedWSQ()=default
destructs the queue
value_type steal()
Definition wsq.hpp:903
constexpr size_t capacity() const
Definition wsq.hpp:953
BoundedWSQ()=default
constructs the queue with a given capacity
bool try_push(O &&item)
Definition wsq.hpp:821
value_type steal_with_feedback(size_t &num_empty_steals)
Definition wsq.hpp:927
std::conditional_t< std::is_pointer_v< T >, T, std::optional< T > > value_type
the return type of queue operations
Definition wsq.hpp:601
size_t size() const noexcept
Definition wsq.hpp:812
static constexpr auto empty_value()
returns the empty sentinel value for the queue element type
Definition wsq.hpp:793
bool empty() const noexcept
queries if the queue is empty at the time of this call
Definition wsq.hpp:804
bool empty() const noexcept
queries if the queue is empty at the time of this call
Definition wsq.hpp:369
value_type pop()
pops out an item from the queue
Definition wsq.hpp:431
value_type steal()
Definition wsq.hpp:464
UnboundedWSQ(int64_t LogSize=TF_DEFAULT_UNBOUNDED_TASK_QUEUE_LOG_SIZE)
constructs the queue with the given size in the base-2 logarithm
Definition wsq.hpp:351
void push(T item)
inserts an item to the queue
Definition wsq.hpp:385
size_t size() const noexcept
Definition wsq.hpp:377
void bulk_push(I &first, size_t N)
Definition wsq.hpp:406
std::conditional_t< std::is_pointer_v< T >, T, std::optional< T > > value_type
the return type of queue operations
Definition wsq.hpp:139
size_t capacity() const noexcept
queries the capacity of the queue
Definition wsq.hpp:518
value_type steal_with_feedback(size_t &num_empty_steals)
Definition wsq.hpp:490
static constexpr auto empty_value()
returns the empty sentinel value for the queue element type
Definition wsq.hpp:335
~UnboundedWSQ()
destructs the queue
Definition wsq.hpp:360
taskflow namespace
Definition small_vector.hpp:20