41#if defined(ARCH_arm64)
42 static constexpr size_t stable_cache_alignment = 256U;
43#elif defined(ARCH_ppc32)
44 static constexpr size_t stable_cache_alignment = 128U;
46 static constexpr size_t stable_cache_alignment = 64U;
49#ifdef __GCC_DESTRUCTIVE_SIZE
50static_assert(stable_cache_alignment >= __GCC_DESTRUCTIVE_SIZE,
51 "Stable cache alignment not sufficient");
55static constexpr size_t string_buffer_size = 1024U;
65template<
typename SEQUENCE_TYPE>
69 using Sequence = SEQUENCE_TYPE;
70 using Generation = std::atomic<Sequence>;
73 static constexpr Sequence
const nil = 0U;
89 throw std::invalid_argument(
"Zero items not supported.");
91 if (items != std::bit_ceil(items))
92 throw std::invalid_argument(
"Number of items must be power of two.");
108 auto items = mask + 1;
109 if (items != std::bit_ceil(items))
110 throw std::length_error(
"Invalid mask.");
127template<
typename SEQUENCE_TYPE>
131 using Sequence = SEQUENCE_TYPE;
132 using Generation =
typename Ring_buffer<Sequence>::Generation;
134 template<
typename S,
typename T, auto G>
friend class Ring_buffer_producer;
135 template<
typename S,
typename T, auto G>
136 friend class Ring_buffer_consumer_raw;
156 {
return _alignment; }
164 {
return _mask + 1; }
174 alignas(stable_cache_alignment)
size_t _mask;
177 alignas(stable_cache_alignment) Generation _tail;
189template<
typename ITEM_TYPE>
190static constexpr size_t ring_slot_alignment = std::bit_ceil(
sizeof(ITEM_TYPE));
206template<
typename ITEM_TYPE, auto GENERATION_PTR>
211 using Item = ITEM_TYPE;
213 template<
typename S,
typename T, auto G>
friend class Ring_buffer_producer;
214 template<
typename S,
typename T, auto G>
215 friend class Ring_buffer_consumer_raw;
224 static size_t size(
size_t const items)
225 {
return items *
sizeof(This); }
230 auto generation() const noexcept
231 {
return std::atomic_ref(_data.*GENERATION_PTR); }
247template<
typename SEQUENCE_TYPE,
typename ITEM_TYPE, auto GENERATION_PTR>
254 using Item = ITEM_TYPE;
275 size_t const items) : _status(status), _slots(slots)
279 _status._version = version;
280 _status._alignment = stable_cache_alignment;
281 _status._mask =
items - 1;
284 for (
size_t i = 0; i <
items; ++i)
290 {
return _status.items(); }
302 auto next = ++_status._tail;
303 auto index = next & _status._mask;
305 auto &generation = _slots[index].generation();
306 auto &slot = _slots[index]._data;
308 generation.store(
Super::nil, std::memory_order_release);
310 generation.store(next, std::memory_order_release);
333template<
typename SEQUENCE_TYPE,
typename ITEM_TYPE, auto GENERATION_PTR>
340 using Sequence = SEQUENCE_TYPE;
341 using Item = ITEM_TYPE;
395 : _status(status), _slots(slots)
400 {
return _status.items(); }
436 Sequence *drops =
nullptr)
const
445 auto index = current & _status._mask;
447 auto const &generation = _slots[index].generation();
448 auto const &slot = _slots[index]._data;
450 auto seen = generation.load();
461 seen = generation.load();
474 Sequence head = _status._tail;
479 head -= _status._mask;
483 *drops = head - current;
486 return State::Dropped;
492 Status
const &_status;
509template<
typename SEQUENCE_TYPE,
typename ITEM_TYPE, auto GENERATION_PTR>
514 using Sequence = SEQUENCE_TYPE;
515 using Item = ITEM_TYPE;
524 using Yield = std::function<bool (Sequence)>;
536 : Super(status, slots)
560 State
peek(Item &item, Drop_policy policy, Sequence *drops =
nullptr)
562 const std::lock_guard guard(_lock);
596 Drop_policy policy, Yield
const &yield,
597 Sequence *drops =
nullptr)
603 size_t idle_cycles = 0;
607 Sequence current_drops;
608 auto status =
peek(
items[count], policy, ¤t_drops);
610 if (status == State::Dropped)
616 *drops = current_drops;
621 if (status == State::Ready)
627 if (count == capacity)
633 if (status == State::Idle)
643 if (yield(idle_cycles))
659 Sequence _current = 0;
State
Status of the dequeue operation.
Ring_buffer_consumer_raw(Status const &status, Slot const *slots)
Construct ring buffer consumer.
size_t items() const
Get the number of items.
State peek(Item &item, Drop_policy policy, Sequence ¤t, Sequence *drops=nullptr) const
Poll and possibly dequeue an item from the ring buffer.
Drop_policy
Item drop policy.
@ Minimal
Minimal item drop policy.
@ Conservative
Conservative item drop policy.
size_t dequeue(Item items[], size_t capacity, size_t burst, Drop_policy policy, Yield const &yield, Sequence *drops=nullptr)
Dequeue items from the ring buffer.
State peek(Item &item, Drop_policy policy, Sequence *drops=nullptr)
Poll and possibly dequeue an item from the ring buffer.
Ring_buffer_consumer(Status const &status, Slot const *slots)
Construct ring buffer consumer.
Ring_buffer_producer(Status &status, Slot &slots, unsigned const version, size_t const items)
Construct ring buffer producer.
void enqueue(Item const &item) const
Enqueue an item in the ring buffer.
size_t items() const
Get the number of items.
static void check_mask(size_t const mask)
Check that the item mask is a power of two minus one.
static constexpr Sequence const nil
Invalid (non-committed) items set their sequence counter to 0.
static void check_items(size_t const items)
Check that the number of items is a power of two.
static size_t size(size_t const items)
Get the size (in bytes) of the given count of items.
size_t mask() const
Get the item mask.
size_t items() const
Get the number of items.
size_t alignment() const
Get the stable alignment of the ring buffer status members.
unsigned version() const
Get the version of the ring buffer items.