Skip to content

Commit 759f3a3

Browse files
committed
refactor engine: simplify AsyncFlatCombiningQueue
No changelog entry intended: no API changes or measurable performance effects. Replace raw `TaskContext*` juggling in `AsyncFlatCombiningQueue` with two `engine::SingleConsumerEvent`, which brings it in line with other synchronization primitives. commit_hash:9e3b52bcdc19e2b14a6fa2243f203e457af41ad8
1 parent 070417c commit 759f3a3

2 files changed

Lines changed: 33 additions & 120 deletions

File tree

core/src/engine/impl/async_flat_combining_queue.cpp

Lines changed: 21 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,12 @@
11
#include <engine/impl/async_flat_combining_queue.hpp>
22

3-
#include <functional>
4-
53
#include <engine/task/task_context.hpp>
64
#include <userver/utils/assert.hpp>
75

86
USERVER_NAMESPACE_BEGIN
97

108
namespace engine::impl {
119

12-
// The consumer is not allowed to leave before being notified, thus this is only a WeakAwaitable.
13-
template <auto TryStartWaiting>
14-
class AsyncFlatCombiningQueue::Awaitable final : public impl::WeakAwaitable {
15-
public:
16-
explicit Awaitable(AsyncFlatCombiningQueue& queue)
17-
: queue_(queue)
18-
{}
19-
20-
bool IsReady() const noexcept override { return false; }
21-
22-
void TryAppendAwaiter(boost::intrusive_ptr<Awaiter>& awaiter, [[maybe_unused]] std::uintptr_t context) override {
23-
if (std::invoke(TryStartWaiting, queue_)) {
24-
// We committed to sleeping and will be woken up only via
25-
// NotifyAsyncConsumer. No deadlines or cancellations are allowed,
26-
// otherwise another thread or task may notify us later and wake up
27-
// a dead task.
28-
awaiter = nullptr;
29-
}
30-
}
31-
32-
boost::intrusive_ptr<Awaiter> RemoveAwaiter(
33-
[[maybe_unused]] Awaiter& awaiter,
34-
[[maybe_unused]] std::uintptr_t context
35-
) noexcept override {
36-
// The notification happened through consuming_task_context_, but we'll pretend that the awaitable did it.
37-
// We won't be notified anymore, since we are the sole consumer now.
38-
return nullptr;
39-
}
40-
41-
private:
42-
AsyncFlatCombiningQueue& queue_;
43-
};
44-
4510
AsyncFlatCombiningQueue::Consumer::Consumer(AsyncFlatCombiningQueue& queue)
4611
: queue_(&queue)
4712
{
@@ -78,22 +43,30 @@ AsyncFlatCombiningQueue::AsyncFlatCombiningQueue() { queue_.Push(consumer_node_)
7843
AsyncFlatCombiningQueue::~AsyncFlatCombiningQueue() {
7944
UASSERT(!DoTryPop());
8045
UASSERT(!has_consumer_);
81-
UASSERT(!has_waiter_);
8246
}
8347

8448
auto AsyncFlatCombiningQueue::WaitAndStartConsuming() -> Consumer {
85-
Wait<&AsyncFlatCombiningQueue::TryStartWaitingForConsumer>();
86-
// The async consumer role is acquired; it is now working.
87-
notification_state_->store(NotificationState::kWorking);
49+
if (TryStartWaitingForConsumer()) {
50+
// No deadlines or cancellations are allowed here, otherwise this task may
51+
// walk away, and DoTryStopConsuming will try to hand over the consumer
52+
// role to nobody.
53+
UASSERT(!engine::current_task::GetCurrentTaskContext().IsCancellable());
54+
[[maybe_unused]] const bool success = start_consuming_event_.WaitForEvent();
55+
UASSERT(success);
56+
}
57+
// The async consumer role is acquired.
8858
return Consumer{*this};
8959
}
9060

9161
void AsyncFlatCombiningQueue::WaitWhileEmpty(Consumer& consumer) noexcept {
9262
UASSERT(consumer.queue_ == this);
93-
Wait<&AsyncFlatCombiningQueue::TryStartWaitingWhileEmpty>();
94-
// Either we were woken up after sleeping, or we were already notified and did
95-
// not sleep. In both cases we are working again.
96-
notification_state_->store(NotificationState::kWorking);
63+
// No deadlines or cancellations are allowed here, otherwise this task may
64+
// walk away while still being registered as the queue's consumer.
65+
UASSERT(!engine::current_task::GetCurrentTaskContext().IsCancellable());
66+
// Auto-resets on success, guaranteeing an 'acquire' on the signal, so that
67+
// the newly pushed nodes are visible to the subsequent DoTryPop calls.
68+
[[maybe_unused]] const bool success = empty_queue_event_->WaitForEvent();
69+
UASSERT(success);
9770
}
9871

9972
AsyncFlatCombiningQueue::NodeBase* AsyncFlatCombiningQueue::DoTryPop() noexcept {
@@ -123,63 +96,22 @@ bool AsyncFlatCombiningQueue::DoTryStopConsuming() noexcept {
12396
UASSERT(has_consumer_.exchange(false));
12497
if (std::exchange(should_pass_consumer_to_waiter_, false)) {
12598
// The waiter will consume the remaining nodes.
126-
notification_state_->store(NotificationState::kWorkingNotified);
127-
NotifyAsyncConsumer();
99+
start_consuming_event_.Send();
128100
return true;
129101
} else if (queue_.PushIfEmpty(consumer_node_)) {
130102
// There is no async consumer anymore.
131-
notification_state_->store(NotificationState::kWorkingNotified);
132103
return true;
133104
} else {
134105
UASSERT(!has_consumer_.exchange(true));
135106
return false;
136107
}
137108
}
138109

139-
void AsyncFlatCombiningQueue::NotifyAsyncConsumer() noexcept {
140-
UASSERT(consuming_task_context_);
141-
TaskContext::Wakeup(
142-
boost::intrusive_ptr<TaskContext>{consuming_task_context_},
143-
TaskContext::WakeupSource::kNotify,
144-
NoEpoch{}
145-
);
146-
}
147-
148110
void AsyncFlatCombiningQueue::NotifyAsyncConsumerIfSleeping() noexcept {
149-
// If the consumer is sleeping, this wins the race against its CAS to
150-
// kSleeping and we wake it up. If it is (already) working, it will re-check
151-
// the queue before sleeping again, so no wakeup is needed.
152-
if (notification_state_->exchange(NotificationState::kWorkingNotified) == NotificationState::kSleeping) {
153-
NotifyAsyncConsumer();
154-
}
155-
}
156-
157-
template <auto TryStartWaiting>
158-
void AsyncFlatCombiningQueue::Wait() noexcept {
159-
UASSERT(!has_waiter_.exchange(true));
160-
auto& current = engine::current_task::GetCurrentTaskContext();
161-
// Check before writing to avoid excessive CPU cache invalidation.
162-
if (consuming_task_context_ != &current) {
163-
consuming_task_context_ = &current;
164-
}
165-
// No deadlines or cancellations are allowed, otherwise this task may walk away and be destroyed,
166-
// and the notification will be sent to a dead task.
167-
UASSERT(!current.IsCancellable());
168-
169-
Awaitable<TryStartWaiting> awaitable{*this};
170-
[[maybe_unused]] const auto wakeup_source = current.Sleep(awaitable, Deadline{});
171-
UASSERT(wakeup_source == TaskContext::WakeupSource::kNotify);
172-
173-
UASSERT(consuming_task_context_ == &current);
174-
UASSERT(has_waiter_.exchange(false));
175-
}
176-
177-
bool AsyncFlatCombiningQueue::TryStartWaitingWhileEmpty() {
178-
// Returns whether we are going to sleep. If a notification arrived while we
179-
// were working (state is kWorkingNotified), the CAS fails and we keep
180-
// working, re-checking the queue without sleeping.
181-
auto expected = NotificationState::kWorking;
182-
return notification_state_->compare_exchange_strong(expected, NotificationState::kSleeping);
111+
// If the consumer is sleeping, this wakes it up. If it is (already)
112+
// working, it will re-check the queue before sleeping again, so no wakeup
113+
// is needed; Send() only sets the signal flag in that case.
114+
empty_queue_event_->Send();
183115
}
184116

185117
bool AsyncFlatCombiningQueue::TryStartWaitingForConsumer() {

core/src/engine/impl/async_flat_combining_queue.hpp

Lines changed: 12 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,16 @@
11
#pragma once
22

33
#include <atomic>
4-
#include <functional>
54

65
#include <userver/concurrent/impl/interference_shield.hpp>
76
#include <userver/concurrent/impl/intrusive_hooks.hpp>
87
#include <userver/concurrent/impl/intrusive_mpsc_queue.hpp>
8+
#include <userver/engine/single_consumer_event.hpp>
99

1010
USERVER_NAMESPACE_BEGIN
1111

1212
namespace engine::impl {
1313

14-
class TaskContext;
15-
1614
// An MPSC queue where the consumer is spontaneously chosen among producers.
1715
// Also, there is an option to provide a dedicated worker task, which will
1816
// become a permanent producer, until it decides to leave.
@@ -42,17 +40,6 @@ class AsyncFlatCombiningQueue final {
4240
void WaitWhileEmpty(Consumer& consumer) noexcept;
4341

4442
private:
45-
template <auto TryStartWaiting>
46-
class Awaitable;
47-
48-
enum class NotificationState {
49-
kWorking,
50-
kWorkingNotified,
51-
kSleeping,
52-
};
53-
54-
static_assert(std::atomic<NotificationState>::is_always_lock_free);
55-
5643
// Wakes up the async consumer if it is currently sleeping. Safe to call from
5744
// any thread, including non-coroutine ones.
5845
void NotifyAsyncConsumerIfSleeping() noexcept;
@@ -61,22 +48,15 @@ class AsyncFlatCombiningQueue final {
6148

6249
bool DoTryStopConsuming() noexcept;
6350

64-
void NotifyAsyncConsumer() noexcept;
65-
66-
template <auto TryStartWaiting>
67-
void Wait() noexcept;
68-
69-
bool TryStartWaitingWhileEmpty();
70-
7151
bool TryStartWaitingForConsumer();
7252

73-
// Tracks whether the async consumer is working or sleeping, decoupling the
74-
// wakeup decision from the queue contents (see NotifyConsumer / WaitWhileEmpty).
75-
// While there is no async consumer, the state is permanently kWorkingNotified.
76-
// Kept first to avoid excessive class padding (it is cache-line aligned).
77-
concurrent::impl::InterferenceShield<std::atomic<NotificationState>> notification_state_{
78-
NotificationState::kWorkingNotified
79-
};
53+
// Signaled whenever a new node is pushed while an async consumer exists (see
54+
// NotifyAsyncConsumerIfSleeping / WaitWhileEmpty), decoupling the wakeup
55+
// decision from the queue contents. Not used while there is no async
56+
// consumer at all (in that case, ad-hoc producer-consumers are used instead).
57+
// Wrapped in InterferenceShield and kept first to avoid excessive class
58+
// padding, since it is written on the hot path of every push.
59+
concurrent::impl::InterferenceShield<engine::SingleConsumerEvent> empty_queue_event_;
8060

8161
concurrent::impl::IntrusiveMpscQueueImpl queue_;
8262

@@ -85,12 +65,13 @@ class AsyncFlatCombiningQueue final {
8565
// Signals to the current consumer that it should hand over to the async task.
8666
concurrent::impl::SinglyLinkedBaseHook start_consuming_notifier_node_;
8767

88-
TaskContext* consuming_task_context_{nullptr};
68+
// Signaled by the current consumer to pass the consumer role to whoever is
69+
// waiting in WaitAndStartConsuming (see should_pass_consumer_to_waiter_).
70+
engine::SingleConsumerEvent start_consuming_event_;
71+
8972
bool should_pass_consumer_to_waiter_{false};
9073
// For diagnosing a multiple-consumers situation.
9174
std::atomic<bool> has_consumer_{false};
92-
// For diagnosing a multiple-waiters situation.
93-
std::atomic<bool> has_waiter_{false};
9475
};
9576

9677
class AsyncFlatCombiningQueue::Consumer final {

0 commit comments

Comments
 (0)