feat(conn): add pipeline backpressure for IoLoopV2#7018
Conversation
There was a problem hiding this comment.
Pull request overview
Adds pipeline backpressure handling for the experimental IoLoopV2 to avoid busy-spinning when the per-thread pipeline buffer/queue limits are exceeded, and expands Python integration tests to cover both V1 and V2 behavior.
Changes:
- Stop parsing further commands when pipeline limits are exceeded (leave bytes in
io_buf_for later). - In IoLoopV2, drain queued work and park the fiber on an event until progress is possible; wake-ups are triggered from several execution/reply paths.
- Extend connection pipeline tests to run for both IoLoop variants and add V2-specific correctness + disconnect coverage.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
src/facade/dragonfly_connection.cc |
Implements V2 backpressure parsing/parking logic and adds (V1)/(V2) log labeling + wake notifications. |
tests/dragonfly/connection_test.py |
Adds/updates pipeline backpressure tests to cover V1+V2, response correctness, and safe disconnect behavior. |
🤖 Augment PR SummarySummary: Adds backpressure/parking to the experimental Changes:
Technical Notes: The backpressure condition is based on per-thread pipeline bytes ( 🤖 Was this summary useful? React with 👍 or 👎 |
7225762 to
4f067f5
Compare
4f067f5 to
b10ff25
Compare
| // V2 connections parked on backpressure register their io_event_ here so that | ||
| // global memory relief (from other connections or CONFIG SET) can wake them. | ||
| // Access is thread-local (per-proactor), no synchronization needed. | ||
| std::vector<util::fb2::EventCount*> v2_pipeline_waiters; |
There was a problem hiding this comment.
if access is indeed thread-local, then using CondVarAny with NoOp lock is better,
as EventCount is designed for multi-threaded coordination.
There was a problem hiding this comment.
I will answer here Since i think few comments are related to the same issue.
|
|
||
| QueueBackpressure& qbp = GetQueueBackpressure(); | ||
| qbp.pipeline_cnd.notify_all(); | ||
| qbp.NotifyPipelineWaiters(); |
There was a problem hiding this comment.
I am trying to understand how it worked before - we did not have O(n) array of fibers parked.
so what changed now?
There was a problem hiding this comment.
I will answer here Since i think few comments are related to the same issue.
| qbp.v2_pipeline_waiters.push_back(&io_event_); | ||
| absl::Cleanup v2_unregister = [&qbp, this] { | ||
| auto& w = qbp.v2_pipeline_waiters; | ||
| auto it = std::find(w.begin(), w.end(), &io_event_); |
There was a problem hiding this comment.
i have a problem with this and also with NotifyPipelineWaiters that runs in O(N).
I want to understand what has changed that now we need O(n) runtime for these operations.
Specifically how this works in V1? I would guess we have a single cndVar that you block on. Why do we need to change the approach here?
There was a problem hiding this comment.
Answering here to 3 of your comments:
In V1:
- We have 2 fibers per connection, one is a producer (IoLoop) and one is a consumer (the one is the consumer that runs Connection::AsyncFiber()).
- If memory got too full, the IoLoop fiber could block on a thread-global CondVarAny (pipeline_cnd) in Dispatchsingle, while expecting AsyncFiber which was still awake, to eventually wake it up.
- Connections waiting on backpressure all park on the same thread-level pipeline_cnd (CondVarAny) . One notify_all() reaches them all because they share a single wait object.
in V2:
- There is only a single fiber now (no AsyncFiber). The single fiber uses io_event as a unified notification object for receive completions, connection shutdown, async message dispatch, and command completion.
- When parked on io_event_ we must park on all, we can;t park on 2 different objects at the same time, and that includes parking on backpressure relief events. If we park on pipeline_cnd_ we will be "deaf" to all other events.
Regarding your comments:
-
"if access is indeed thread-local, then using CondVarAny with NoOp lock is better,
as EventCount is designed for multi-threaded coordination.": you are right but as I wrote, the reason I can;t use pipeline_cndd for V2 is because the fiber can only be parked on a single object at a time and must expect all possible events, and the code was written to support passing events (e.g socket receive completions) via this io_event_. -
"I am trying to understand how it worked before - we did not have O(n) array of fibers parked.
so what changed now?" If you look at the implementation of the pipeline_cnd.notiftall(), it is also O(n), it's just in the internal implementation of CondVarAny:
bool WaitQueue::NotifyAll(FiberInterface* active) {
bool notified = false;
while (NotifyOne(active))
notified = true;
return notified;
}
To summarize:
what actually change is the way we notify and how we organize members logically:
- V1 loop has all connections sharing one CondVarAny with a single notify_all() works.
- V2 connections each have their own io_event_, and you need to explicitly reach each one in dragonfly_connection code. The vector is the minimal mechanism to do so.
Why we can't use a new or existing CondVarAny in my opinion?
Since it has the same fundamental issue - during the back-pressure wait, the fiber must remain parked on io_event_ because socket completions, shutdowns, and async dispatches all arrive exclusively via io_event_.notify(). A fiber can only wait in one place at a time, so we cannot park on both io_event_ and pipeline_cnd or any new CondVarAny simultaneously.
Let me know if you'd prefer to explore a different architectural approach for V2's event multiplexing. I think that this might require expanding the scope of this PR to refactor how V2 waits for events.
There was a problem hiding this comment.
I think the difference here is that unregistering after waking up is now a O(n) operation due to the find, so if you wake up m connections at once, it will be O(m * n) list operations done asymptotically. The current helio waiters unlink in O(1) time. What can be done, is:
- Making the vector and std::list manually like helio does internally
EventCountsupports subscribe with a callback based waiter, support can also be extended to other primitives
There was a problem hiding this comment.
(2) is already used to wait for command execution
There was a problem hiding this comment.
Thanks for the advice, I think I will go with option 2 (new sharted EventCount), it's faster / simpler and no need to modify helio at this stage. I'm working on it.
dcf9fa6 to
30f22c6
Compare
| // Notifies both V1 waiters (pipeline_cnd) and V2 subscribers (v2_pipeline_backpressure_ec). | ||
| void NotifyPipelineWaiters() { | ||
| pipeline_cnd.notify_all(); | ||
| v2_pipeline_backpressure_ec.notifyAll(); | ||
| } | ||
|
|
There was a problem hiding this comment.
todo: unite somewhen in the future, but maybe even not if we abolish v1 fast
There was a problem hiding this comment.
Noted, will consider add a task.
|
Lets hold on with merging it until we release 1.38 |
| auto sub_key = qbp.v2_pipeline_backpressure_ec.check_or_subscribe(under_limit_pred, | ||
| &backpressure_waiter); | ||
|
|
||
| if (sub_key) { // we had to subscribe, so need to wait | ||
| io_event_.await([this, under_limit_pred]() { | ||
| bool cmd_ready = parsed_head_ && parsed_head_->CanReply(); | ||
| // We wake up if: | ||
| // 1. Memory is freed (under_limit_pred()) or we can free it ourselves (cmd_ready). | ||
| // 2. Control-plane messages need processing (!dispatch_q_.empty()). | ||
| // 3. The connection is terminating (io_ec_). | ||
| return under_limit_pred() || cmd_ready || !dispatch_q_.empty() || io_ec_; | ||
| }); | ||
| } | ||
| } |
There was a problem hiding this comment.
maybe we can unite it somehow in the future like I did with the with the command waiter
There was a problem hiding this comment.
also this function is really big now, maybe we can move some parts out of it
There was a problem hiding this comment.
Noted. I won't do it in this PR.
This PR is currently on hold, I won't merge it till we release 1.38. |
Park the IoLoopV2 fiber via io_event_.await() when the pipeline queue exceeds limits, preventing busy-spinning. The fiber resumes when memory is freed, commands are ready, the dispatch queue has items, or the connection closes. To prevent lost wakeups on thread-global limits, parked V2 connections now register their io_event_ in a thread-local waiter list. This ensures they reliably wake up when other connections free memory or when limits are dynamically raised via CONFIG SET. - ParseRedis breaks early if over limit, leaving bytes in io_buf_. - ExecuteBatch and ReplyBatch notify io_event_ to wake the fiber. - Global memory relief safely wakes parked V2 fibers, utilizing an O(1) swap-and-pop unregister on cleanup. - Wakeups are optimized to avoid the "thundering herd" problem by only notifying waiters when thread-global pipeline budget is actually freed. - Updated logs with (V1)/(V2) labels for easier debugging. - Added tests for V2 correctness and safe disconnects. Signed-off-by: Gil Levkovich <69595609+glevkovich@users.noreply.github.com>
Signed-off-by: Gil Levkovich <69595609+glevkovich@users.noreply.github.com>
Signed-off-by: Gil Levkovich <69595609+glevkovich@users.noreply.github.com>
Signed-off-by: Gil Levkovich <69595609+glevkovich@users.noreply.github.com>
Signed-off-by: Gil Levkovich <69595609+glevkovich@users.noreply.github.com>
baed79b to
0f38d3a
Compare
| // After draining commands, notify all connections parked on backpressure relief | ||
| if (conn_stats.pipeline_queue_bytes < mem_before) { | ||
| qbp.NotifyPipelineWaiters(); | ||
| } | ||
|
|
||
| // await block (backpressure) | ||
| // Re-check if pipeline buffer over limit after draining - ExecuteBatch/ReplyBatch may have | ||
| // freed memory. If still over limit, sleep to prevent busy-spin. | ||
| // Only park if this connection is actively contributing (parsed_cmd_q_len_ > 0). | ||
| // Connections with an empty queue must stay in the read loop. | ||
| bool post_over_limit = | ||
| (parsed_cmd_q_len_ > 0) && |
There was a problem hiding this comment.
My comment still applies. IoLoopV2 function now becomes veeeery long, it doesn't even fit on my screen. Maybe we can refactor it, at least in the future?
There was a problem hiding this comment.
Noted, sure, wrote also in the previous comment. I will add a task. For sure not part of this PR.
Park the IoLoopV2 fiber via io_event_.await() when the pipeline queue
exceeds limits, preventing busy-spinning. The fiber resumes when
memory is freed, commands are ready, the dispatch queue has items,
or the connection closes.
To prevent lost wakeups on thread-global limits, parked V2 connections
now register their io_event_ in a thread-local waiter list. This ensures
they reliably wake up when other connections free memory or when limits
are dynamically raised via CONFIG SET.
swap-and-pop unregister on cleanup.
notifying waiters when thread-global pipeline budget is actually freed.
Signed-off-by: Gil Levkovich 69595609+glevkovich@users.noreply.github.com