chore: pending chunks for disk backed queue#7025
Conversation
kostasrim
commented
Mar 30, 2026
- PushAsync with pending list
- Add cancellation
- Tests
🤖 Augment PR SummarySummary: This PR refactors the disk-backed backpressure queue to support chunk-based pending writes, plus cancellation and a “draining” watermark. Changes:
Technical Notes: Uses io_uring 🤖 Was this summary useful? React with 👍 or 👎 |
|
|
||
| bool DiskBackedQueue::HasEnoughBackingSpaceFor(size_t bytes) const { | ||
| return (bytes + impl_->total_backing_bytes) < max_backing_size_; | ||
| void DiskBackedQueue::Cancel() { |
There was a problem hiding this comment.
src/facade/disk_backed_queue.cc:162: Cancel() sets cancelled and zeroes counters but does not clear write_queue or notify callbacks for queued (not-yet-submitted) writes, so callers can hang waiting for AsyncPushCallback. Also IsActive() will remain true if write_queue is still non-empty after Cancel().
Severity: high
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
| return (bytes + impl_->total_backing_bytes + impl_->queued_bytes) < max_backing_size_; | ||
| } | ||
|
|
||
| void DiskBackedQueue::PushAsync(Chunk chunk, AsyncPushCallback cb) { |
There was a problem hiding this comment.
src/facade/disk_backed_queue.cc:191: PushAsync() still enqueues chunks when impl_->cancelled is true, but MaybeFlushQueue() becomes a no-op in that state so the callback may never run. This can deadlock code that waits on the push callback after cancellation.
Severity: high
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
| queued_bytes -= size; | ||
| } | ||
|
|
||
| if (cancelled) |
There was a problem hiding this comment.
src/facade/disk_backed_queue.cc:103: On write failure (res < 0), the I/O error is logged but the callback ultimately receives operation_canceled because cancelled is set, which hides the root cause from callers. This makes it difficult to distinguish explicit Cancel() from disk I/O failures.
Severity: medium
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
| << " to backing with error: " << ec; | ||
| cancelled = true; | ||
| } else if (!cancelled) { | ||
| write_offset += size; |
There was a problem hiding this comment.
src/facade/disk_backed_queue.cc:98: LinuxFile::WriteAsync() returns the number of bytes written in res, but the bookkeeping assumes a full write and advances write_offset/total_backing_bytes by size. If a partial write occurs, offsets/counters can become inconsistent and later reads may be corrupted.
Severity: high
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
| static constexpr size_t kMaxChunkSize = 8192; | ||
|
|
||
| struct Chunk { | ||
| // 0 < data.size() <= kMaxChunkSize |
There was a problem hiding this comment.
src/facade/disk_backed_queue.h:25: Chunk is documented as data.size() <= kMaxChunkSize, but the current tests push much larger chunks and PushAsync() only checks non-empty, so the public contract is inconsistent. This can mislead future callers about the expected chunking behavior.
Severity: low
Other Locations
src/facade/disk_backed_queue_test.cc:66src/facade/disk_backed_queue_test.cc:111src/facade/disk_backed_queue_test.cc:157src/facade/disk_backed_queue_test.cc:312
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
There was a problem hiding this comment.
Pull request overview
Updates facade::DiskBackedQueue to support queued/pending async writes (rather than submitting every push immediately), adds cancellation/drain state APIs, and adjusts unit tests to the new PushAsync(Chunk) interface.
Changes:
- Changed
PushAsyncto take an ownedChunkand introduced an internal pending write queue withqueued_bytesaccounting. - Added queue state APIs (
IsActive,IsDraining,IsPopInFlight,IsPushInFlight) plus aCancel()API and a drain watermark flag. - Updated
disk_backed_queue_test.ccto push owned chunks and to use a helper forio::MutableBytescreation.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 6 comments.
| File | Description |
|---|---|
| src/facade/disk_backed_queue_test.cc | Migrates tests to the new Chunk-based push API and adds helpers for chunk/byte conversions. |
| src/facade/disk_backed_queue.h | Introduces Chunk, max chunk size constant, new state APIs, and Cancel() declaration/docs. |
| src/facade/disk_backed_queue.cc | Implements pending write queue/flush chaining, queued-bytes accounting, drain threshold flag, and cancellation/state tracking. |
| DiskBackedQueue::Chunk MakeChunk(const std::string& s) { | ||
| DiskBackedQueue::Chunk chunk; | ||
| chunk.data.assign(s.begin(), s.end()); | ||
| return chunk; | ||
| } |
There was a problem hiding this comment.
kMaxChunkSize/Chunk contract says data.size() <= kMaxChunkSize, but MakeChunk() blindly copies the whole input string (e.g. 10KB/12KB/32KB in these tests), so tests now violate the documented API and would mask size-related bugs. Either split test writes into <=kMaxChunkSize chunks or adjust/remove the max-size contract.
| // Cancel all pending (not-yet-submitted) writes and mark the queue as cancelled. | ||
| // In-flight write/read CQEs will complete with operation_canceled. | ||
| // Caller must wait for in-flight operations to complete before destroying the queue. | ||
| void Cancel(); |
There was a problem hiding this comment.
Cancel() introduces new externally visible behavior (cancelling queued writes and in-flight CQEs), but there are no unit tests covering cancellation paths (queued-only, in-flight write, in-flight read). Adding targeted tests in disk_backed_queue_test.cc would prevent hangs/regressions in this control-flow-heavy logic.
| void DiskBackedQueue::Cancel() { | ||
| impl_->cancelled = true; | ||
| impl_->queued_bytes = 0; | ||
| impl_->total_backing_bytes = 0; |
There was a problem hiding this comment.
Cancel() sets cancelled and zeroes counters but does not clear write_queue or resolve callbacks for queued (not-yet-submitted) writes. Because MaybeFlushQueue() is a no-op when cancelled, any queued pushes will never complete (potential hang) and the queued chunks remain allocated.
| impl_->total_backing_bytes = 0; | |
| impl_->total_backing_bytes = 0; | |
| impl_->write_queue.clear(); | |
| impl_->push_in_flight = false; | |
| impl_->pop_in_flight = false; |
|
|
||
| void DiskBackedQueue::PushAsync(Chunk chunk, AsyncPushCallback cb) { | ||
| DCHECK(!chunk.data.empty()); | ||
|
|
There was a problem hiding this comment.
PushAsync() does not check cancelled. After Cancel(), this will still enqueue into write_queue and increment queued_bytes, but MaybeFlushQueue() will never submit writes, so the callback is never invoked. Consider returning operation_canceled immediately (and not enqueueing) when cancelled.
| if (impl_->cancelled) { | |
| cb(nonstd::make_unexpected(std::make_error_code(std::errc::operation_canceled))); | |
| return; | |
| } |
| if (res < 0) { | ||
| ec = {-res, std::system_category()}; | ||
| LOG(ERROR) << "Failed to offload chunk of size " << size | ||
| << " to backing with error: " << ec; | ||
| cancelled = true; | ||
| } else if (!cancelled) { | ||
| write_offset += size; | ||
| total_backing_bytes += size; | ||
| queued_bytes -= size; | ||
| } | ||
|
|
||
| if (cancelled) |
There was a problem hiding this comment.
On WriteAsync failure (res < 0), the callback builds the real I/O ec but then sets cancelled = true, which later forces ec = operation_canceled. This hides the original error from the caller; return the actual write error for that operation and only cancel/drain the remaining queued work.
| if (res < 0) { | |
| ec = {-res, std::system_category()}; | |
| LOG(ERROR) << "Failed to offload chunk of size " << size | |
| << " to backing with error: " << ec; | |
| cancelled = true; | |
| } else if (!cancelled) { | |
| write_offset += size; | |
| total_backing_bytes += size; | |
| queued_bytes -= size; | |
| } | |
| if (cancelled) | |
| bool local_error = false; | |
| if (res < 0) { | |
| ec = {-res, std::system_category()}; | |
| LOG(ERROR) << "Failed to offload chunk of size " << size | |
| << " to backing with error: " << ec; | |
| cancelled = true; | |
| local_error = true; | |
| } else if (!cancelled) { | |
| write_offset += size; | |
| total_backing_bytes += size; | |
| queued_bytes -= size; | |
| } | |
| if (cancelled && !local_error) |
| LOG(ERROR) << "Failed to offload chunk of size " << size | ||
| << " to backing with error: " << ec; | ||
| cancelled = true; | ||
| } else if (!cancelled) { |
There was a problem hiding this comment.
If a write fails (res < 0) and cancelled becomes true, any remaining write_queue entries are never flushed and their callbacks are never invoked (since MaybeFlushQueue() becomes a no-op). To avoid deadlocks, drain the pending queue on cancellation/error (invoke each cb with an error, and reconcile queued_bytes).
| "Maximum size of the backing file. When max size is reached, connection will " | ||
| "stop offloading backpressure to disk and block on client read."); | ||
|
|
||
| ABSL_FLAG(size_t, disk_backpressure_drain_threshold, 16_KB, |
There was a problem hiding this comment.
@romange I added this watermark flag to allow shifting disk backpressure back to the socket. This allows a smooth transition from disk to memory without ping ponging the disk when draining. That way socket reads are deferred while the last few bytes of the disk queue are being drained by the connection
There was a problem hiding this comment.
When remaining bytes in the disk-backed queue drop below this - what does it mean remaining bytes?
| // Cancel all pending (not-yet-submitted) writes and mark the queue as cancelled. | ||
| // In-flight write/read CQEs will complete with operation_canceled. | ||
| // Caller must wait for in-flight operations to complete before destroying the queue. | ||
| void Cancel(); |
There was a problem hiding this comment.
I can introduce a blocking Shutdown but Cancel() + wait if in-flight is more than enough for the users of this class
| } | ||
|
|
||
| void DiskBackedQueue::PushAsync(Chunk chunk, AsyncPushCallback cb) { | ||
| DCHECK(!chunk.data.empty()); |
There was a problem hiding this comment.
I did not introduce a fiber similar to let's say the journal streamer because it's not really needed. AsyncWrites on pending entries can continue from the async cb. Also we don't need any of the extra complexity of the context
| } | ||
|
|
||
| void DiskBackedQueue::PopAsync(io::MutableBytes out, AsyncPopCallback cb) { | ||
| DCHECK(!impl_->pop_in_flight); |
There was a problem hiding this comment.
@romange we have a pending write list on the write path. we could similarly do the same with read path. That way CPU can be busy parsing while the disk is reading the next chunk.
I leave it as is for now since performance is not the main goal here
Signed-off-by: Kostas Kyrimis <kostas@dragonflydb.io>
9422df9 to
5948181
Compare
|
|
||
| struct PendingWrite { | ||
| Chunk chunk; | ||
| AsyncPushCallback cb; |
There was a problem hiding this comment.
We do not really need to store cb since it's stateless so maybe we can pass it during the construction of the queue and we don;t have to store it each time
|
toy e2e #6929 |
|
@romange before I finalize anything let's make sure we agree on the direction:
|
| // Returns true when remaining bytes have dropped below the drain threshold. | ||
| // The caller should block new socket reads to allow a clean drain-to-memory | ||
| // transition (TCP backpressure builds during this window). | ||
| bool IsDraining() const; |
There was a problem hiding this comment.
Is "draining" the right term here?
what the queue actually does during this state?