Skip to content

feat(conn): add pipeline backpressure for IoLoopV2#7018

Merged
glevkovich merged 5 commits into
mainfrom
glevkovich/v2loop_backpressure
Apr 14, 2026
Merged

feat(conn): add pipeline backpressure for IoLoopV2#7018
glevkovich merged 5 commits into
mainfrom
glevkovich/v2loop_backpressure

Conversation

@glevkovich
Copy link
Copy Markdown
Contributor

@glevkovich glevkovich commented Mar 30, 2026

Park the IoLoopV2 fiber via io_event_.await() when the pipeline queue
exceeds limits, preventing busy-spinning. The fiber resumes when
memory is freed, commands are ready, the dispatch queue has items,
or the connection closes.

To prevent lost wakeups on thread-global limits, parked V2 connections
now register their io_event_ in a thread-local waiter list. This ensures
they reliably wake up when other connections free memory or when limits
are dynamically raised via CONFIG SET.

  • ParseRedis breaks early if over limit, leaving bytes in io_buf_.
  • ExecuteBatch and ReplyBatch notify io_event_ to wake the fiber.
  • Global memory relief safely wakes parked V2 fibers, utilizing an O(1)
    swap-and-pop unregister on cleanup.
  • Wakeups are optimized to avoid the "thundering herd" problem by only
    notifying waiters when thread-global pipeline budget is actually freed.
  • Updated logs with (V1)/(V2) labels for easier debugging.
  • Added tests for V2 correctness and safe disconnects.

Signed-off-by: Gil Levkovich 69595609+glevkovich@users.noreply.github.com

Copilot AI review requested due to automatic review settings March 30, 2026 06:46
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds pipeline backpressure handling for the experimental IoLoopV2 to avoid busy-spinning when the per-thread pipeline buffer/queue limits are exceeded, and expands Python integration tests to cover both V1 and V2 behavior.

Changes:

  • Stop parsing further commands when pipeline limits are exceeded (leave bytes in io_buf_ for later).
  • In IoLoopV2, drain queued work and park the fiber on an event until progress is possible; wake-ups are triggered from several execution/reply paths.
  • Extend connection pipeline tests to run for both IoLoop variants and add V2-specific correctness + disconnect coverage.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.

File Description
src/facade/dragonfly_connection.cc Implements V2 backpressure parsing/parking logic and adds (V1)/(V2) log labeling + wake notifications.
tests/dragonfly/connection_test.py Adds/updates pipeline backpressure tests to cover V1+V2, response correctness, and safe disconnect behavior.

Comment thread src/facade/dragonfly_connection.cc
@augmentcode
Copy link
Copy Markdown

augmentcode Bot commented Mar 30, 2026

🤖 Augment PR Summary

Summary: Adds backpressure/parking to the experimental IoLoopV2 to avoid busy-spinning when the pipeline queue exceeds configured limits.

Changes:

  • Labels V1/V2 pipeline-overlimit warnings to simplify log-based debugging.
  • Stops RESP parsing early when running in enqueue-only mode and the pipeline queue crosses limits, leaving remaining bytes in io_buf_ for later iterations.
  • Adds a fast-path guard in ParseRedisBatch() to avoid parsing more data while already over the pipeline limits.
  • In IoLoopV2, drains existing work and then parks the fiber via io_event_.await() when still over the limit, resuming on replies/dispatch work/connection close.
  • Notifies io_event_ from ExecuteBatch() and ReplyBatch() (V2) so the parked fiber can resume after draining.
  • Extends Python integration tests for pipeline-overlimit behavior, adds additional correctness/disconnect coverage, and runs some cases with experimental_io_loop_v2 enabled.

Technical Notes: The backpressure condition is based on per-thread pipeline bytes (pipeline_buffer_limit) and per-connection queued command count (pipeline_queue_limit).

🤖 Was this summary useful? React with 👍 or 👎

Copy link
Copy Markdown

@augmentcode augmentcode Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review completed. 2 suggestions posted.

Fix All in Augment

Comment augment review to trigger a new review at any time.

Comment thread src/facade/dragonfly_connection.cc
Comment thread tests/dragonfly/connection_test.py Outdated
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.

Comment thread tests/dragonfly/connection_test.py Outdated
Comment thread tests/dragonfly/connection_test.py
@glevkovich glevkovich force-pushed the glevkovich/v2loop_backpressure branch from 4f067f5 to b10ff25 Compare March 30, 2026 10:40
Comment thread src/facade/dragonfly_connection.cc Outdated
// V2 connections parked on backpressure register their io_event_ here so that
// global memory relief (from other connections or CONFIG SET) can wake them.
// Access is thread-local (per-proactor), no synchronization needed.
std::vector<util::fb2::EventCount*> v2_pipeline_waiters;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if access is indeed thread-local, then using CondVarAny with NoOp lock is better,
as EventCount is designed for multi-threaded coordination.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will answer here Since i think few comments are related to the same issue.


