Skip to content

Commit 3b6648b

Browse files
maikelccotter
authored andcommitted
Improve implementation to be near to Dmitry's
1 parent 0a9df03 commit 3b6648b

4 files changed

Lines changed: 49 additions & 34 deletions

File tree

include/exec/timed_thread_scheduler.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ namespace exec {
5353
, set_value_{set_value} {
5454
}
5555

56-
STDEXEC::__std::atomic<void*> next_{nullptr};
56+
STDEXEC::__std::atomic<timed_thread_operation_base*> next_{nullptr};
5757
command_type command_;
5858
void (*set_value_)(timed_thread_operation_base*) noexcept;
5959
};

include/stdexec/__detail/__intrusive_mpsc_queue.hpp

Lines changed: 44 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) Dmitiy V'jukov
2+
* Copyright (c) Dmitry V'jukov
33
* Copyright (c) 2024 Maikel Nadolski
44
* Copyright (c) 2024 NVIDIA Corporation
55
*
@@ -24,6 +24,8 @@
2424

2525
#include "__atomic.hpp"
2626

27+
#include "stdexec/__detail/__config.hpp"
28+
2729
#include "./__spin_loop_pause.hpp"
2830

2931
namespace STDEXEC {
@@ -32,13 +34,13 @@ namespace STDEXEC {
3234

3335
// _Node must be default_initializable only for the queue to construct an
3436
// internal "stub" node - only the _Next data element is accessed internally.
35-
template <class _Node, __std::atomic<void*> _Node::* _Next>
37+
template <class _Node, __std::atomic<_Node*> _Node::* _Next>
3638
requires __std::default_initializable<_Node>
3739
class __intrusive_mpsc_queue<_Next> {
3840

39-
__std::atomic<void*> __back_{&__stub_};
40-
__std::atomic<void*> __head_{&__stub_};
41-
_Node __stub_;
41+
__std::atomic<_Node*> __head_{&__stub_};
42+
_Node* __tail_{&__stub_};
43+
_Node __stub_{};
4244

4345
public:
4446

@@ -47,38 +49,51 @@ namespace STDEXEC {
4749
}
4850

4951
constexpr auto push_back(_Node* __new_node) noexcept -> bool {
50-
(__new_node->*_Next).store(nullptr, __std::memory_order_release);
51-
_Node* __prev = static_cast<_Node*>(
52-
__head_.exchange(static_cast<void*>(__new_node), __std::memory_order_acq_rel)
53-
);
54-
bool was_stub = __prev == &__stub_;
55-
(__prev->*_Next).store(static_cast<void*>(__new_node), __std::memory_order_release);
56-
return was_stub;
52+
(__new_node->*_Next).store(nullptr, __std::memory_order_relaxed);
53+
_Node* __prev = __head_.exchange(__new_node, __std::memory_order_acq_rel);
54+
(__prev->*_Next).store(__new_node, __std::memory_order_release);
55+
return __prev == &__stub_;
5756
}
5857

5958
constexpr auto pop_front() noexcept -> _Node* {
60-
_Node* __back = static_cast<_Node*>(__back_.load(__std::memory_order_relaxed));
61-
_Node* __next = static_cast<_Node*>((__back->*_Next).load(__std::memory_order_acquire));
62-
if (__back == &__stub_) {
63-
if (nullptr == __next)
64-
return nullptr;
65-
__back_.store(static_cast<void*>(__next), __std::memory_order_relaxed);
66-
__back = __next;
67-
__next = static_cast<_Node*>((__next->*_Next).load(__std::memory_order_acquire));
59+
_Node* __tail = this->__tail_;
60+
STDEXEC_ASSERT(__tail != nullptr);
61+
_Node* __next = (__tail->*_Next).load(__std::memory_order_acquire);
62+
// If tail is pointing to the stub node we need to advance it once more
63+
if (&__stub_ == __tail) {
64+
if (nullptr == __next) {
65+
return nullptr;
66+
}
67+
this->__tail_ = __next;
68+
__tail = __next;
69+
__next = (__next->*_Next).load(__std::memory_order_acquire);
6870
}
69-
if (__next) {
70-
__back_.store(static_cast<void*>(__next), __std::memory_order_relaxed);
71-
return __back;
71+
// Normal case: there is a next node and we can just advance the tail
72+
if (nullptr != __next) {
73+
this->__tail_ = __next;
74+
return __tail;
7275
}
73-
_Node* __head = static_cast<_Node*>(__head_.load(__std::memory_order_relaxed));
74-
if (__back != __head)
76+
// Next is nullptr here means that either:
77+
// 1) There are no more nodes in the queue
78+
// 2) A producer is in the middle of adding a new node
79+
const _Node* __head = this->__head_.load(__std::memory_order_acquire);
80+
// A producer is in the middle of adding a new node
81+
// we cannot return tail as we cannot link the next node yet
82+
if (__tail != __head) {
7583
return nullptr;
84+
}
85+
// No more nodes in the queue - we need to insert a stub node
86+
// to be able to link to an eventual empty state (or new nodes)
7687
push_back(&__stub_);
77-
__next = static_cast<_Node*>((__back->*_Next).load(__std::memory_order_acquire));
78-
if (__next) {
79-
__back_.store(static_cast<void*>(__next), __std::memory_order_relaxed);
80-
return __back;
88+
// Now re-attempt to load next
89+
__next = (__tail->*_Next).load(__std::memory_order_acquire);
90+
if (nullptr != __next) {
91+
// Successfully linked either a new node or the stub node
92+
this->__tail_ = __next;
93+
return __tail;
8194
}
95+
// A producer is in the middle of adding a new node since next is still nullptr
96+
// and not our stub node, thus we cannot link the next node yet
8297
return nullptr;
8398
}
8499
};

test/rrd/intrusive_mpsc_queue.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
#include <stdexec/__detail/__intrusive_mpsc_queue.hpp>
2020

2121
struct test_node {
22-
std::atomic<void*> next_{nullptr};
22+
std::atomic<test_node*> next_{nullptr};
2323
int value_{0};
2424

2525
test_node() = default;
@@ -271,11 +271,11 @@ struct mpsc_five_producers_ordered : rl::test_suite<mpsc_five_producers_ordered,
271271
queue.push_back(&nodes[start_idx + i]);
272272
}
273273
} else {
274-
int count = 0;
274+
// int count = 0;
275275
while (consumed_count < TOTAL_ITEMS) {
276276
test_node* node = queue.pop_front();
277277
if (node) {
278-
consumed_values[count] = node->value_;
278+
consumed_values[consumed_count] = node->value_;
279279
++consumed_count;
280280
}
281281
}

test/stdexec/detail/test_intrusive_mpsc_queue.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
namespace {
1111

1212
struct test_node {
13-
std::atomic<void*> next_{nullptr};
13+
std::atomic<test_node*> next_{nullptr};
1414
int value_{0};
1515

1616
test_node() = default;

0 commit comments

Comments
 (0)