Skip to content

feat: backpressure offloading for ioloopv2 (memcache)#6929

Open
kostasrim wants to merge 11 commits into
mainfrom
memcache-offloading
Open

feat: backpressure offloading for ioloopv2 (memcache)#6929
kostasrim wants to merge 11 commits into
mainfrom
memcache-offloading

Conversation

@kostasrim
Copy link
Copy Markdown
Contributor

@kostasrim kostasrim commented Mar 19, 2026

  • add --pipeline_disk_offload_threshold (IoLoopV2 only) to trigger offloading once the parsed command queue exceeds a byte threshold.
  • add per-connection disk offload state (disk_queue_, in-flight flags) and helpers to push raw io_buf_ bytes to disk and later restore them.
  • add tests

resolves #6030

@kostasrim kostasrim self-assigned this Mar 19, 2026
Copilot AI review requested due to automatic review settings March 19, 2026 12:20
Comment thread src/facade/dragonfly_connection.cc Outdated
disk_push_in_flight_ = false;
if (ec) {
LOG(ERROR) << "Disk offload write failed: " << ec.message();
// Should we abort the connection ?
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How should we handle failing writes or reads ? Shall we shut down the connection ? Shall we retry ?

I guess a first step would be to just abort the connection ?

WDYT @romange

Comment thread src/facade/dragonfly_connection.cc Outdated
const size_t to_offload = io_buf_.InputLen();
disk_push_in_flight_ = true;

// We intentionally defer ConsumeInput until the write completes rather than deep-copying
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This or copy by value.

I chose to pin the memory, effectively deferring to consume it until the in-flight write completes.

The cost here is that we need to be careful to not empty or consume or resize the io buffer.

The reason I opted in for this approach is that I don't want yet another level of copy

  1. Copy for the callback so we don't have to pin memory (I did not want this)
  2. Copy when you Write(). The kernel doesn't just write to disk, it just writes to the page cache and marks the region as dirty
  3. Finally the page cache gets written (flushed to disk)

I explicitly decided to remove (1) copy

@romange if you disagree object or anything else to add I am all eyes 😄

Copy link
Copy Markdown
Contributor Author

@kostasrim kostasrim Mar 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok after having a little bit better understanding of the side effects of that decision, the answer is that it messes tcp ordering on io_buf_ and fixing those cases offloads the backpressure back on the tcp layer (which beats the purpose of it).

Using two buffers solves all the ordering issues and provides a good separation of concerns:

socket_read->io_buf->offloaded_to_disk
disk_read->io_buf_for_disk

This is a simple produer/consumer. socket reads go directly to disk and new parsed commands are first loaded/read from disk and then we fall back to io_buf_ read providing natural data flow and ordering.

@augmentcode
Copy link
Copy Markdown

augmentcode Bot commented Mar 19, 2026

🤖 Augment PR Summary

Summary: Adds optional disk-backed offload for IoLoopV2 when large Memcached pipelines create backpressure.

Changes:

  • Introduced --pipeline_disk_offload_threshold (IoLoopV2 only) to trigger offloading once the parsed command queue exceeds a byte threshold.
  • Added per-connection disk offload state (disk_queue_, in-flight flags) and helpers to push raw io_buf_ bytes to disk and later restore them.
  • Integrated offload/drain checks into ParseLoop() and IoLoopV2(), including recv-callback opportunistic offload and updated await conditions to avoid spinning while disk I/O is in flight.
  • Prevented CheckIoBufCapacity() from reallocating the I/O buffer while an async disk push/pop is pending to avoid UAF.
  • Added an async integration test that floods Memcached requests and asserts that offload/restore activity occurred.

Technical Notes: Offload defers ConsumeInput() until the async write completes so the in-flight buffer region remains stable for io_uring operations.

🤖 Was this summary useful? React with 👍 or 👎

Copy link
Copy Markdown

@augmentcode augmentcode Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review completed. 3 suggestions posted.

Fix All in Augment

Comment augment review to trigger a new review at any time.

Comment thread src/facade/dragonfly_connection.cc
Comment thread src/facade/dragonfly_connection.cc Outdated
io_event_.await([this]() {
bool cmd_ready =
parsed_head_ && parsed_head_ != parsed_to_execute_ && parsed_head_->CanReply();
return cmd_ready || !disk_push_in_flight_ || io_ec_;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the force_await spin-guard, this predicate is immediately true when disk_push_in_flight_ is false, so if HandleSocketBackpressure() can’t start a push (e.g., init failure / backing file full) the fiber may not actually yield and can busy-spin while still over the offload threshold.

Severity: medium

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

Comment thread tests/dragonfly/connection_test.py Outdated
@kostasrim kostasrim requested review from Copilot and removed request for Copilot March 19, 2026 12:25
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds disk-backed offloading for IoLoopV2 (memcache) to prevent unbounded in-memory growth under pipeline backpressure by spilling raw socket bytes to a per-connection backing file and restoring them once the parsed-command queue drains.

Changes:

  • Introduces --pipeline_disk_offload_threshold and integrates a per-connection DiskBackedQueue into facade::Connection.
  • Updates IoLoopV2/ParseLoop to trigger offload when parsed_cmd_q_bytes_ exceeds the threshold and to restore data when pressure subsides.
  • Adds a Python regression test that attempts to validate offload/restore via log scraping.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.

File Description
tests/dragonfly/connection_test.py Adds a memcache IoLoopV2 test intended to verify disk offload/restore via logs.
src/facade/dragonfly_connection.h Declares disk-queue members and helper methods for offload/drain.
src/facade/dragonfly_connection.cc Adds the offload threshold flag and implements offload/drain logic in IoLoopV2/ParseLoop and connection teardown.

Comment thread src/facade/dragonfly_connection.cc Outdated
Comment thread src/facade/dragonfly_connection.cc Outdated
Comment thread src/facade/dragonfly_connection.cc
Comment thread tests/dragonfly/connection_test.py Outdated
Signed-off-by: Kostas Kyrimis <kostas@dragonflydb.io>
@kostasrim kostasrim force-pushed the memcache-offloading branch from 8a90b31 to 882307f Compare March 19, 2026 13:39
@kostasrim kostasrim requested a review from Copilot March 19, 2026 13:40
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds disk-backed offloading for IoLoopV2 memcache connections when the per-connection parsed command queue exceeds a configurable byte threshold, with a regression test to validate offload/restore behavior (Issue #6030).

Changes:

  • Introduces --pipeline_disk_offload_threshold (IoLoopV2 only) to trigger disk offloading once parsed_cmd_q_bytes_ crosses a byte threshold.
  • Adds per-connection disk offload state (disk_queue_, in-flight flags) and logic to push io_buf_ bytes to disk and later restore them.
  • Adds a Python integration test asserting offload/restore activity via logs.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.

File Description
tests/dragonfly/connection_test.py Adds an async integration test that floods memcache to trigger disk offload/restore and asserts logs contain both events.
src/facade/dragonfly_connection.h Declares disk offload helpers and adds per-connection disk queue/in-flight state.
src/facade/dragonfly_connection.cc Implements offload threshold flag, push/pop offload logic, and integrates it into IoLoopV2/ParseLoop.

Comment thread src/facade/dragonfly_connection.cc Outdated
Comment thread src/facade/dragonfly_connection.cc Outdated
Comment on lines +2622 to +2626
disk_push_in_flight_ = false;
if (ec) {
LOG(ERROR) << "Disk offload write failed: " << ec.message();
// Should we abort the connection ?
} else {
Comment thread src/facade/dragonfly_connection.cc Outdated
Comment on lines +2937 to +2941
io_event_.await([this]() {
bool cmd_ready =
parsed_head_ && parsed_head_ != parsed_to_execute_ && parsed_head_->CanReply();
return cmd_ready || !disk_push_in_flight_ || io_ec_;
});
@kostasrim kostasrim requested a review from romange March 26, 2026 06:41
@kostasrim
Copy link
Copy Markdown
Contributor Author

@romange plz look on my comments

Comment thread src/facade/dragonfly_connection.h Outdated
// Returns true if one or more commands were parsed from the read buffer,
// and false if no complete commands could be parsed (for example, when
// parsing is pending more input).
bool ParseMCBatch();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not pass mutable arge by reference. Why do you need to pass a buffer, it's not a data member?

util::FiberSocketBase::ProvidedBuffer recv_buf_;
io::IoBuf io_buf_; // used in io loop and parsers
// parser input: fed exclusively from disk pops (or socket_buf_ when disk empty)
io::IoBuf io_buf_;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need 2 instances?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Offload connection backpressure to disk + test

3 participants