feat: backpressure offloading for ioloopv2 (memcache)#6929
Conversation
| disk_push_in_flight_ = false; | ||
| if (ec) { | ||
| LOG(ERROR) << "Disk offload write failed: " << ec.message(); | ||
| // Should we abort the connection ? |
There was a problem hiding this comment.
How should we handle failing writes or reads ? Shall we shut down the connection ? Shall we retry ?
I guess a first step would be to just abort the connection ?
WDYT @romange
| const size_t to_offload = io_buf_.InputLen(); | ||
| disk_push_in_flight_ = true; | ||
|
|
||
| // We intentionally defer ConsumeInput until the write completes rather than deep-copying |
There was a problem hiding this comment.
This or copy by value.
I chose to pin the memory, effectively deferring to consume it until the in-flight write completes.
The cost here is that we need to be careful to not empty or consume or resize the io buffer.
The reason I opted in for this approach is that I don't want yet another level of copy
- Copy for the callback so we don't have to pin memory (I did not want this)
- Copy when you Write(). The kernel doesn't just write to disk, it just writes to the
page cacheand marks theregion as dirty - Finally the page cache gets written (flushed to disk)
I explicitly decided to remove (1) copy
@romange if you disagree object or anything else to add I am all eyes 😄
There was a problem hiding this comment.
Ok after having a little bit better understanding of the side effects of that decision, the answer is that it messes tcp ordering on io_buf_ and fixing those cases offloads the backpressure back on the tcp layer (which beats the purpose of it).
Using two buffers solves all the ordering issues and provides a good separation of concerns:
socket_read->io_buf->offloaded_to_disk
disk_read->io_buf_for_disk
This is a simple produer/consumer. socket reads go directly to disk and new parsed commands are first loaded/read from disk and then we fall back to io_buf_ read providing natural data flow and ordering.
🤖 Augment PR SummarySummary: Adds optional disk-backed offload for IoLoopV2 when large Memcached pipelines create backpressure. Changes:
Technical Notes: Offload defers 🤖 Was this summary useful? React with 👍 or 👎 |
| io_event_.await([this]() { | ||
| bool cmd_ready = | ||
| parsed_head_ && parsed_head_ != parsed_to_execute_ && parsed_head_->CanReply(); | ||
| return cmd_ready || !disk_push_in_flight_ || io_ec_; |
There was a problem hiding this comment.
In the force_await spin-guard, this predicate is immediately true when disk_push_in_flight_ is false, so if HandleSocketBackpressure() can’t start a push (e.g., init failure / backing file full) the fiber may not actually yield and can busy-spin while still over the offload threshold.
Severity: medium
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
There was a problem hiding this comment.
Pull request overview
Adds disk-backed offloading for IoLoopV2 (memcache) to prevent unbounded in-memory growth under pipeline backpressure by spilling raw socket bytes to a per-connection backing file and restoring them once the parsed-command queue drains.
Changes:
- Introduces
--pipeline_disk_offload_thresholdand integrates a per-connectionDiskBackedQueueintofacade::Connection. - Updates IoLoopV2/ParseLoop to trigger offload when
parsed_cmd_q_bytes_exceeds the threshold and to restore data when pressure subsides. - Adds a Python regression test that attempts to validate offload/restore via log scraping.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| tests/dragonfly/connection_test.py | Adds a memcache IoLoopV2 test intended to verify disk offload/restore via logs. |
| src/facade/dragonfly_connection.h | Declares disk-queue members and helper methods for offload/drain. |
| src/facade/dragonfly_connection.cc | Adds the offload threshold flag and implements offload/drain logic in IoLoopV2/ParseLoop and connection teardown. |
Signed-off-by: Kostas Kyrimis <kostas@dragonflydb.io>
8a90b31 to
882307f
Compare
There was a problem hiding this comment.
Pull request overview
Adds disk-backed offloading for IoLoopV2 memcache connections when the per-connection parsed command queue exceeds a configurable byte threshold, with a regression test to validate offload/restore behavior (Issue #6030).
Changes:
- Introduces
--pipeline_disk_offload_threshold(IoLoopV2 only) to trigger disk offloading onceparsed_cmd_q_bytes_crosses a byte threshold. - Adds per-connection disk offload state (
disk_queue_, in-flight flags) and logic to pushio_buf_bytes to disk and later restore them. - Adds a Python integration test asserting offload/restore activity via logs.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| tests/dragonfly/connection_test.py | Adds an async integration test that floods memcache to trigger disk offload/restore and asserts logs contain both events. |
| src/facade/dragonfly_connection.h | Declares disk offload helpers and adds per-connection disk queue/in-flight state. |
| src/facade/dragonfly_connection.cc | Implements offload threshold flag, push/pop offload logic, and integrates it into IoLoopV2/ParseLoop. |
| disk_push_in_flight_ = false; | ||
| if (ec) { | ||
| LOG(ERROR) << "Disk offload write failed: " << ec.message(); | ||
| // Should we abort the connection ? | ||
| } else { |
| io_event_.await([this]() { | ||
| bool cmd_ready = | ||
| parsed_head_ && parsed_head_ != parsed_to_execute_ && parsed_head_->CanReply(); | ||
| return cmd_ready || !disk_push_in_flight_ || io_ec_; | ||
| }); |
|
@romange plz look on my comments |
| // Returns true if one or more commands were parsed from the read buffer, | ||
| // and false if no complete commands could be parsed (for example, when | ||
| // parsing is pending more input). | ||
| bool ParseMCBatch(); |
There was a problem hiding this comment.
We do not pass mutable arge by reference. Why do you need to pass a buffer, it's not a data member?
| util::FiberSocketBase::ProvidedBuffer recv_buf_; | ||
| io::IoBuf io_buf_; // used in io loop and parsers | ||
| // parser input: fed exclusively from disk pops (or socket_buf_ when disk empty) | ||
| io::IoBuf io_buf_; |
There was a problem hiding this comment.
Why do you need 2 instances?
Signed-off-by: Kostas Kyrimis <kostas@dragonflydb.io>
resolves #6030