5#include "../utility/macros.hpp"
6#include "../utility/traits.hpp"
13#ifndef TF_DEFAULT_BOUNDED_TASK_QUEUE_LOG_SIZE
21 #define TF_DEFAULT_BOUNDED_TASK_QUEUE_LOG_SIZE 8
24#ifndef TF_DEFAULT_UNBOUNDED_TASK_QUEUE_LOG_SIZE
32 #define TF_DEFAULT_UNBOUNDED_TASK_QUEUE_LOG_SIZE 10
75 explicit Array(
size_t c) :
78 S {
new std::atomic<T>[C]} {
89 void push(int64_t i, T o)
noexcept {
90 S[i & M].store(o, std::memory_order_relaxed);
93 T
pop(int64_t i)
noexcept {
94 return S[i & M].load(std::memory_order_relaxed);
97 Array* resize(int64_t b, int64_t t) {
98 Array* ptr =
new Array {2*C};
99 for(int64_t i=t; i!=b; ++i) {
100 ptr->push(i,
pop(i));
105 Array* resize(int64_t b, int64_t t,
size_t N) {
107 Array* ptr =
new Array {std::bit_ceil(C + N)};
108 for(int64_t i=t; i!=b; ++i) {
109 ptr->push(i,
pop(i));
116 alignas(TF_CACHELINE_SIZE) std::atomic<int64_t> _top;
117 alignas(TF_CACHELINE_SIZE) std::atomic<int64_t> _bottom;
118 std::atomic<Array*> _array;
119 std::vector<Array*> _garbage;
139 using value_type = std::conditional_t<std::is_pointer_v<T>, T, std::optional<T>>;
151 explicit UnboundedWSQ(int64_t LogSize = TF_DEFAULT_UNBOUNDED_TASK_QUEUE_LOG_SIZE);
246 template <typename I>
336 if constexpr (std::is_pointer_v<T>) {
339 return std::optional<T>{std::nullopt};
345 Array* _resize_array(Array* a, int64_t b, int64_t t);
346 Array* _resize_array(Array* a, int64_t b, int64_t t,
size_t N);
352 _top.store(0, std::memory_order_relaxed);
353 _bottom.store(0, std::memory_order_relaxed);
354 _array.store(
new Array{(
size_t{1} << LogSize)}, std::memory_order_relaxed);
355 _garbage.reserve(32);
361 for(
auto a : _garbage) {
364 delete _array.load();
370 int64_t t = _top.load(std::memory_order_relaxed);
371 int64_t b = _bottom.load(std::memory_order_relaxed);
378 int64_t t = _top.load(std::memory_order_relaxed);
379 int64_t b = _bottom.load(std::memory_order_relaxed);
380 return static_cast<size_t>(b >= t ? b - t : 0);
387 int64_t b = _bottom.load(std::memory_order_relaxed);
388 int64_t t = _top.load(std::memory_order_acquire);
389 Array* a = _array.load(std::memory_order_relaxed);
392 if (a->capacity() <
static_cast<size_t>(b - t + 1)) [[unlikely]] {
393 a = _resize_array(a, b, t);
397 std::atomic_thread_fence(std::memory_order_release);
400 _bottom.store(b + 1, std::memory_order_release);
410 int64_t b = _bottom.load(std::memory_order_relaxed);
411 int64_t t = _top.load(std::memory_order_acquire);
412 Array* a = _array.load(std::memory_order_relaxed);
415 if ( (b - t + N) > a->capacity() ) [[unlikely]] {
416 a = _resize_array(a, b, t, N);
419 for(
size_t i=0; i<N; ++i) {
420 a->push(b++, *first++);
422 std::atomic_thread_fence(std::memory_order_release);
425 _bottom.store(b, std::memory_order_release);
433 int64_t b = _bottom.load(std::memory_order_relaxed) - 1;
434 Array* a = _array.load(std::memory_order_relaxed);
435 _bottom.store(b, std::memory_order_relaxed);
436 std::atomic_thread_fence(std::memory_order_seq_cst);
437 int64_t t = _top.load(std::memory_order_relaxed);
446 if(!_top.compare_exchange_strong(t, t+1, std::memory_order_seq_cst,
447 std::memory_order_relaxed)) {
451 _bottom.store(b + 1, std::memory_order_relaxed);
455 _bottom.store(b + 1, std::memory_order_relaxed);
466 int64_t t = _top.load(std::memory_order_acquire);
467 std::atomic_thread_fence(std::memory_order_seq_cst);
468 int64_t b = _bottom.load(std::memory_order_acquire);
474 Array* a = _array.load(std::memory_order_consume);
476 if(!_top.compare_exchange_strong(t, t+1,
477 std::memory_order_seq_cst,
478 std::memory_order_relaxed)) {
492 int64_t t = _top.load(std::memory_order_acquire);
493 std::atomic_thread_fence(std::memory_order_seq_cst);
494 int64_t b = _bottom.load(std::memory_order_acquire);
500 num_empty_steals = 0;
501 Array* a = _array.load(std::memory_order_consume);
503 if(!_top.compare_exchange_strong(t, t+1,
504 std::memory_order_seq_cst,
505 std::memory_order_relaxed)) {
519 return _array.load(std::memory_order_relaxed)->capacity();
523typename UnboundedWSQ<T>::Array*
524UnboundedWSQ<T>::_resize_array(Array* a, int64_t b, int64_t t) {
525 Array* tmp = a->resize(b, t);
526 _garbage.push_back(a);
528 _array.store(tmp, std::memory_order_release);
533typename UnboundedWSQ<T>::Array*
534UnboundedWSQ<T>::_resize_array(Array* a, int64_t b, int64_t t,
size_t N) {
535 Array* tmp = a->resize(b, t, N);
536 _garbage.push_back(a);
538 _array.store(tmp, std::memory_order_release);
571template <
typename T,
size_t LogSize = TF_DEFAULT_BOUNDED_TASK_QUEUE_LOG_SIZE>
574 constexpr static size_t BufferSize =
size_t{1} << LogSize;
575 constexpr static size_t BufferMask = (BufferSize - 1);
577 static_assert((BufferSize >= 2) && ((BufferSize & (BufferSize - 1)) == 0));
579 alignas(TF_CACHELINE_SIZE) std::atomic<int64_t> _top {0};
580 alignas(TF_CACHELINE_SIZE) std::atomic<int64_t> _bottom {0};
581 alignas(TF_CACHELINE_SIZE) std::atomic<T> _buffer[BufferSize];
601 using value_type = std::conditional_t<std::is_pointer_v<T>, T, std::optional<T>>;
676 template <typename O>
705 template <typename I>
794 if constexpr (std::is_pointer_v<T>) {
797 return std::optional<T>{std::nullopt};
803template <
typename T,
size_t LogSize>
805 int64_t t = _top.load(std::memory_order_relaxed);
806 int64_t b = _bottom.load(std::memory_order_relaxed);
811template <
typename T,
size_t LogSize>
813 int64_t t = _top.load(std::memory_order_relaxed);
814 int64_t b = _bottom.load(std::memory_order_relaxed);
815 return static_cast<size_t>(b >= t ? b - t : 0);
819template <
typename T,
size_t LogSize>
823 int64_t b = _bottom.load(std::memory_order_relaxed);
824 int64_t t = _top.load(std::memory_order_acquire);
827 if(
static_cast<size_t>(b - t + 1) > BufferSize) [[unlikely]] {
831 _buffer[b & BufferMask].store(std::forward<O>(o), std::memory_order_relaxed);
833 std::atomic_thread_fence(std::memory_order_release);
836 _bottom.store(b + 1, std::memory_order_release);
842template <
typename T,
size_t LogSize>
848 int64_t b = _bottom.load(std::memory_order_relaxed);
849 int64_t t = _top.load(std::memory_order_acquire);
851 size_t r = BufferSize - (b - t);
852 size_t n = std::min(N, r);
856 for(
size_t i=0; i<n; ++i) {
857 _buffer[b++ & BufferMask].store(*first++, std::memory_order_relaxed);
859 std::atomic_thread_fence(std::memory_order_release);
861 _bottom.store(b, std::memory_order_release);
868template <
typename T,
size_t LogSize>
872 int64_t b = _bottom.load(std::memory_order_relaxed) - 1;
873 _bottom.store(b, std::memory_order_relaxed);
874 std::atomic_thread_fence(std::memory_order_seq_cst);
875 int64_t t = _top.load(std::memory_order_relaxed);
881 item = _buffer[b & BufferMask].load(std::memory_order_relaxed);
884 if(!_top.compare_exchange_strong(t, t+1,
885 std::memory_order_seq_cst,
886 std::memory_order_relaxed)) {
890 _bottom.store(b + 1, std::memory_order_relaxed);
894 _bottom.store(b + 1, std::memory_order_relaxed);
901template <
typename T,
size_t LogSize>
904 int64_t t = _top.load(std::memory_order_acquire);
905 std::atomic_thread_fence(std::memory_order_seq_cst);
906 int64_t b = _bottom.load(std::memory_order_acquire);
912 item = _buffer[t & BufferMask].load(std::memory_order_relaxed);
913 if(!_top.compare_exchange_strong(t, t+1,
914 std::memory_order_seq_cst,
915 std::memory_order_relaxed)) {
925template <
typename T,
size_t LogSize>
928 int64_t t = _top.load(std::memory_order_acquire);
929 std::atomic_thread_fence(std::memory_order_seq_cst);
930 int64_t b = _bottom.load(std::memory_order_acquire);
936 num_empty_steals = 0;
937 item = _buffer[t & BufferMask].load(std::memory_order_relaxed);
938 if(!_top.compare_exchange_strong(t, t+1,
939 std::memory_order_seq_cst,
940 std::memory_order_relaxed)) {
952template <
typename T,
size_t LogSize>
value_type pop()
Definition wsq.hpp:870
size_t try_bulk_push(I &first, size_t N)
Definition wsq.hpp:844
~BoundedWSQ()=default
destructs the queue
value_type steal()
Definition wsq.hpp:903
constexpr size_t capacity() const
Definition wsq.hpp:953
BoundedWSQ()=default
constructs the queue with a given capacity
bool try_push(O &&item)
Definition wsq.hpp:821
value_type steal_with_feedback(size_t &num_empty_steals)
Definition wsq.hpp:927
std::conditional_t< std::is_pointer_v< T >, T, std::optional< T > > value_type
the return type of queue operations
Definition wsq.hpp:601
size_t size() const noexcept
Definition wsq.hpp:812
static constexpr auto empty_value()
returns the empty sentinel value for the queue element type
Definition wsq.hpp:793
bool empty() const noexcept
queries if the queue is empty at the time of this call
Definition wsq.hpp:804
bool empty() const noexcept
queries if the queue is empty at the time of this call
Definition wsq.hpp:369
value_type pop()
pops out an item from the queue
Definition wsq.hpp:431
value_type steal()
Definition wsq.hpp:464
UnboundedWSQ(int64_t LogSize=TF_DEFAULT_UNBOUNDED_TASK_QUEUE_LOG_SIZE)
constructs the queue with the given size in the base-2 logarithm
Definition wsq.hpp:351
void push(T item)
inserts an item to the queue
Definition wsq.hpp:385
size_t size() const noexcept
Definition wsq.hpp:377
void bulk_push(I &first, size_t N)
Definition wsq.hpp:406
std::conditional_t< std::is_pointer_v< T >, T, std::optional< T > > value_type
the return type of queue operations
Definition wsq.hpp:139
size_t capacity() const noexcept
queries the capacity of the queue
Definition wsq.hpp:518
value_type steal_with_feedback(size_t &num_empty_steals)
Definition wsq.hpp:490
static constexpr auto empty_value()
returns the empty sentinel value for the queue element type
Definition wsq.hpp:335
~UnboundedWSQ()
destructs the queue
Definition wsq.hpp:360
taskflow namespace
Definition small_vector.hpp:20