5#include "../utility/macros.hpp"
6#include "../utility/traits.hpp"
13#ifndef TF_DEFAULT_BOUNDED_TASK_QUEUE_LOG_SIZE
21 #define TF_DEFAULT_BOUNDED_TASK_QUEUE_LOG_SIZE 8
24#ifndef TF_DEFAULT_UNBOUNDED_TASK_QUEUE_LOG_SIZE
32 #define TF_DEFAULT_UNBOUNDED_TASK_QUEUE_LOG_SIZE 10
68 if constexpr (std::is_pointer_v<T>) {
71 return std::optional<T>{std::nullopt};
113 explicit Array(
size_t c) :
116 S {
new std::atomic<T>[C]} {
127 void push(int64_t i, T o)
noexcept {
128 S[i & M].store(o, std::memory_order_relaxed);
131 T
pop(int64_t i)
noexcept {
132 return S[i & M].load(std::memory_order_relaxed);
135 Array* resize(int64_t b, int64_t t) {
136 Array* ptr =
new Array(2*C);
137 for(int64_t i=t; i!=b; ++i) {
138 ptr->push(i,
pop(i));
143 Array* resize(int64_t b, int64_t t,
size_t N) {
145 Array* ptr =
new Array(std::bit_ceil(C + N));
146 for(int64_t i=t; i!=b; ++i) {
147 ptr->push(i,
pop(i));
154 alignas(TF_CACHELINE_SIZE) std::atomic<int64_t> _top;
155 alignas(TF_CACHELINE_SIZE) std::atomic<int64_t> _bottom;
161 int64_t _cached_top {0};
165 alignas(TF_CACHELINE_SIZE) std::atomic<Array*> _array;
166 std::vector<Array*> _garbage;
186 using value_type = std::conditional_t<std::is_pointer_v<T>, T, std::optional<T>>;
198 explicit UnboundedWSQ(int64_t LogSize = TF_DEFAULT_UNBOUNDED_TASK_QUEUE_LOG_SIZE);
293 template <typename I>
356 Array* _resize_array(Array* a, int64_t b, int64_t t);
357 Array* _resize_array(Array* a, int64_t b, int64_t t,
size_t N);
363 _top.store(0, std::memory_order_relaxed);
364 _bottom.store(0, std::memory_order_relaxed);
365 _array.store(
new Array{(
size_t{1} << LogSize)}, std::memory_order_relaxed);
366 _garbage.reserve(32);
372 for(
auto a : _garbage) {
375 delete _array.load();
381 int64_t t = _top.load(std::memory_order_relaxed);
382 int64_t b = _bottom.load(std::memory_order_relaxed);
389 int64_t t = _top.load(std::memory_order_relaxed);
390 int64_t b = _bottom.load(std::memory_order_relaxed);
391 return static_cast<size_t>(b >= t ? b - t : 0);
398 int64_t b = _bottom.load(std::memory_order_relaxed);
399 Array* a = _array.load(std::memory_order_relaxed);
402 if(a->capacity() <
static_cast<size_t>(b - _cached_top + 1)) [[unlikely]] {
403 _cached_top = _top.load(std::memory_order_acquire);
404 if(a->capacity() <
static_cast<size_t>(b - _cached_top + 1)) [[unlikely]] {
405 a = _resize_array(a, b, _cached_top);
410 std::atomic_thread_fence(std::memory_order_release);
413 _bottom.store(b + 1, std::memory_order_release);
423 int64_t b = _bottom.load(std::memory_order_relaxed);
424 Array* a = _array.load(std::memory_order_relaxed);
427 if((b - _cached_top + N) > a->capacity()) [[unlikely]] {
428 _cached_top = _top.load(std::memory_order_acquire);
429 if((b - _cached_top + N) > a->capacity()) [[unlikely]] {
430 a = _resize_array(a, b, _cached_top, N);
434 for(
size_t i=0; i<N; ++i) {
435 a->push(b++, *first++);
437 std::atomic_thread_fence(std::memory_order_release);
440 _bottom.store(b, std::memory_order_release);
448 int64_t b = _bottom.load(std::memory_order_relaxed) - 1;
449 Array* a = _array.load(std::memory_order_relaxed);
450 _bottom.store(b, std::memory_order_relaxed);
451 std::atomic_thread_fence(std::memory_order_seq_cst);
452 int64_t t = _top.load(std::memory_order_relaxed);
461 if(!_top.compare_exchange_strong(t, t+1, std::memory_order_seq_cst,
462 std::memory_order_relaxed)) {
466 _bottom.store(b + 1, std::memory_order_relaxed);
470 _bottom.store(b + 1, std::memory_order_relaxed);
481 int64_t t = _top.load(std::memory_order_acquire);
482 std::atomic_thread_fence(std::memory_order_seq_cst);
483 int64_t b = _bottom.load(std::memory_order_acquire);
489 Array* a = _array.load(std::memory_order_consume);
491 if(!_top.compare_exchange_strong(t, t+1,
492 std::memory_order_seq_cst,
493 std::memory_order_relaxed)) {
506 return _array.load(std::memory_order_relaxed)->capacity();
510typename UnboundedWSQ<T>::Array*
511UnboundedWSQ<T>::_resize_array(Array* a, int64_t b, int64_t t) {
512 Array* tmp = a->resize(b, t);
513 _garbage.push_back(a);
515 _array.store(tmp, std::memory_order_release);
520typename UnboundedWSQ<T>::Array*
521UnboundedWSQ<T>::_resize_array(Array* a, int64_t b, int64_t t,
size_t N) {
522 Array* tmp = a->resize(b, t, N);
523 _garbage.push_back(a);
525 _array.store(tmp, std::memory_order_release);
558template <
typename T,
size_t LogSize = TF_DEFAULT_BOUNDED_TASK_QUEUE_LOG_SIZE>
561 constexpr static size_t BufferSize =
size_t{1} << LogSize;
562 constexpr static size_t BufferMask = (BufferSize - 1);
564 static_assert((BufferSize >= 2) && ((BufferSize & (BufferSize - 1)) == 0));
566 alignas(TF_CACHELINE_SIZE) std::atomic<int64_t> _top {0};
567 alignas(TF_CACHELINE_SIZE) std::atomic<int64_t> _bottom {0};
568 alignas(TF_CACHELINE_SIZE) std::atomic<T> _buffer[BufferSize];
588 using value_type = std::conditional_t<std::is_pointer_v<T>, T, std::optional<T>>;
663 template <typename O>
692 template <typename I>
755template <
typename T,
size_t LogSize>
757 int64_t t = _top.load(std::memory_order_relaxed);
758 int64_t b = _bottom.load(std::memory_order_relaxed);
763template <
typename T,
size_t LogSize>
765 int64_t t = _top.load(std::memory_order_relaxed);
766 int64_t b = _bottom.load(std::memory_order_relaxed);
767 return static_cast<size_t>(b >= t ? b - t : 0);
771template <
typename T,
size_t LogSize>
775 int64_t b = _bottom.load(std::memory_order_relaxed);
776 int64_t t = _top.load(std::memory_order_acquire);
779 if(
static_cast<size_t>(b - t + 1) > BufferSize) [[unlikely]] {
783 _buffer[b & BufferMask].store(std::forward<O>(o), std::memory_order_relaxed);
785 std::atomic_thread_fence(std::memory_order_release);
788 _bottom.store(b + 1, std::memory_order_release);
794template <
typename T,
size_t LogSize>
800 int64_t b = _bottom.load(std::memory_order_relaxed);
801 int64_t t = _top.load(std::memory_order_acquire);
803 size_t r = BufferSize - (b - t);
804 size_t n = std::min(N, r);
808 for(
size_t i=0; i<n; ++i) {
809 _buffer[b++ & BufferMask].store(*first++, std::memory_order_relaxed);
811 std::atomic_thread_fence(std::memory_order_release);
813 _bottom.store(b, std::memory_order_release);
820template <
typename T,
size_t LogSize>
824 int64_t b = _bottom.load(std::memory_order_relaxed) - 1;
825 _bottom.store(b, std::memory_order_relaxed);
826 std::atomic_thread_fence(std::memory_order_seq_cst);
827 int64_t t = _top.load(std::memory_order_relaxed);
833 item = _buffer[b & BufferMask].load(std::memory_order_relaxed);
836 if(!_top.compare_exchange_strong(t, t+1,
837 std::memory_order_seq_cst,
838 std::memory_order_relaxed)) {
842 _bottom.store(b + 1, std::memory_order_relaxed);
846 _bottom.store(b + 1, std::memory_order_relaxed);
853template <
typename T,
size_t LogSize>
856 int64_t t = _top.load(std::memory_order_acquire);
857 std::atomic_thread_fence(std::memory_order_seq_cst);
858 int64_t b = _bottom.load(std::memory_order_acquire);
864 item = _buffer[t & BufferMask].load(std::memory_order_relaxed);
865 if(!_top.compare_exchange_strong(t, t+1,
866 std::memory_order_seq_cst,
867 std::memory_order_relaxed)) {
878template <
typename T,
size_t LogSize>
size_t try_bulk_push(I &first, size_t N)
~BoundedWSQ()=default
destructs the queue
constexpr size_t capacity() const
BoundedWSQ()=default
constructs the queue with a given capacity
std::conditional_t< std::is_pointer_v< T >, T, std::optional< T > > value_type
the return type of queue operations
Definition wsq.hpp:588
size_t size() const noexcept
static constexpr auto empty_value()
Definition wsq.hpp:751
bool empty() const noexcept
queries if the queue is empty at the time of this call
Definition wsq.hpp:756
bool empty() const noexcept
queries if the queue is empty at the time of this call
Definition wsq.hpp:380
value_type pop()
pops out an item from the queue
Definition wsq.hpp:446
value_type steal()
Definition wsq.hpp:479
UnboundedWSQ(int64_t LogSize=TF_DEFAULT_UNBOUNDED_TASK_QUEUE_LOG_SIZE)
constructs the queue with the given size in the base-2 logarithm
Definition wsq.hpp:362
void push(T item)
inserts an item to the queue
Definition wsq.hpp:396
size_t size() const noexcept
Definition wsq.hpp:388
void bulk_push(I &first, size_t N)
Definition wsq.hpp:419
std::conditional_t< std::is_pointer_v< T >, T, std::optional< T > > value_type
the return type of queue operations
Definition wsq.hpp:186
size_t capacity() const noexcept
queries the capacity of the queue
Definition wsq.hpp:505
static constexpr auto empty_value()
returns the empty sentinel value for the queue element type
Definition wsq.hpp:352
~UnboundedWSQ()
destructs the queue
Definition wsq.hpp:371
taskflow namespace
Definition small_vector.hpp:20
constexpr auto wsq_empty_value()
returns the empty sentinel for work-stealing steal operations
Definition wsq.hpp:67