diff --git a/docs/disk-offload-state-machine.md b/docs/disk-offload-state-machine.md new file mode 100644 index 000000000000..4908571bbcf9 --- /dev/null +++ b/docs/disk-offload-state-machine.md @@ -0,0 +1,215 @@ +# Connection Disk Offload + +--- + +## 1. Watermark Flags + +Three thresholds form a three-level band: + +| Flag | Default | Role | +|----------------------------------------|--------------|------| +| `pipeline_disk_offload_threshold` | 0 (disabled) | **Offload trigger** – `parsed_cmd_q_bytes_ >= threshold` → incoming bytes go to disk instead of `io_buf_`. | +| `disk_backpressure_hysteresis_arm` | x KB | **High-water mark** – once `total_backing + queued >= arm`, `hysteresis_armed` is set, enabling the drain phase. | +| `disk_backpressure_hysteresis_trigger` | y KB | **Low-water mark** – while armed and `total_backing + queued < trigger`, `IsDraining()=true`. New socket reads are blocked until the queue fully drains to memory. | + + +**Why hysteresis?** + +We want to keep the disk queue active as long as it's busy and we also want to avoid the disk tax for pipelines that are being drained. Think of DrainDiskQueue reads to io_buf_, RecvNotification fires and we are forced to write +to disk. With hysterisis, we allow backpressure to fall naturally to tcp buffers while we drain the last chunks from the queue. Also note, we can use a staging buffer within the disk queue but I don't think it's worth it right now. + +--- + +## 2. `DiskBackedQueue` State Machine + +```mermaid +stateDiagram-v2 + [*] --> IDLE + IDLE --> ACTIVE : PushAsync called + ACTIVE --> IDLE : backing empty, both tracks idle, hysteresis_armed reset + ACTIVE --> CANCELLED : Cancel or I/O error + + state ACTIVE { + state WriteTrack { + [*] --> WriteIdle + WriteIdle --> Writing : PushAsync - MaybeFlushQueue + Writing --> WriteIdle : write CQE fires, queue empty + Writing --> Writing : write CQE fires, more queued - chain next write + } + -- + state ReadTrack { + [*] --> ReadIdle + ReadIdle --> Draining : hysteresis armed AND total+queued below trigger + ReadIdle --> Popping : DrainDiskQueue calls PopAsync + Draining --> Popping : DrainDiskQueue calls PopAsync - CanPush false during drain + Draining --> ReadIdle : backing empty - hysteresis_armed reset + Popping --> ReadIdle : read CQE fires, bytes to io_buf + } + } + + CANCELLED --> [*] : ConnectionFlow teardown waits IsActive = false +``` + +> **Note**: The two tracks run concurrently. A write CQE and a read CQE can be in-flight +> simultaneously — they always target different file offsets (writes append at `write_offset`, +> reads consume from `next_read_offset`). `IsActive() = false` only when **both** tracks +> are idle and `total_backing_bytes = 0`. + +### Key predicates + +``` +IsActive() = (!cancelled && total_backing_bytes > 0) + || !write_queue.empty() + || write_in_flight + || pop_in_flight + +IsDraining() = hysteresis_armed + && (total_backing_bytes + queued_bytes) < hysteresis_trigger + +CanPush(n) = !cancelled + && !IsDraining() + && (total_backing_bytes + queued_bytes + n) < max_backing_size +``` + +--- + +## 3. `DrainDiskQueue` – per-loop drain step + +Called once per `IoLoopV2` iteration, before `ReadPendingInput`. + +```mermaid +flowchart TD + A(["DrainDiskQueue called"]) --> B{"threshold=0 or disk empty?"} + B -- yes --> Z(["return – no-op"]) + B -- no --> C{"pop already in-flight?"} + C -- yes --> Z + C -- no --> D{"io_buf_ append space = 0?"} + D -- yes --> Z + D -- no --> E{"parsed_cmd_q_bytes_ >= threshold?"} + E -- yes --> Z2(["return – pipeline full, wait for cmd drain"]) + E -- no --> F["PopAsync(io_buf_.AppendBuffer)"] + F --> G(["CQE fires: io_buf_.CommitWrite, io_event_.notify"]) +``` + +The guard on `parsed_cmd_q_bytes_ >= threshold` is the back-pressure gate: the fiber +must wait for shard threads to execute commands and free memory before draining more +disk data into the parser. + +--- + +## 4. `NotifyOnRecv` – recv routing + +Called from the io_uring recv callback (edge-triggered). + +```mermaid +flowchart TD + A(["NotifyOnRecv"]) --> B{"socket error?"} + B -- yes --> Z(["set io_ec_, return"]) + B -- no --> C{"should_offload?\ndisk active OR q_bytes >= threshold"} + C -- no --> D["TryRecv into io_buf_, CommitWrite"] + C -- yes --> E{"CanPush(kMaxChunkSize)?"} + E -- no --> Z2(["return – TCP backpressure builds naturally"]) + E -- yes --> F["TryRecv into Chunk buffer"] + F --> G{"recv error?"} + G -- yes --> Z3(["set io_ec_ or drop EAGAIN, return"]) + G -- no --> H["PushAsync chunk to disk"] + H --> I{"pop_in_flight?"} + I -- yes --> Z4(["return – pop CQE callback will notify"]) + I -- no --> J(["io_event_.notify"]) +``` + +--- + +## 5. `IoLoopV2` – main loop + +```mermaid +flowchart TD + START(["IoLoopV2 start"]) --> INIT["read offload_threshold\npre-init disk queue\nregister RecvOnNotify CB"] + INIT --> LOOP + + LOOP(["loop iteration"]) --> MIG["HandleMigrateRequest"] + MIG --> WAITER["subscribe cmd_completion_waiter if in-flight commands"] + WAITER --> DRAIN["DrainDiskQueue"] + + DRAIN --> PIP{"pending_input_ AND NOT pop_in_flight?"} + PIP -- yes --> READ["ReadPendingInput: TryRecv into io_buf_"] + PIP -- no --> BUFCHECK + READ --> BUFCHECK + + BUFCHECK{"io_buf_ empty OR pop_in_flight?"} -- no --> PARSE_GATE + BUFCHECK -- yes --> POLL["if empty AND NOT pop_in_flight: NotifyOnRecv poll"] + POLL --> FLUSH["reply_builder_.Flush"] + FLUSH --> AWAIT["io_event_.await — wakes on any of:\n• can_parse: InputLen gt 0 AND NOT pop_in_flight\n• cmd_ready: head can reply\n• cmd_exec: head ready to execute\n• can_drain: disk non-empty AND q_bytes lt thr AND NOT pop_in_flight\n• can_poll: pending_input_ AND NOT pop_in_flight\n• dispatch_q non-empty\n• migration requested\n• io_ec_ set"] + AWAIT --> LOOP + + PARSE_GATE --> DISPATCH{"dispatch_q non-empty?"} + DISPATCH -- yes --> CTRL["drain control messages, continue"] + CTRL --> LOOP + + DISPATCH -- no --> FA{"force_await?\nq_bytes >= thr AND no cmd to execute"} + FA -- no --> OVERLIMIT{"pre_over_limit OR pop_in_flight OR io_buf_ empty?"} + OVERLIMIT -- no --> PARSE["ParseLoop: parse → execute → reply"] + PARSE --> LOOP + + OVERLIMIT -- yes --> EXEC["ExecuteBatch / ReplyBatch"] + FA -- yes --> EXEC + EXEC --> FA2{"force_await?"} + FA2 -- yes --> AWAIT2["io_event_.await:\ncmd_ready OR q_bytes < thr OR io_ec_ set"] + AWAIT2 --> BPL + FA2 -- no --> BPL + BPL{"post_over_limit?"} -- yes --> AWAIT3["io_event_.await pipeline backpressure relief"] + AWAIT3 --> LOOP + BPL -- no --> LOOP +``` + +### `force_await` dual role + +`force_await = threshold > 0 AND parsed_cmd_q_bytes_ >= threshold AND parsed_to_execute_ = null` + +1. **Spin guard** – prevents the fiber from busy-looping when `io_buf_` has data but the + pipeline is full. Forces a yield so shard threads can run and drain commands. +2. **Offload bypass guard** – `ReadPendingInput` calls `TryRecv` directly into `io_buf_` + with no `should_offload()` check. With `force_await` set, the parse guard skips + `ReadPendingInput`, ensuring bytes continue going to disk rather than leaking into + the in-memory buffer. + +### The pop-in-flight `io_event_` notify guards + +Two sites suppress `io_event_.notify()` while a pop CQE owns `io_buf_.AppendBuffer()`: + +| Site | Guard | +|------|-------| +| `RegisterOnRecv` lambda | `if (!disk_queue_ \|\| !IsPopInFlight()) notify()` | +| `PushAsync` write-CQE callback | `if (!IsPopInFlight()) notify()` | + +Without both guards the fiber wakes, sees `can_parse=false` (buffer half-written), +loops again, and starves the proactor that needs to fire the pop CQE. + +The await predicate adds the matching guard on `can_poll`: +```cpp +bool can_poll = pending_input_ && !pop_in_flight; +``` +This prevents spinning on `ReadPendingInput` (which is guarded by `!pop_in_flight` +at its call site) while `pending_input_` stays true. + +--- + +## 6. End-of-connection teardown + +```mermaid +sequenceDiagram + participant CF as ConnectionFlow + participant DQ as DiskBackedQueue + participant IO as io_event + + CF->>DQ: Cancel() + Note over DQ: cancelled=true, flush pending write CBs with operation_canceled + CF->>DQ: IsActive()? + alt in-flight CQEs still pending + CF->>IO: await NOT IsActive() + DQ-->>IO: write/pop CQE lands, notify + IO-->>CF: wakes + end + CF->>DQ: reset() / Close() + Note over DQ: unlink backing file +``` diff --git a/src/facade/disk_backed_queue.cc b/src/facade/disk_backed_queue.cc index 81d1bd46d516..2e801c54e697 100644 --- a/src/facade/disk_backed_queue.cc +++ b/src/facade/disk_backed_queue.cc @@ -14,7 +14,9 @@ #include #include +#include #include +#include #include "base/flags.h" #include "facade/facade_types.h" @@ -22,6 +24,7 @@ #include "util/fibers/uring_proactor.h" using facade::operator""_MB; +using facade::operator""_KB; ABSL_FLAG(std::string, disk_backpressure_folder, "/tmp/", "Folder to store disk-backed connection backpressure"); @@ -30,16 +33,46 @@ ABSL_FLAG(size_t, disk_backpressure_file_max_bytes, 50_MB, "Maximum size of the backing file. When max size is reached, connection will " "stop offloading backpressure to disk and block on client read."); +ABSL_FLAG(size_t, disk_backpressure_hysteresis_trigger, 16_KB, + "Low-water mark for the disk-backed queue hysteresis band. Once the high-water " + "mark (disk_backpressure_hysteresis_arm) has been crossed, new socket reads are " + "blocked when the queue shrinks below this threshold, ensuring a clean drain-to-" + "memory transition without new writes racing the tail reads. Defaults to 2× " + "kMaxChunkSize (16 KB)."); + +ABSL_FLAG(size_t, disk_backpressure_hysteresis_arm, 256_KB, + "High-water mark for the disk-backed queue hysteresis band. The queue must " + "accumulate at least this many bytes before the low-water drain phase can " + "activate. Prevents a tiny transient queue from ever triggering the drain-to-" + "memory block. Must be larger than disk_backpressure_hysteresis_trigger. " + "Defaults to 256 KB."); + namespace facade { struct DiskBackedQueue::Impl { std::unique_ptr file; size_t write_offset = 0; - size_t total_backing_bytes = 0; + size_t total_backing_bytes = 0; // bytes actually on disk, available to pop + size_t queued_bytes = 0; // bytes in write_queue not yet written to disk size_t next_read_offset = 0; // Tracks how far into the file holes have been punched (always 4096-aligned). size_t punch_offset = 0; size_t in_flight_callbacks = 0; + bool write_in_flight = false; + bool pop_in_flight = false; + // Set by Cancel(). In-flight CQE callbacks skip accounting and chaining when true. + bool cancelled = false; + // Set when total bytes cross hysteresis_arm on the way up; cleared when the + // queue fully empties. Together with hysteresis_trigger_ this forms the two-level + // hysteresis band that governs draining mode. + bool hysteresis_armed = false; + size_t hysteresis_arm = 0; + + struct PendingWrite { + Chunk chunk; + AsyncPushCallback cb; + }; + std::deque write_queue; // Punch holes over the aligned region we have fully read past so the OS can reclaim pages. void MaybePunchHole() { @@ -51,12 +84,62 @@ struct DiskBackedQueue::Impl { punch_offset = aligned_end; } } + + void MaybeFlushQueue() { + if (cancelled || write_in_flight || write_queue.empty()) + return; + + write_in_flight = true; + + PendingWrite pw = std::move(write_queue.front()); + write_queue.pop_front(); + + const uint8_t* data_ptr = pw.chunk.data.data(); + const size_t size = pw.chunk.data.size(); + const size_t offset = write_offset; + + file->WriteAsync({data_ptr, size}, offset, [this, pw = std::move(pw), size](int res) mutable { + --in_flight_callbacks; + write_in_flight = false; + + std::error_code ec; + if (res < 0) { + ec = {-res, std::system_category()}; + LOG(ERROR) << "Failed to offload chunk of size " << size + << " to backing with error: " << ec; + cancelled = true; + } else if (!cancelled) { + write_offset += size; + total_backing_bytes += size; + queued_bytes -= size; + if (!hysteresis_armed && (total_backing_bytes + queued_bytes) >= hysteresis_arm) + hysteresis_armed = true; + } + + if (cancelled) + ec = std::make_error_code(std::errc::operation_canceled); + + VLOG(2) << "Write CQE done: cancelled=" << cancelled << " write_in_flight=" << write_in_flight + << " write_queue.size=" << write_queue.size() << " pop_in_flight=" << pop_in_flight; + + // Chain next write before invoking cb, so IsActive() reflects + // the true state when the callback notifies the connection. + // MaybeFlushQueue is a no-op when cancelled. + if (!ec && !write_queue.empty()) + MaybeFlushQueue(); + + pw.cb(ec); + }); + ++in_flight_callbacks; + } }; DiskBackedQueue::DiskBackedQueue(uint32_t conn_id) : impl_(std::make_unique()), max_backing_size_(absl::GetFlag(FLAGS_disk_backpressure_file_max_bytes)), + hysteresis_trigger_(absl::GetFlag(FLAGS_disk_backpressure_hysteresis_trigger)), id_(conn_id) { + impl_->hysteresis_arm = absl::GetFlag(FLAGS_disk_backpressure_hysteresis_arm); } DiskBackedQueue::~DiskBackedQueue() { @@ -96,44 +179,86 @@ bool DiskBackedQueue::Empty() const { return impl_->total_backing_bytes == 0; } +void DiskBackedQueue::Cancel() { + impl_->cancelled = true; + impl_->queued_bytes = 0; + impl_->total_backing_bytes = 0; + // Drain items that were queued but never submitted to io_uring. + // MaybeFlushQueue won't process them once cancelled is set, so we must + // invoke their callbacks here to avoid leaving IsActive() permanently true. + VLOG(2) << "DiskQueue Cancel: flushing write_queue.size=" << impl_->write_queue.size() + << " write_in_flight=" << impl_->write_in_flight + << " pop_in_flight=" << impl_->pop_in_flight; + auto ec = std::make_error_code(std::errc::operation_canceled); + for (auto& pw : impl_->write_queue) + pw.cb(ec); + impl_->write_queue.clear(); +} + +bool DiskBackedQueue::IsActive() const { + return (!impl_->cancelled && impl_->total_backing_bytes > 0) || !impl_->write_queue.empty() || + impl_->write_in_flight || impl_->pop_in_flight; +} + +bool DiskBackedQueue::IsDraining() const { + if (!impl_->hysteresis_armed) + return false; + return (impl_->total_backing_bytes + impl_->queued_bytes) < hysteresis_trigger_; +} + +bool DiskBackedQueue::IsPopInFlight() const { + return impl_->pop_in_flight; +} + bool DiskBackedQueue::HasEnoughBackingSpaceFor(size_t bytes) const { - return (bytes + impl_->total_backing_bytes) < max_backing_size_; + return (bytes + impl_->total_backing_bytes + impl_->queued_bytes) < max_backing_size_; } -void DiskBackedQueue::PushAsync(io::Bytes bytes, AsyncPushCallback cb) { - const size_t offset = impl_->write_offset; - const size_t size = bytes.size(); - ++impl_->in_flight_callbacks; +bool DiskBackedQueue::CanPush(size_t bytes) const { + if (impl_->cancelled) + return false; + if (IsDraining()) + return false; + return HasEnoughBackingSpaceFor(bytes); +} - impl_->file->WriteAsync(bytes, offset, [this, size, cb = std::move(cb)](int res) { - --impl_->in_flight_callbacks; - if (res < 0) { - std::error_code ec{-res, std::system_category()}; - VLOG(2) << "Failed to offload blob of size " << size << " to backing with error: " << ec; - cb(ec); - return; - } +void DiskBackedQueue::PushAsync(Chunk chunk, AsyncPushCallback cb) { + DCHECK(!chunk.data.empty()); - impl_->write_offset += size; - impl_->total_backing_bytes += size; - VLOG(2) << "Offload connection " << this << " backpressure of " << size; - cb({}); - }); + impl_->queued_bytes += chunk.data.size(); + impl_->write_queue.push_back({std::move(chunk), std::move(cb)}); + VLOG(3) << "PushAsync: queued_bytes=" << impl_->queued_bytes + << " write_queue.size=" << impl_->write_queue.size() + << " write_in_flight=" << impl_->write_in_flight << " armed=" << impl_->hysteresis_armed; + impl_->MaybeFlushQueue(); } void DiskBackedQueue::PopAsync(io::MutableBytes out, AsyncPopCallback cb) { + DCHECK(!impl_->pop_in_flight); + DCHECK(!impl_->cancelled); + DCHECK_GT(impl_->total_backing_bytes, 0u); + const size_t to_read = std::min(impl_->total_backing_bytes, out.size()); const size_t offset = impl_->next_read_offset; ++impl_->in_flight_callbacks; + impl_->pop_in_flight = true; io::MutableBytes read_buf = out.subspan(0, to_read); impl_->file->ReadAsync(read_buf, offset, [this, to_read, offset, cb = std::move(cb)](int res) { --impl_->in_flight_callbacks; + impl_->pop_in_flight = false; + + if (impl_->cancelled) { + cb(nonstd::make_unexpected(std::make_error_code(std::errc::operation_canceled))); + return; + } + if (res < 0) { std::error_code ec{-res, std::system_category()}; LOG(ERROR) << "Could not load item at offset " << offset << " of size " << to_read << " from disk with error: " << ec.value() << " " << ec.message(); + impl_->cancelled = true; cb(nonstd::make_unexpected(ec)); return; } @@ -142,8 +267,14 @@ void DiskBackedQueue::PopAsync(io::MutableBytes out, AsyncPopCallback cb) { impl_->next_read_offset += bytes_read; impl_->total_backing_bytes -= bytes_read; - VLOG(2) << "Loaded item with offset " << offset << " of size " << bytes_read - << " for connection " << this; + // Reset hysteresis once the queue is fully drained (nothing on disk, nothing pending write). + // This re-arms the high-water mark check so the next growth cycle starts fresh. + if (impl_->total_backing_bytes == 0 && impl_->write_queue.empty() && !impl_->write_in_flight) + impl_->hysteresis_armed = false; + + VLOG(2) << "PopAsync done: bytes_read=" << bytes_read + << " total_backing=" << impl_->total_backing_bytes + << " armed=" << impl_->hysteresis_armed << " cancelled=" << impl_->cancelled; impl_->MaybePunchHole(); @@ -160,7 +291,7 @@ namespace facade { struct DiskBackedQueue::Impl {}; DiskBackedQueue::DiskBackedQueue(uint32_t conn_id) - : impl_(std::make_unique()), max_backing_size_(0), id_(conn_id) { + : impl_(std::make_unique()), max_backing_size_(0), hysteresis_trigger_(0), id_(conn_id) { } DiskBackedQueue::~DiskBackedQueue() = default; @@ -177,11 +308,30 @@ bool DiskBackedQueue::Empty() const { return true; } +bool DiskBackedQueue::IsActive() const { + return false; +} + +bool DiskBackedQueue::IsDraining() const { + return false; +} + +bool DiskBackedQueue::IsPopInFlight() const { + return false; +} + +bool DiskBackedQueue::CanPush(size_t) const { + return false; +} + +void DiskBackedQueue::Cancel() { +} + std::error_code DiskBackedQueue::Close() { return {}; } -void DiskBackedQueue::PushAsync(io::Bytes, AsyncPushCallback cb) { +void DiskBackedQueue::PushAsync(Chunk, AsyncPushCallback cb) { cb(std::make_error_code(std::errc::function_not_supported)); } diff --git a/src/facade/disk_backed_queue.h b/src/facade/disk_backed_queue.h index d7d7ddfd81c6..94ee967440b1 100644 --- a/src/facade/disk_backed_queue.h +++ b/src/facade/disk_backed_queue.h @@ -7,6 +7,7 @@ #include #include #include +#include #include "io/io.h" @@ -17,26 +18,61 @@ namespace facade { // HasEnoughBackingSpaceFor() always returns false, so callers never push/pop. class DiskBackedQueue { public: + // Maximum number of bytes per chunk. Recvs read into a Chunk-sized buffer; + // actual chunk size is the number of bytes received (may be smaller). + static constexpr size_t kMaxChunkSize = 8192; + + struct Chunk { + // 0 < data.size() * 8 < kMaxChunkSize + std::vector data; + }; + explicit DiskBackedQueue(uint32_t conn_id); ~DiskBackedQueue(); std::error_code Init(); + // Returns true if the queue has any pending work: data on disk, chunks + // waiting to be written, a write CQE in-flight, or a pop CQE in-flight. + bool IsActive() const; + + // Returns true when remaining bytes have dropped below the drain threshold after + // previously exceeding the hysteresis threshold. The caller should block new + // socket reads to allow a clean drain-to-memory transition. + bool IsDraining() const; + + // Returns true if there is a pop in flight + bool IsPopInFlight() const; + + // Returns true if the queue can accept new data right now. + // False when draining, at capacity, or cancelled. + bool CanPush(size_t bytes) const; + // Check if we can offload bytes to backing file. + // Counts both on-disk bytes and bytes queued for writing (not yet on disk). bool HasEnoughBackingSpaceFor(size_t bytes) const; using AsyncPushCallback = std::function; - void PushAsync(io::Bytes bytes, AsyncPushCallback cb); + // Takes ownership of chunk and enqueues it for async write to disk. + // The caller's buffer is free the instant PushAsync returns. + // cb is invoked after the chunk's write CQE fires. + void PushAsync(Chunk chunk, AsyncPushCallback cb); using AsyncPopCallback = std::function)>; // Async read variant. Callback is invoked with Result containing bytes read or error. + // Must only be called when no pop is already in-flight and the backing store is non-empty. void PopAsync(io::MutableBytes out, AsyncPopCallback cb); - // Check if backing file is empty, i.e. backing file has 0 bytes. + // Check if backing store (on-disk bytes) is empty. bool Empty() const; + // Cancel all pending (not-yet-submitted) writes, invoking their callbacks with + // operation_canceled. + // Caller must also wait the in-flight operations to complete + void Cancel(); + std::error_code Close(); private: @@ -46,6 +82,7 @@ class DiskBackedQueue { // Read only constants const size_t max_backing_size_; + const size_t hysteresis_trigger_; // same as connection id. Used to uniquely identify the backed file const size_t id_; diff --git a/src/facade/disk_backed_queue_test.cc b/src/facade/disk_backed_queue_test.cc index 1bf7ae59d2ca..530e2936b361 100644 --- a/src/facade/disk_backed_queue_test.cc +++ b/src/facade/disk_backed_queue_test.cc @@ -26,6 +26,16 @@ namespace { using namespace facade; +DiskBackedQueue::Chunk MakeChunk(const std::string& s) { + DiskBackedQueue::Chunk chunk; + chunk.data.assign(s.begin(), s.end()); + return chunk; +} + +io::MutableBytes AsMutableBytes(std::string& s) { + return io::MutableBytes(reinterpret_cast(s.data()), s.size()); +} + class DiskBackedQueueTest : public testing::Test { protected: void SetUp() override { @@ -53,11 +63,10 @@ TEST_F(DiskBackedQueueTest, PunchHoleReleasesSpace) { std::string data(12288, 'x'); { util::fb2::Done done; - backing.PushAsync(io::MutableBytes(reinterpret_cast(data.data()), data.size()), - [&done](std::error_code ec) { - ASSERT_FALSE(ec); - done.Notify(); - }); + backing.PushAsync(MakeChunk(data), [&done](std::error_code ec) { + ASSERT_FALSE(ec); + done.Notify(); + }); done.Wait(); } @@ -65,7 +74,7 @@ TEST_F(DiskBackedQueueTest, PunchHoleReleasesSpace) { std::string results; while (!backing.Empty()) { std::string buf(4096, '\0'); - auto out = io::MutableBytes(reinterpret_cast(buf.data()), buf.size()); + auto out = AsMutableBytes(buf); util::fb2::Done done; backing.PopAsync(out, [&done, &results, &buf](io::Result res) { ASSERT_TRUE(res); @@ -99,18 +108,17 @@ TEST_F(DiskBackedQueueTest, PunchHoleAdvancesOffset) { std::string data(32768, 'y'); { util::fb2::Done done; - backing.PushAsync(io::MutableBytes(reinterpret_cast(data.data()), data.size()), - [&done](std::error_code ec) { - ASSERT_FALSE(ec); - done.Notify(); - }); + backing.PushAsync(MakeChunk(data), [&done](std::error_code ec) { + ASSERT_FALSE(ec); + done.Notify(); + }); done.Wait(); } // Read exactly 4096 bytes (1 page). { std::string buf(4096, '\0'); - auto out = io::MutableBytes(reinterpret_cast(buf.data()), buf.size()); + auto out = AsMutableBytes(buf); util::fb2::Done done; backing.PopAsync(out, [&done](io::Result res) { ASSERT_TRUE(res); @@ -146,11 +154,10 @@ TEST_F(DiskBackedQueueTest, PunchHoleUnalignedReadsAndWrites) { std::string data(10000, 'z'); { util::fb2::Done done; - backing.PushAsync(io::MutableBytes(reinterpret_cast(data.data()), data.size()), - [&done](std::error_code ec) { - ASSERT_FALSE(ec); - done.Notify(); - }); + backing.PushAsync(MakeChunk(data), [&done](std::error_code ec) { + ASSERT_FALSE(ec); + done.Notify(); + }); done.Wait(); } @@ -160,7 +167,7 @@ TEST_F(DiskBackedQueueTest, PunchHoleUnalignedReadsAndWrites) { std::string results; { std::string buf(3000, '\0'); - auto out = io::MutableBytes(reinterpret_cast(buf.data()), buf.size()); + auto out = AsMutableBytes(buf); util::fb2::Done done; backing.PopAsync(out, [&done, &results, &buf](io::Result res) { ASSERT_TRUE(res); @@ -183,7 +190,7 @@ TEST_F(DiskBackedQueueTest, PunchHoleUnalignedReadsAndWrites) { // Now the first page (0-4095) should be punched. { std::string buf(2000, '\0'); - auto out = io::MutableBytes(reinterpret_cast(buf.data()), buf.size()); + auto out = AsMutableBytes(buf); util::fb2::Done done; backing.PopAsync(out, [&done, &results, &buf](io::Result res) { ASSERT_TRUE(res); @@ -206,7 +213,7 @@ TEST_F(DiskBackedQueueTest, PunchHoleUnalignedReadsAndWrites) { // Now the first two pages (0-8191) should be punched. { std::string buf(3500, '\0'); - auto out = io::MutableBytes(reinterpret_cast(buf.data()), buf.size()); + auto out = AsMutableBytes(buf); util::fb2::Done done; backing.PopAsync(out, [&done, &results, &buf](io::Result res) { ASSERT_TRUE(res); @@ -226,7 +233,7 @@ TEST_F(DiskBackedQueueTest, PunchHoleUnalignedReadsAndWrites) { // Read remaining data and verify results match. while (!backing.Empty()) { std::string buf(4096, '\0'); - auto out = io::MutableBytes(reinterpret_cast(buf.data()), buf.size()); + auto out = AsMutableBytes(buf); util::fb2::Done done; backing.PopAsync(out, [&done, &results, &buf](io::Result res) { ASSERT_TRUE(res); @@ -256,10 +263,8 @@ TEST_F(DiskBackedQueueTest, AsyncReadWrite) { util::fb2::Fiber write_fiber = util::fb2::Fiber("writer", [&]() { for (size_t i = 0; i < 100; ++i) { auto cmd = absl::StrCat("SET FOO", i, " BAR"); - auto bytes = io::MutableBytes(reinterpret_cast(cmd.data()), cmd.size()); - util::fb2::Done done; - backing.PushAsync(bytes, [&done](std::error_code ec) { + backing.PushAsync(MakeChunk(cmd), [&done](std::error_code ec) { EXPECT_FALSE(ec); done.Notify(); }); @@ -274,7 +279,7 @@ TEST_F(DiskBackedQueueTest, AsyncReadWrite) { util::fb2::Fiber read_fiber = util::fb2::Fiber("reader", [&]() { while (!backing.Empty()) { std::string buf(1024, 'c'); - auto bytes = io::MutableBytes(reinterpret_cast(buf.data()), buf.size()); + auto bytes = AsMutableBytes(buf); util::fb2::Done done; backing.PopAsync(bytes, [&done, &results, &buf](io::Result res) { @@ -304,18 +309,17 @@ TEST_F(DiskBackedQueueTest, AsyncPunchHole) { std::string data(12288, 'x'); util::fb2::Done write_done; - backing.PushAsync(io::MutableBytes(reinterpret_cast(data.data()), data.size()), - [&write_done](std::error_code ec) { - ASSERT_FALSE(ec); - write_done.Notify(); - }); + backing.PushAsync(MakeChunk(data), [&write_done](std::error_code ec) { + ASSERT_FALSE(ec); + write_done.Notify(); + }); write_done.Wait(); // Async read all data back in 4096-byte chunks std::string results; while (!backing.Empty()) { std::string buf(4096, '\0'); - auto out = io::MutableBytes(reinterpret_cast(buf.data()), buf.size()); + auto out = AsMutableBytes(buf); util::fb2::Done read_done; backing.PopAsync(out, [&read_done, &results, &buf](io::Result res) { diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index cf402b59d869..767dd61da037 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -69,6 +69,11 @@ ABSL_FLAG(string, admin_bind, "", ABSL_FLAG(strings::MemoryBytesFlag, request_cache_limit, 64_MB, "Amount of memory to use for request cache in bytes - per IO thread."); +ABSL_FLAG(strings::MemoryBytesFlag, pipeline_disk_offload_threshold, 0, + "IoLoopV2 only: when the parsed command queue exceeds this many bytes, raw socket data " + "is offloaded to a disk-backed queue instead of being kept in memory. " + "0 means disabled (default)."); + ABSL_FLAG(strings::MemoryBytesFlag, pipeline_buffer_limit, 128_MB, "Amount of memory to use for storing pipeline requests - per IO thread." "Please note that clients that send excecissively huge pipelines, " @@ -1212,6 +1217,7 @@ void Connection::ConnectionFlow() { ++conn_stats.conn_received_cnt; ++local_stats_.read_cnt; + local_stats_.net_bytes_in += io_buf_.InputLen(); ParserStatus parse_status = OK; @@ -1274,6 +1280,22 @@ void Connection::ConnectionFlow() { socket_->ResetOnRecvHook(); } + if (disk_queue_) { + // Cancel pending (queued but not-yet-submitted) writes immediately. + // This leaves at most one in-flight write CQE and one in-flight pop CQE, + // both of which will complete quickly without draining the full queue. + disk_queue_->Cancel(); + VLOG(2) << "DiskQueue post-Cancel: IsActive=" << disk_queue_->IsActive() + << " IsPopInFlight=" << disk_queue_->IsPopInFlight() << " conn=" << id_; + if (disk_queue_->IsActive()) { + VLOG(2) << "Waiting for in-flight disk CQEs to complete conn=" << id_; + io_event_.await([this] { return !disk_queue_->IsActive(); }); + VLOG(2) << "In-flight disk CQEs done conn=" << id_; + } + std::ignore = disk_queue_->Close(); + disk_queue_.reset(); + } + // We wait for dispatch_fb to finish writing the previous replies before replying to the last // offending request. if (parse_status == ERROR) { @@ -1474,8 +1496,29 @@ auto Connection::ParseLoop() -> ParserStatus { auto parse_func = protocol_ == Protocol::MEMCACHE ? &Connection::ParseMCBatch : &Connection::ParseRedisBatch; + const size_t offload_threshold = + ioloop_v2_ ? size_t(GetFlag(FLAGS_pipeline_disk_offload_threshold)) : 0u; + bool commands_parsed = false; do { + // if the queue is already over threshold from previously dispatched async + // commands, offload remaining io_buf to disk and stop parsing new commands. + // We still execute and reply already-queued commands such that the parsed_cmd_q_bytes + // drains. + VLOG(4) << "ParseLoop check: parsed_cmd_q_bytes_=" << parsed_cmd_q_bytes_ + << " threshold=" << offload_threshold << " buf_len=" << io_buf_.InputLen(); + if (offload_threshold > 0 && parsed_cmd_q_bytes_ >= offload_threshold) { + VLOG(3) << "Offload triggered. offload_threshold=" << offload_threshold + << " buf_len=" << io_buf_.InputLen() << " parsed_head_=" << parsed_head_ + << " parsed_to_execute_=" << parsed_to_execute_; + // New recvs are routed to chunks by NotifyOnRecv; no explicit push needed here. + if (!ExecuteBatch()) + return ERROR; + if (!ReplyBatch()) + return ERROR; + break; + } + commands_parsed = (this->*parse_func)(io_buf_); if (!ExecuteBatch()) @@ -1483,6 +1526,7 @@ auto Connection::ParseLoop() -> ParserStatus { if (!ReplyBatch()) return ERROR; + } while (commands_parsed && io_buf_.InputLen() > 0); return commands_parsed ? OK : NEED_MORE; @@ -2766,6 +2810,42 @@ void Connection::EnsureMemoryBudget(unsigned tid) { thread_queue_backpressure[tid].EnsureBelowLimit(); } +void Connection::DrainDiskQueue(size_t offload_threshold) { + if (offload_threshold == 0 || !disk_queue_ || disk_queue_->Empty()) + return; + if (disk_queue_->IsPopInFlight() || io_buf_.AppendLen() == 0) + return; + VLOG(3) << "DrainDiskQueue: parsed_cmd_q_bytes_=" << parsed_cmd_q_bytes_ + << " threshold=" << offload_threshold << " pop_in_flight=" << disk_queue_->IsPopInFlight() + << " append_len=" << io_buf_.AppendLen() << " IsDraining=" << disk_queue_->IsDraining() + << " CanPush=" << disk_queue_->CanPush(0); + if (parsed_cmd_q_bytes_ >= offload_threshold) + return; + + disk_queue_->PopAsync(io_buf_.AppendBuffer(), [this](io::Result res) { + if (res) { + io_buf_.CommitWrite(*res); + VLOG(3) << "Restored " << *res << " bytes from disk for connection " << id_; + } else { + LOG(ERROR) << "Disk offload read failed: " << res.error().message(); + } + io_event_.notify(); + }); +} + +bool Connection::InitDiskQueueIfNeeded() { + if (disk_queue_) + return true; + disk_queue_ = std::make_unique(id_); + if (auto ec = disk_queue_->Init(); ec) { + LOG(WARNING) << "Failed to init disk-backed queue for connection " << id_ << ": " + << ec.message(); + disk_queue_.reset(); + return false; + } + return true; +} + ConnectionRef::ConnectionRef(const std::shared_ptr& ptr, unsigned thread_id, uint32_t client_id) : ptr_{ptr}, last_known_thread_id_{thread_id}, client_id_{client_id} { @@ -2803,6 +2883,13 @@ void Connection::NotifyOnRecv(const util::FiberSocketBase::RecvNotification& n) return; } + // True when incoming socket bytes should be routed to the disk queue rather than io_buf_. + auto should_offload = [this]() { + if (offload_threshold_ == 0 || !disk_queue_) + return false; + return disk_queue_->IsActive() || parsed_cmd_q_bytes_ >= offload_threshold_; + }; + using RecvNoti = util::FiberSocketBase::RecvNotification::RecvCompletion; if (std::holds_alternative(n.read_result)) { if (!std::get(n.read_result)) { // false - connection aborted @@ -2810,13 +2897,74 @@ void Connection::NotifyOnRecv(const util::FiberSocketBase::RecvNotification& n) return; } - pending_input_ = true; - } else if (std::holds_alternative(n.read_result)) { // provided buffer. - io::MutableBytes buf = std::get(n.read_result); - { - ReadBufTracker tracker(io_buf_); - io_buf_.WriteAndCommit(buf.data(), buf.size()); + if (should_offload()) { + // CanPush covers all queue-side reasons to block a recv: draining window, + // backing store full, or cancelled. TCP backpressure builds naturally when + // we stop reading from the socket. + if (!disk_queue_->CanPush(DiskBackedQueue::kMaxChunkSize)) + return; + + DiskBackedQueue::Chunk chunk; + chunk.data.resize(DiskBackedQueue::kMaxChunkSize); + io::Result res = socket_->TryRecv({chunk.data.data(), chunk.data.size()}); + + if (!res) { + auto ec = res.error(); + if (ec != errc::resource_unavailable_try_again && ec != errc::operation_would_block) + io_ec_ = ec; + return; + } + if (*res == 0) { + io_ec_ = make_error_code(errc::connection_aborted); + return; + } + chunk.data.resize(*res); + VLOG(3) << "Offloaded " << *res << " bytes to disk for connection " << id_; + disk_queue_->PushAsync(std::move(chunk), [this](std::error_code ec) { + if (ec) + io_ec_ = ec; + // Don't notify while a pop CQE is in flight: the pop callback will wake the + // fiber when io_buf_ data is ready. Notifying on every write CQE causes a + // cooperative-scheduler starvation spin — the fiber loops checking can_parse + // (always false while pop is in flight) and starves the proactor. + if (!disk_queue_->IsPopInFlight()) + io_event_.notify(); + }); + return; } + + // Normal path: recv directly into io_buf_. + // A recv call can return fewer bytes than requested even if the socket buffer actually + // contains enough data to satisfy the full request. + // TODO: maybe worth looping here and trying another recv call until it fails with + // EAGAIN or EWOULDBLOCK. The problem there is that we need to handle resizing if + // AppendBuffer is zero. + if (io_buf_.AppendLen() == 0) + return; + + io::MutableBytes buf = io_buf_.AppendBuffer(); + io::Result res = socket_->TryRecv(buf); + + if (res) { + if (*res > 0) { + io_buf_.CommitWrite(*res); + return; + } + io_ec_ = make_error_code(errc::connection_aborted); + return; + } + + auto ec = res.error(); + if (ec != errc::resource_unavailable_try_again && ec != errc::operation_would_block) + io_ec_ = ec; + + } else if (std::holds_alternative(n.read_result)) { // provided buffer + io::MutableBytes buf = std::get(n.read_result); + + // TODO: if a pop CQE is in-flight writing into io_buf_, WriteAndCommit races with it. + // TODO: disk offload for multishot recv (provided-buffer) path. + ReadBufTracker tracker(io_buf_); + io_buf_.WriteAndCommit(buf.data(), buf.size()); last_interaction_ = time(nullptr); } else { LOG(FATAL) << "Should not reach here"; @@ -2853,6 +3001,11 @@ void Connection::ReadPendingInput() { } void Connection::CheckIoBufCapacity(bool reached_capacity, base::IoBuf* io_buf) { + // Reserve() frees the backing allocation, which is unsafe while io_uring is writing + // into io_buf_.AppendBuffer() during an in-flight pop CQE. + if (disk_queue_ && disk_queue_->IsPopInFlight()) + return; + size_t max_io_buf_len = GetFlag(FLAGS_max_client_iobuf_len); size_t capacity = io_buf->Capacity(); @@ -2919,10 +3072,33 @@ variant Connection::IoLoopV2() { MaybeEnableRecvMultishot(); + offload_threshold_ = GetFlag(FLAGS_pipeline_disk_offload_threshold); + const size_t offload_threshold = offload_threshold_; + + VLOG(2) << "IoLoopV2 starting: offload_threshold=" << offload_threshold << " conn_id=" << id_; + + // Pre-init disk queue in the fiber context (avoid blocking file-open inside the recv callback). + if (offload_threshold > 0) { + bool ok = InitDiskQueueIfNeeded(); + VLOG(2) << "IoLoopV2 disk queue pre-init: ok=" << ok + << " disk_queue_set=" << (disk_queue_ != nullptr); + } + peer->RegisterOnRecv([this](const FiberSocketBase::RecvNotification& n) { - DVLOG(2) << "Calling DoReadOnRecv iobuf_len: " << io_buf_.InputLen(); + // NotifyOnRecv routes bytes to io_buf_ (queue inactive) or a Chunk (queue active). + // Offload routing is decided inside NotifyOnRecv via offload_threshold_. + DVLOG(2) << "Calling NotifyOnRecv io_buf_len: " << io_buf_.InputLen(); NotifyOnRecv(n); - io_event_.notify(); + VLOG(4) << "recv callback fired: offload_threshold_=" << offload_threshold_ + << " disk_queue_=" << (disk_queue_ != nullptr) + << " parsed_cmd_q_bytes_=" << parsed_cmd_q_bytes_ + << " io_buf_len=" << io_buf_.InputLen(); + // Suppress notify while a pop CQE owns io_buf_.AppendBuffer() — the pop callback + // will wake the fiber when data is ready. Without this guard every socket recv wakes + // the await, the predicate (can_parse=false while pop is in flight) is false, and + // we spin without ever yielding to the proactor that needs to fire the pop CQE. + if (!disk_queue_ || !disk_queue_->IsPopInFlight()) + io_event_.notify(); }); ParserStatus parse_status = OK; @@ -2949,12 +3125,27 @@ variant Connection::IoLoopV2() { current_wait_.emplace(parsed_head_, &cmd_completion_waiter); } - if (pending_input_) { + DrainDiskQueue(offload_threshold); + + const bool pop_in_flight = disk_queue_ && disk_queue_->IsPopInFlight(); + + if (pending_input_ && !pop_in_flight) { ReadPendingInput(); } - // await block (no data to read) - if (io_buf_.InputLen() == 0) { + // Enter the await block when io_buf_ is empty or a pop CQE is in-flight. + // The in-flight case: PopAsync owns io_buf_.AppendBuffer() until the CQE fires; + // we must not parse the partially-written region. + if (io_buf_.InputLen() == 0 || pop_in_flight) { + if (io_buf_.InputLen() == 0 && !pop_in_flight) { + // Poll again for readiness. The event handler registered above is edge triggered. + // We should read from the socket until EAGAIN or EWOULDBLOCK to make sure we consume + // all available data. See "Do I need to continuously read/write" question under + // https://man7.org/linux/man-pages/man7/epoll.7.html + // The exception is when we use io_uring with multishot recv enabled, in which case + // we rely on the kernel to keep feeding us data until multishot is disabled. + NotifyOnRecv(FiberSocketBase::RecvNotification{true}); + } phase_ = READ_SOCKET; // Flush replies deferred by ReplyBatch before sleeping - ensures the client @@ -2964,21 +3155,24 @@ variant Connection::IoLoopV2() { return err; } - io_event_.await([this, &is_ready_to_migrate]() { + io_event_.await([this, offload_threshold, &is_ready_to_migrate]() { // TODO: optimize CanReply with looking up waiter key - // io_buf_.InputLen() > 0 is still needed for multishot flow. - - // We wake up if: - // 1. New data arrived or is pending (io_buf_.InputLen() > 0 || pending_input_). - // 2. A parsed command is ready to execute (HasCommandToExecute()). - // 3. An executed command is ready to send its reply (parsed_head_ && - // parsed_head_->CanReply()). - // 4. Control-plane messages arrived (!dispatch_q_.empty()). - // 5. The socket encountered an error/closed (io_ec_). - // 6. A migration to another thread was requested AND is actionable now (no subscriptions). - return io_buf_.InputLen() > 0 || pending_input_ || HasCommandToExecute() || - (parsed_head_ && parsed_head_->CanReply()) || !dispatch_q_.empty() || io_ec_ || - is_ready_to_migrate(); + bool cmd_executable = parsed_head_ && parsed_head_ == parsed_to_execute_; + bool cmd_ready = !cmd_executable && parsed_head_ && parsed_head_->CanReply(); + bool pop_in_flight = disk_queue_ && disk_queue_->IsPopInFlight(); + // Wake to drain disk once the pipeline queue has room again. + bool can_drain_disk = offload_threshold > 0 && disk_queue_ && !disk_queue_->Empty() && + parsed_cmd_q_bytes_ < offload_threshold && !pop_in_flight; + // Wake when there is parseable data in io_buf_ and no pop is writing into it. + bool can_parse = io_buf_.InputLen() > 0 && !pop_in_flight; + // Only poll the socket when no pop CQE owns io_buf_.AppendBuffer(). Mirrors the + // guard at the ReadPendingInput() call site above: if we let pending_input_ wake + // the fiber while pop_in_flight=true, the fiber spins (ReadPendingInput is + // skipped, pending_input_ stays true) and starves the proactor that must fire + // the pop CQE. + bool can_poll = pending_input_ && !pop_in_flight; + return can_parse || cmd_ready || cmd_executable || io_ec_ || can_drain_disk || can_poll || + !dispatch_q_.empty() || is_ready_to_migrate(); }); } @@ -3014,13 +3208,25 @@ variant Connection::IoLoopV2() { auto& conn_stats = GetLocalConnStats(); QueueBackpressure& qbp = GetQueueBackpressure(); + // When over the offload threshold with no commands left to dispatch, force the + // await path so the fiber yields and lets shard completions be processed. + // Without this, new recvs keep routing to chunks but we spin without yielding. + // This also prevents ReadPendingInput() from bypassing the disk offload path: + // ReadPendingInput() calls TryRecv directly into io_buf_ with no should_offload() + // check; with force_await set, the parse guard below skips the io_buf_ parse path + // and we fall through to the await, which only wakes when the queue drains. + bool force_await = offload_threshold > 0 && parsed_cmd_q_bytes_ >= offload_threshold && + parsed_to_execute_ == nullptr; + + const bool cur_pop_in_flight = disk_queue_ && disk_queue_->IsPopInFlight(); // Only parse data if we are under the memory limit (backpressure). // Exception: If the queue is empty, we always parse to allow admin commands // (like CONFIG SET) to run so they can fix the memory limits if needed. bool pre_over_limit = (parsed_cmd_q_len_ > 0) && qbp.IsPipelineBufferOverLimit(conn_stats.pipeline_queue_bytes, parsed_cmd_q_len_); - if (io_buf_.InputLen() > 0 && !pre_over_limit) { // Parse, execute and reply + if (io_buf_.InputLen() > 0 && !force_await && !cur_pop_in_flight && + !pre_over_limit) { // Parse, execute and reply size_t mem_before = conn_stats.pipeline_queue_bytes; parse_status = ParseLoop(); @@ -3041,6 +3247,20 @@ variant Connection::IoLoopV2() { ReplyBatch(); } + // When force_await is active we cannot parse even if io_buf_ has data. + // New recvs are already routed to chunks by DoReadOnRecv (no explicit push needed). + // Yield until a command finishes or the queue drains below the threshold. + if (force_await) { + VLOG(2) << "force_await spin-guard: yielding for cmd completion or queue drain" + << " parsed_cmd_q_bytes_=" << parsed_cmd_q_bytes_; + io_event_.await([this, offload_threshold]() { + bool cmd_ready = + parsed_head_ && parsed_head_ != parsed_to_execute_ && parsed_head_->CanReply(); + bool queue_has_room = parsed_cmd_q_bytes_ < offload_threshold; + return cmd_ready || queue_has_room || io_ec_; + }); + } + // After draining commands, notify all connections parked on backpressure relief if (conn_stats.pipeline_queue_bytes < mem_before) { qbp.NotifyPipelineWaiters(); diff --git a/src/facade/dragonfly_connection.h b/src/facade/dragonfly_connection.h index b8e123a192f6..7ec50b39c502 100644 --- a/src/facade/dragonfly_connection.h +++ b/src/facade/dragonfly_connection.h @@ -14,6 +14,7 @@ #include #include "facade/connection_ref.h" +#include "facade/disk_backed_queue.h" #include "facade/facade_types.h" #include "facade/parsed_command.h" #include "io/io_buf.h" @@ -376,6 +377,11 @@ class Connection : public util::Connection { std::pair GetClientInfoBeforeAfterTid() const; + // Lazily initialises disk_queue_. Returns false on error and disk_queue_ stays null. + bool InitDiskQueueIfNeeded(); + + void DrainDiskQueue(size_t offload_threshold); + void IncreaseConnStats(); void DecreaseConnStats(); void BreakOnce(uint32_t ev_mask); @@ -406,7 +412,7 @@ class Connection : public util::Connection { // Only CPU-bound work; must not perform I/O or fiber suspension. void ParseFromBuffer(base::IoBuf& buf); - // Call appropriate ParseBatch function, proceed with Execute and Reply all why input is remaining + // Call appropriate ParseBatch function, proceed with Execute and Reply all while input remains. ParserStatus ParseLoop(); // Loop over enqueued async commands and enqueue them for async execution. @@ -451,11 +457,17 @@ class Connection : public util::Connection { util::fb2::EventCount io_event_; std::optional current_wait_; + // Disk-backed offload queue for IoLoopV2 pipeline backpressure. + // Lazily created when the parsed command queue exceeds the offload threshold. + std::unique_ptr disk_queue_; + // Cached value of FLAGS_pipeline_disk_offload_threshold, set at IoLoopV2 startup. + size_t offload_threshold_ = 0; + // how many bytes of the current request have been consumed size_t request_consumed_bytes_ = 0; util::FiberSocketBase::ProvidedBuffer recv_buf_; - io::IoBuf io_buf_; // used in io loop and parsers + io::IoBuf io_buf_; std::unique_ptr redis_parser_; std::unique_ptr memcache_parser_; ParsedCommand* parsed_cmd_ = nullptr; diff --git a/tests/dragonfly/connection_test.py b/tests/dragonfly/connection_test.py index d87a7c9228e9..62c8191d82c1 100644 --- a/tests/dragonfly/connection_test.py +++ b/tests/dragonfly/connection_test.py @@ -2160,6 +2160,44 @@ async def flood(): await writer.wait_closed() +async def test_ioloopv2_disk_backpressure_offload(df_factory: DflyInstanceFactory): + MC_PORT = 11212 + # Use a very small threshold (1 byte) so offloading triggers immediately once + # any command is in-flight, making the test reliable without needing precise timing. + server = df_factory.create( + proactor_threads=4, + memcached_port=MC_PORT, + experimental_io_loop_v2=True, + pipeline_disk_offload_threshold=1, + disk_backpressure_folder="/tmp/", + vmodule="dragonfly_connection=3", + ) + server.start() + + async def producer(): + _, writer = await asyncio.open_connection("localhost", MC_PORT) + cmds = [] + for i in range(10_000): + key = f"k{i % 100}" + val = f"val{i}" + cmds.append(f"set {key} 0 0 {len(val)} noreply\r\n{val}\r\n".encode()) + writer.write(b"".join(cmds)) + await writer.drain() + await asyncio.sleep(0.3) + writer.close() + await writer.wait_closed() + + await asyncio.gather(*[producer() for _ in range(5)]) + + server.stop() + + offload_lines = server.find_in_logs("Offloaded.*bytes to disk") + restore_lines = server.find_in_logs("Restored.*bytes from disk") + + assert len(offload_lines) > 1 + assert len(restore_lines) > 1 + + @dfly_args({"proactor_threads": 1}) async def test_multi_exec_phantom_connections(df_server: DflyInstance): """Reproduce the addr=0.0.0.0 phantom connections from issue #7272.