5#include "../utility/macros.hpp"
6#include "../utility/traits.hpp"
13#ifndef TF_DEFAULT_BOUNDED_TASK_QUEUE_LOG_SIZE
21 #define TF_DEFAULT_BOUNDED_TASK_QUEUE_LOG_SIZE 8
24#ifndef TF_DEFAULT_UNBOUNDED_TASK_QUEUE_LOG_SIZE
32 #define TF_DEFAULT_UNBOUNDED_TASK_QUEUE_LOG_SIZE 10
71 if constexpr (std::is_pointer_v<T>) {
74 return std::optional<T>{std::nullopt};
91 if constexpr (std::is_pointer_v<T>) {
92 return reinterpret_cast<T
>(uintptr_t{1});
94 return std::optional<T>{std::nullopt};
136 explicit Array(
size_t c) :
139 S {
new std::atomic<T>[C]} {
150 void push(int64_t i, T o)
noexcept {
151 S[i & M].store(o, std::memory_order_relaxed);
154 T
pop(int64_t i)
noexcept {
155 return S[i & M].load(std::memory_order_relaxed);
158 Array* resize(int64_t b, int64_t t) {
159 Array* ptr =
new Array {2*C};
160 for(int64_t i=t; i!=b; ++i) {
161 ptr->push(i,
pop(i));
166 Array* resize(int64_t b, int64_t t,
size_t N) {
168 Array* ptr =
new Array {std::bit_ceil(C + N)};
169 for(int64_t i=t; i!=b; ++i) {
170 ptr->push(i,
pop(i));
177 alignas(TF_CACHELINE_SIZE) std::atomic<int64_t> _top;
178 alignas(TF_CACHELINE_SIZE) std::atomic<int64_t> _bottom;
179 std::atomic<Array*> _array;
180 std::vector<Array*> _garbage;
200 using value_type = std::conditional_t<std::is_pointer_v<T>, T, std::optional<T>>;
212 explicit UnboundedWSQ(int64_t LogSize = TF_DEFAULT_UNBOUNDED_TASK_QUEUE_LOG_SIZE);
307 template <typename I>
430 Array* _resize_array(Array* a, int64_t b, int64_t t);
431 Array* _resize_array(Array* a, int64_t b, int64_t t,
size_t N);
437 _top.store(0, std::memory_order_relaxed);
438 _bottom.store(0, std::memory_order_relaxed);
439 _array.store(
new Array{(
size_t{1} << LogSize)}, std::memory_order_relaxed);
440 _garbage.reserve(32);
446 for(
auto a : _garbage) {
449 delete _array.load();
455 int64_t t = _top.load(std::memory_order_relaxed);
456 int64_t b = _bottom.load(std::memory_order_relaxed);
463 int64_t t = _top.load(std::memory_order_relaxed);
464 int64_t b = _bottom.load(std::memory_order_relaxed);
465 return static_cast<size_t>(b >= t ? b - t : 0);
472 int64_t b = _bottom.load(std::memory_order_relaxed);
473 int64_t t = _top.load(std::memory_order_acquire);
474 Array* a = _array.load(std::memory_order_relaxed);
477 if (a->capacity() <
static_cast<size_t>(b - t + 1)) [[unlikely]] {
478 a = _resize_array(a, b, t);
482 std::atomic_thread_fence(std::memory_order_release);
485 _bottom.store(b + 1, std::memory_order_release);
495 int64_t b = _bottom.load(std::memory_order_relaxed);
496 int64_t t = _top.load(std::memory_order_acquire);
497 Array* a = _array.load(std::memory_order_relaxed);
500 if ( (b - t + N) > a->capacity() ) [[unlikely]] {
501 a = _resize_array(a, b, t, N);
504 for(
size_t i=0; i<N; ++i) {
505 a->push(b++, *first++);
507 std::atomic_thread_fence(std::memory_order_release);
510 _bottom.store(b, std::memory_order_release);
518 int64_t b = _bottom.load(std::memory_order_relaxed) - 1;
519 Array* a = _array.load(std::memory_order_relaxed);
520 _bottom.store(b, std::memory_order_relaxed);
521 std::atomic_thread_fence(std::memory_order_seq_cst);
522 int64_t t = _top.load(std::memory_order_relaxed);
531 if(!_top.compare_exchange_strong(t, t+1, std::memory_order_seq_cst,
532 std::memory_order_relaxed)) {
536 _bottom.store(b + 1, std::memory_order_relaxed);
540 _bottom.store(b + 1, std::memory_order_relaxed);
551 int64_t t = _top.load(std::memory_order_acquire);
552 std::atomic_thread_fence(std::memory_order_seq_cst);
553 int64_t b = _bottom.load(std::memory_order_acquire);
559 Array* a = _array.load(std::memory_order_consume);
561 if(!_top.compare_exchange_strong(t, t+1,
562 std::memory_order_seq_cst,
563 std::memory_order_relaxed)) {
578 int64_t t = _top.load(std::memory_order_acquire);
579 std::atomic_thread_fence(std::memory_order_seq_cst);
580 int64_t b = _bottom.load(std::memory_order_acquire);
584 Array* a = _array.load(std::memory_order_consume);
585 auto item = a->pop(t);
586 if(!_top.compare_exchange_strong(t, t+1,
587 std::memory_order_seq_cst,
588 std::memory_order_relaxed)) {
603 return _array.load(std::memory_order_relaxed)->capacity();
607typename UnboundedWSQ<T>::Array*
608UnboundedWSQ<T>::_resize_array(Array* a, int64_t b, int64_t t) {
609 Array* tmp = a->resize(b, t);
610 _garbage.push_back(a);
612 _array.store(tmp, std::memory_order_release);
617typename UnboundedWSQ<T>::Array*
618UnboundedWSQ<T>::_resize_array(Array* a, int64_t b, int64_t t,
size_t N) {
619 Array* tmp = a->resize(b, t, N);
620 _garbage.push_back(a);
622 _array.store(tmp, std::memory_order_release);
655template <
typename T,
size_t LogSize = TF_DEFAULT_BOUNDED_TASK_QUEUE_LOG_SIZE>
658 constexpr static size_t BufferSize =
size_t{1} << LogSize;
659 constexpr static size_t BufferMask = (BufferSize - 1);
661 static_assert((BufferSize >= 2) && ((BufferSize & (BufferSize - 1)) == 0));
663 alignas(TF_CACHELINE_SIZE) std::atomic<int64_t> _top {0};
664 alignas(TF_CACHELINE_SIZE) std::atomic<int64_t> _bottom {0};
665 alignas(TF_CACHELINE_SIZE) std::atomic<T> _buffer[BufferSize];
685 using value_type = std::conditional_t<std::is_pointer_v<T>, T, std::optional<T>>;
760 template <typename O>
789 template <typename I>
911template <
typename T,
size_t LogSize>
913 int64_t t = _top.load(std::memory_order_relaxed);
914 int64_t b = _bottom.load(std::memory_order_relaxed);
919template <
typename T,
size_t LogSize>
921 int64_t t = _top.load(std::memory_order_relaxed);
922 int64_t b = _bottom.load(std::memory_order_relaxed);
923 return static_cast<size_t>(b >= t ? b - t : 0);
927template <
typename T,
size_t LogSize>
931 int64_t b = _bottom.load(std::memory_order_relaxed);
932 int64_t t = _top.load(std::memory_order_acquire);
935 if(
static_cast<size_t>(b - t + 1) > BufferSize) [[unlikely]] {
939 _buffer[b & BufferMask].store(std::forward<O>(o), std::memory_order_relaxed);
941 std::atomic_thread_fence(std::memory_order_release);
944 _bottom.store(b + 1, std::memory_order_release);
950template <
typename T,
size_t LogSize>
956 int64_t b = _bottom.load(std::memory_order_relaxed);
957 int64_t t = _top.load(std::memory_order_acquire);
959 size_t r = BufferSize - (b - t);
960 size_t n = std::min(N, r);
964 for(
size_t i=0; i<n; ++i) {
965 _buffer[b++ & BufferMask].store(*first++, std::memory_order_relaxed);
967 std::atomic_thread_fence(std::memory_order_release);
969 _bottom.store(b, std::memory_order_release);
976template <
typename T,
size_t LogSize>
980 int64_t b = _bottom.load(std::memory_order_relaxed) - 1;
981 _bottom.store(b, std::memory_order_relaxed);
982 std::atomic_thread_fence(std::memory_order_seq_cst);
983 int64_t t = _top.load(std::memory_order_relaxed);
989 item = _buffer[b & BufferMask].load(std::memory_order_relaxed);
992 if(!_top.compare_exchange_strong(t, t+1,
993 std::memory_order_seq_cst,
994 std::memory_order_relaxed)) {
998 _bottom.store(b + 1, std::memory_order_relaxed);
1002 _bottom.store(b + 1, std::memory_order_relaxed);
1009template <
typename T,
size_t LogSize>
1012 int64_t t = _top.load(std::memory_order_acquire);
1013 std::atomic_thread_fence(std::memory_order_seq_cst);
1014 int64_t b = _bottom.load(std::memory_order_acquire);
1020 item = _buffer[t & BufferMask].load(std::memory_order_relaxed);
1021 if(!_top.compare_exchange_strong(t, t+1,
1022 std::memory_order_seq_cst,
1023 std::memory_order_relaxed)) {
1034template <
typename T,
size_t LogSize>
1038 int64_t t = _top.load(std::memory_order_acquire);
1039 std::atomic_thread_fence(std::memory_order_seq_cst);
1040 int64_t b = _bottom.load(std::memory_order_acquire);
1044 auto item = _buffer[t & BufferMask].load(std::memory_order_relaxed);
1045 if(!_top.compare_exchange_strong(t, t+1,
1046 std::memory_order_seq_cst,
1047 std::memory_order_relaxed)) {
1060template <
typename T,
size_t LogSize>
size_t try_bulk_push(I &first, size_t N)
~BoundedWSQ()=default
destructs the queue
value_type steal_with_feedback()
constexpr size_t capacity() const
BoundedWSQ()=default
constructs the queue with a given capacity
std::conditional_t< std::is_pointer_v< T >, T, std::optional< T > > value_type
the return type of queue operations
Definition wsq.hpp:685
size_t size() const noexcept
static constexpr auto empty_value()
Definition wsq.hpp:895
static auto contended_value()
returns the contended sentinel value for pointer element types
Definition wsq.hpp:907
bool empty() const noexcept
queries if the queue is empty at the time of this call
Definition wsq.hpp:912
bool empty() const noexcept
queries if the queue is empty at the time of this call
Definition wsq.hpp:454
value_type pop()
pops out an item from the queue
Definition wsq.hpp:516
value_type steal()
Definition wsq.hpp:549
UnboundedWSQ(int64_t LogSize=TF_DEFAULT_UNBOUNDED_TASK_QUEUE_LOG_SIZE)
constructs the queue with the given size in the base-2 logarithm
Definition wsq.hpp:436
void push(T item)
inserts an item to the queue
Definition wsq.hpp:470
size_t size() const noexcept
Definition wsq.hpp:462
void bulk_push(I &first, size_t N)
Definition wsq.hpp:491
std::conditional_t< std::is_pointer_v< T >, T, std::optional< T > > value_type
the return type of queue operations
Definition wsq.hpp:200
size_t capacity() const noexcept
queries the capacity of the queue
Definition wsq.hpp:602
static auto contended_value()
returns the contended sentinel value for pointer element types
Definition wsq.hpp:426
static constexpr auto empty_value()
returns the empty sentinel value for the queue element type
Definition wsq.hpp:414
~UnboundedWSQ()
destructs the queue
Definition wsq.hpp:445
value_type steal_with_feedback()
Definition wsq.hpp:576
taskflow namespace
Definition small_vector.hpp:20
constexpr auto wsq_empty_value()
returns the empty sentinel for work-stealing steal operations
Definition wsq.hpp:70
auto wsq_contended_value()
returns the contended sentinel for work-stealing steal operations
Definition wsq.hpp:90