QueueBackpressure& qbp = GetQueueBackpressure();
qbp.pipeline_cnd.notify_all();
qbp.NotifyPipelineWaiters();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am trying to understand how it worked before - we did not have O(n) array of fibers parked.
so what changed now?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will answer here Since i think few comments are related to the same issue.

Comment thread src/facade/dragonfly_connection.cc
Comment thread src/facade/dragonfly_connection.cc Outdated
qbp.v2_pipeline_waiters.push_back(&io_event_);
absl::Cleanup v2_unregister = [&qbp, this] {
auto& w = qbp.v2_pipeline_waiters;
auto it = std::find(w.begin(), w.end(), &io_event_);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i have a problem with this and also with NotifyPipelineWaiters that runs in O(N).

I want to understand what has changed that now we need O(n) runtime for these operations.
Specifically how this works in V1? I would guess we have a single cndVar that you block on. Why do we need to change the approach here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Answering here to 3 of your comments:

In V1:

  • We have 2 fibers per connection, one is a producer (IoLoop) and one is a consumer (the one is the consumer that runs Connection::AsyncFiber()).
  • If memory got too full, the IoLoop fiber could block on a thread-global CondVarAny (pipeline_cnd) in Dispatchsingle, while expecting AsyncFiber which was still awake, to eventually wake it up.
  • Connections waiting on backpressure all park on the same thread-level pipeline_cnd (CondVarAny) . One notify_all() reaches them all because they share a single wait object.

in V2:

  • There is only a single fiber now (no AsyncFiber). The single fiber uses io_event as a unified notification object for receive completions, connection shutdown, async message dispatch, and command completion.
  • When parked on io_event_ we must park on all, we can;t park on 2 different objects at the same time, and that includes parking on backpressure relief events. If we park on pipeline_cnd_ we will be "deaf" to all other events.

Regarding your comments:

  • "if access is indeed thread-local, then using CondVarAny with NoOp lock is better,
    as EventCount is designed for multi-threaded coordination.": you are right but as I wrote, the reason I can;t use pipeline_cndd for V2 is because the fiber can only be parked on a single object at a time and must expect all possible events, and the code was written to support passing events (e.g socket receive completions) via this io_event_.

  • "I am trying to understand how it worked before - we did not have O(n) array of fibers parked.
    so what changed now?" If you look at the implementation of the pipeline_cnd.notiftall(), it is also O(n), it's just in the internal implementation of CondVarAny:

bool WaitQueue::NotifyAll(FiberInterface* active) {
  bool notified = false;
  while (NotifyOne(active))
    notified = true;
  return notified;
}

To summarize:

what actually change is the way we notify and how we organize members logically:

  • V1 loop has all connections sharing one CondVarAny with a single notify_all() works.
  • V2 connections each have their own io_event_, and you need to explicitly reach each one in dragonfly_connection code. The vector is the minimal mechanism to do so.

Why we can't use a new or existing CondVarAny in my opinion?
Since it has the same fundamental issue - during the back-pressure wait, the fiber must remain parked on io_event_ because socket completions, shutdowns, and async dispatches all arrive exclusively via io_event_.notify(). A fiber can only wait in one place at a time, so we cannot park on both io_event_ and pipeline_cnd or any new CondVarAny simultaneously.

Let me know if you'd prefer to explore a different architectural approach for V2's event multiplexing. I think that this might require expanding the scope of this PR to refactor how V2 waits for events.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the difference here is that unregistering after waking up is now a O(n) operation due to the find, so if you wake up m connections at once, it will be O(m * n) list operations done asymptotically. The current helio waiters unlink in O(1) time. What can be done, is:

  1. Making the vector and std::list manually like helio does internally
  2. EventCount supports subscribe with a callback based waiter, support can also be extended to other primitives

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(2) is already used to wait for command execution

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the advice, I think I will go with option 2 (new sharted EventCount), it's faster / simpler and no need to modify helio at this stage. I'm working on it.

@glevkovich glevkovich force-pushed the glevkovich/v2loop_backpressure branch 2 times, most recently from dcf9fa6 to 30f22c6 Compare March 31, 2026 16:51
@glevkovich glevkovich requested a review from Copilot March 31, 2026 16:51
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.

Comment thread src/facade/dragonfly_connection.cc
Comment thread tests/dragonfly/connection_test.py
@glevkovich glevkovich requested a review from romange March 31, 2026 17:16
dranikpg
dranikpg previously approved these changes Apr 3, 2026
Comment on lines +371 to +376
// Notifies both V1 waiters (pipeline_cnd) and V2 subscribers (v2_pipeline_backpressure_ec).
void NotifyPipelineWaiters() {
pipeline_cnd.notify_all();
v2_pipeline_backpressure_ec.notifyAll();
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo: unite somewhen in the future, but maybe even not if we abolish v1 fast

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted, will consider add a task.

@romange
Copy link
Copy Markdown
Collaborator

romange commented Apr 3, 2026

Lets hold on with merging it until we release 1.38

Comment thread src/facade/dragonfly_connection.cc Outdated
Comment on lines +2856 to +2869
auto sub_key = qbp.v2_pipeline_backpressure_ec.check_or_subscribe(under_limit_pred,
&backpressure_waiter);

if (sub_key) { // we had to subscribe, so need to wait
io_event_.await([this, under_limit_pred]() {
bool cmd_ready = parsed_head_ && parsed_head_->CanReply();
// We wake up if:
// 1. Memory is freed (under_limit_pred()) or we can free it ourselves (cmd_ready).
// 2. Control-plane messages need processing (!dispatch_q_.empty()).
// 3. The connection is terminating (io_ec_).
return under_limit_pred() || cmd_ready || !dispatch_q_.empty() || io_ec_;
});
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can unite it somehow in the future like I did with the with the command waiter

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also this function is really big now, maybe we can move some parts out of it

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted. I won't do it in this PR.

@glevkovich
Copy link
Copy Markdown
Contributor Author

Lets hold on with merging it until we release 1.38

This PR is currently on hold, I won't merge it till we release 1.38.

Park the IoLoopV2 fiber via io_event_.await() when the pipeline queue
exceeds limits, preventing busy-spinning. The fiber resumes when
memory is freed, commands are ready, the dispatch queue has items,
or the connection closes.

To prevent lost wakeups on thread-global limits, parked V2 connections
now register their io_event_ in a thread-local waiter list. This ensures
they reliably wake up when other connections free memory or when limits
are dynamically raised via CONFIG SET.

- ParseRedis breaks early if over limit, leaving bytes in io_buf_.
- ExecuteBatch and ReplyBatch notify io_event_ to wake the fiber.
- Global memory relief safely wakes parked V2 fibers, utilizing an O(1)
  swap-and-pop unregister on cleanup.
- Wakeups are optimized to avoid the "thundering herd" problem by only
  notifying waiters when thread-global pipeline budget is actually freed.
- Updated logs with (V1)/(V2) labels for easier debugging.
- Added tests for V2 correctness and safe disconnects.

Signed-off-by: Gil Levkovich <69595609+glevkovich@users.noreply.github.com>
Signed-off-by: Gil Levkovich <69595609+glevkovich@users.noreply.github.com>
Signed-off-by: Gil Levkovich <69595609+glevkovich@users.noreply.github.com>
Signed-off-by: Gil Levkovich <69595609+glevkovich@users.noreply.github.com>
Signed-off-by: Gil Levkovich <69595609+glevkovich@users.noreply.github.com>
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 4 out of 4 changed files in this pull request and generated 1 comment.

Comment thread src/facade/dragonfly_connection.cc
Comment on lines +2847 to +2858
// After draining commands, notify all connections parked on backpressure relief
if (conn_stats.pipeline_queue_bytes < mem_before) {
qbp.NotifyPipelineWaiters();
}

// await block (backpressure)
// Re-check if pipeline buffer over limit after draining - ExecuteBatch/ReplyBatch may have
// freed memory. If still over limit, sleep to prevent busy-spin.
// Only park if this connection is actively contributing (parsed_cmd_q_len_ > 0).
// Connections with an empty queue must stay in the read loop.
bool post_over_limit =
(parsed_cmd_q_len_ > 0) &&
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My comment still applies. IoLoopV2 function now becomes veeeery long, it doesn't even fit on my screen. Maybe we can refactor it, at least in the future?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted, sure, wrote also in the previous comment. I will add a task. For sure not part of this PR.

@glevkovich glevkovich merged commit a8ff43d into main Apr 14, 2026
17 checks passed
@glevkovich glevkovich deleted the glevkovich/v2loop_backpressure branch April 14, 2026 15:37
@glevkovich
Copy link
Copy Markdown
Contributor Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants