Skip to content

Commit 9b44d94

Browse files
committed
Fix race in MPSC algo
Relacy helped identify a race in the existing MPSC algo. I am having a hard time exactly explaining what's going on, but in the newly added unit test (the non Relacy one), I am able to observe three different odd behaviors - a consumer consuming the same elemment in an finite loop, apparently due to the internal next pointers pointing in some sort of cycle - consumer returning &__nil_! - consumer never able to consume a produced value (node is lost) With the non-relacy unit test, in the existing algo, if I insert a random sleep of 0-10 microseconds in push_back after __back_ is exchanged, I can observe one of the above behaviors nearly every single time. The most common was the first behavior. The existing algo claims it came from Dmitry Vyukov's implementation, though one key difference is that the existing one uses an atomic pointer to a Node for the "nil" object, whereas Dmitry's stores an actual Node object embedded in the queue. I re-implemented the version in stdexec exactly as it appears on Dmitry's website (which I had to dig up on archive.org), and it passes newly added Relacy (exploring many thread interleavings) and non-Relacy unit tests. I originally tracked down a bug in timed_thread_scheduler.cpp, where sometimes `STDEXEC_ASSERT(op->command_ == command_type::command_type::stop);` failed.
1 parent 514f239 commit 9b44d94

6 files changed

Lines changed: 479 additions & 36 deletions

File tree

include/exec/timed_thread_scheduler.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ namespace exec {
4343
stop
4444
};
4545

46+
// Default ctor for __intrusive_mpsc_queue's internal stub node
47+
constexpr timed_thread_operation_base() = default;
48+
4649
constexpr timed_thread_operation_base(
4750
void (*set_value)(timed_thread_operation_base*) noexcept,
4851
command_type command = command_type::schedule) noexcept

include/stdexec/__detail/__intrusive_mpsc_queue.hpp

Lines changed: 39 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -30,53 +30,57 @@ namespace STDEXEC {
3030
template <auto _Ptr>
3131
class __intrusive_mpsc_queue;
3232

33+
// _Node must be default_initializable only for the queue to construct an
34+
// internal "stub" node - only the _Next data element is accessed internally.
3335
template <class _Node, __std::atomic<void*> _Node::* _Next>
36+
requires __std::default_initializable<_Node>
3437
class __intrusive_mpsc_queue<_Next> {
35-
__std::atomic<void*> __back_{&__nil_};
36-
void* __front_{&__nil_};
37-
__std::atomic<_Node*> __nil_ = nullptr;
3838

39-
constexpr void push_back_nil() {
40-
__nil_.store(nullptr, __std::memory_order_relaxed);
41-
auto* __prev = static_cast<_Node*>(__back_.exchange(&__nil_, __std::memory_order_acq_rel));
42-
(__prev->*_Next).store(&__nil_, __std::memory_order_release);
43-
}
39+
__std::atomic<void*> __back_{&__stub_};
40+
__std::atomic<void*> __head_{&__stub_};
41+
_Node __stub_;
4442

4543
public:
44+
45+
__intrusive_mpsc_queue() {
46+
(__stub_.*_Next).store(nullptr, __std::memory_order_release);
47+
}
48+
4649
constexpr auto push_back(_Node* __new_node) noexcept -> bool {
47-
(__new_node->*_Next).store(nullptr, __std::memory_order_relaxed);
48-
void* __prev_back = __back_.exchange(__new_node, __std::memory_order_acq_rel);
49-
bool __is_nil = __prev_back == static_cast<void*>(&__nil_);
50-
if (__is_nil) {
51-
__nil_.store(__new_node, __std::memory_order_release);
52-
} else {
53-
(static_cast<_Node*>(__prev_back)->*_Next).store(__new_node, __std::memory_order_release);
54-
}
55-
return __is_nil;
50+
(__new_node->*_Next).store(nullptr, __std::memory_order_release);
51+
_Node* __prev = static_cast<_Node*>(
52+
__head_.exchange(static_cast<void*>(__new_node), __std::memory_order_acq_rel)
53+
);
54+
bool was_stub = __prev == &__stub_;
55+
(__prev->*_Next).store(static_cast<void*>(__new_node), __std::memory_order_release);
56+
return was_stub;
5657
}
5758

5859
constexpr auto pop_front() noexcept -> _Node* {
59-
if (__front_ == static_cast<void*>(&__nil_)) {
60-
_Node* __next = __nil_.load(__std::memory_order_acquire);
61-
if (!__next) {
60+
_Node* __back = static_cast<_Node*>(__back_.load(__std::memory_order_relaxed));
61+
_Node* __next = static_cast<_Node*>((__back->*_Next).load(__std::memory_order_acquire));
62+
if (__back == &__stub_) {
63+
if (nullptr == __next)
6264
return nullptr;
63-
}
64-
__front_ = __next;
65+
__back_.store(static_cast<void*>(__next), __std::memory_order_relaxed);
66+
__back = __next;
67+
__next = static_cast<_Node*>((__next->*_Next).load(__std::memory_order_acquire));
68+
}
69+
if (__next) {
70+
__back_.store(static_cast<void*>(__next), __std::memory_order_relaxed);
71+
return __back;
6572
}
66-
auto* __front = static_cast<_Node*>(__front_);
67-
void* __next = (__front->*_Next).load(__std::memory_order_acquire);
73+
_Node* __head = static_cast<_Node*>(__head_.load(__std::memory_order_relaxed));
74+
if (__back != __head)
75+
return nullptr;
76+
push_back(&__stub_);
77+
__next = static_cast<_Node*>((__back->*_Next).load(__std::memory_order_acquire));
6878
if (__next) {
69-
__front_ = __next;
70-
return __front;
79+
__back_.store(static_cast<void*>(__next), __std::memory_order_relaxed);
80+
return __back;
7181
}
72-
STDEXEC_ASSERT(!__next);
73-
push_back_nil();
74-
do {
75-
__spin_loop_pause();
76-
__next = (__front->*_Next).load(__std::memory_order_acquire);
77-
} while (!__next);
78-
__front_ = __next;
79-
return __front;
82+
return nullptr;
8083
}
8184
};
82-
} // namespace STDEXEC
85+
86+
} // namespace STDEXEC

test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ set(stdexec_test_sources
6262
stdexec/algos/other/test_execute.cpp
6363
stdexec/detail/test_completion_signatures.cpp
6464
stdexec/detail/test_utility.cpp
65+
stdexec/detail/test_intrusive_mpsc_queue.cpp
6566
stdexec/schedulers/test_task_scheduler.cpp
6667
stdexec/queries/test_env.cpp
6768
stdexec/queries/test_get_forward_progress_guarantee.cpp

test/rrd/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ build_dir = build
77

88
.SECONDARY:
99

10-
test_programs = split async_scope sync_wait
10+
test_programs = split async_scope sync_wait intrusive_mpsc_queue
1111

1212
test_exe_files = $(foreach name,$(test_programs),$(build_dir)/$(name))
1313

0 commit comments

Comments
 (0)