From 882307f2ab0ae2a35b42c833983247a98e03ba51 Mon Sep 17 00:00:00 2001 From: Kostas Kyrimis Date: Thu, 19 Mar 2026 14:19:11 +0200 Subject: [PATCH 1/9] feat: backpressure offloading for ioloopv2 Signed-off-by: Kostas Kyrimis --- src/facade/dragonfly_connection.cc | 215 +++++++++++++++++++++++++++-- src/facade/dragonfly_connection.h | 13 ++ tests/dragonfly/connection_test.py | 38 +++++ 3 files changed, 253 insertions(+), 13 deletions(-) diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 0022428ed54f..7b207b6cc3da 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -67,6 +67,11 @@ ABSL_FLAG(string, admin_bind, "", ABSL_FLAG(strings::MemoryBytesFlag, request_cache_limit, 64_MB, "Amount of memory to use for request cache in bytes - per IO thread."); +ABSL_FLAG(strings::MemoryBytesFlag, pipeline_disk_offload_threshold, 0, + "IoLoopV2 only: when the parsed command queue exceeds this many bytes, raw socket data " + "is offloaded to a disk-backed queue instead of being kept in memory. " + "0 means disabled (default)."); + ABSL_FLAG(strings::MemoryBytesFlag, pipeline_buffer_limit, 128_MB, "Amount of memory to use for storing pipeline requests - per IO thread." "Please note that clients that send excecissively huge pipelines, " @@ -1148,6 +1153,14 @@ void Connection::ConnectionFlow() { socket_->ResetOnRecvHook(); } + if (disk_queue_) { + if (disk_push_in_flight_ || disk_pop_in_flight_) { + io_event_.await([this] { return !disk_push_in_flight_ && !disk_pop_in_flight_; }); + } + std::ignore = disk_queue_->Close(); + disk_queue_.reset(); + } + // We wait for dispatch_fb to finish writing the previous replies before replying to the last // offending request. if (parse_status == ERROR) { @@ -1338,8 +1351,30 @@ auto Connection::ParseLoop() -> ParserStatus { auto parse_func = protocol_ == Protocol::MEMCACHE ? &Connection::ParseMCBatch : &Connection::ParseRedisBatch; + const size_t offload_threshold = + ioloop_v2_ ? size_t(GetFlag(FLAGS_pipeline_disk_offload_threshold)) : 0u; + bool commands_parsed = false; do { + // if the queue is already over threshold from previously dispatched async + // commands, offload remaining io_buf to disk and stop parsing new commands. + // We still execute and reply already-queued commands such that they + // eventually drain. io_buf_ should be at a clean command boundary here so no + // partial-command corruption (having half filled io_buf_ w + VLOG(4) << "ParseLoop check: parsed_cmd_q_bytes_=" << parsed_cmd_q_bytes_ + << " threshold=" << offload_threshold << " io_buf_len=" << io_buf_.InputLen(); + if (offload_threshold > 0 && parsed_cmd_q_bytes_ >= offload_threshold) { + VLOG(3) << "Offload triggered. offload_threshold=" << offload_threshold + << " io_buf_len=" << io_buf_.InputLen() << " parsed_head_=" << parsed_head_ + << " parsed_to_execute_=" << parsed_to_execute_; + HandleSocketBackpressure(offload_threshold); + if (!ExecuteBatch()) + return ERROR; + if (!ReplyBatch()) + return ERROR; + break; + } + commands_parsed = (this->*parse_func)(); if (!ExecuteBatch()) @@ -1347,6 +1382,7 @@ auto Connection::ParseLoop() -> ParserStatus { if (!ReplyBatch()) return ERROR; + } while (commands_parsed && io_buf_.InputLen() > 0); return commands_parsed ? OK : NEED_MORE; @@ -2555,6 +2591,81 @@ void Connection::EnsureMemoryBudget(unsigned tid) { thread_queue_backpressure[tid].EnsureBelowLimit(); } +void Connection::HandleSocketBackpressure(size_t offload_threshold) { + DCHECK_GT(offload_threshold, 0u); + + if (parsed_cmd_q_bytes_ < offload_threshold) + return; + + if (disk_push_in_flight_ || io_buf_.InputLen() == 0) + return; + + if (!InitDiskQueueIfNeeded()) + return; + + if (!disk_queue_->HasEnoughBackingSpaceFor(io_buf_.InputLen())) + return; + + const size_t to_offload = io_buf_.InputLen(); + disk_push_in_flight_ = true; + + // We intentionally defer ConsumeInput until the write completes rather than deep-copying + // the buffer. This is safe because: + // 1. io_buf_.AppendBuffer() starts at buf_+size_, which is after the in-flight bytes, so + // incoming recv data written via CommitWrite never overlaps the region io_uring is + // reading. + // 2. ParseLoop and CheckIoBufCapacity both check disk_push_in_flight_ and skip any + // operation that would re-parse or reallocate the buffer while the write is in flight. + // Calling ConsumeInput here would reset offs_/size_ to 0, making AppendBuffer() alias the + // same memory io_uring is still reading, and would allow Reserve() to free that memory. + disk_queue_->PushAsync(io_buf_.InputBuffer(), [this, to_offload](std::error_code ec) { + disk_push_in_flight_ = false; + if (ec) { + LOG(ERROR) << "Disk offload write failed: " << ec.message(); + // Should we abort the connection ? + } else { + io_buf_.ConsumeInput(to_offload); + VLOG(3) << "Offloaded " << to_offload << " bytes to disk for connection " << id_; + } + io_event_.notify(); + }); +} + +void Connection::DrainDiskQueue(size_t offload_threshold) { + if (offload_threshold == 0 || !disk_queue_ || disk_queue_->Empty()) + return; + if (disk_pop_in_flight_ || io_buf_.AppendLen() == 0) + return; + if (parsed_cmd_q_bytes_ >= offload_threshold) + return; + + disk_pop_in_flight_ = true; + disk_queue_->PopAsync(io_buf_.AppendBuffer(), [this](io::Result res) { + disk_pop_in_flight_ = false; + if (res) { + io_buf_.CommitWrite(*res); + VLOG(3) << "Restored " << *res << " bytes from disk for connection " << id_; + } else { + LOG(ERROR) << "Disk offload read failed: " << res.error().message(); + // Should we abort the connection ? + } + io_event_.notify(); + }); +} + +bool Connection::InitDiskQueueIfNeeded() { + if (disk_queue_) + return true; + disk_queue_ = std::make_unique(id_); + if (auto ec = disk_queue_->Init(); ec) { + LOG(WARNING) << "Failed to init disk-backed queue for connection " << id_ << ": " + << ec.message(); + disk_queue_.reset(); + return false; + } + return true; +} + ConnectionRef::ConnectionRef(const std::shared_ptr& ptr, unsigned thread_id, uint32_t client_id) : ptr_{ptr}, last_known_thread_id_{thread_id}, client_id_{client_id} { @@ -2592,6 +2703,10 @@ void Connection::DoReadOnRecv(const util::FiberSocketBase::RecvNotification& n) return; } + // We are reading to io_buf_ concurrently + if (disk_pop_in_flight_) + return; + using RecvNoti = util::FiberSocketBase::RecvNotification::RecvCompletion; if (std::holds_alternative(n.read_result)) { if (!std::get(n.read_result)) { @@ -2640,6 +2755,11 @@ void Connection::DoReadOnRecv(const util::FiberSocketBase::RecvNotification& n) } void Connection::CheckIoBufCapacity(bool is_iobuf_full) { + // The io_buf_ backing allocation is pinned by an in-flight io_uring write (push) or + // read (pop). Reserve() would free it, causing a use-after-free. + if (disk_push_in_flight_ || disk_pop_in_flight_) + return; + auto& conn_stats = tl_facade_stats->conn_stats; size_t max_io_buf_len = GetFlag(FLAGS_max_client_iobuf_len); @@ -2696,9 +2816,31 @@ variant Connection::IoLoopV2() { #endif } - peer->RegisterOnRecv([this](const FiberSocketBase::RecvNotification& n) { + const size_t offload_threshold = GetFlag(FLAGS_pipeline_disk_offload_threshold); + + VLOG(2) << "IoLoopV2 starting: offload_threshold=" << offload_threshold << " conn_id=" << id_; + + // Pre-init disk queue in the fiber context (avoid blocking file-open inside the recv callback). + if (offload_threshold > 0) { + bool ok = InitDiskQueueIfNeeded(); + VLOG(2) << "IoLoopV2 disk queue pre-init: ok=" << ok + << " disk_queue_set=" << (disk_queue_ != nullptr); + } + + peer->RegisterOnRecv([this, offload_threshold](const FiberSocketBase::RecvNotification& n) { DVLOG(2) << "Calling DoReadOnRecv iobuf_len: " << io_buf_.InputLen(); DoReadOnRecv(n); + // The recv callback runs in the proactor event loop as a plain function (no fiber context). + // This cb can fire multiple times before io_event_.notify() ever reaches and wakes up + // the connection (we don't have any gurantees what runs next) so I opportunistically + // attempt to offload to disk here (if it's possible). + VLOG(4) << "recv callback fired: offload_threshold=" << offload_threshold + << " disk_queue_=" << (disk_queue_ != nullptr) + << " parsed_cmd_q_bytes_=" << parsed_cmd_q_bytes_ + << " io_buf_len=" << io_buf_.InputLen(); + if (offload_threshold > 0 && disk_queue_) { + HandleSocketBackpressure(offload_threshold); + } io_event_.notify(); }); @@ -2717,20 +2859,42 @@ variant Connection::IoLoopV2() { current_wait_.emplace(cmd, &ioevent_waiter); } - if (io_buf_.InputLen() == 0) { - // Poll again for readiness. The event handler registered above is edge triggered - // We should read from the socket until EAGAIN or EWOULDBLOCK - // to make sure we consume all available data. - // See "Do I need to continuously read/write" question - // under https://man7.org/linux/man-pages/man7/epoll.7.html - // The exception is when we use io_uring with multishot recv enabled, in which case - // we rely on the kernel to keep feeding us data until we multishot is disabled. - DoReadOnRecv(FiberSocketBase::RecvNotification{true}); - io_event_.await([this]() { + DrainDiskQueue(offload_threshold); + + // Enter the await block when io_buf_ is empty OR when a disk write is in-flight. + // The in-flight case: deferred ConsumeInput means io_buf_.InputLen() > 0 while io_uring + // still owns those bytes. We must yield here so the proactor can process the write CQE + // and fire the completion callback. Without this yield the fiber spins forever and the + // CQE is never dequeued. + if (io_buf_.InputLen() == 0 || disk_push_in_flight_ || disk_pop_in_flight_) { + if (io_buf_.InputLen() == 0 && !disk_pop_in_flight_) { + // Poll again for readiness. The event handler registered above is edge triggered. + // We should read from the socket until EAGAIN or EWOULDBLOCK to make sure we consume + // all available data. See "Do I need to continuously read/write" question under + // https://man7.org/linux/man-pages/man7/epoll.7.html + // The exception is when we use io_uring with multishot recv enabled, in which case + // we rely on the kernel to keep feeding us data until multishot is disabled. + // Skip while disk_pop_in_flight_: AppendBuffer() is owned by PopAsync. + DoReadOnRecv(FiberSocketBase::RecvNotification{true}); + if (offload_threshold > 0 && disk_queue_) { + VLOG(2) << "post-manual-read check: parsed_cmd_q_bytes_=" << parsed_cmd_q_bytes_ + << " io_buf_len=" << io_buf_.InputLen() + << " disk_push_in_flight_=" << disk_push_in_flight_; + HandleSocketBackpressure(offload_threshold); + } + } + io_event_.await([this, offload_threshold]() { // TODO: optimize CanReply with looking up waiter key bool cmd_executable = parsed_head_ && parsed_head_ == parsed_to_execute_; bool cmd_ready = !cmd_executable && parsed_head_ && parsed_head_->CanReply(); - return io_buf_.InputLen() > 0 || cmd_ready || cmd_executable || io_ec_; + // Wake when disk push/pop completes so we can re-evaluate offload/drain. + bool disk_op_done = !disk_push_in_flight_ && !disk_pop_in_flight_; + // Wake to drain disk once the pipeline queue has room again. + bool can_drain_disk = offload_threshold > 0 && disk_queue_ && !disk_queue_->Empty() && + parsed_cmd_q_bytes_ < offload_threshold && disk_op_done; + // Wake to parse only when the write is done: io_buf_ bytes are safe to consume. + bool can_parse = io_buf_.InputLen() > 0 && !disk_push_in_flight_ && !disk_pop_in_flight_; + return can_parse || cmd_ready || cmd_executable || io_ec_ || can_drain_disk; }); } @@ -2742,7 +2906,13 @@ variant Connection::IoLoopV2() { phase_ = PROCESS; bool is_iobuf_full = io_buf_.AppendLen() == 0; - if (io_buf_.InputLen() > 0) { + // When over the offload threshold with no commands left to dispatch, force the + // await path so the fiber yields and lets shard completions be processed. + // Without this, recv keeps refilling io_buf_ and we spin forever. + bool force_await = offload_threshold > 0 && parsed_cmd_q_bytes_ >= offload_threshold && + parsed_to_execute_ == nullptr; + + if (io_buf_.InputLen() > 0 && !force_await && !disk_push_in_flight_) { parse_status = ParseLoop(); } else { parse_status = NEED_MORE; @@ -2752,6 +2922,25 @@ variant Connection::IoLoopV2() { ExecuteBatch(); ReplyBatch(); } + + // When force_await is active and io_buf_ still has data we cannot parse + // (queue is over threshold) and we cannot offload it yet (push in flight), + // we must suspend here instead of spinning. Without this, the fiber loops + // without yielding and the write CQE for the in-flight push never completes, + // creating a livelock. + if (force_await && io_buf_.InputLen() > 0) { + HandleSocketBackpressure(offload_threshold); + if (io_buf_.InputLen() > 0) { + VLOG(2) << "force_await spin-guard: awaiting push completion or cmd_ready" + << " disk_push_in_flight_=" << disk_push_in_flight_ + << " parsed_cmd_q_bytes_=" << parsed_cmd_q_bytes_; + io_event_.await([this]() { + bool cmd_ready = + parsed_head_ && parsed_head_ != parsed_to_execute_ && parsed_head_->CanReply(); + return cmd_ready || !disk_push_in_flight_ || io_ec_; + }); + } + } } if (reply_builder_->GetError()) { diff --git a/src/facade/dragonfly_connection.h b/src/facade/dragonfly_connection.h index 290b6058276d..3b7977a2b9b9 100644 --- a/src/facade/dragonfly_connection.h +++ b/src/facade/dragonfly_connection.h @@ -14,6 +14,7 @@ #include #include "facade/connection_ref.h" +#include "facade/disk_backed_queue.h" #include "facade/facade_types.h" #include "facade/parsed_command.h" #include "io/io_buf.h" @@ -336,6 +337,12 @@ class Connection : public util::Connection { std::pair GetClientInfoBeforeAfterTid() const; + // Lazily initialises disk_queue_. Returns false on error and disk_queue_ stays null. + bool InitDiskQueueIfNeeded(); + + void HandleSocketBackpressure(size_t offload_threshold); + void DrainDiskQueue(size_t offload_threshold); + void IncreaseConnStats(); void DecreaseConnStats(); void BreakOnce(uint32_t ev_mask); @@ -407,6 +414,12 @@ class Connection : public util::Connection { util::fb2::EventCount io_event_; std::optional current_wait_; + // Disk-backed offload queue for IoLoopV2 pipeline backpressure. + // Lazily created when the parsed command queue exceeds the offload threshold. + std::unique_ptr disk_queue_; + bool disk_push_in_flight_ = false; + bool disk_pop_in_flight_ = false; + // how many bytes of the current request have been consumed size_t request_consumed_bytes_ = 0; diff --git a/tests/dragonfly/connection_test.py b/tests/dragonfly/connection_test.py index a6d72c76c459..7fb5ac0e9aa5 100644 --- a/tests/dragonfly/connection_test.py +++ b/tests/dragonfly/connection_test.py @@ -1792,3 +1792,41 @@ async def flood(): await flood_task writer.close() await writer.wait_closed() + + +async def test_ioloopv2_disk_backpressure_offload(df_factory: DflyInstanceFactory): + MC_PORT = 11212 + # Use a very small threshold (1 byte) so offloading triggers immediately once + # any command is in-flight, making the test reliable without needing precise timing. + server = df_factory.create( + proactor_threads=4, + memcached_port=MC_PORT, + experimental_io_loop_v2=True, + pipeline_disk_offload_threshold=1, + disk_backpressure_folder="/tmp/", + vmodule="dragonfly_connection=3", + ) + server.start() + + async def producer(): + _, writer = await asyncio.open_connection("localhost", MC_PORT) + cmds = [] + for i in range(10_000): + key = f"k{i % 100}" + val = f"val{i}" + cmds.append(f"set {key} 0 0 {len(val)} noreply\r\n{val}\r\n".encode()) + writer.write(b"".join(cmds)) + await writer.drain() + await asyncio.sleep(0.3) + writer.close() + await writer.wait_closed() + + await asyncio.gather(*[producer() for _ in range(5)]) + + server.stop() + + offload_lines = server.find_in_logs("Offloaded.*bytes to disk") + restore_lines = server.find_in_logs("Restored.*bytes from disk") + + assert len(offload_lines) > 1 + assert len(restore_lines) > 1 From 2922f23b299ff223fc9ff8c0eae35da8b8ad1ba8 Mon Sep 17 00:00:00 2001 From: Kostas Kyrimis Date: Thu, 19 Mar 2026 18:36:19 +0200 Subject: [PATCH 2/9] seperate io_buf --- src/facade/dragonfly_connection.cc | 115 +++++++++++++++-------------- src/facade/dragonfly_connection.h | 4 +- 2 files changed, 64 insertions(+), 55 deletions(-) diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 7b207b6cc3da..0b4c93edbdda 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -1358,9 +1358,8 @@ auto Connection::ParseLoop() -> ParserStatus { do { // if the queue is already over threshold from previously dispatched async // commands, offload remaining io_buf to disk and stop parsing new commands. - // We still execute and reply already-queued commands such that they - // eventually drain. io_buf_ should be at a clean command boundary here so no - // partial-command corruption (having half filled io_buf_ w + // We still execute and reply already-queued commands such that the parsed_cmd_q_bytes + // drains. VLOG(4) << "ParseLoop check: parsed_cmd_q_bytes_=" << parsed_cmd_q_bytes_ << " threshold=" << offload_threshold << " io_buf_len=" << io_buf_.InputLen(); if (offload_threshold > 0 && parsed_cmd_q_bytes_ >= offload_threshold) { @@ -2597,34 +2596,33 @@ void Connection::HandleSocketBackpressure(size_t offload_threshold) { if (parsed_cmd_q_bytes_ < offload_threshold) return; - if (disk_push_in_flight_ || io_buf_.InputLen() == 0) + if (disk_push_in_flight_ || socket_buf_.InputLen() == 0) return; if (!InitDiskQueueIfNeeded()) return; - if (!disk_queue_->HasEnoughBackingSpaceFor(io_buf_.InputLen())) + if (!disk_queue_->HasEnoughBackingSpaceFor(socket_buf_.InputLen())) return; - const size_t to_offload = io_buf_.InputLen(); + const size_t to_offload = socket_buf_.InputLen(); disk_push_in_flight_ = true; // We intentionally defer ConsumeInput until the write completes rather than deep-copying // the buffer. This is safe because: - // 1. io_buf_.AppendBuffer() starts at buf_+size_, which is after the in-flight bytes, so - // incoming recv data written via CommitWrite never overlaps the region io_uring is - // reading. - // 2. ParseLoop and CheckIoBufCapacity both check disk_push_in_flight_ and skip any - // operation that would re-parse or reallocate the buffer while the write is in flight. + // 1. socket_buf_.AppendBuffer() starts after the in-flight bytes, so incoming recv data + // written via CommitWrite never overlaps the region io_uring is reading. + // 2. DoReadOnRecv checks disk_push_in_flight_ and skips writes to socket_buf_ while the + // push is in flight, preventing any mutation of the buffer io_uring is reading. // Calling ConsumeInput here would reset offs_/size_ to 0, making AppendBuffer() alias the // same memory io_uring is still reading, and would allow Reserve() to free that memory. - disk_queue_->PushAsync(io_buf_.InputBuffer(), [this, to_offload](std::error_code ec) { + disk_queue_->PushAsync(socket_buf_.InputBuffer(), [this, to_offload](std::error_code ec) { disk_push_in_flight_ = false; if (ec) { LOG(ERROR) << "Disk offload write failed: " << ec.message(); // Should we abort the connection ? } else { - io_buf_.ConsumeInput(to_offload); + socket_buf_.ConsumeInput(to_offload); VLOG(3) << "Offloaded " << to_offload << " bytes to disk for connection " << id_; } io_event_.notify(); @@ -2703,8 +2701,8 @@ void Connection::DoReadOnRecv(const util::FiberSocketBase::RecvNotification& n) return; } - // We are reading to io_buf_ concurrently - if (disk_pop_in_flight_) + // socket_buf_ is pinned by io_uring during a push; new writes would corrupt the in-flight read. + if (disk_push_in_flight_) return; using RecvNoti = util::FiberSocketBase::RecvNotification::RecvCompletion; @@ -2714,12 +2712,12 @@ void Connection::DoReadOnRecv(const util::FiberSocketBase::RecvNotification& n) return; } - if (io_buf_.AppendLen() == 0) { - // We will regrow in IoLoopV2 + if (socket_buf_.AppendLen() == 0) { + // We will try to offload socket_buf_ to disk in HandleSocketBackpressure. return; } - io::MutableBytes buf = io_buf_.AppendBuffer(); + io::MutableBytes buf = socket_buf_.AppendBuffer(); io::Result res = socket_->TryRecv(buf); if (res) { @@ -2729,7 +2727,7 @@ void Connection::DoReadOnRecv(const util::FiberSocketBase::RecvNotification& n) // TODO maybe worth looping here and try another recv call until it fails // with EAGAIN or EWOULDBLOCK. The problem there is that we need to handle // resizing if AppendBuffer is zero. - io_buf_.CommitWrite(*res); + socket_buf_.CommitWrite(*res); return; } // *res == 0 @@ -2747,17 +2745,16 @@ void Connection::DoReadOnRecv(const util::FiberSocketBase::RecvNotification& n) io_ec_ = ec; } else if (std::holds_alternative(n.read_result)) { // provided buffer. io::MutableBytes buf = std::get(n.read_result); - UpdateIoBufCapacity(io_buf_, &tl_facade_stats->conn_stats, - [&]() { io_buf_.WriteAndCommit(buf.data(), buf.size()); }); + UpdateIoBufCapacity(socket_buf_, &tl_facade_stats->conn_stats, + [&]() { socket_buf_.WriteAndCommit(buf.data(), buf.size()); }); } else { LOG(FATAL) << "Should not reach here"; } } void Connection::CheckIoBufCapacity(bool is_iobuf_full) { - // The io_buf_ backing allocation is pinned by an in-flight io_uring write (push) or - // read (pop). Reserve() would free it, causing a use-after-free. - if (disk_push_in_flight_ || disk_pop_in_flight_) + // io_buf_ backing allocation is pinned by an in-flight pop. Reserve() would free it. + if (disk_pop_in_flight_) return; auto& conn_stats = tl_facade_stats->conn_stats; @@ -2828,7 +2825,7 @@ variant Connection::IoLoopV2() { } peer->RegisterOnRecv([this, offload_threshold](const FiberSocketBase::RecvNotification& n) { - DVLOG(2) << "Calling DoReadOnRecv iobuf_len: " << io_buf_.InputLen(); + DVLOG(2) << "Calling DoReadOnRecv socket_buf_len: " << socket_buf_.InputLen(); DoReadOnRecv(n); // The recv callback runs in the proactor event loop as a plain function (no fiber context). // This cb can fire multiple times before io_event_.notify() ever reaches and wakes up @@ -2861,12 +2858,21 @@ variant Connection::IoLoopV2() { DrainDiskQueue(offload_threshold); - // Enter the await block when io_buf_ is empty OR when a disk write is in-flight. - // The in-flight case: deferred ConsumeInput means io_buf_.InputLen() > 0 while io_uring - // still owns those bytes. We must yield here so the proactor can process the write CQE - // and fire the completion callback. Without this yield the fiber spins forever and the - // CQE is never dequeued. - if (io_buf_.InputLen() == 0 || disk_push_in_flight_ || disk_pop_in_flight_) { + // Once disk is fully drained and the parser buffer is empty, promote buffered socket + // bytes into io_buf_ so the parser sees them next. This is always safe: disk is empty so + // there are no older bytes that could be violated by parsing socket_buf_ data now. + if ((!disk_queue_ || disk_queue_->Empty()) && !disk_pop_in_flight_ && io_buf_.InputLen() == 0 && + socket_buf_.InputLen() > 0) { + UpdateIoBufCapacity(io_buf_, &tl_facade_stats->conn_stats, [&]() { + io_buf_.Reserve(socket_buf_.InputLen()); + io_buf_.WriteAndCommit(socket_buf_.InputBuffer().data(), socket_buf_.InputLen()); + }); + socket_buf_.ConsumeInput(socket_buf_.InputLen()); + } + + // Enter the await block when io_buf_ is empty OR when a disk pop is in-flight. + // The in-flight case: PopAsync owns io_buf_.AppendBuffer() until the CQE fires. + if (io_buf_.InputLen() == 0 || disk_pop_in_flight_) { if (io_buf_.InputLen() == 0 && !disk_pop_in_flight_) { // Poll again for readiness. The event handler registered above is edge triggered. // We should read from the socket until EAGAIN or EWOULDBLOCK to make sure we consume @@ -2874,11 +2880,10 @@ variant Connection::IoLoopV2() { // https://man7.org/linux/man-pages/man7/epoll.7.html // The exception is when we use io_uring with multishot recv enabled, in which case // we rely on the kernel to keep feeding us data until multishot is disabled. - // Skip while disk_pop_in_flight_: AppendBuffer() is owned by PopAsync. DoReadOnRecv(FiberSocketBase::RecvNotification{true}); if (offload_threshold > 0 && disk_queue_) { VLOG(2) << "post-manual-read check: parsed_cmd_q_bytes_=" << parsed_cmd_q_bytes_ - << " io_buf_len=" << io_buf_.InputLen() + << " socket_buf_len=" << socket_buf_.InputLen() << " disk_push_in_flight_=" << disk_push_in_flight_; HandleSocketBackpressure(offload_threshold); } @@ -2887,14 +2892,17 @@ variant Connection::IoLoopV2() { // TODO: optimize CanReply with looking up waiter key bool cmd_executable = parsed_head_ && parsed_head_ == parsed_to_execute_; bool cmd_ready = !cmd_executable && parsed_head_ && parsed_head_->CanReply(); - // Wake when disk push/pop completes so we can re-evaluate offload/drain. + // Wake when disk pop completes so we can parse the restored bytes. bool disk_op_done = !disk_push_in_flight_ && !disk_pop_in_flight_; // Wake to drain disk once the pipeline queue has room again. bool can_drain_disk = offload_threshold > 0 && disk_queue_ && !disk_queue_->Empty() && parsed_cmd_q_bytes_ < offload_threshold && disk_op_done; - // Wake to parse only when the write is done: io_buf_ bytes are safe to consume. - bool can_parse = io_buf_.InputLen() > 0 && !disk_push_in_flight_ && !disk_pop_in_flight_; - return can_parse || cmd_ready || cmd_executable || io_ec_ || can_drain_disk; + // Wake to parse restored disk bytes. + bool can_parse = io_buf_.InputLen() > 0 && !disk_pop_in_flight_; + // Wake to promote socket bytes once disk is empty and io_buf_ is drained. + bool disk_empty = !disk_queue_ || disk_queue_->Empty(); + bool can_promote = socket_buf_.InputLen() > 0 && disk_empty && !disk_pop_in_flight_; + return can_parse || can_promote || cmd_ready || cmd_executable || io_ec_ || can_drain_disk; }); } @@ -2912,7 +2920,7 @@ variant Connection::IoLoopV2() { bool force_await = offload_threshold > 0 && parsed_cmd_q_bytes_ >= offload_threshold && parsed_to_execute_ == nullptr; - if (io_buf_.InputLen() > 0 && !force_await && !disk_push_in_flight_) { + if (io_buf_.InputLen() > 0 && !force_await && !disk_pop_in_flight_) { parse_status = ParseLoop(); } else { parse_status = NEED_MORE; @@ -2923,23 +2931,22 @@ variant Connection::IoLoopV2() { ReplyBatch(); } - // When force_await is active and io_buf_ still has data we cannot parse - // (queue is over threshold) and we cannot offload it yet (push in flight), - // we must suspend here instead of spinning. Without this, the fiber loops - // without yielding and the write CQE for the in-flight push never completes, - // creating a livelock. - if (force_await && io_buf_.InputLen() > 0) { + // When force_await is active we cannot parse even if io_buf_ or socket_buf_ has data. + // Push any pending socket bytes to disk so TCP keeps flowing, then yield until either + // a command finishes (cmd_ready) or the queue drains below the threshold. Without this + // yield the fiber spins: the await block is skipped (io_buf_ non-empty), parse is + // skipped (force_await), and nothing makes progress. + if (force_await && (io_buf_.InputLen() > 0 || socket_buf_.InputLen() > 0)) { HandleSocketBackpressure(offload_threshold); - if (io_buf_.InputLen() > 0) { - VLOG(2) << "force_await spin-guard: awaiting push completion or cmd_ready" - << " disk_push_in_flight_=" << disk_push_in_flight_ - << " parsed_cmd_q_bytes_=" << parsed_cmd_q_bytes_; - io_event_.await([this]() { - bool cmd_ready = - parsed_head_ && parsed_head_ != parsed_to_execute_ && parsed_head_->CanReply(); - return cmd_ready || !disk_push_in_flight_ || io_ec_; - }); - } + VLOG(2) << "force_await spin-guard: yielding for cmd completion or queue drain" + << " disk_push_in_flight_=" << disk_push_in_flight_ + << " parsed_cmd_q_bytes_=" << parsed_cmd_q_bytes_; + io_event_.await([this, offload_threshold]() { + bool cmd_ready = + parsed_head_ && parsed_head_ != parsed_to_execute_ && parsed_head_->CanReply(); + bool queue_has_room = parsed_cmd_q_bytes_ < offload_threshold; + return cmd_ready || queue_has_room || io_ec_; + }); } } diff --git a/src/facade/dragonfly_connection.h b/src/facade/dragonfly_connection.h index 3b7977a2b9b9..8a9cc650101a 100644 --- a/src/facade/dragonfly_connection.h +++ b/src/facade/dragonfly_connection.h @@ -424,7 +424,9 @@ class Connection : public util::Connection { size_t request_consumed_bytes_ = 0; util::FiberSocketBase::ProvidedBuffer recv_buf_; - io::IoBuf io_buf_; // used in io loop and parsers + io::IoBuf + io_buf_; // parser input: fed exclusively from disk pops (or socket_buf_ when disk empty) + io::IoBuf socket_buf_; // recv target: new TCP bytes always land here std::unique_ptr redis_parser_; std::unique_ptr memcache_parser_; ParsedCommand* parsed_cmd_ = nullptr; From 5ed367c1f9fbd45af554a9cca38af7219236c8ba Mon Sep 17 00:00:00 2001 From: Kostas Kyrimis Date: Fri, 20 Mar 2026 08:50:06 +0200 Subject: [PATCH 3/9] double buffer --- src/facade/dragonfly_connection.cc | 156 ++++++++++++++++------------- src/facade/dragonfly_connection.h | 14 +-- 2 files changed, 91 insertions(+), 79 deletions(-) diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 0b4c93edbdda..bc261ae1dd02 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -829,8 +829,8 @@ void Connection::HandleRequests() { // We validate the http request using basic-auth inside HttpConnection::HandleSingleRequest. cc_->authenticated = true; - auto ec = http_conn.ParseFromBuffer(io_buf_.InputBuffer()); - io_buf_.ConsumeInput(io_buf_.InputLen()); + auto ec = http_conn.ParseFromBuffer(socket_buf_.InputBuffer()); + socket_buf_.ConsumeInput(socket_buf_.InputLen()); if (!ec) { http_conn.HandleRequests(); } @@ -1041,7 +1041,7 @@ io::Result Connection::CheckForHttpProto() { auto* peer = socket_.get(); auto& conn_stats = tl_facade_stats->conn_stats; do { - auto buf = io_buf_.AppendBuffer(); + auto buf = socket_buf_.AppendBuffer(); DCHECK(!buf.empty()); ::io::Result recv_sz = peer->Recv(buf); @@ -1053,8 +1053,8 @@ io::Result Connection::CheckForHttpProto() { return false; } - io_buf_.CommitWrite(*recv_sz); - string_view ib = io::View(io_buf_.InputBuffer()); + socket_buf_.CommitWrite(*recv_sz); + string_view ib = io::View(socket_buf_.InputBuffer()); if (ib.size() >= 2 && ib[0] == 22 && ib[1] == 3) { // We matched the TLS handshake raw data, which means "peer" is a TCP socket. // Reject the connection. @@ -1064,15 +1064,15 @@ io::Result Connection::CheckForHttpProto() { ib = ib.substr(last_len); size_t pos = ib.find('\n'); if (pos != string_view::npos) { - ib = io::View(io_buf_.InputBuffer().first(last_len + pos)); + ib = io::View(socket_buf_.InputBuffer().first(last_len + pos)); if (ib.size() < 10 || ib.back() != '\r') return false; ib.remove_suffix(1); return MatchHttp11Line(ib); } - last_len = io_buf_.InputLen(); - UpdateIoBufCapacity(io_buf_, &conn_stats, [&]() { io_buf_.EnsureCapacity(128); }); + last_len = socket_buf_.InputLen(); + UpdateIoBufCapacity(socket_buf_, &conn_stats, [&]() { socket_buf_.EnsureCapacity(128); }); } while (last_len < 1024); return false; @@ -1089,7 +1089,18 @@ void Connection::ConnectionFlow() { ++conn_stats.conn_received_cnt; ++local_stats_.read_cnt; - local_stats_.net_bytes_in += io_buf_.InputLen(); + + // For the old IoLoop path, promote CheckForHttpProto bytes from socket_buf_ to io_buf_. + // IoLoopV2 reads socket_buf_ directly via its active_buf pointer, so no copy is needed. + if (socket_buf_.InputLen() > 0 && !ioloop_v2_) { + UpdateIoBufCapacity(io_buf_, &conn_stats, [&]() { + io_buf_.Reserve(socket_buf_.InputLen()); + io_buf_.WriteAndCommit(socket_buf_.InputBuffer().data(), socket_buf_.InputLen()); + }); + socket_buf_.ConsumeInput(socket_buf_.InputLen()); + } + + local_stats_.net_bytes_in += io_buf_.InputLen() + socket_buf_.InputLen(); ParserStatus parse_status = OK; @@ -1101,7 +1112,7 @@ void Connection::ConnectionFlow() { parse_status = ParseRedis(10000); } else { DCHECK(memcache_parser_); - parse_status = ParseLoop(); + parse_status = ParseLoop(io_buf_); } } @@ -1186,8 +1197,8 @@ void Connection::ConnectionFlow() { LOG_IF(WARNING, ec2) << "Could not shutdown socket " << ec2; while (!ec2) { // Discard any received data. - io_buf_.Clear(); - auto recv_sz = socket_->Recv(io_buf_.AppendBuffer()); + socket_buf_.Clear(); + auto recv_sz = socket_->Recv(socket_buf_.AppendBuffer()); if (!recv_sz || *recv_sz == 0) { break; // Peer closed connection. } @@ -1347,7 +1358,7 @@ Connection::ParserStatus Connection::ParseRedis(unsigned max_busy_cycles, bool e return ERROR; } -auto Connection::ParseLoop() -> ParserStatus { +auto Connection::ParseLoop(io::IoBuf& buf) -> ParserStatus { auto parse_func = protocol_ == Protocol::MEMCACHE ? &Connection::ParseMCBatch : &Connection::ParseRedisBatch; @@ -1361,10 +1372,10 @@ auto Connection::ParseLoop() -> ParserStatus { // We still execute and reply already-queued commands such that the parsed_cmd_q_bytes // drains. VLOG(4) << "ParseLoop check: parsed_cmd_q_bytes_=" << parsed_cmd_q_bytes_ - << " threshold=" << offload_threshold << " io_buf_len=" << io_buf_.InputLen(); + << " threshold=" << offload_threshold << " buf_len=" << buf.InputLen(); if (offload_threshold > 0 && parsed_cmd_q_bytes_ >= offload_threshold) { VLOG(3) << "Offload triggered. offload_threshold=" << offload_threshold - << " io_buf_len=" << io_buf_.InputLen() << " parsed_head_=" << parsed_head_ + << " buf_len=" << buf.InputLen() << " parsed_head_=" << parsed_head_ << " parsed_to_execute_=" << parsed_to_execute_; HandleSocketBackpressure(offload_threshold); if (!ExecuteBatch()) @@ -1374,7 +1385,7 @@ auto Connection::ParseLoop() -> ParserStatus { break; } - commands_parsed = (this->*parse_func)(); + commands_parsed = (this->*parse_func)(buf); if (!ExecuteBatch()) return ERROR; @@ -1382,7 +1393,7 @@ auto Connection::ParseLoop() -> ParserStatus { if (!ReplyBatch()) return ERROR; - } while (commands_parsed && io_buf_.InputLen() > 0); + } while (commands_parsed && buf.InputLen() > 0); return commands_parsed ? OK : NEED_MORE; } @@ -1490,7 +1501,7 @@ variant Connection::IoLoop() { parse_status = ParseRedis(max_busy_read_cycles_cached); } else { DCHECK(memcache_parser_); - parse_status = ParseLoop(); + parse_status = ParseLoop(io_buf_); } if (reply_builder_->GetError()) { @@ -2301,12 +2312,14 @@ bool Connection::IsReplySizeOverLimit() const { return over_limit; } -bool Connection::ParseRedisBatch() { +bool Connection::ParseRedisBatch(io::IoBuf& buf) { + // IoLoopV2 is MEMCACHE only; for the Redis path buf is always io_buf_. + (void)buf; return ParseRedis(max_busy_read_cycles_cached, true) == ParserStatus::OK; } -bool Connection::ParseMCBatch() { - CHECK(io_buf_.InputLen() > 0); +bool Connection::ParseMCBatch(io::IoBuf& buf) { + CHECK(buf.InputLen() > 0); do { if (parsed_cmd_ == nullptr) { @@ -2316,9 +2329,9 @@ bool Connection::ParseMCBatch() { } uint32_t consumed = 0; memcache_parser_->set_last_unix_time(time(nullptr)); - MemcacheParser::Result result = memcache_parser_->Parse(io::View(io_buf_.InputBuffer()), - &consumed, parsed_cmd_->mc_command()); - io_buf_.ConsumeInput(consumed); + MemcacheParser::Result result = + memcache_parser_->Parse(io::View(buf.InputBuffer()), &consumed, parsed_cmd_->mc_command()); + buf.ConsumeInput(consumed); DVLOG(2) << "mc_result " << unsigned(result) << " consumed: " << consumed << " type " << unsigned(parsed_cmd_->mc_command()->type); @@ -2353,7 +2366,7 @@ bool Connection::ParseMCBatch() { break; } } - } while (parsed_cmd_q_len_ < 128 && io_buf_.InputLen() > 0); + } while (parsed_cmd_q_len_ < 128 && buf.InputLen() > 0); return true; } @@ -2609,13 +2622,12 @@ void Connection::HandleSocketBackpressure(size_t offload_threshold) { disk_push_in_flight_ = true; // We intentionally defer ConsumeInput until the write completes rather than deep-copying - // the buffer. This is safe because: - // 1. socket_buf_.AppendBuffer() starts after the in-flight bytes, so incoming recv data - // written via CommitWrite never overlaps the region io_uring is reading. - // 2. DoReadOnRecv checks disk_push_in_flight_ and skips writes to socket_buf_ while the - // push is in flight, preventing any mutation of the buffer io_uring is reading. - // Calling ConsumeInput here would reset offs_/size_ to 0, making AppendBuffer() alias the - // same memory io_uring is still reading, and would allow Reserve() to free that memory. + // the buffer. This is safe because socket_buf_.AppendBuffer() starts after the in-flight + // InputBuffer bytes, so incoming recv data written via CommitWrite never overlaps the region + // io_uring is reading. Calling ConsumeInput here would reset offs_/size_ to 0, making + // AppendBuffer() alias the same memory io_uring is still reading, and would allow Reserve() + // to free that memory. Any path that would Reserve socket_buf_ (WriteAndCommit with + // insufficient AppendLen, or CheckIoBufCapacity) checks disk_push_in_flight_ and skips. disk_queue_->PushAsync(socket_buf_.InputBuffer(), [this, to_offload](std::error_code ec) { disk_push_in_flight_ = false; if (ec) { @@ -2701,10 +2713,6 @@ void Connection::DoReadOnRecv(const util::FiberSocketBase::RecvNotification& n) return; } - // socket_buf_ is pinned by io_uring during a push; new writes would corrupt the in-flight read. - if (disk_push_in_flight_) - return; - using RecvNoti = util::FiberSocketBase::RecvNotification::RecvCompletion; if (std::holds_alternative(n.read_result)) { if (!std::get(n.read_result)) { @@ -2717,6 +2725,8 @@ void Connection::DoReadOnRecv(const util::FiberSocketBase::RecvNotification& n) return; } + // Writes go to AppendBuffer which starts after the in-flight InputBuffer region, + // so they never overlap the memory io_uring is reading during a push. Safe to proceed. io::MutableBytes buf = socket_buf_.AppendBuffer(); io::Result res = socket_->TryRecv(buf); @@ -2745,6 +2755,10 @@ void Connection::DoReadOnRecv(const util::FiberSocketBase::RecvNotification& n) io_ec_ = ec; } else if (std::holds_alternative(n.read_result)) { // provided buffer. io::MutableBytes buf = std::get(n.read_result); + // WriteAndCommit may call Reserve() if AppendLen() < buf.size(), which would free the + // backing allocation that io_uring is still reading during an in-flight push. Skip. + if (disk_push_in_flight_ && socket_buf_.AppendLen() < buf.size()) + return; UpdateIoBufCapacity(socket_buf_, &tl_facade_stats->conn_stats, [&]() { socket_buf_.WriteAndCommit(buf.data(), buf.size()); }); } else { @@ -2752,15 +2766,17 @@ void Connection::DoReadOnRecv(const util::FiberSocketBase::RecvNotification& n) } } -void Connection::CheckIoBufCapacity(bool is_iobuf_full) { - // io_buf_ backing allocation is pinned by an in-flight pop. Reserve() would free it. - if (disk_pop_in_flight_) +void Connection::CheckIoBufCapacity(bool is_iobuf_full, io::IoBuf& buf) { + // Reserve() frees the backing allocation, which is unsafe while io_uring has a reference: + // io_buf_ is pinned by an in-flight disk pop (PopAsync reads into it) + // socket_buf_ is pinned by an in-flight disk push (PushAsync reads from it) + if ((disk_pop_in_flight_ && &buf == &io_buf_) || (disk_push_in_flight_ && &buf == &socket_buf_)) return; auto& conn_stats = tl_facade_stats->conn_stats; size_t max_io_buf_len = GetFlag(FLAGS_max_client_iobuf_len); - size_t capacity = io_buf_.Capacity(); + size_t capacity = buf.Capacity(); if (capacity < max_io_buf_len) { size_t parser_hint = 0; if (redis_parser_) @@ -2772,23 +2788,23 @@ void Connection::CheckIoBufCapacity(bool is_iobuf_full) { // (Note: The buffer object is only working in power-of-2 sizes, // so there's no danger of accidental O(n^2) behavior.) if (parser_hint > capacity) { - UpdateIoBufCapacity(io_buf_, &conn_stats, - [&]() { io_buf_.Reserve(std::min(max_io_buf_len, parser_hint)); }); + UpdateIoBufCapacity(buf, &conn_stats, + [&]() { buf.Reserve(std::min(max_io_buf_len, parser_hint)); }); } // If we got a partial request because iobuf was full, grow it up to // a reasonable limit to save on Recv() calls. if (is_iobuf_full && capacity < max_io_buf_len / 2) { // Last io used most of the io_buf to the end. - UpdateIoBufCapacity(io_buf_, &conn_stats, [&]() { - io_buf_.Reserve(capacity * 2); // Valid growth range. + UpdateIoBufCapacity(buf, &conn_stats, [&]() { + buf.Reserve(capacity * 2); // Valid growth range. }); } - if (io_buf_.AppendLen() == 0U) { + if (buf.AppendLen() == 0U) { // it can happen with memcached but not for RedisParser, because RedisParser fully // consumes the passed buffer - LOG_EVERY_T(WARNING, 10) << "Maximum io_buf length reached " << io_buf_.Capacity() + LOG_EVERY_T(WARNING, 10) << "Maximum io_buf length reached " << buf.Capacity() << ", consider to increase max_client_iobuf_len flag"; } } @@ -2848,6 +2864,11 @@ variant Connection::IoLoopV2() { util::fb2::detail::Waiter ioevent_waiter{ioevent_cb}; // takes callback by reference absl::Cleanup waiter_cleanup = [this] { current_wait_.reset(); }; + // active_buf points to whichever buffer the parser should read from this iteration: + // &io_buf_ when disk data is present or a pop is in-flight (io_buf_ is the sink) + // &socket_buf_ otherwise (no disk involvement; parse socket bytes without copying) + io::IoBuf* active_buf = &socket_buf_; + do { HandleMigrateRequest(); @@ -2858,22 +2879,13 @@ variant Connection::IoLoopV2() { DrainDiskQueue(offload_threshold); - // Once disk is fully drained and the parser buffer is empty, promote buffered socket - // bytes into io_buf_ so the parser sees them next. This is always safe: disk is empty so - // there are no older bytes that could be violated by parsing socket_buf_ data now. - if ((!disk_queue_ || disk_queue_->Empty()) && !disk_pop_in_flight_ && io_buf_.InputLen() == 0 && - socket_buf_.InputLen() > 0) { - UpdateIoBufCapacity(io_buf_, &tl_facade_stats->conn_stats, [&]() { - io_buf_.Reserve(socket_buf_.InputLen()); - io_buf_.WriteAndCommit(socket_buf_.InputBuffer().data(), socket_buf_.InputLen()); - }); - socket_buf_.ConsumeInput(socket_buf_.InputLen()); - } + // Recompute active_buf every iteration after DrainDiskQueue may have changed disk state. + active_buf = (io_buf_.InputLen() > 0 || disk_pop_in_flight_) ? &io_buf_ : &socket_buf_; - // Enter the await block when io_buf_ is empty OR when a disk pop is in-flight. + // Enter the await block when the active buffer is empty OR when a disk pop is in-flight. // The in-flight case: PopAsync owns io_buf_.AppendBuffer() until the CQE fires. - if (io_buf_.InputLen() == 0 || disk_pop_in_flight_) { - if (io_buf_.InputLen() == 0 && !disk_pop_in_flight_) { + if (active_buf->InputLen() == 0 || disk_pop_in_flight_) { + if (active_buf->InputLen() == 0 && !disk_pop_in_flight_) { // Poll again for readiness. The event handler registered above is edge triggered. // We should read from the socket until EAGAIN or EWOULDBLOCK to make sure we consume // all available data. See "Do I need to continuously read/write" question under @@ -2897,12 +2909,12 @@ variant Connection::IoLoopV2() { // Wake to drain disk once the pipeline queue has room again. bool can_drain_disk = offload_threshold > 0 && disk_queue_ && !disk_queue_->Empty() && parsed_cmd_q_bytes_ < offload_threshold && disk_op_done; - // Wake to parse restored disk bytes. - bool can_parse = io_buf_.InputLen() > 0 && !disk_pop_in_flight_; - // Wake to promote socket bytes once disk is empty and io_buf_ is drained. + // Wake when there is parseable data: disk bytes in io_buf_, or socket bytes when + // disk is fully drained (active_buf will be re-evaluated on the next iteration). bool disk_empty = !disk_queue_ || disk_queue_->Empty(); - bool can_promote = socket_buf_.InputLen() > 0 && disk_empty && !disk_pop_in_flight_; - return can_parse || can_promote || cmd_ready || cmd_executable || io_ec_ || can_drain_disk; + bool can_parse = (io_buf_.InputLen() > 0 || (socket_buf_.InputLen() > 0 && disk_empty)) && + !disk_pop_in_flight_; + return can_parse || cmd_ready || cmd_executable || io_ec_ || can_drain_disk; }); } @@ -2912,16 +2924,16 @@ variant Connection::IoLoopV2() { } phase_ = PROCESS; - bool is_iobuf_full = io_buf_.AppendLen() == 0; + bool is_iobuf_full = active_buf->AppendLen() == 0; // When over the offload threshold with no commands left to dispatch, force the // await path so the fiber yields and lets shard completions be processed. - // Without this, recv keeps refilling io_buf_ and we spin forever. + // Without this, recv keeps refilling the buffer and we spin forever. bool force_await = offload_threshold > 0 && parsed_cmd_q_bytes_ >= offload_threshold && parsed_to_execute_ == nullptr; - if (io_buf_.InputLen() > 0 && !force_await && !disk_pop_in_flight_) { - parse_status = ParseLoop(); + if (active_buf->InputLen() > 0 && !force_await && !disk_pop_in_flight_) { + parse_status = ParseLoop(*active_buf); } else { parse_status = NEED_MORE; @@ -2931,10 +2943,10 @@ variant Connection::IoLoopV2() { ReplyBatch(); } - // When force_await is active we cannot parse even if io_buf_ or socket_buf_ has data. + // When force_await is active we cannot parse even if active_buf or socket_buf_ has data. // Push any pending socket bytes to disk so TCP keeps flowing, then yield until either // a command finishes (cmd_ready) or the queue drains below the threshold. Without this - // yield the fiber spins: the await block is skipped (io_buf_ non-empty), parse is + // yield the fiber spins: the await block is skipped (active_buf non-empty), parse is // skipped (force_await), and nothing makes progress. if (force_await && (io_buf_.InputLen() > 0 || socket_buf_.InputLen() > 0)) { HandleSocketBackpressure(offload_threshold); @@ -2956,7 +2968,7 @@ variant Connection::IoLoopV2() { if (parse_status == NEED_MORE) { parse_status = OK; - CheckIoBufCapacity(is_iobuf_full); + CheckIoBufCapacity(is_iobuf_full, *active_buf); } else if (parse_status != OK) { break; } diff --git a/src/facade/dragonfly_connection.h b/src/facade/dragonfly_connection.h index 8a9cc650101a..e71e56288523 100644 --- a/src/facade/dragonfly_connection.h +++ b/src/facade/dragonfly_connection.h @@ -280,7 +280,7 @@ class Connection : public util::Connection { void DoReadOnRecv(const util::FiberSocketBase::RecvNotification& n); - void CheckIoBufCapacity(bool is_iobuf_full); + void CheckIoBufCapacity(bool is_iobuf_full, io::IoBuf& buf); // Main loop reading client messages and passing requests to dispatch queue. std::variant IoLoopV2(); @@ -365,12 +365,12 @@ class Connection : public util::Connection { // Returns true if one or more commands were parsed from the read buffer, // and false if no complete commands could be parsed (for example, when // parsing is pending more input). - bool ParseMCBatch(); + bool ParseMCBatch(io::IoBuf& buf); - bool ParseRedisBatch(); + bool ParseRedisBatch(io::IoBuf& buf); // Call appropriate ParseBatch function, proceed with Execute and Reply all why input is remaining - ParserStatus ParseLoop(); + ParserStatus ParseLoop(io::IoBuf& buf); // Loop over enqueued async commands and enqueue them for async execution. // If async execution is not possible, handle them in synchronous mode one by one. @@ -424,9 +424,9 @@ class Connection : public util::Connection { size_t request_consumed_bytes_ = 0; util::FiberSocketBase::ProvidedBuffer recv_buf_; - io::IoBuf - io_buf_; // parser input: fed exclusively from disk pops (or socket_buf_ when disk empty) - io::IoBuf socket_buf_; // recv target: new TCP bytes always land here + // parser input: fed exclusively from disk pops (or socket_buf_ when disk empty) + io::IoBuf io_buf_; + io::IoBuf socket_buf_; std::unique_ptr redis_parser_; std::unique_ptr memcache_parser_; ParsedCommand* parsed_cmd_ = nullptr; From a58d5b48b8a382b7d012ee1575c069db58c83be3 Mon Sep 17 00:00:00 2001 From: Kostas Kyrimis Date: Fri, 27 Mar 2026 14:35:37 +0200 Subject: [PATCH 4/9] replace two io_buf_ design with copy on Push + chaining Signed-off-by: Kostas Kyrimis --- src/facade/disk_backed_queue.cc | 149 +++++++++++++--- src/facade/disk_backed_queue.h | 37 +++- src/facade/disk_backed_queue_test.cc | 54 +++--- src/facade/dragonfly_connection.cc | 249 +++++++++++++-------------- src/facade/dragonfly_connection.h | 9 +- 5 files changed, 323 insertions(+), 175 deletions(-) diff --git a/src/facade/disk_backed_queue.cc b/src/facade/disk_backed_queue.cc index 81d1bd46d516..54275e2d6638 100644 --- a/src/facade/disk_backed_queue.cc +++ b/src/facade/disk_backed_queue.cc @@ -14,7 +14,9 @@ #include #include +#include #include +#include #include "base/flags.h" #include "facade/facade_types.h" @@ -22,6 +24,7 @@ #include "util/fibers/uring_proactor.h" using facade::operator""_MB; +using facade::operator""_KB; ABSL_FLAG(std::string, disk_backpressure_folder, "/tmp/", "Folder to store disk-backed connection backpressure"); @@ -30,16 +33,32 @@ ABSL_FLAG(size_t, disk_backpressure_file_max_bytes, 50_MB, "Maximum size of the backing file. When max size is reached, connection will " "stop offloading backpressure to disk and block on client read."); +ABSL_FLAG(size_t, disk_backpressure_drain_threshold, 16_KB, + "When remaining bytes in the disk-backed queue drop below this " + "threshold, queue is in draining phase (backpressure drains). " + "Defaults to 2x the max chunk size (16KB)."); + namespace facade { struct DiskBackedQueue::Impl { std::unique_ptr file; size_t write_offset = 0; - size_t total_backing_bytes = 0; + size_t total_backing_bytes = 0; // bytes actually on disk, available to pop + size_t queued_bytes = 0; // bytes in write_queue not yet written to disk size_t next_read_offset = 0; // Tracks how far into the file holes have been punched (always 4096-aligned). size_t punch_offset = 0; size_t in_flight_callbacks = 0; + bool write_in_flight = false; + bool pop_in_flight = false; + // Set by Cancel(). In-flight CQE callbacks skip accounting and chaining when true. + bool cancelled = false; + + struct PendingWrite { + Chunk chunk; + AsyncPushCallback cb; + }; + std::deque write_queue; // Punch holes over the aligned region we have fully read past so the OS can reclaim pages. void MaybePunchHole() { @@ -51,11 +70,53 @@ struct DiskBackedQueue::Impl { punch_offset = aligned_end; } } + + void MaybeFlushQueue() { + if (write_in_flight || write_queue.empty()) + return; + + write_in_flight = true; + + PendingWrite pw = std::move(write_queue.front()); + write_queue.pop_front(); + + const uint8_t* data_ptr = pw.chunk.data.data(); + const size_t size = pw.chunk.data.size(); + const size_t offset = write_offset; + + file->WriteAsync({data_ptr, size}, offset, [this, pw = std::move(pw), size](int res) mutable { + --in_flight_callbacks; + write_in_flight = false; + + std::error_code ec; + if (res < 0) { + ec = {-res, std::system_category()}; + LOG(ERROR) << "Failed to offload chunk of size " << size + << " to backing with error: " << ec; + } else if (!cancelled) { + write_offset += size; + total_backing_bytes += size; + queued_bytes -= size; + } + + if (cancelled) + ec = std::make_error_code(std::errc::operation_canceled); + + // Chain next write before invoking cb, so IsActive() reflects + // the true state when the callback notifies the connection. + if (!ec && !write_queue.empty()) + MaybeFlushQueue(); + + pw.cb(ec); + }); + ++in_flight_callbacks; + } }; DiskBackedQueue::DiskBackedQueue(uint32_t conn_id) : impl_(std::make_unique()), max_backing_size_(absl::GetFlag(FLAGS_disk_backpressure_file_max_bytes)), + drain_threshold_(absl::GetFlag(FLAGS_disk_backpressure_drain_threshold)), id_(conn_id) { } @@ -96,40 +157,69 @@ bool DiskBackedQueue::Empty() const { return impl_->total_backing_bytes == 0; } -bool DiskBackedQueue::HasEnoughBackingSpaceFor(size_t bytes) const { - return (bytes + impl_->total_backing_bytes) < max_backing_size_; +void DiskBackedQueue::Cancel() { + impl_->cancelled = true; + impl_->queued_bytes = 0; + impl_->total_backing_bytes = 0; + + // Drain all pending (not-yet-submitted) writes synchronously. + // The at-most-one already-submitted write/pop CQE will still arrive; its + // callback checks cancelled and skips accounting, then clears the in-flight flag. + const auto cancel_ec = std::make_error_code(std::errc::operation_canceled); + while (!impl_->write_queue.empty()) { + AsyncPushCallback cb = std::move(impl_->write_queue.front().cb); + impl_->write_queue.pop_front(); + cb(cancel_ec); + } } -void DiskBackedQueue::PushAsync(io::Bytes bytes, AsyncPushCallback cb) { - const size_t offset = impl_->write_offset; - const size_t size = bytes.size(); - ++impl_->in_flight_callbacks; +bool DiskBackedQueue::IsActive() const { + return (!impl_->cancelled && impl_->total_backing_bytes > 0) || !impl_->write_queue.empty() || + impl_->write_in_flight || impl_->pop_in_flight; +} - impl_->file->WriteAsync(bytes, offset, [this, size, cb = std::move(cb)](int res) { - --impl_->in_flight_callbacks; - if (res < 0) { - std::error_code ec{-res, std::system_category()}; - VLOG(2) << "Failed to offload blob of size " << size << " to backing with error: " << ec; - cb(ec); - return; - } +bool DiskBackedQueue::IsDraining() const { + if (!IsActive()) + return false; + return (impl_->total_backing_bytes + impl_->queued_bytes) < drain_threshold_; +} - impl_->write_offset += size; - impl_->total_backing_bytes += size; - VLOG(2) << "Offload connection " << this << " backpressure of " << size; - cb({}); - }); +bool DiskBackedQueue::IsPopInFlight() const { + return impl_->pop_in_flight; +} + +bool DiskBackedQueue::HasEnoughBackingSpaceFor(size_t bytes) const { + return (bytes + impl_->total_backing_bytes + impl_->queued_bytes) < max_backing_size_; +} + +void DiskBackedQueue::PushAsync(Chunk chunk, AsyncPushCallback cb) { + DCHECK(!chunk.data.empty()); + + impl_->queued_bytes += chunk.data.size(); + impl_->write_queue.push_back({std::move(chunk), std::move(cb)}); + impl_->MaybeFlushQueue(); } void DiskBackedQueue::PopAsync(io::MutableBytes out, AsyncPopCallback cb) { + if (impl_->pop_in_flight || impl_->total_backing_bytes == 0) + return; + const size_t to_read = std::min(impl_->total_backing_bytes, out.size()); const size_t offset = impl_->next_read_offset; ++impl_->in_flight_callbacks; + impl_->pop_in_flight = true; io::MutableBytes read_buf = out.subspan(0, to_read); impl_->file->ReadAsync(read_buf, offset, [this, to_read, offset, cb = std::move(cb)](int res) { --impl_->in_flight_callbacks; + impl_->pop_in_flight = false; + + if (impl_->cancelled) { + cb(nonstd::make_unexpected(std::make_error_code(std::errc::operation_canceled))); + return; + } + if (res < 0) { std::error_code ec{-res, std::system_category()}; LOG(ERROR) << "Could not load item at offset " << offset << " of size " << to_read @@ -160,7 +250,7 @@ namespace facade { struct DiskBackedQueue::Impl {}; DiskBackedQueue::DiskBackedQueue(uint32_t conn_id) - : impl_(std::make_unique()), max_backing_size_(0), id_(conn_id) { + : impl_(std::make_unique()), max_backing_size_(0), drain_threshold_(0), id_(conn_id) { } DiskBackedQueue::~DiskBackedQueue() = default; @@ -177,11 +267,26 @@ bool DiskBackedQueue::Empty() const { return true; } +bool DiskBackedQueue::IsActive() const { + return false; +} + +bool DiskBackedQueue::IsDraining() const { + return false; +} + +bool DiskBackedQueue::IsPopInFlight() const { + return false; +} + +void DiskBackedQueue::Cancel() { +} + std::error_code DiskBackedQueue::Close() { return {}; } -void DiskBackedQueue::PushAsync(io::Bytes, AsyncPushCallback cb) { +void DiskBackedQueue::PushAsync(Chunk, AsyncPushCallback cb) { cb(std::make_error_code(std::errc::function_not_supported)); } diff --git a/src/facade/disk_backed_queue.h b/src/facade/disk_backed_queue.h index d7d7ddfd81c6..5b2f2537dac8 100644 --- a/src/facade/disk_backed_queue.h +++ b/src/facade/disk_backed_queue.h @@ -7,6 +7,7 @@ #include #include #include +#include #include "io/io.h" @@ -17,26 +18,57 @@ namespace facade { // HasEnoughBackingSpaceFor() always returns false, so callers never push/pop. class DiskBackedQueue { public: + // Maximum number of bytes per chunk. Recvs read into a Chunk-sized buffer; + // actual chunk size is the number of bytes received (may be smaller). + static constexpr size_t kMaxChunkSize = 8192; + + struct Chunk { + // 0 < data.size() * 8 < kMaxChunkSize + std::vector data; + }; + explicit DiskBackedQueue(uint32_t conn_id); ~DiskBackedQueue(); std::error_code Init(); + // Returns true if the queue has any pending work: data on disk, chunks + // waiting to be written, a write CQE in-flight, or a pop CQE in-flight. + bool IsActive() const; + + // Returns true when remaining bytes have dropped below the drain + // threshold. The caller should block new socket reads to allow a clean + // drain-to-memory transition (TCP backpressure builds during this window). + bool IsDraining() const; + + // Returns true if there is a pop in flight + bool IsPopInFlight() const; + // Check if we can offload bytes to backing file. + // Counts both on-disk bytes and bytes queued for writing (not yet on disk). bool HasEnoughBackingSpaceFor(size_t bytes) const; using AsyncPushCallback = std::function; - void PushAsync(io::Bytes bytes, AsyncPushCallback cb); + // Takes ownership of chunk and enqueues it for async write to disk. + // The caller's buffer is free the instant PushAsync returns. + // cb is invoked after the chunk's write CQE fires. + void PushAsync(Chunk chunk, AsyncPushCallback cb); using AsyncPopCallback = std::function)>; // Async read variant. Callback is invoked with Result containing bytes read or error. + // No-op if a pop is already in-flight or the backing store is empty. void PopAsync(io::MutableBytes out, AsyncPopCallback cb); - // Check if backing file is empty, i.e. backing file has 0 bytes. + // Check if backing store (on-disk bytes) is empty. bool Empty() const; + // Cancel all pending (not-yet-submitted) writes, invoking their callbacks with + // operation_canceled. + // Caller must also wait the in-flight operations to complete + void Cancel(); + std::error_code Close(); private: @@ -46,6 +78,7 @@ class DiskBackedQueue { // Read only constants const size_t max_backing_size_; + const size_t drain_threshold_; // same as connection id. Used to uniquely identify the backed file const size_t id_; diff --git a/src/facade/disk_backed_queue_test.cc b/src/facade/disk_backed_queue_test.cc index 1bf7ae59d2ca..03a325b82acf 100644 --- a/src/facade/disk_backed_queue_test.cc +++ b/src/facade/disk_backed_queue_test.cc @@ -53,11 +53,13 @@ TEST_F(DiskBackedQueueTest, PunchHoleReleasesSpace) { std::string data(12288, 'x'); { util::fb2::Done done; - backing.PushAsync(io::MutableBytes(reinterpret_cast(data.data()), data.size()), - [&done](std::error_code ec) { - ASSERT_FALSE(ec); - done.Notify(); - }); + DiskBackedQueue::Chunk chunk; + chunk.data.assign(reinterpret_cast(data.data()), + reinterpret_cast(data.data()) + data.size()); + backing.PushAsync(std::move(chunk), [&done](std::error_code ec) { + ASSERT_FALSE(ec); + done.Notify(); + }); done.Wait(); } @@ -99,11 +101,13 @@ TEST_F(DiskBackedQueueTest, PunchHoleAdvancesOffset) { std::string data(32768, 'y'); { util::fb2::Done done; - backing.PushAsync(io::MutableBytes(reinterpret_cast(data.data()), data.size()), - [&done](std::error_code ec) { - ASSERT_FALSE(ec); - done.Notify(); - }); + DiskBackedQueue::Chunk chunk; + chunk.data.assign(reinterpret_cast(data.data()), + reinterpret_cast(data.data()) + data.size()); + backing.PushAsync(std::move(chunk), [&done](std::error_code ec) { + ASSERT_FALSE(ec); + done.Notify(); + }); done.Wait(); } @@ -146,11 +150,13 @@ TEST_F(DiskBackedQueueTest, PunchHoleUnalignedReadsAndWrites) { std::string data(10000, 'z'); { util::fb2::Done done; - backing.PushAsync(io::MutableBytes(reinterpret_cast(data.data()), data.size()), - [&done](std::error_code ec) { - ASSERT_FALSE(ec); - done.Notify(); - }); + DiskBackedQueue::Chunk chunk; + chunk.data.assign(reinterpret_cast(data.data()), + reinterpret_cast(data.data()) + data.size()); + backing.PushAsync(std::move(chunk), [&done](std::error_code ec) { + ASSERT_FALSE(ec); + done.Notify(); + }); done.Wait(); } @@ -256,10 +262,12 @@ TEST_F(DiskBackedQueueTest, AsyncReadWrite) { util::fb2::Fiber write_fiber = util::fb2::Fiber("writer", [&]() { for (size_t i = 0; i < 100; ++i) { auto cmd = absl::StrCat("SET FOO", i, " BAR"); - auto bytes = io::MutableBytes(reinterpret_cast(cmd.data()), cmd.size()); + DiskBackedQueue::Chunk chunk; + chunk.data.assign(reinterpret_cast(cmd.data()), + reinterpret_cast(cmd.data()) + cmd.size()); util::fb2::Done done; - backing.PushAsync(bytes, [&done](std::error_code ec) { + backing.PushAsync(std::move(chunk), [&done](std::error_code ec) { EXPECT_FALSE(ec); done.Notify(); }); @@ -304,11 +312,13 @@ TEST_F(DiskBackedQueueTest, AsyncPunchHole) { std::string data(12288, 'x'); util::fb2::Done write_done; - backing.PushAsync(io::MutableBytes(reinterpret_cast(data.data()), data.size()), - [&write_done](std::error_code ec) { - ASSERT_FALSE(ec); - write_done.Notify(); - }); + DiskBackedQueue::Chunk chunk; + chunk.data.assign(reinterpret_cast(data.data()), + reinterpret_cast(data.data()) + data.size()); + backing.PushAsync(std::move(chunk), [&write_done](std::error_code ec) { + ASSERT_FALSE(ec); + write_done.Notify(); + }); write_done.Wait(); // Async read all data back in 4096-byte chunks diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index bc261ae1dd02..ac6fe5299f4b 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -1090,9 +1090,9 @@ void Connection::ConnectionFlow() { ++local_stats_.read_cnt; - // For the old IoLoop path, promote CheckForHttpProto bytes from socket_buf_ to io_buf_. - // IoLoopV2 reads socket_buf_ directly via its active_buf pointer, so no copy is needed. - if (socket_buf_.InputLen() > 0 && !ioloop_v2_) { + // Promote any bytes read during CheckForHttpProto from socket_buf_ to io_buf_. + // Both the old IoLoop and IoLoopV2 parse from io_buf_ exclusively. + if (socket_buf_.InputLen() > 0) { UpdateIoBufCapacity(io_buf_, &conn_stats, [&]() { io_buf_.Reserve(socket_buf_.InputLen()); io_buf_.WriteAndCommit(socket_buf_.InputBuffer().data(), socket_buf_.InputLen()); @@ -1165,8 +1165,12 @@ void Connection::ConnectionFlow() { } if (disk_queue_) { - if (disk_push_in_flight_ || disk_pop_in_flight_) { - io_event_.await([this] { return !disk_push_in_flight_ && !disk_pop_in_flight_; }); + // Cancel pending (queued but not-yet-submitted) writes immediately. + // This leaves at most one in-flight write CQE and one in-flight pop CQE, + // both of which will complete quickly without draining the full queue. + disk_queue_->Cancel(); + if (disk_queue_->IsActive()) { + io_event_.await([this] { return !disk_queue_->IsActive(); }); } std::ignore = disk_queue_->Close(); disk_queue_.reset(); @@ -1377,7 +1381,7 @@ auto Connection::ParseLoop(io::IoBuf& buf) -> ParserStatus { VLOG(3) << "Offload triggered. offload_threshold=" << offload_threshold << " buf_len=" << buf.InputLen() << " parsed_head_=" << parsed_head_ << " parsed_to_execute_=" << parsed_to_execute_; - HandleSocketBackpressure(offload_threshold); + // New recvs are routed to chunks by DoReadOnRecv; no explicit push needed here. if (!ExecuteBatch()) return ERROR; if (!ReplyBatch()) @@ -2603,61 +2607,20 @@ void Connection::EnsureMemoryBudget(unsigned tid) { thread_queue_backpressure[tid].EnsureBelowLimit(); } -void Connection::HandleSocketBackpressure(size_t offload_threshold) { - DCHECK_GT(offload_threshold, 0u); - - if (parsed_cmd_q_bytes_ < offload_threshold) - return; - - if (disk_push_in_flight_ || socket_buf_.InputLen() == 0) - return; - - if (!InitDiskQueueIfNeeded()) - return; - - if (!disk_queue_->HasEnoughBackingSpaceFor(socket_buf_.InputLen())) - return; - - const size_t to_offload = socket_buf_.InputLen(); - disk_push_in_flight_ = true; - - // We intentionally defer ConsumeInput until the write completes rather than deep-copying - // the buffer. This is safe because socket_buf_.AppendBuffer() starts after the in-flight - // InputBuffer bytes, so incoming recv data written via CommitWrite never overlaps the region - // io_uring is reading. Calling ConsumeInput here would reset offs_/size_ to 0, making - // AppendBuffer() alias the same memory io_uring is still reading, and would allow Reserve() - // to free that memory. Any path that would Reserve socket_buf_ (WriteAndCommit with - // insufficient AppendLen, or CheckIoBufCapacity) checks disk_push_in_flight_ and skips. - disk_queue_->PushAsync(socket_buf_.InputBuffer(), [this, to_offload](std::error_code ec) { - disk_push_in_flight_ = false; - if (ec) { - LOG(ERROR) << "Disk offload write failed: " << ec.message(); - // Should we abort the connection ? - } else { - socket_buf_.ConsumeInput(to_offload); - VLOG(3) << "Offloaded " << to_offload << " bytes to disk for connection " << id_; - } - io_event_.notify(); - }); -} - void Connection::DrainDiskQueue(size_t offload_threshold) { if (offload_threshold == 0 || !disk_queue_ || disk_queue_->Empty()) return; - if (disk_pop_in_flight_ || io_buf_.AppendLen() == 0) + if (disk_queue_->IsPopInFlight() || io_buf_.AppendLen() == 0) return; if (parsed_cmd_q_bytes_ >= offload_threshold) return; - disk_pop_in_flight_ = true; disk_queue_->PopAsync(io_buf_.AppendBuffer(), [this](io::Result res) { - disk_pop_in_flight_ = false; if (res) { io_buf_.CommitWrite(*res); VLOG(3) << "Restored " << *res << " bytes from disk for connection " << id_; } else { LOG(ERROR) << "Disk offload read failed: " << res.error().message(); - // Should we abort the connection ? } io_event_.notify(); }); @@ -2707,6 +2670,29 @@ bool ConnectionRef::operator==(const ConnectionRef& other) const { return client_id_ == other.client_id_; } +bool Connection::MaybeOffloadToDisk(size_t len, const uint8_t* data) { + if (offload_threshold_ == 0 || !disk_queue_) + return false; + if (!disk_queue_->IsActive() && parsed_cmd_q_bytes_ < offload_threshold_) + return false; + + const uint8_t* src = data; + size_t remaining = len; + while (remaining > 0) { + size_t chunk_size = std::min(remaining, DiskBackedQueue::kMaxChunkSize); + DiskBackedQueue::Chunk chunk; + chunk.data.assign(src, src + chunk_size); + src += chunk_size; + remaining -= chunk_size; + disk_queue_->PushAsync(std::move(chunk), [this](std::error_code ec) { + if (ec) + io_ec_ = ec; + io_event_.notify(); + }); + } + return true; +} + void Connection::DoReadOnRecv(const util::FiberSocketBase::RecvNotification& n) { if (std::holds_alternative(n.read_result)) { io_ec_ = std::get(n.read_result); @@ -2720,57 +2706,90 @@ void Connection::DoReadOnRecv(const util::FiberSocketBase::RecvNotification& n) return; } - if (socket_buf_.AppendLen() == 0) { - // We will try to offload socket_buf_ to disk in HandleSocketBackpressure. + // Check if we should offload before doing a TryRecv (IsDraining blocks the recv entirely). + auto should_offload = [this]() { + if (offload_threshold_ == 0 || !disk_queue_) + return false; + return disk_queue_->IsActive() || parsed_cmd_q_bytes_ >= offload_threshold_; + }; + + if (should_offload()) { + // During the drain window block socket reads (MaybeOffloadToDisk returns true + // without pushing), so TCP backpressure builds until the queue fully empties. + if (disk_queue_->IsDraining()) + return; + + // Don't recv if backing store is too full to accept a max-size chunk. + if (!disk_queue_->HasEnoughBackingSpaceFor(DiskBackedQueue::kMaxChunkSize)) + return; + + DiskBackedQueue::Chunk chunk; + chunk.data.resize(DiskBackedQueue::kMaxChunkSize); + io::Result res = socket_->TryRecv({chunk.data.data(), chunk.data.size()}); + + if (!res) { + auto ec = res.error(); + if (ec != errc::resource_unavailable_try_again && ec != errc::operation_would_block) + io_ec_ = ec; + return; + } + if (*res == 0) { + io_ec_ = make_error_code(errc::connection_aborted); + return; + } + chunk.data.resize(*res); + VLOG(3) << "Offloaded " << *res << " bytes to disk for connection " << id_; + disk_queue_->PushAsync(std::move(chunk), [this](std::error_code ec) { + if (ec) + io_ec_ = ec; + io_event_.notify(); + }); return; } - // Writes go to AppendBuffer which starts after the in-flight InputBuffer region, - // so they never overlap the memory io_uring is reading during a push. Safe to proceed. - io::MutableBytes buf = socket_buf_.AppendBuffer(); + // Normal path: recv directly into io_buf_. + // A recv call can return fewer bytes than requested even if the socket buffer actually + // contains enough data to satisfy the full request. + // TODO: maybe worth looping here and trying another recv call until it fails with + // EAGAIN or EWOULDBLOCK. The problem there is that we need to handle resizing if + // AppendBuffer is zero. + if (io_buf_.AppendLen() == 0) + return; + + io::MutableBytes buf = io_buf_.AppendBuffer(); io::Result res = socket_->TryRecv(buf); if (res) { if (*res > 0) { - // A recv call can return fewer bytes than requested even if the - // socket buffer actually contains enough data to satisfy the full request. - // TODO maybe worth looping here and try another recv call until it fails - // with EAGAIN or EWOULDBLOCK. The problem there is that we need to handle - // resizing if AppendBuffer is zero. - socket_buf_.CommitWrite(*res); + io_buf_.CommitWrite(*res); return; } - // *res == 0 io_ec_ = make_error_code(errc::connection_aborted); return; } - // error path (!res) auto ec = res.error(); - // EAGAIN and EWOULDBLOCK - if (ec == errc::resource_unavailable_try_again || ec == errc::operation_would_block) { - return; - } + if (ec != errc::resource_unavailable_try_again && ec != errc::operation_would_block) + io_ec_ = ec; - io_ec_ = ec; - } else if (std::holds_alternative(n.read_result)) { // provided buffer. + } else if (std::holds_alternative(n.read_result)) { // provided buffer io::MutableBytes buf = std::get(n.read_result); - // WriteAndCommit may call Reserve() if AppendLen() < buf.size(), which would free the - // backing allocation that io_uring is still reading during an in-flight push. Skip. - if (disk_push_in_flight_ && socket_buf_.AppendLen() < buf.size()) - return; - UpdateIoBufCapacity(socket_buf_, &tl_facade_stats->conn_stats, - [&]() { socket_buf_.WriteAndCommit(buf.data(), buf.size()); }); + + if (MaybeOffloadToDisk(buf.size(), buf.data())) { + } else { + // TODO: if a pop CQE is in-flight writing into io_buf_, WriteAndCommit races with it. + UpdateIoBufCapacity(io_buf_, &tl_facade_stats->conn_stats, + [&]() { io_buf_.WriteAndCommit(buf.data(), buf.size()); }); + } } else { LOG(FATAL) << "Should not reach here"; } } void Connection::CheckIoBufCapacity(bool is_iobuf_full, io::IoBuf& buf) { - // Reserve() frees the backing allocation, which is unsafe while io_uring has a reference: - // io_buf_ is pinned by an in-flight disk pop (PopAsync reads into it) - // socket_buf_ is pinned by an in-flight disk push (PushAsync reads from it) - if ((disk_pop_in_flight_ && &buf == &io_buf_) || (disk_push_in_flight_ && &buf == &socket_buf_)) + // Reserve() frees the backing allocation, which is unsafe while io_uring is writing + // into io_buf_.AppendBuffer() during an in-flight pop CQE. + if (disk_queue_ && disk_queue_->IsPopInFlight() && &buf == &io_buf_) return; auto& conn_stats = tl_facade_stats->conn_stats; @@ -2829,7 +2848,8 @@ variant Connection::IoLoopV2() { #endif } - const size_t offload_threshold = GetFlag(FLAGS_pipeline_disk_offload_threshold); + offload_threshold_ = GetFlag(FLAGS_pipeline_disk_offload_threshold); + const size_t offload_threshold = offload_threshold_; VLOG(2) << "IoLoopV2 starting: offload_threshold=" << offload_threshold << " conn_id=" << id_; @@ -2840,20 +2860,15 @@ variant Connection::IoLoopV2() { << " disk_queue_set=" << (disk_queue_ != nullptr); } - peer->RegisterOnRecv([this, offload_threshold](const FiberSocketBase::RecvNotification& n) { - DVLOG(2) << "Calling DoReadOnRecv socket_buf_len: " << socket_buf_.InputLen(); + peer->RegisterOnRecv([this](const FiberSocketBase::RecvNotification& n) { + // DoReadOnRecv routes bytes to io_buf_ (queue inactive) or a Chunk (queue active). + // Offload routing is decided inside DoReadOnRecv via offload_threshold_. + DVLOG(2) << "Calling DoReadOnRecv io_buf_len: " << io_buf_.InputLen(); DoReadOnRecv(n); - // The recv callback runs in the proactor event loop as a plain function (no fiber context). - // This cb can fire multiple times before io_event_.notify() ever reaches and wakes up - // the connection (we don't have any gurantees what runs next) so I opportunistically - // attempt to offload to disk here (if it's possible). - VLOG(4) << "recv callback fired: offload_threshold=" << offload_threshold + VLOG(4) << "recv callback fired: offload_threshold_=" << offload_threshold_ << " disk_queue_=" << (disk_queue_ != nullptr) << " parsed_cmd_q_bytes_=" << parsed_cmd_q_bytes_ << " io_buf_len=" << io_buf_.InputLen(); - if (offload_threshold > 0 && disk_queue_) { - HandleSocketBackpressure(offload_threshold); - } io_event_.notify(); }); @@ -2864,11 +2879,6 @@ variant Connection::IoLoopV2() { util::fb2::detail::Waiter ioevent_waiter{ioevent_cb}; // takes callback by reference absl::Cleanup waiter_cleanup = [this] { current_wait_.reset(); }; - // active_buf points to whichever buffer the parser should read from this iteration: - // &io_buf_ when disk data is present or a pop is in-flight (io_buf_ is the sink) - // &socket_buf_ otherwise (no disk involvement; parse socket bytes without copying) - io::IoBuf* active_buf = &socket_buf_; - do { HandleMigrateRequest(); @@ -2879,13 +2889,13 @@ variant Connection::IoLoopV2() { DrainDiskQueue(offload_threshold); - // Recompute active_buf every iteration after DrainDiskQueue may have changed disk state. - active_buf = (io_buf_.InputLen() > 0 || disk_pop_in_flight_) ? &io_buf_ : &socket_buf_; + const bool pop_in_flight = disk_queue_ && disk_queue_->IsPopInFlight(); - // Enter the await block when the active buffer is empty OR when a disk pop is in-flight. - // The in-flight case: PopAsync owns io_buf_.AppendBuffer() until the CQE fires. - if (active_buf->InputLen() == 0 || disk_pop_in_flight_) { - if (active_buf->InputLen() == 0 && !disk_pop_in_flight_) { + // Enter the await block when io_buf_ is empty or a pop CQE is in-flight. + // The in-flight case: PopAsync owns io_buf_.AppendBuffer() until the CQE fires; + // we must not parse the partially-written region. + if (io_buf_.InputLen() == 0 || pop_in_flight) { + if (io_buf_.InputLen() == 0 && !pop_in_flight) { // Poll again for readiness. The event handler registered above is edge triggered. // We should read from the socket until EAGAIN or EWOULDBLOCK to make sure we consume // all available data. See "Do I need to continuously read/write" question under @@ -2893,27 +2903,17 @@ variant Connection::IoLoopV2() { // The exception is when we use io_uring with multishot recv enabled, in which case // we rely on the kernel to keep feeding us data until multishot is disabled. DoReadOnRecv(FiberSocketBase::RecvNotification{true}); - if (offload_threshold > 0 && disk_queue_) { - VLOG(2) << "post-manual-read check: parsed_cmd_q_bytes_=" << parsed_cmd_q_bytes_ - << " socket_buf_len=" << socket_buf_.InputLen() - << " disk_push_in_flight_=" << disk_push_in_flight_; - HandleSocketBackpressure(offload_threshold); - } } io_event_.await([this, offload_threshold]() { // TODO: optimize CanReply with looking up waiter key bool cmd_executable = parsed_head_ && parsed_head_ == parsed_to_execute_; bool cmd_ready = !cmd_executable && parsed_head_ && parsed_head_->CanReply(); - // Wake when disk pop completes so we can parse the restored bytes. - bool disk_op_done = !disk_push_in_flight_ && !disk_pop_in_flight_; + bool pop_in_flight = disk_queue_ && disk_queue_->IsPopInFlight(); // Wake to drain disk once the pipeline queue has room again. bool can_drain_disk = offload_threshold > 0 && disk_queue_ && !disk_queue_->Empty() && - parsed_cmd_q_bytes_ < offload_threshold && disk_op_done; - // Wake when there is parseable data: disk bytes in io_buf_, or socket bytes when - // disk is fully drained (active_buf will be re-evaluated on the next iteration). - bool disk_empty = !disk_queue_ || disk_queue_->Empty(); - bool can_parse = (io_buf_.InputLen() > 0 || (socket_buf_.InputLen() > 0 && disk_empty)) && - !disk_pop_in_flight_; + parsed_cmd_q_bytes_ < offload_threshold && !pop_in_flight; + // Wake when there is parseable data in io_buf_ and no pop is writing into it. + bool can_parse = io_buf_.InputLen() > 0 && !pop_in_flight; return can_parse || cmd_ready || cmd_executable || io_ec_ || can_drain_disk; }); } @@ -2924,16 +2924,17 @@ variant Connection::IoLoopV2() { } phase_ = PROCESS; - bool is_iobuf_full = active_buf->AppendLen() == 0; + bool is_iobuf_full = io_buf_.AppendLen() == 0; // When over the offload threshold with no commands left to dispatch, force the // await path so the fiber yields and lets shard completions be processed. - // Without this, recv keeps refilling the buffer and we spin forever. + // Without this, new recvs keep routing to chunks but we spin without yielding. bool force_await = offload_threshold > 0 && parsed_cmd_q_bytes_ >= offload_threshold && parsed_to_execute_ == nullptr; - if (active_buf->InputLen() > 0 && !force_await && !disk_pop_in_flight_) { - parse_status = ParseLoop(*active_buf); + const bool cur_pop_in_flight = disk_queue_ && disk_queue_->IsPopInFlight(); + if (io_buf_.InputLen() > 0 && !force_await && !cur_pop_in_flight) { + parse_status = ParseLoop(io_buf_); } else { parse_status = NEED_MORE; @@ -2943,15 +2944,11 @@ variant Connection::IoLoopV2() { ReplyBatch(); } - // When force_await is active we cannot parse even if active_buf or socket_buf_ has data. - // Push any pending socket bytes to disk so TCP keeps flowing, then yield until either - // a command finishes (cmd_ready) or the queue drains below the threshold. Without this - // yield the fiber spins: the await block is skipped (active_buf non-empty), parse is - // skipped (force_await), and nothing makes progress. - if (force_await && (io_buf_.InputLen() > 0 || socket_buf_.InputLen() > 0)) { - HandleSocketBackpressure(offload_threshold); + // When force_await is active we cannot parse even if io_buf_ has data. + // New recvs are already routed to chunks by DoReadOnRecv (no explicit push needed). + // Yield until a command finishes or the queue drains below the threshold. + if (force_await) { VLOG(2) << "force_await spin-guard: yielding for cmd completion or queue drain" - << " disk_push_in_flight_=" << disk_push_in_flight_ << " parsed_cmd_q_bytes_=" << parsed_cmd_q_bytes_; io_event_.await([this, offload_threshold]() { bool cmd_ready = @@ -2968,7 +2965,7 @@ variant Connection::IoLoopV2() { if (parse_status == NEED_MORE) { parse_status = OK; - CheckIoBufCapacity(is_iobuf_full, *active_buf); + CheckIoBufCapacity(is_iobuf_full, io_buf_); } else if (parse_status != OK) { break; } diff --git a/src/facade/dragonfly_connection.h b/src/facade/dragonfly_connection.h index e71e56288523..69581414bf6f 100644 --- a/src/facade/dragonfly_connection.h +++ b/src/facade/dragonfly_connection.h @@ -280,6 +280,10 @@ class Connection : public util::Connection { void DoReadOnRecv(const util::FiberSocketBase::RecvNotification& n); + // Pushes [data, data+len) to the disk queue in ≤kMaxChunkSize chunks. + // Returns true if data was offloaded; false means caller should fall through to io_buf_. + bool MaybeOffloadToDisk(size_t len, const uint8_t* data); + void CheckIoBufCapacity(bool is_iobuf_full, io::IoBuf& buf); // Main loop reading client messages and passing requests to dispatch queue. @@ -340,7 +344,6 @@ class Connection : public util::Connection { // Lazily initialises disk_queue_. Returns false on error and disk_queue_ stays null. bool InitDiskQueueIfNeeded(); - void HandleSocketBackpressure(size_t offload_threshold); void DrainDiskQueue(size_t offload_threshold); void IncreaseConnStats(); @@ -417,8 +420,8 @@ class Connection : public util::Connection { // Disk-backed offload queue for IoLoopV2 pipeline backpressure. // Lazily created when the parsed command queue exceeds the offload threshold. std::unique_ptr disk_queue_; - bool disk_push_in_flight_ = false; - bool disk_pop_in_flight_ = false; + // Cached value of FLAGS_pipeline_disk_offload_threshold, set at IoLoopV2 startup. + size_t offload_threshold_ = 0; // how many bytes of the current request have been consumed size_t request_consumed_bytes_ = 0; From 48a108161fb41141d8eb3ffd9260bf95432dc29d Mon Sep 17 00:00:00 2001 From: Kostas Kyrimis Date: Fri, 27 Mar 2026 15:43:08 +0200 Subject: [PATCH 5/9] clean up --- src/facade/dragonfly_connection.cc | 84 +++++++++++++----------------- src/facade/dragonfly_connection.h | 12 ++--- 2 files changed, 41 insertions(+), 55 deletions(-) diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index ac6fe5299f4b..f5db58ae581e 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -829,8 +829,8 @@ void Connection::HandleRequests() { // We validate the http request using basic-auth inside HttpConnection::HandleSingleRequest. cc_->authenticated = true; - auto ec = http_conn.ParseFromBuffer(socket_buf_.InputBuffer()); - socket_buf_.ConsumeInput(socket_buf_.InputLen()); + auto ec = http_conn.ParseFromBuffer(io_buf_.InputBuffer()); + io_buf_.ConsumeInput(io_buf_.InputLen()); if (!ec) { http_conn.HandleRequests(); } @@ -1041,7 +1041,7 @@ io::Result Connection::CheckForHttpProto() { auto* peer = socket_.get(); auto& conn_stats = tl_facade_stats->conn_stats; do { - auto buf = socket_buf_.AppendBuffer(); + auto buf = io_buf_.AppendBuffer(); DCHECK(!buf.empty()); ::io::Result recv_sz = peer->Recv(buf); @@ -1053,8 +1053,8 @@ io::Result Connection::CheckForHttpProto() { return false; } - socket_buf_.CommitWrite(*recv_sz); - string_view ib = io::View(socket_buf_.InputBuffer()); + io_buf_.CommitWrite(*recv_sz); + string_view ib = io::View(io_buf_.InputBuffer()); if (ib.size() >= 2 && ib[0] == 22 && ib[1] == 3) { // We matched the TLS handshake raw data, which means "peer" is a TCP socket. // Reject the connection. @@ -1064,15 +1064,15 @@ io::Result Connection::CheckForHttpProto() { ib = ib.substr(last_len); size_t pos = ib.find('\n'); if (pos != string_view::npos) { - ib = io::View(socket_buf_.InputBuffer().first(last_len + pos)); + ib = io::View(io_buf_.InputBuffer().first(last_len + pos)); if (ib.size() < 10 || ib.back() != '\r') return false; ib.remove_suffix(1); return MatchHttp11Line(ib); } - last_len = socket_buf_.InputLen(); - UpdateIoBufCapacity(socket_buf_, &conn_stats, [&]() { socket_buf_.EnsureCapacity(128); }); + last_len = io_buf_.InputLen(); + UpdateIoBufCapacity(io_buf_, &conn_stats, [&]() { io_buf_.EnsureCapacity(128); }); } while (last_len < 1024); return false; @@ -1090,17 +1090,7 @@ void Connection::ConnectionFlow() { ++local_stats_.read_cnt; - // Promote any bytes read during CheckForHttpProto from socket_buf_ to io_buf_. - // Both the old IoLoop and IoLoopV2 parse from io_buf_ exclusively. - if (socket_buf_.InputLen() > 0) { - UpdateIoBufCapacity(io_buf_, &conn_stats, [&]() { - io_buf_.Reserve(socket_buf_.InputLen()); - io_buf_.WriteAndCommit(socket_buf_.InputBuffer().data(), socket_buf_.InputLen()); - }); - socket_buf_.ConsumeInput(socket_buf_.InputLen()); - } - - local_stats_.net_bytes_in += io_buf_.InputLen() + socket_buf_.InputLen(); + local_stats_.net_bytes_in += io_buf_.InputLen(); ParserStatus parse_status = OK; @@ -1112,7 +1102,7 @@ void Connection::ConnectionFlow() { parse_status = ParseRedis(10000); } else { DCHECK(memcache_parser_); - parse_status = ParseLoop(io_buf_); + parse_status = ParseLoop(); } } @@ -1201,8 +1191,8 @@ void Connection::ConnectionFlow() { LOG_IF(WARNING, ec2) << "Could not shutdown socket " << ec2; while (!ec2) { // Discard any received data. - socket_buf_.Clear(); - auto recv_sz = socket_->Recv(socket_buf_.AppendBuffer()); + io_buf_.Clear(); + auto recv_sz = socket_->Recv(io_buf_.AppendBuffer()); if (!recv_sz || *recv_sz == 0) { break; // Peer closed connection. } @@ -1362,7 +1352,7 @@ Connection::ParserStatus Connection::ParseRedis(unsigned max_busy_cycles, bool e return ERROR; } -auto Connection::ParseLoop(io::IoBuf& buf) -> ParserStatus { +auto Connection::ParseLoop() -> ParserStatus { auto parse_func = protocol_ == Protocol::MEMCACHE ? &Connection::ParseMCBatch : &Connection::ParseRedisBatch; @@ -1376,10 +1366,10 @@ auto Connection::ParseLoop(io::IoBuf& buf) -> ParserStatus { // We still execute and reply already-queued commands such that the parsed_cmd_q_bytes // drains. VLOG(4) << "ParseLoop check: parsed_cmd_q_bytes_=" << parsed_cmd_q_bytes_ - << " threshold=" << offload_threshold << " buf_len=" << buf.InputLen(); + << " threshold=" << offload_threshold << " buf_len=" << io_buf_.InputLen(); if (offload_threshold > 0 && parsed_cmd_q_bytes_ >= offload_threshold) { VLOG(3) << "Offload triggered. offload_threshold=" << offload_threshold - << " buf_len=" << buf.InputLen() << " parsed_head_=" << parsed_head_ + << " buf_len=" << io_buf_.InputLen() << " parsed_head_=" << parsed_head_ << " parsed_to_execute_=" << parsed_to_execute_; // New recvs are routed to chunks by DoReadOnRecv; no explicit push needed here. if (!ExecuteBatch()) @@ -1389,7 +1379,7 @@ auto Connection::ParseLoop(io::IoBuf& buf) -> ParserStatus { break; } - commands_parsed = (this->*parse_func)(buf); + commands_parsed = (this->*parse_func)(); if (!ExecuteBatch()) return ERROR; @@ -1397,7 +1387,7 @@ auto Connection::ParseLoop(io::IoBuf& buf) -> ParserStatus { if (!ReplyBatch()) return ERROR; - } while (commands_parsed && buf.InputLen() > 0); + } while (commands_parsed && io_buf_.InputLen() > 0); return commands_parsed ? OK : NEED_MORE; } @@ -1505,7 +1495,7 @@ variant Connection::IoLoop() { parse_status = ParseRedis(max_busy_read_cycles_cached); } else { DCHECK(memcache_parser_); - parse_status = ParseLoop(io_buf_); + parse_status = ParseLoop(); } if (reply_builder_->GetError()) { @@ -2316,14 +2306,12 @@ bool Connection::IsReplySizeOverLimit() const { return over_limit; } -bool Connection::ParseRedisBatch(io::IoBuf& buf) { - // IoLoopV2 is MEMCACHE only; for the Redis path buf is always io_buf_. - (void)buf; +bool Connection::ParseRedisBatch() { return ParseRedis(max_busy_read_cycles_cached, true) == ParserStatus::OK; } -bool Connection::ParseMCBatch(io::IoBuf& buf) { - CHECK(buf.InputLen() > 0); +bool Connection::ParseMCBatch() { + CHECK(io_buf_.InputLen() > 0); do { if (parsed_cmd_ == nullptr) { @@ -2333,9 +2321,9 @@ bool Connection::ParseMCBatch(io::IoBuf& buf) { } uint32_t consumed = 0; memcache_parser_->set_last_unix_time(time(nullptr)); - MemcacheParser::Result result = - memcache_parser_->Parse(io::View(buf.InputBuffer()), &consumed, parsed_cmd_->mc_command()); - buf.ConsumeInput(consumed); + MemcacheParser::Result result = memcache_parser_->Parse(io::View(io_buf_.InputBuffer()), + &consumed, parsed_cmd_->mc_command()); + io_buf_.ConsumeInput(consumed); DVLOG(2) << "mc_result " << unsigned(result) << " consumed: " << consumed << " type " << unsigned(parsed_cmd_->mc_command()->type); @@ -2370,7 +2358,7 @@ bool Connection::ParseMCBatch(io::IoBuf& buf) { break; } } - } while (parsed_cmd_q_len_ < 128 && buf.InputLen() > 0); + } while (parsed_cmd_q_len_ < 128 && io_buf_.InputLen() > 0); return true; } @@ -2786,16 +2774,16 @@ void Connection::DoReadOnRecv(const util::FiberSocketBase::RecvNotification& n) } } -void Connection::CheckIoBufCapacity(bool is_iobuf_full, io::IoBuf& buf) { +void Connection::CheckIoBufCapacity(bool is_iobuf_full) { // Reserve() frees the backing allocation, which is unsafe while io_uring is writing // into io_buf_.AppendBuffer() during an in-flight pop CQE. - if (disk_queue_ && disk_queue_->IsPopInFlight() && &buf == &io_buf_) + if (disk_queue_ && disk_queue_->IsPopInFlight()) return; auto& conn_stats = tl_facade_stats->conn_stats; size_t max_io_buf_len = GetFlag(FLAGS_max_client_iobuf_len); - size_t capacity = buf.Capacity(); + size_t capacity = io_buf_.Capacity(); if (capacity < max_io_buf_len) { size_t parser_hint = 0; if (redis_parser_) @@ -2807,23 +2795,23 @@ void Connection::CheckIoBufCapacity(bool is_iobuf_full, io::IoBuf& buf) { // (Note: The buffer object is only working in power-of-2 sizes, // so there's no danger of accidental O(n^2) behavior.) if (parser_hint > capacity) { - UpdateIoBufCapacity(buf, &conn_stats, - [&]() { buf.Reserve(std::min(max_io_buf_len, parser_hint)); }); + UpdateIoBufCapacity(io_buf_, &conn_stats, + [&]() { io_buf_.Reserve(std::min(max_io_buf_len, parser_hint)); }); } // If we got a partial request because iobuf was full, grow it up to // a reasonable limit to save on Recv() calls. if (is_iobuf_full && capacity < max_io_buf_len / 2) { // Last io used most of the io_buf to the end. - UpdateIoBufCapacity(buf, &conn_stats, [&]() { - buf.Reserve(capacity * 2); // Valid growth range. + UpdateIoBufCapacity(io_buf_, &conn_stats, [&]() { + io_buf_.Reserve(capacity * 2); // Valid growth range. }); } - if (buf.AppendLen() == 0U) { + if (io_buf_.AppendLen() == 0U) { // it can happen with memcached but not for RedisParser, because RedisParser fully // consumes the passed buffer - LOG_EVERY_T(WARNING, 10) << "Maximum io_buf length reached " << buf.Capacity() + LOG_EVERY_T(WARNING, 10) << "Maximum io_buf length reached " << io_buf_.Capacity() << ", consider to increase max_client_iobuf_len flag"; } } @@ -2934,7 +2922,7 @@ variant Connection::IoLoopV2() { const bool cur_pop_in_flight = disk_queue_ && disk_queue_->IsPopInFlight(); if (io_buf_.InputLen() > 0 && !force_await && !cur_pop_in_flight) { - parse_status = ParseLoop(io_buf_); + parse_status = ParseLoop(); } else { parse_status = NEED_MORE; @@ -2965,7 +2953,7 @@ variant Connection::IoLoopV2() { if (parse_status == NEED_MORE) { parse_status = OK; - CheckIoBufCapacity(is_iobuf_full, io_buf_); + CheckIoBufCapacity(is_iobuf_full); } else if (parse_status != OK) { break; } diff --git a/src/facade/dragonfly_connection.h b/src/facade/dragonfly_connection.h index 69581414bf6f..e0ec7bdb7dc9 100644 --- a/src/facade/dragonfly_connection.h +++ b/src/facade/dragonfly_connection.h @@ -284,7 +284,7 @@ class Connection : public util::Connection { // Returns true if data was offloaded; false means caller should fall through to io_buf_. bool MaybeOffloadToDisk(size_t len, const uint8_t* data); - void CheckIoBufCapacity(bool is_iobuf_full, io::IoBuf& buf); + void CheckIoBufCapacity(bool is_iobuf_full); // Main loop reading client messages and passing requests to dispatch queue. std::variant IoLoopV2(); @@ -368,12 +368,12 @@ class Connection : public util::Connection { // Returns true if one or more commands were parsed from the read buffer, // and false if no complete commands could be parsed (for example, when // parsing is pending more input). - bool ParseMCBatch(io::IoBuf& buf); + bool ParseMCBatch(); - bool ParseRedisBatch(io::IoBuf& buf); + bool ParseRedisBatch(); - // Call appropriate ParseBatch function, proceed with Execute and Reply all why input is remaining - ParserStatus ParseLoop(io::IoBuf& buf); + // Call appropriate ParseBatch function, proceed with Execute and Reply all while input remains. + ParserStatus ParseLoop(); // Loop over enqueued async commands and enqueue them for async execution. // If async execution is not possible, handle them in synchronous mode one by one. @@ -427,9 +427,7 @@ class Connection : public util::Connection { size_t request_consumed_bytes_ = 0; util::FiberSocketBase::ProvidedBuffer recv_buf_; - // parser input: fed exclusively from disk pops (or socket_buf_ when disk empty) io::IoBuf io_buf_; - io::IoBuf socket_buf_; std::unique_ptr redis_parser_; std::unique_ptr memcache_parser_; ParsedCommand* parsed_cmd_ = nullptr; From 039886b0a64b89a03dce9212348461423cc36081 Mon Sep 17 00:00:00 2001 From: Kostas Kyrimis Date: Fri, 27 Mar 2026 17:39:00 +0200 Subject: [PATCH 6/9] clean up --- src/facade/disk_backed_queue.cc | 20 ++++------- src/facade/disk_backed_queue.h | 2 +- src/facade/disk_backed_queue_test.cc | 52 ++++++++++++---------------- 3 files changed, 31 insertions(+), 43 deletions(-) diff --git a/src/facade/disk_backed_queue.cc b/src/facade/disk_backed_queue.cc index 54275e2d6638..0e7073cd1cbe 100644 --- a/src/facade/disk_backed_queue.cc +++ b/src/facade/disk_backed_queue.cc @@ -72,7 +72,7 @@ struct DiskBackedQueue::Impl { } void MaybeFlushQueue() { - if (write_in_flight || write_queue.empty()) + if (cancelled || write_in_flight || write_queue.empty()) return; write_in_flight = true; @@ -93,6 +93,7 @@ struct DiskBackedQueue::Impl { ec = {-res, std::system_category()}; LOG(ERROR) << "Failed to offload chunk of size " << size << " to backing with error: " << ec; + cancelled = true; } else if (!cancelled) { write_offset += size; total_backing_bytes += size; @@ -104,6 +105,7 @@ struct DiskBackedQueue::Impl { // Chain next write before invoking cb, so IsActive() reflects // the true state when the callback notifies the connection. + // MaybeFlushQueue is a no-op when cancelled. if (!ec && !write_queue.empty()) MaybeFlushQueue(); @@ -161,16 +163,6 @@ void DiskBackedQueue::Cancel() { impl_->cancelled = true; impl_->queued_bytes = 0; impl_->total_backing_bytes = 0; - - // Drain all pending (not-yet-submitted) writes synchronously. - // The at-most-one already-submitted write/pop CQE will still arrive; its - // callback checks cancelled and skips accounting, then clears the in-flight flag. - const auto cancel_ec = std::make_error_code(std::errc::operation_canceled); - while (!impl_->write_queue.empty()) { - AsyncPushCallback cb = std::move(impl_->write_queue.front().cb); - impl_->write_queue.pop_front(); - cb(cancel_ec); - } } bool DiskBackedQueue::IsActive() const { @@ -201,8 +193,9 @@ void DiskBackedQueue::PushAsync(Chunk chunk, AsyncPushCallback cb) { } void DiskBackedQueue::PopAsync(io::MutableBytes out, AsyncPopCallback cb) { - if (impl_->pop_in_flight || impl_->total_backing_bytes == 0) - return; + DCHECK(!impl_->pop_in_flight); + DCHECK(!impl_->cancelled); + DCHECK_GT(impl_->total_backing_bytes, 0u); const size_t to_read = std::min(impl_->total_backing_bytes, out.size()); const size_t offset = impl_->next_read_offset; @@ -224,6 +217,7 @@ void DiskBackedQueue::PopAsync(io::MutableBytes out, AsyncPopCallback cb) { std::error_code ec{-res, std::system_category()}; LOG(ERROR) << "Could not load item at offset " << offset << " of size " << to_read << " from disk with error: " << ec.value() << " " << ec.message(); + impl_->cancelled = true; cb(nonstd::make_unexpected(ec)); return; } diff --git a/src/facade/disk_backed_queue.h b/src/facade/disk_backed_queue.h index 5b2f2537dac8..4d09bc7e7d55 100644 --- a/src/facade/disk_backed_queue.h +++ b/src/facade/disk_backed_queue.h @@ -58,7 +58,7 @@ class DiskBackedQueue { using AsyncPopCallback = std::function)>; // Async read variant. Callback is invoked with Result containing bytes read or error. - // No-op if a pop is already in-flight or the backing store is empty. + // Must only be called when no pop is already in-flight and the backing store is non-empty. void PopAsync(io::MutableBytes out, AsyncPopCallback cb); // Check if backing store (on-disk bytes) is empty. diff --git a/src/facade/disk_backed_queue_test.cc b/src/facade/disk_backed_queue_test.cc index 03a325b82acf..530e2936b361 100644 --- a/src/facade/disk_backed_queue_test.cc +++ b/src/facade/disk_backed_queue_test.cc @@ -26,6 +26,16 @@ namespace { using namespace facade; +DiskBackedQueue::Chunk MakeChunk(const std::string& s) { + DiskBackedQueue::Chunk chunk; + chunk.data.assign(s.begin(), s.end()); + return chunk; +} + +io::MutableBytes AsMutableBytes(std::string& s) { + return io::MutableBytes(reinterpret_cast(s.data()), s.size()); +} + class DiskBackedQueueTest : public testing::Test { protected: void SetUp() override { @@ -53,10 +63,7 @@ TEST_F(DiskBackedQueueTest, PunchHoleReleasesSpace) { std::string data(12288, 'x'); { util::fb2::Done done; - DiskBackedQueue::Chunk chunk; - chunk.data.assign(reinterpret_cast(data.data()), - reinterpret_cast(data.data()) + data.size()); - backing.PushAsync(std::move(chunk), [&done](std::error_code ec) { + backing.PushAsync(MakeChunk(data), [&done](std::error_code ec) { ASSERT_FALSE(ec); done.Notify(); }); @@ -67,7 +74,7 @@ TEST_F(DiskBackedQueueTest, PunchHoleReleasesSpace) { std::string results; while (!backing.Empty()) { std::string buf(4096, '\0'); - auto out = io::MutableBytes(reinterpret_cast(buf.data()), buf.size()); + auto out = AsMutableBytes(buf); util::fb2::Done done; backing.PopAsync(out, [&done, &results, &buf](io::Result res) { ASSERT_TRUE(res); @@ -101,10 +108,7 @@ TEST_F(DiskBackedQueueTest, PunchHoleAdvancesOffset) { std::string data(32768, 'y'); { util::fb2::Done done; - DiskBackedQueue::Chunk chunk; - chunk.data.assign(reinterpret_cast(data.data()), - reinterpret_cast(data.data()) + data.size()); - backing.PushAsync(std::move(chunk), [&done](std::error_code ec) { + backing.PushAsync(MakeChunk(data), [&done](std::error_code ec) { ASSERT_FALSE(ec); done.Notify(); }); @@ -114,7 +118,7 @@ TEST_F(DiskBackedQueueTest, PunchHoleAdvancesOffset) { // Read exactly 4096 bytes (1 page). { std::string buf(4096, '\0'); - auto out = io::MutableBytes(reinterpret_cast(buf.data()), buf.size()); + auto out = AsMutableBytes(buf); util::fb2::Done done; backing.PopAsync(out, [&done](io::Result res) { ASSERT_TRUE(res); @@ -150,10 +154,7 @@ TEST_F(DiskBackedQueueTest, PunchHoleUnalignedReadsAndWrites) { std::string data(10000, 'z'); { util::fb2::Done done; - DiskBackedQueue::Chunk chunk; - chunk.data.assign(reinterpret_cast(data.data()), - reinterpret_cast(data.data()) + data.size()); - backing.PushAsync(std::move(chunk), [&done](std::error_code ec) { + backing.PushAsync(MakeChunk(data), [&done](std::error_code ec) { ASSERT_FALSE(ec); done.Notify(); }); @@ -166,7 +167,7 @@ TEST_F(DiskBackedQueueTest, PunchHoleUnalignedReadsAndWrites) { std::string results; { std::string buf(3000, '\0'); - auto out = io::MutableBytes(reinterpret_cast(buf.data()), buf.size()); + auto out = AsMutableBytes(buf); util::fb2::Done done; backing.PopAsync(out, [&done, &results, &buf](io::Result res) { ASSERT_TRUE(res); @@ -189,7 +190,7 @@ TEST_F(DiskBackedQueueTest, PunchHoleUnalignedReadsAndWrites) { // Now the first page (0-4095) should be punched. { std::string buf(2000, '\0'); - auto out = io::MutableBytes(reinterpret_cast(buf.data()), buf.size()); + auto out = AsMutableBytes(buf); util::fb2::Done done; backing.PopAsync(out, [&done, &results, &buf](io::Result res) { ASSERT_TRUE(res); @@ -212,7 +213,7 @@ TEST_F(DiskBackedQueueTest, PunchHoleUnalignedReadsAndWrites) { // Now the first two pages (0-8191) should be punched. { std::string buf(3500, '\0'); - auto out = io::MutableBytes(reinterpret_cast(buf.data()), buf.size()); + auto out = AsMutableBytes(buf); util::fb2::Done done; backing.PopAsync(out, [&done, &results, &buf](io::Result res) { ASSERT_TRUE(res); @@ -232,7 +233,7 @@ TEST_F(DiskBackedQueueTest, PunchHoleUnalignedReadsAndWrites) { // Read remaining data and verify results match. while (!backing.Empty()) { std::string buf(4096, '\0'); - auto out = io::MutableBytes(reinterpret_cast(buf.data()), buf.size()); + auto out = AsMutableBytes(buf); util::fb2::Done done; backing.PopAsync(out, [&done, &results, &buf](io::Result res) { ASSERT_TRUE(res); @@ -262,12 +263,8 @@ TEST_F(DiskBackedQueueTest, AsyncReadWrite) { util::fb2::Fiber write_fiber = util::fb2::Fiber("writer", [&]() { for (size_t i = 0; i < 100; ++i) { auto cmd = absl::StrCat("SET FOO", i, " BAR"); - DiskBackedQueue::Chunk chunk; - chunk.data.assign(reinterpret_cast(cmd.data()), - reinterpret_cast(cmd.data()) + cmd.size()); - util::fb2::Done done; - backing.PushAsync(std::move(chunk), [&done](std::error_code ec) { + backing.PushAsync(MakeChunk(cmd), [&done](std::error_code ec) { EXPECT_FALSE(ec); done.Notify(); }); @@ -282,7 +279,7 @@ TEST_F(DiskBackedQueueTest, AsyncReadWrite) { util::fb2::Fiber read_fiber = util::fb2::Fiber("reader", [&]() { while (!backing.Empty()) { std::string buf(1024, 'c'); - auto bytes = io::MutableBytes(reinterpret_cast(buf.data()), buf.size()); + auto bytes = AsMutableBytes(buf); util::fb2::Done done; backing.PopAsync(bytes, [&done, &results, &buf](io::Result res) { @@ -312,10 +309,7 @@ TEST_F(DiskBackedQueueTest, AsyncPunchHole) { std::string data(12288, 'x'); util::fb2::Done write_done; - DiskBackedQueue::Chunk chunk; - chunk.data.assign(reinterpret_cast(data.data()), - reinterpret_cast(data.data()) + data.size()); - backing.PushAsync(std::move(chunk), [&write_done](std::error_code ec) { + backing.PushAsync(MakeChunk(data), [&write_done](std::error_code ec) { ASSERT_FALSE(ec); write_done.Notify(); }); @@ -325,7 +319,7 @@ TEST_F(DiskBackedQueueTest, AsyncPunchHole) { std::string results; while (!backing.Empty()) { std::string buf(4096, '\0'); - auto out = io::MutableBytes(reinterpret_cast(buf.data()), buf.size()); + auto out = AsMutableBytes(buf); util::fb2::Done read_done; backing.PopAsync(out, [&read_done, &results, &buf](io::Result res) { From b2f8e61d7f46e6da1d068344110d5dee04ba3e8e Mon Sep 17 00:00:00 2001 From: Kostas Kyrimis Date: Thu, 21 May 2026 08:57:30 +0300 Subject: [PATCH 7/9] hysterisis --- src/facade/disk_backed_queue.cc | 71 ++++++++++++++++++---- src/facade/disk_backed_queue.h | 12 ++-- src/facade/dragonfly_connection.cc | 94 +++++++++++++++--------------- src/facade/dragonfly_connection.h | 4 -- 4 files changed, 116 insertions(+), 65 deletions(-) diff --git a/src/facade/disk_backed_queue.cc b/src/facade/disk_backed_queue.cc index 0e7073cd1cbe..2e801c54e697 100644 --- a/src/facade/disk_backed_queue.cc +++ b/src/facade/disk_backed_queue.cc @@ -33,10 +33,19 @@ ABSL_FLAG(size_t, disk_backpressure_file_max_bytes, 50_MB, "Maximum size of the backing file. When max size is reached, connection will " "stop offloading backpressure to disk and block on client read."); -ABSL_FLAG(size_t, disk_backpressure_drain_threshold, 16_KB, - "When remaining bytes in the disk-backed queue drop below this " - "threshold, queue is in draining phase (backpressure drains). " - "Defaults to 2x the max chunk size (16KB)."); +ABSL_FLAG(size_t, disk_backpressure_hysteresis_trigger, 16_KB, + "Low-water mark for the disk-backed queue hysteresis band. Once the high-water " + "mark (disk_backpressure_hysteresis_arm) has been crossed, new socket reads are " + "blocked when the queue shrinks below this threshold, ensuring a clean drain-to-" + "memory transition without new writes racing the tail reads. Defaults to 2× " + "kMaxChunkSize (16 KB)."); + +ABSL_FLAG(size_t, disk_backpressure_hysteresis_arm, 256_KB, + "High-water mark for the disk-backed queue hysteresis band. The queue must " + "accumulate at least this many bytes before the low-water drain phase can " + "activate. Prevents a tiny transient queue from ever triggering the drain-to-" + "memory block. Must be larger than disk_backpressure_hysteresis_trigger. " + "Defaults to 256 KB."); namespace facade { @@ -53,6 +62,11 @@ struct DiskBackedQueue::Impl { bool pop_in_flight = false; // Set by Cancel(). In-flight CQE callbacks skip accounting and chaining when true. bool cancelled = false; + // Set when total bytes cross hysteresis_arm on the way up; cleared when the + // queue fully empties. Together with hysteresis_trigger_ this forms the two-level + // hysteresis band that governs draining mode. + bool hysteresis_armed = false; + size_t hysteresis_arm = 0; struct PendingWrite { Chunk chunk; @@ -98,11 +112,16 @@ struct DiskBackedQueue::Impl { write_offset += size; total_backing_bytes += size; queued_bytes -= size; + if (!hysteresis_armed && (total_backing_bytes + queued_bytes) >= hysteresis_arm) + hysteresis_armed = true; } if (cancelled) ec = std::make_error_code(std::errc::operation_canceled); + VLOG(2) << "Write CQE done: cancelled=" << cancelled << " write_in_flight=" << write_in_flight + << " write_queue.size=" << write_queue.size() << " pop_in_flight=" << pop_in_flight; + // Chain next write before invoking cb, so IsActive() reflects // the true state when the callback notifies the connection. // MaybeFlushQueue is a no-op when cancelled. @@ -118,8 +137,9 @@ struct DiskBackedQueue::Impl { DiskBackedQueue::DiskBackedQueue(uint32_t conn_id) : impl_(std::make_unique()), max_backing_size_(absl::GetFlag(FLAGS_disk_backpressure_file_max_bytes)), - drain_threshold_(absl::GetFlag(FLAGS_disk_backpressure_drain_threshold)), + hysteresis_trigger_(absl::GetFlag(FLAGS_disk_backpressure_hysteresis_trigger)), id_(conn_id) { + impl_->hysteresis_arm = absl::GetFlag(FLAGS_disk_backpressure_hysteresis_arm); } DiskBackedQueue::~DiskBackedQueue() { @@ -163,6 +183,16 @@ void DiskBackedQueue::Cancel() { impl_->cancelled = true; impl_->queued_bytes = 0; impl_->total_backing_bytes = 0; + // Drain items that were queued but never submitted to io_uring. + // MaybeFlushQueue won't process them once cancelled is set, so we must + // invoke their callbacks here to avoid leaving IsActive() permanently true. + VLOG(2) << "DiskQueue Cancel: flushing write_queue.size=" << impl_->write_queue.size() + << " write_in_flight=" << impl_->write_in_flight + << " pop_in_flight=" << impl_->pop_in_flight; + auto ec = std::make_error_code(std::errc::operation_canceled); + for (auto& pw : impl_->write_queue) + pw.cb(ec); + impl_->write_queue.clear(); } bool DiskBackedQueue::IsActive() const { @@ -171,9 +201,9 @@ bool DiskBackedQueue::IsActive() const { } bool DiskBackedQueue::IsDraining() const { - if (!IsActive()) + if (!impl_->hysteresis_armed) return false; - return (impl_->total_backing_bytes + impl_->queued_bytes) < drain_threshold_; + return (impl_->total_backing_bytes + impl_->queued_bytes) < hysteresis_trigger_; } bool DiskBackedQueue::IsPopInFlight() const { @@ -184,11 +214,22 @@ bool DiskBackedQueue::HasEnoughBackingSpaceFor(size_t bytes) const { return (bytes + impl_->total_backing_bytes + impl_->queued_bytes) < max_backing_size_; } +bool DiskBackedQueue::CanPush(size_t bytes) const { + if (impl_->cancelled) + return false; + if (IsDraining()) + return false; + return HasEnoughBackingSpaceFor(bytes); +} + void DiskBackedQueue::PushAsync(Chunk chunk, AsyncPushCallback cb) { DCHECK(!chunk.data.empty()); impl_->queued_bytes += chunk.data.size(); impl_->write_queue.push_back({std::move(chunk), std::move(cb)}); + VLOG(3) << "PushAsync: queued_bytes=" << impl_->queued_bytes + << " write_queue.size=" << impl_->write_queue.size() + << " write_in_flight=" << impl_->write_in_flight << " armed=" << impl_->hysteresis_armed; impl_->MaybeFlushQueue(); } @@ -226,8 +267,14 @@ void DiskBackedQueue::PopAsync(io::MutableBytes out, AsyncPopCallback cb) { impl_->next_read_offset += bytes_read; impl_->total_backing_bytes -= bytes_read; - VLOG(2) << "Loaded item with offset " << offset << " of size " << bytes_read - << " for connection " << this; + // Reset hysteresis once the queue is fully drained (nothing on disk, nothing pending write). + // This re-arms the high-water mark check so the next growth cycle starts fresh. + if (impl_->total_backing_bytes == 0 && impl_->write_queue.empty() && !impl_->write_in_flight) + impl_->hysteresis_armed = false; + + VLOG(2) << "PopAsync done: bytes_read=" << bytes_read + << " total_backing=" << impl_->total_backing_bytes + << " armed=" << impl_->hysteresis_armed << " cancelled=" << impl_->cancelled; impl_->MaybePunchHole(); @@ -244,7 +291,7 @@ namespace facade { struct DiskBackedQueue::Impl {}; DiskBackedQueue::DiskBackedQueue(uint32_t conn_id) - : impl_(std::make_unique()), max_backing_size_(0), drain_threshold_(0), id_(conn_id) { + : impl_(std::make_unique()), max_backing_size_(0), hysteresis_trigger_(0), id_(conn_id) { } DiskBackedQueue::~DiskBackedQueue() = default; @@ -273,6 +320,10 @@ bool DiskBackedQueue::IsPopInFlight() const { return false; } +bool DiskBackedQueue::CanPush(size_t) const { + return false; +} + void DiskBackedQueue::Cancel() { } diff --git a/src/facade/disk_backed_queue.h b/src/facade/disk_backed_queue.h index 4d09bc7e7d55..94ee967440b1 100644 --- a/src/facade/disk_backed_queue.h +++ b/src/facade/disk_backed_queue.h @@ -36,14 +36,18 @@ class DiskBackedQueue { // waiting to be written, a write CQE in-flight, or a pop CQE in-flight. bool IsActive() const; - // Returns true when remaining bytes have dropped below the drain - // threshold. The caller should block new socket reads to allow a clean - // drain-to-memory transition (TCP backpressure builds during this window). + // Returns true when remaining bytes have dropped below the drain threshold after + // previously exceeding the hysteresis threshold. The caller should block new + // socket reads to allow a clean drain-to-memory transition. bool IsDraining() const; // Returns true if there is a pop in flight bool IsPopInFlight() const; + // Returns true if the queue can accept new data right now. + // False when draining, at capacity, or cancelled. + bool CanPush(size_t bytes) const; + // Check if we can offload bytes to backing file. // Counts both on-disk bytes and bytes queued for writing (not yet on disk). bool HasEnoughBackingSpaceFor(size_t bytes) const; @@ -78,7 +82,7 @@ class DiskBackedQueue { // Read only constants const size_t max_backing_size_; - const size_t drain_threshold_; + const size_t hysteresis_trigger_; // same as connection id. Used to uniquely identify the backed file const size_t id_; diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 46301985bece..767dd61da037 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -1285,8 +1285,12 @@ void Connection::ConnectionFlow() { // This leaves at most one in-flight write CQE and one in-flight pop CQE, // both of which will complete quickly without draining the full queue. disk_queue_->Cancel(); + VLOG(2) << "DiskQueue post-Cancel: IsActive=" << disk_queue_->IsActive() + << " IsPopInFlight=" << disk_queue_->IsPopInFlight() << " conn=" << id_; if (disk_queue_->IsActive()) { + VLOG(2) << "Waiting for in-flight disk CQEs to complete conn=" << id_; io_event_.await([this] { return !disk_queue_->IsActive(); }); + VLOG(2) << "In-flight disk CQEs done conn=" << id_; } std::ignore = disk_queue_->Close(); disk_queue_.reset(); @@ -2811,6 +2815,10 @@ void Connection::DrainDiskQueue(size_t offload_threshold) { return; if (disk_queue_->IsPopInFlight() || io_buf_.AppendLen() == 0) return; + VLOG(3) << "DrainDiskQueue: parsed_cmd_q_bytes_=" << parsed_cmd_q_bytes_ + << " threshold=" << offload_threshold << " pop_in_flight=" << disk_queue_->IsPopInFlight() + << " append_len=" << io_buf_.AppendLen() << " IsDraining=" << disk_queue_->IsDraining() + << " CanPush=" << disk_queue_->CanPush(0); if (parsed_cmd_q_bytes_ >= offload_threshold) return; @@ -2869,35 +2877,19 @@ bool ConnectionRef::operator==(const ConnectionRef& other) const { return client_id_ == other.client_id_; } -bool Connection::MaybeOffloadToDisk(size_t len, const uint8_t* data) { - if (offload_threshold_ == 0 || !disk_queue_) - return false; - if (!disk_queue_->IsActive() && parsed_cmd_q_bytes_ < offload_threshold_) - return false; - - const uint8_t* src = data; - size_t remaining = len; - while (remaining > 0) { - size_t chunk_size = std::min(remaining, DiskBackedQueue::kMaxChunkSize); - DiskBackedQueue::Chunk chunk; - chunk.data.assign(src, src + chunk_size); - src += chunk_size; - remaining -= chunk_size; - disk_queue_->PushAsync(std::move(chunk), [this](std::error_code ec) { - if (ec) - io_ec_ = ec; - io_event_.notify(); - }); - } - return true; -} - void Connection::NotifyOnRecv(const util::FiberSocketBase::RecvNotification& n) { if (std::holds_alternative(n.read_result)) { io_ec_ = std::get(n.read_result); return; } + // True when incoming socket bytes should be routed to the disk queue rather than io_buf_. + auto should_offload = [this]() { + if (offload_threshold_ == 0 || !disk_queue_) + return false; + return disk_queue_->IsActive() || parsed_cmd_q_bytes_ >= offload_threshold_; + }; + using RecvNoti = util::FiberSocketBase::RecvNotification::RecvCompletion; if (std::holds_alternative(n.read_result)) { if (!std::get(n.read_result)) { // false - connection aborted @@ -2905,21 +2897,11 @@ void Connection::NotifyOnRecv(const util::FiberSocketBase::RecvNotification& n) return; } - // Check if we should offload before doing a TryRecv (IsDraining blocks the recv entirely). - auto should_offload = [this]() { - if (offload_threshold_ == 0 || !disk_queue_) - return false; - return disk_queue_->IsActive() || parsed_cmd_q_bytes_ >= offload_threshold_; - }; - if (should_offload()) { - // During the drain window block socket reads (MaybeOffloadToDisk returns true - // without pushing), so TCP backpressure builds until the queue fully empties. - if (disk_queue_->IsDraining()) - return; - - // Don't recv if backing store is too full to accept a max-size chunk. - if (!disk_queue_->HasEnoughBackingSpaceFor(DiskBackedQueue::kMaxChunkSize)) + // CanPush covers all queue-side reasons to block a recv: draining window, + // backing store full, or cancelled. TCP backpressure builds naturally when + // we stop reading from the socket. + if (!disk_queue_->CanPush(DiskBackedQueue::kMaxChunkSize)) return; DiskBackedQueue::Chunk chunk; @@ -2941,7 +2923,12 @@ void Connection::NotifyOnRecv(const util::FiberSocketBase::RecvNotification& n) disk_queue_->PushAsync(std::move(chunk), [this](std::error_code ec) { if (ec) io_ec_ = ec; - io_event_.notify(); + // Don't notify while a pop CQE is in flight: the pop callback will wake the + // fiber when io_buf_ data is ready. Notifying on every write CQE causes a + // cooperative-scheduler starvation spin — the fiber loops checking can_parse + // (always false while pop is in flight) and starves the proactor. + if (!disk_queue_->IsPopInFlight()) + io_event_.notify(); }); return; } @@ -2974,12 +2961,10 @@ void Connection::NotifyOnRecv(const util::FiberSocketBase::RecvNotification& n) } else if (std::holds_alternative(n.read_result)) { // provided buffer io::MutableBytes buf = std::get(n.read_result); - if (MaybeOffloadToDisk(buf.size(), buf.data())) { - } else { - // TODO: if a pop CQE is in-flight writing into io_buf_, WriteAndCommit races with it. - ReadBufTracker tracker(io_buf_); - io_buf_.WriteAndCommit(buf.data(), buf.size()); - } + // TODO: if a pop CQE is in-flight writing into io_buf_, WriteAndCommit races with it. + // TODO: disk offload for multishot recv (provided-buffer) path. + ReadBufTracker tracker(io_buf_); + io_buf_.WriteAndCommit(buf.data(), buf.size()); last_interaction_ = time(nullptr); } else { LOG(FATAL) << "Should not reach here"; @@ -3108,7 +3093,12 @@ variant Connection::IoLoopV2() { << " disk_queue_=" << (disk_queue_ != nullptr) << " parsed_cmd_q_bytes_=" << parsed_cmd_q_bytes_ << " io_buf_len=" << io_buf_.InputLen(); - io_event_.notify(); + // Suppress notify while a pop CQE owns io_buf_.AppendBuffer() — the pop callback + // will wake the fiber when data is ready. Without this guard every socket recv wakes + // the await, the predicate (can_parse=false while pop is in flight) is false, and + // we spin without ever yielding to the proactor that needs to fire the pop CQE. + if (!disk_queue_ || !disk_queue_->IsPopInFlight()) + io_event_.notify(); }); ParserStatus parse_status = OK; @@ -3175,8 +3165,14 @@ variant Connection::IoLoopV2() { parsed_cmd_q_bytes_ < offload_threshold && !pop_in_flight; // Wake when there is parseable data in io_buf_ and no pop is writing into it. bool can_parse = io_buf_.InputLen() > 0 && !pop_in_flight; - return can_parse || cmd_ready || cmd_executable || io_ec_ || can_drain_disk || - pending_input_ || !dispatch_q_.empty() || is_ready_to_migrate(); + // Only poll the socket when no pop CQE owns io_buf_.AppendBuffer(). Mirrors the + // guard at the ReadPendingInput() call site above: if we let pending_input_ wake + // the fiber while pop_in_flight=true, the fiber spins (ReadPendingInput is + // skipped, pending_input_ stays true) and starves the proactor that must fire + // the pop CQE. + bool can_poll = pending_input_ && !pop_in_flight; + return can_parse || cmd_ready || cmd_executable || io_ec_ || can_drain_disk || can_poll || + !dispatch_q_.empty() || is_ready_to_migrate(); }); } @@ -3215,6 +3211,10 @@ variant Connection::IoLoopV2() { // When over the offload threshold with no commands left to dispatch, force the // await path so the fiber yields and lets shard completions be processed. // Without this, new recvs keep routing to chunks but we spin without yielding. + // This also prevents ReadPendingInput() from bypassing the disk offload path: + // ReadPendingInput() calls TryRecv directly into io_buf_ with no should_offload() + // check; with force_await set, the parse guard below skips the io_buf_ parse path + // and we fall through to the await, which only wakes when the queue drains. bool force_await = offload_threshold > 0 && parsed_cmd_q_bytes_ >= offload_threshold && parsed_to_execute_ == nullptr; diff --git a/src/facade/dragonfly_connection.h b/src/facade/dragonfly_connection.h index ffb83308e500..7ec50b39c502 100644 --- a/src/facade/dragonfly_connection.h +++ b/src/facade/dragonfly_connection.h @@ -312,10 +312,6 @@ class Connection : public util::Connection { void NotifyOnRecv(const util::FiberSocketBase::RecvNotification& n); - // Pushes [data, data+len) to the disk queue in ≤kMaxChunkSize chunks. - // Returns true if data was offloaded; false means caller should fall through to io_buf_. - bool MaybeOffloadToDisk(size_t len, const uint8_t* data); - // Enables io_uring multishot receives for the connection if the current thread supports it. // This is required during initial setup or after migrating to a new thread/proactor, // provided the buffer ring is configured and the connection is not using TLS. From 8658727192620530648e7aabb41aa3d2b5756200 Mon Sep 17 00:00:00 2001 From: Kostas Kyrimis Date: Thu, 21 May 2026 19:52:05 +0300 Subject: [PATCH 8/9] docs --- docs/disk-offload-state-machine.md | 215 +++++++++++++++++++++++++++++ 1 file changed, 215 insertions(+) create mode 100644 docs/disk-offload-state-machine.md diff --git a/docs/disk-offload-state-machine.md b/docs/disk-offload-state-machine.md new file mode 100644 index 000000000000..f8491cb275f4 --- /dev/null +++ b/docs/disk-offload-state-machine.md @@ -0,0 +1,215 @@ +# Connection Disk Offload + +--- + +## 1. Watermark Flags + +Three thresholds form a three-level band: + +| Flag | Default | Role | +|----------------------------------------|--------------|------| +| `pipeline_disk_offload_threshold` | 0 (disabled) | **Offload trigger** – `parsed_cmd_q_bytes_ >= threshold` → incoming bytes go to disk instead of `io_buf_`. | +| `disk_backpressure_hysteresis_arm` | x KB | **High-water mark** – once `total_backing + queued >= arm`, `hysteresis_armed` is set, enabling the drain phase. | +| `disk_backpressure_hysteresis_trigger` | y KB | **Low-water mark** – while armed and `total_backing + queued < trigger`, `IsDraining()=true`. New socket reads are blocked until the queue fully drains to memory. | + + +**Why hysteresis?** + +We want to keep the disk queue active as long as it's busy and we also want to avoid the disk tax for pipelines that are being drained. Think of DrainDiskQueue reads to io_buf_, RecvNotification fires and we are forced to write +to disk. With hysterisis, we allow backpressure to fall naturally to tcp buffers while we drain the last chunks from the queue. + +--- + +## 2. `DiskBackedQueue` State Machine + +```mermaid +stateDiagram-v2 + [*] --> IDLE + IDLE --> ACTIVE : PushAsync called + ACTIVE --> IDLE : backing empty, both tracks idle, hysteresis_armed reset + ACTIVE --> CANCELLED : Cancel or I/O error + + state ACTIVE { + state WriteTrack { + [*] --> WriteIdle + WriteIdle --> Writing : PushAsync - MaybeFlushQueue + Writing --> WriteIdle : write CQE fires, queue empty + Writing --> Writing : write CQE fires, more queued - chain next write + } + -- + state ReadTrack { + [*] --> ReadIdle + ReadIdle --> Draining : hysteresis armed AND total+queued below trigger + ReadIdle --> Popping : DrainDiskQueue calls PopAsync + Draining --> Popping : DrainDiskQueue calls PopAsync - CanPush false during drain + Draining --> ReadIdle : backing empty - hysteresis_armed reset + Popping --> ReadIdle : read CQE fires, bytes to io_buf + } + } + + CANCELLED --> [*] : ConnectionFlow teardown waits IsActive = false +``` + +> **Note**: The two tracks run concurrently. A write CQE and a read CQE can be in-flight +> simultaneously — they always target different file offsets (writes append at `write_offset`, +> reads consume from `next_read_offset`). `IsActive() = false` only when **both** tracks +> are idle and `total_backing_bytes = 0`. + +### Key predicates + +``` +IsActive() = (!cancelled && total_backing_bytes > 0) + || !write_queue.empty() + || write_in_flight + || pop_in_flight + +IsDraining() = hysteresis_armed + && (total_backing_bytes + queued_bytes) < hysteresis_trigger + +CanPush(n) = !cancelled + && !IsDraining() + && (total_backing_bytes + queued_bytes + n) < max_backing_size +``` + +--- + +## 3. `DrainDiskQueue` – per-loop drain step + +Called once per `IoLoopV2` iteration, before `ReadPendingInput`. + +```mermaid +flowchart TD + A(["DrainDiskQueue called"]) --> B{"threshold=0 or disk empty?"} + B -- yes --> Z(["return – no-op"]) + B -- no --> C{"pop already in-flight?"} + C -- yes --> Z + C -- no --> D{"io_buf_ append space = 0?"} + D -- yes --> Z + D -- no --> E{"parsed_cmd_q_bytes_ >= threshold?"} + E -- yes --> Z2(["return – pipeline full, wait for cmd drain"]) + E -- no --> F["PopAsync(io_buf_.AppendBuffer)"] + F --> G(["CQE fires: io_buf_.CommitWrite, io_event_.notify"]) +``` + +The guard on `parsed_cmd_q_bytes_ >= threshold` is the back-pressure gate: the fiber +must wait for shard threads to execute commands and free memory before draining more +disk data into the parser. + +--- + +## 4. `NotifyOnRecv` – recv routing + +Called from the io_uring recv callback (edge-triggered). + +```mermaid +flowchart TD + A(["NotifyOnRecv"]) --> B{"socket error?"} + B -- yes --> Z(["set io_ec_, return"]) + B -- no --> C{"should_offload?\ndisk active OR q_bytes >= threshold"} + C -- no --> D["TryRecv into io_buf_, CommitWrite"] + C -- yes --> E{"CanPush(kMaxChunkSize)?"} + E -- no --> Z2(["return – TCP backpressure builds naturally"]) + E -- yes --> F["TryRecv into Chunk buffer"] + F --> G{"recv error?"} + G -- yes --> Z3(["set io_ec_ or drop EAGAIN, return"]) + G -- no --> H["PushAsync chunk to disk"] + H --> I{"pop_in_flight?"} + I -- yes --> Z4(["return – pop CQE callback will notify"]) + I -- no --> J(["io_event_.notify"]) +``` + +--- + +## 5. `IoLoopV2` – main loop + +```mermaid +flowchart TD + START(["IoLoopV2 start"]) --> INIT["read offload_threshold\npre-init disk queue\nregister RecvOnNotify CB"] + INIT --> LOOP + + LOOP(["loop iteration"]) --> MIG["HandleMigrateRequest"] + MIG --> WAITER["subscribe cmd_completion_waiter if in-flight commands"] + WAITER --> DRAIN["DrainDiskQueue"] + + DRAIN --> PIP{"pending_input_ AND NOT pop_in_flight?"} + PIP -- yes --> READ["ReadPendingInput: TryRecv into io_buf_"] + PIP -- no --> BUFCHECK + READ --> BUFCHECK + + BUFCHECK{"io_buf_ empty OR pop_in_flight?"} -- no --> PARSE_GATE + BUFCHECK -- yes --> POLL["if empty AND NOT pop_in_flight: NotifyOnRecv poll"] + POLL --> FLUSH["reply_builder_.Flush"] + FLUSH --> AWAIT["io_event_.await — wakes on any of:\n• can_parse: InputLen gt 0 AND NOT pop_in_flight\n• cmd_ready: head can reply\n• cmd_exec: head ready to execute\n• can_drain: disk non-empty AND q_bytes lt thr AND NOT pop_in_flight\n• can_poll: pending_input_ AND NOT pop_in_flight\n• dispatch_q non-empty\n• migration requested\n• io_ec_ set"] + AWAIT --> LOOP + + PARSE_GATE --> DISPATCH{"dispatch_q non-empty?"} + DISPATCH -- yes --> CTRL["drain control messages, continue"] + CTRL --> LOOP + + DISPATCH -- no --> FA{"force_await?\nq_bytes >= thr AND no cmd to execute"} + FA -- no --> OVERLIMIT{"pre_over_limit OR pop_in_flight OR io_buf_ empty?"} + OVERLIMIT -- no --> PARSE["ParseLoop: parse → execute → reply"] + PARSE --> LOOP + + OVERLIMIT -- yes --> EXEC["ExecuteBatch / ReplyBatch"] + FA -- yes --> EXEC + EXEC --> FA2{"force_await?"} + FA2 -- yes --> AWAIT2["io_event_.await:\ncmd_ready OR q_bytes < thr OR io_ec_ set"] + AWAIT2 --> BPL + FA2 -- no --> BPL + BPL{"post_over_limit?"} -- yes --> AWAIT3["io_event_.await pipeline backpressure relief"] + AWAIT3 --> LOOP + BPL -- no --> LOOP +``` + +### `force_await` dual role + +`force_await = threshold > 0 AND parsed_cmd_q_bytes_ >= threshold AND parsed_to_execute_ = null` + +1. **Spin guard** – prevents the fiber from busy-looping when `io_buf_` has data but the + pipeline is full. Forces a yield so shard threads can run and drain commands. +2. **Offload bypass guard** – `ReadPendingInput` calls `TryRecv` directly into `io_buf_` + with no `should_offload()` check. With `force_await` set, the parse guard skips + `ReadPendingInput`, ensuring bytes continue going to disk rather than leaking into + the in-memory buffer. + +### The pop-in-flight `io_event_` notify guards + +Two sites suppress `io_event_.notify()` while a pop CQE owns `io_buf_.AppendBuffer()`: + +| Site | Guard | +|------|-------| +| `RegisterOnRecv` lambda | `if (!disk_queue_ \|\| !IsPopInFlight()) notify()` | +| `PushAsync` write-CQE callback | `if (!IsPopInFlight()) notify()` | + +Without both guards the fiber wakes, sees `can_parse=false` (buffer half-written), +loops again, and starves the proactor that needs to fire the pop CQE. + +The await predicate adds the matching guard on `can_poll`: +```cpp +bool can_poll = pending_input_ && !pop_in_flight; +``` +This prevents spinning on `ReadPendingInput` (which is guarded by `!pop_in_flight` +at its call site) while `pending_input_` stays true. + +--- + +## 6. End-of-connection teardown + +```mermaid +sequenceDiagram + participant CF as ConnectionFlow + participant DQ as DiskBackedQueue + participant IO as io_event + + CF->>DQ: Cancel() + Note over DQ: cancelled=true, flush pending write CBs with operation_canceled + CF->>DQ: IsActive()? + alt in-flight CQEs still pending + CF->>IO: await NOT IsActive() + DQ-->>IO: write/pop CQE lands, notify + IO-->>CF: wakes + end + CF->>DQ: reset() / Close() + Note over DQ: unlink backing file +``` From df801a309504c17e6ac47e4386ad671f80954438 Mon Sep 17 00:00:00 2001 From: Kostas Kyrimis Date: Thu, 21 May 2026 19:53:38 +0300 Subject: [PATCH 9/9] small comment --- docs/disk-offload-state-machine.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/disk-offload-state-machine.md b/docs/disk-offload-state-machine.md index f8491cb275f4..4908571bbcf9 100644 --- a/docs/disk-offload-state-machine.md +++ b/docs/disk-offload-state-machine.md @@ -16,7 +16,7 @@ Three thresholds form a three-level band: **Why hysteresis?** We want to keep the disk queue active as long as it's busy and we also want to avoid the disk tax for pipelines that are being drained. Think of DrainDiskQueue reads to io_buf_, RecvNotification fires and we are forced to write -to disk. With hysterisis, we allow backpressure to fall naturally to tcp buffers while we drain the last chunks from the queue. +to disk. With hysterisis, we allow backpressure to fall naturally to tcp buffers while we drain the last chunks from the queue. Also note, we can use a staging buffer within the disk queue but I don't think it's worth it right now. ---