Skip to content

Commit 038c1b4

Browse files
committed
pattern: stable.
1 parent 48ab32c commit 038c1b4

1 file changed

Lines changed: 25 additions & 22 deletions

File tree

include/daking/MPSC_queue.hpp

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ namespace daking {
111111

112112
union {
113113
value_type value_;
114-
MPSC_node* next_chunk_; // for low_jitter
114+
MPSC_node* next_chunk_; // for stable
115115
};
116116
std::atomic<node_t*> next_;
117117
};
@@ -177,7 +177,7 @@ namespace daking {
177177
}
178178
new_top.node_ = old_top.node_->next_chunk_;
179179
new_top.tag_ = old_top.tag_ + 1;
180-
// For low_jitter pattern:
180+
// For stable pattern:
181181
// If TA and TB reach here at the same time
182182
// And A pop the chunk successfully, then it will construct object at old_top.node_->next_chunk_,
183183
// so that B will read a invalid value, but this value will not pass the next CAS.(old_top have been updated by A)
@@ -201,15 +201,16 @@ namespace daking {
201201
std::size_t Align = 64, /* std::hardware_destructive_interference_size */
202202
std::size_t ExpansionFactor = 2
203203
>
204-
struct low_jitter {
205-
static_assert((ThreadLocalCapacity & (ThreadLocalCapacity - 1)) == 0, "ThreadLocalCapacity must be a power of 2.");
204+
struct stable {
205+
static_assert(ThreadLocalCapacity && (ThreadLocalCapacity & (ThreadLocalCapacity - 1)) == 0, "ThreadLocalCapacity must be a power of 2.");
206+
static_assert(ExpansionFactor > 1, "ExpansionFactor must be greater than 1.");
206207
static constexpr std::size_t thread_local_capacity = ThreadLocalCapacity;
207208
static constexpr std::size_t align = Align;
208209
static constexpr std::size_t expansion_factor = ExpansionFactor;
209210
};
210211

211212
template <typename Ty, typename LowJitter, typename Alloc>
212-
struct MPSC_low_jitter_impl : public std::allocator_traits<Alloc>::template rebind_alloc<MPSC_node<Ty>> {
213+
struct MPSC_stable_impl : public std::allocator_traits<Alloc>::template rebind_alloc<MPSC_node<Ty>> {
213214
using value_type = Ty;
214215
using size_type = std::size_t;
215216

@@ -234,11 +235,11 @@ namespace daking {
234235

235236
struct thread_hook_t {
236237
thread_hook_t() {
237-
control_block_ = MPSC_low_jitter_impl::get_global_manager().get_control_block();
238+
control_block_ = MPSC_stable_impl::get_global_manager().get_control_block();
238239
}
239240

240241
~thread_hook_t() {
241-
MPSC_low_jitter_impl::get_global_manager().return_control_block(control_block_);
242+
MPSC_stable_impl::get_global_manager().return_control_block(control_block_);
242243
}
243244

244245
DAKING_ALWAYS_INLINE node_t*& node_list() noexcept {
@@ -259,7 +260,7 @@ namespace daking {
259260
using alloc_page_t = typename std::allocator_traits<Alloc>::template rebind_alloc<page_t>;
260261
using altraits_page_t = std::allocator_traits<alloc_page_t>;
261262

262-
// meta datas use new/delete
263+
// new/delete meta data
263264
using control_block_page_t = MPSC_page<control_block_t>;
264265
using control_block_recycler_t = MPSC_chunk_stack<control_block_t>;
265266

@@ -296,20 +297,20 @@ namespace daking {
296297

297298
for (size_type i = 0; i < count; i++) {
298299
new_nodes[i].next_ = new_nodes + i + 1; // seq_cst
299-
if ((i & (MPSC_low_jitter_impl::thread_local_capacity - 1)) == MPSC_low_jitter_impl::thread_local_capacity - 1) DAKING_UNLIKELY {
300+
if ((i & (MPSC_stable_impl::thread_local_capacity - 1)) == MPSC_stable_impl::thread_local_capacity - 1) DAKING_UNLIKELY {
300301
// chunk_count = count / ThreadLocalCapacity
301302
new_nodes[i].next_ = nullptr;
302303
std::atomic_thread_fence(std::memory_order_acq_rel);
303304
// mutex don't protect global_chunk_stack_
304-
MPSC_low_jitter_impl::global_chunk_stack_.push(&new_nodes[i - MPSC_low_jitter_impl::thread_local_capacity + 1]);
305+
MPSC_stable_impl::global_chunk_stack_.push(&new_nodes[i - MPSC_stable_impl::thread_local_capacity + 1]);
305306
}
306307
}
307308

308309
global_node_count_.store(global_node_count_ + count, std::memory_order_release);
309310
}
310311

311312
DAKING_ALWAYS_INLINE control_block_t* allocate_control_block() {
312-
std::lock_guard guard(MPSC_low_jitter_impl::global_mutex_);
313+
std::lock_guard guard(MPSC_stable_impl::global_mutex_);
313314
control_block_t* control_block = new control_block_t();
314315
control_block_page_t* new_page = new control_block_page_t(control_block, 1, global_control_block_page_list_);
315316
global_control_block_page_list_ = new_page;
@@ -349,20 +350,20 @@ namespace daking {
349350
using altraits_page_t = typename manager_t::altraits_page_t;
350351

351352
static_assert(std::is_empty_v<Alloc>,
352-
"In the low_jitter global manager design, Alloc must be stateless to avoid dangling references. "
353+
"In the stable global manager design, Alloc must be stateless to avoid dangling references. "
353354
);
354355
static_assert(
355356
std::is_constructible_v<alloc_node_t, Alloc> && std::is_constructible_v<alloc_page_t, Alloc>,
356357
"Alloc should have a template constructor like 'Alloc(const Alloc<T>& alloc)' to meet internal conversion."
357358
);
358359

359-
MPSC_low_jitter_impl(const Alloc& alloc) : alloc_node_t(alloc) {
360+
MPSC_stable_impl(const Alloc& alloc) : alloc_node_t(alloc) {
360361
global_instance_count_++;
361362
std::lock_guard<std::mutex> guard(global_mutex_);
362363
global_manager_instance_ = manager_t::create_global_manager(alloc); // single instance
363364
}
364365

365-
~MPSC_low_jitter_impl() {
366+
~MPSC_stable_impl() {
366367
if (--global_instance_count_ == 0) {
367368
// only the last instance free the global resource
368369
std::lock_guard<std::mutex> lock(global_mutex_);
@@ -442,8 +443,10 @@ namespace daking {
442443
// if anyone have already allocate chunks, I return.
443444
return;
444445
}
445-
446-
get_global_manager().reserve(std::max(thread_local_capacity, get_global_manager().node_count()), *this);
446+
447+
constexpr size_type mask = thread_local_capacity - 1;
448+
size_type count = (expansion_factor - 1) * get_global_manager().node_count();
449+
get_global_manager().reserve(std::max(thread_local_capacity, count), *this);
447450
}
448451

449452
DAKING_ALWAYS_INLINE void free_global() {
@@ -473,18 +476,18 @@ namespace daking {
473476
struct MPSC_base;
474477

475478
template <typename Ty, std::size_t...Args, typename Alloc>
476-
struct MPSC_base<Ty, low_jitter<Args...>, Alloc>
477-
: MPSC_low_jitter_impl<Ty, low_jitter<Args...>, Alloc> {
478-
using memory_policy = MPSC_low_jitter_impl<Ty, low_jitter<Args...>, Alloc>;
479+
struct MPSC_base<Ty, stable<Args...>, Alloc>
480+
: MPSC_stable_impl<Ty, stable<Args...>, Alloc> {
481+
using memory_policy = MPSC_stable_impl<Ty, stable<Args...>, Alloc>;
479482

480483
MPSC_base(const Alloc& alloc) : memory_policy(alloc) {}
481484
~MPSC_base() = default;
482485
};
483486
}
484487

485-
using detail::low_jitter;
488+
using detail::stable;
486489
/*
487-
low_jitter pattern will not free memory to OS at runtime, but have a stable jitter performance.
490+
stable pattern will not free memory to OS at runtime, but have a stable jitter performance.
488491
489492
In this mode:
490493
@@ -536,7 +539,7 @@ namespace daking {
536539

537540
template <
538541
typename Ty,
539-
typename MemoryPolicy = low_jitter<>,
542+
typename MemoryPolicy = stable<>,
540543
typename Alloc = std::allocator<Ty>
541544
>
542545
class MPSC_queue : private detail::MPSC_base<Ty, MemoryPolicy, Alloc> {

0 commit comments

Comments
 (0)