Skip to content

Commit 6a33ff7

Browse files
committed
Fix a corner case.
1 parent c4bda57 commit 6a33ff7

1 file changed

Lines changed: 21 additions & 7 deletions

File tree

include/daking/MPSC_queue.hpp

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -262,8 +262,10 @@ namespace daking {
262262
~MPSC_thread_hook() {
263263
// If this is consumer hook, release the queue tail to help destructor thread.
264264
std::atomic_thread_fence(std::memory_order_release);
265-
std::lock_guard<std::mutex> guard(Queue::global_mutex_);
266-
Queue::_get_global_manager().unregister_for(tid_);
265+
if (Queue::_is_global_manager_alive()) {
266+
std::lock_guard<std::mutex> guard(Queue::global_mutex_);
267+
Queue::_get_global_manager().unregister_for(tid_);
268+
}
267269
}
268270

269271
DAKING_ALWAYS_INLINE node_t*& node_list() noexcept {
@@ -298,9 +300,14 @@ namespace daking {
298300
using altraits_page_t = std::allocator_traits<alloc_page_t>;
299301

300302
MPSC_manager(const Alloc& alloc)
301-
: alloc_node_t(alloc), alloc_page_t(alloc) {}
303+
: alloc_node_t(alloc)
304+
, alloc_page_t(alloc) {}
302305

303-
~MPSC_manager() = default;
306+
~MPSC_manager() {
307+
reset();
308+
Queue::global_manager_instance_ = nullptr;
309+
std::atomic_thread_fence(std::memory_order_release);
310+
}
304311

305312
void reset() {
306313
/* Already locked */
@@ -637,18 +644,23 @@ namespace daking {
637644
}
638645

639646
DAKING_ALWAYS_INLINE static size_type global_node_size_apprx() noexcept {
640-
return global_manager_instance_ ? _get_global_manager().node_count() : 0;
647+
return _is_global_manager_alive() ? _get_global_manager().node_count() : 0;
641648
}
642649

643650
DAKING_ALWAYS_INLINE static bool reserve_global_chunk(size_type chunk_count) {
644-
return global_manager_instance_ ? _reserve_global_external(chunk_count) : false;
651+
return _is_global_manager_alive() ? _reserve_global_external(chunk_count) : false;
645652
}
646653

647654
private:
648655
DAKING_ALWAYS_INLINE static manager_t& _get_global_manager() noexcept {
649656
return *global_manager_instance_;
650657
}
651658

659+
DAKING_ALWAYS_INLINE static bool _is_global_manager_alive() noexcept {
660+
std::atomic_thread_fence(std::memory_order_acquire);
661+
return global_manager_instance_ != nullptr;
662+
}
663+
652664
DAKING_ALWAYS_INLINE thread_hook_t& _get_thread_hook() {
653665
static thread_local thread_hook_t thread_hook;
654666
return thread_hook;
@@ -732,7 +744,9 @@ namespace daking {
732744
DAKING_ALWAYS_INLINE void _free_global() {
733745
/* Already locked */
734746
global_chunk_stack_.reset();
735-
_get_global_manager().reset();
747+
if (_is_global_manager_alive()) {
748+
_get_global_manager().reset();
749+
}
736750
}
737751

738752
/* Global LockFree*/

0 commit comments

Comments
 (0)