Skip to content

chore: pending chunks for disk backed queue#7025

Open
kostasrim wants to merge 1 commit into
mainfrom
disk-queue-chunks
Open

chore: pending chunks for disk backed queue#7025
kostasrim wants to merge 1 commit into
mainfrom
disk-queue-chunks

Conversation

@kostasrim
Copy link
Copy Markdown
Contributor

  • PushAsync with pending list
  • Add cancellation
  • Tests

@kostasrim kostasrim self-assigned this Mar 30, 2026
Copilot AI review requested due to automatic review settings March 30, 2026 15:00
@augmentcode
Copy link
Copy Markdown

augmentcode Bot commented Mar 30, 2026

🤖 Augment PR Summary

Summary: This PR refactors the disk-backed backpressure queue to support chunk-based pending writes, plus cancellation and a “draining” watermark.

Changes:

  • Add DiskBackedQueue::Chunk and change PushAsync to enqueue chunks in a pending deque and flush them sequentially to disk.
  • Track queued_bytes separately from total_backing_bytes and add IsActive/IsDraining and push/pop in-flight state accessors.
  • Introduce --disk_backpressure_drain_threshold (default 16KB) to detect the low-watermark “draining” phase.
  • Add Cancel() to mark the queue canceled and aim to stop further async submissions / cancel outstanding ops.
  • Update unit tests to use the new chunk API and helpers for buffer conversion.

Technical Notes: Uses io_uring LinuxFile async I/O and continues to punch holes in already-read pages to reclaim disk-backed cache.

🤖 Was this summary useful? React with 👍 or 👎

Copy link
Copy Markdown

@augmentcode augmentcode Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review completed. 5 suggestions posted.

Fix All in Augment

Comment augment review to trigger a new review at any time.


bool DiskBackedQueue::HasEnoughBackingSpaceFor(size_t bytes) const {
return (bytes + impl_->total_backing_bytes) < max_backing_size_;
void DiskBackedQueue::Cancel() {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

src/facade/disk_backed_queue.cc:162: Cancel() sets cancelled and zeroes counters but does not clear write_queue or notify callbacks for queued (not-yet-submitted) writes, so callers can hang waiting for AsyncPushCallback. Also IsActive() will remain true if write_queue is still non-empty after Cancel().

Severity: high

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

return (bytes + impl_->total_backing_bytes + impl_->queued_bytes) < max_backing_size_;
}

void DiskBackedQueue::PushAsync(Chunk chunk, AsyncPushCallback cb) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

src/facade/disk_backed_queue.cc:191: PushAsync() still enqueues chunks when impl_->cancelled is true, but MaybeFlushQueue() becomes a no-op in that state so the callback may never run. This can deadlock code that waits on the push callback after cancellation.

Severity: high

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

queued_bytes -= size;
}

if (cancelled)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

src/facade/disk_backed_queue.cc:103: On write failure (res < 0), the I/O error is logged but the callback ultimately receives operation_canceled because cancelled is set, which hides the root cause from callers. This makes it difficult to distinguish explicit Cancel() from disk I/O failures.

Severity: medium

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

<< " to backing with error: " << ec;
cancelled = true;
} else if (!cancelled) {
write_offset += size;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

src/facade/disk_backed_queue.cc:98: LinuxFile::WriteAsync() returns the number of bytes written in res, but the bookkeeping assumes a full write and advances write_offset/total_backing_bytes by size. If a partial write occurs, offsets/counters can become inconsistent and later reads may be corrupted.

Severity: high

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

static constexpr size_t kMaxChunkSize = 8192;

struct Chunk {
// 0 < data.size() <= kMaxChunkSize
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

src/facade/disk_backed_queue.h:25: Chunk is documented as data.size() <= kMaxChunkSize, but the current tests push much larger chunks and PushAsync() only checks non-empty, so the public contract is inconsistent. This can mislead future callers about the expected chunking behavior.

Severity: low

Other Locations
  • src/facade/disk_backed_queue_test.cc:66
  • src/facade/disk_backed_queue_test.cc:111
  • src/facade/disk_backed_queue_test.cc:157
  • src/facade/disk_backed_queue_test.cc:312

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Updates facade::DiskBackedQueue to support queued/pending async writes (rather than submitting every push immediately), adds cancellation/drain state APIs, and adjusts unit tests to the new PushAsync(Chunk) interface.

Changes:

  • Changed PushAsync to take an owned Chunk and introduced an internal pending write queue with queued_bytes accounting.
  • Added queue state APIs (IsActive, IsDraining, IsPopInFlight, IsPushInFlight) plus a Cancel() API and a drain watermark flag.
  • Updated disk_backed_queue_test.cc to push owned chunks and to use a helper for io::MutableBytes creation.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 6 comments.

File Description
src/facade/disk_backed_queue_test.cc Migrates tests to the new Chunk-based push API and adds helpers for chunk/byte conversions.
src/facade/disk_backed_queue.h Introduces Chunk, max chunk size constant, new state APIs, and Cancel() declaration/docs.
src/facade/disk_backed_queue.cc Implements pending write queue/flush chaining, queued-bytes accounting, drain threshold flag, and cancellation/state tracking.

Comment on lines +29 to +33
DiskBackedQueue::Chunk MakeChunk(const std::string& s) {
DiskBackedQueue::Chunk chunk;
chunk.data.assign(s.begin(), s.end());
return chunk;
}
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kMaxChunkSize/Chunk contract says data.size() <= kMaxChunkSize, but MakeChunk() blindly copies the whole input string (e.g. 10KB/12KB/32KB in these tests), so tests now violate the documented API and would mask size-related bugs. Either split test writes into <=kMaxChunkSize chunks or adjust/remove the max-size contract.

Copilot uses AI. Check for mistakes.
Comment on lines +68 to +71
// Cancel all pending (not-yet-submitted) writes and mark the queue as cancelled.
// In-flight write/read CQEs will complete with operation_canceled.
// Caller must wait for in-flight operations to complete before destroying the queue.
void Cancel();
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cancel() introduces new externally visible behavior (cancelling queued writes and in-flight CQEs), but there are no unit tests covering cancellation paths (queued-only, in-flight write, in-flight read). Adding targeted tests in disk_backed_queue_test.cc would prevent hangs/regressions in this control-flow-heavy logic.

Copilot generated this review using guidance from repository custom instructions.
void DiskBackedQueue::Cancel() {
impl_->cancelled = true;
impl_->queued_bytes = 0;
impl_->total_backing_bytes = 0;
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cancel() sets cancelled and zeroes counters but does not clear write_queue or resolve callbacks for queued (not-yet-submitted) writes. Because MaybeFlushQueue() is a no-op when cancelled, any queued pushes will never complete (potential hang) and the queued chunks remain allocated.

Suggested change
impl_->total_backing_bytes = 0;
impl_->total_backing_bytes = 0;
impl_->write_queue.clear();
impl_->push_in_flight = false;
impl_->pop_in_flight = false;

Copilot uses AI. Check for mistakes.

void DiskBackedQueue::PushAsync(Chunk chunk, AsyncPushCallback cb) {
DCHECK(!chunk.data.empty());

Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PushAsync() does not check cancelled. After Cancel(), this will still enqueue into write_queue and increment queued_bytes, but MaybeFlushQueue() will never submit writes, so the callback is never invoked. Consider returning operation_canceled immediately (and not enqueueing) when cancelled.

Suggested change
if (impl_->cancelled) {
cb(nonstd::make_unexpected(std::make_error_code(std::errc::operation_canceled)));
return;
}

Copilot uses AI. Check for mistakes.
Comment on lines +92 to +103
if (res < 0) {
ec = {-res, std::system_category()};
LOG(ERROR) << "Failed to offload chunk of size " << size
<< " to backing with error: " << ec;
cancelled = true;
} else if (!cancelled) {
write_offset += size;
total_backing_bytes += size;
queued_bytes -= size;
}

if (cancelled)
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On WriteAsync failure (res < 0), the callback builds the real I/O ec but then sets cancelled = true, which later forces ec = operation_canceled. This hides the original error from the caller; return the actual write error for that operation and only cancel/drain the remaining queued work.

Suggested change
if (res < 0) {
ec = {-res, std::system_category()};
LOG(ERROR) << "Failed to offload chunk of size " << size
<< " to backing with error: " << ec;
cancelled = true;
} else if (!cancelled) {
write_offset += size;
total_backing_bytes += size;
queued_bytes -= size;
}
if (cancelled)
bool local_error = false;
if (res < 0) {
ec = {-res, std::system_category()};
LOG(ERROR) << "Failed to offload chunk of size " << size
<< " to backing with error: " << ec;
cancelled = true;
local_error = true;
} else if (!cancelled) {
write_offset += size;
total_backing_bytes += size;
queued_bytes -= size;
}
if (cancelled && !local_error)

Copilot uses AI. Check for mistakes.
Comment on lines +94 to +97
LOG(ERROR) << "Failed to offload chunk of size " << size
<< " to backing with error: " << ec;
cancelled = true;
} else if (!cancelled) {
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a write fails (res < 0) and cancelled becomes true, any remaining write_queue entries are never flushed and their callbacks are never invoked (since MaybeFlushQueue() becomes a no-op). To avoid deadlocks, drain the pending queue on cancellation/error (invoke each cb with an error, and reconcile queued_bytes).

Copilot uses AI. Check for mistakes.
"Maximum size of the backing file. When max size is reached, connection will "
"stop offloading backpressure to disk and block on client read.");

ABSL_FLAG(size_t, disk_backpressure_drain_threshold, 16_KB,
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@romange I added this watermark flag to allow shifting disk backpressure back to the socket. This allows a smooth transition from disk to memory without ping ponging the disk when draining. That way socket reads are deferred while the last few bytes of the disk queue are being drained by the connection

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When remaining bytes in the disk-backed queue drop below this - what does it mean remaining bytes?

// Cancel all pending (not-yet-submitted) writes and mark the queue as cancelled.
// In-flight write/read CQEs will complete with operation_canceled.
// Caller must wait for in-flight operations to complete before destroying the queue.
void Cancel();
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can introduce a blocking Shutdown but Cancel() + wait if in-flight is more than enough for the users of this class

}

void DiskBackedQueue::PushAsync(Chunk chunk, AsyncPushCallback cb) {
DCHECK(!chunk.data.empty());
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not introduce a fiber similar to let's say the journal streamer because it's not really needed. AsyncWrites on pending entries can continue from the async cb. Also we don't need any of the extra complexity of the context

}

void DiskBackedQueue::PopAsync(io::MutableBytes out, AsyncPopCallback cb) {
DCHECK(!impl_->pop_in_flight);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@romange we have a pending write list on the write path. we could similarly do the same with read path. That way CPU can be busy parsing while the disk is reading the next chunk.

I leave it as is for now since performance is not the main goal here

Signed-off-by: Kostas Kyrimis <kostas@dragonflydb.io>

struct PendingWrite {
Chunk chunk;
AsyncPushCallback cb;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not really need to store cb since it's stateless so maybe we can pass it during the construction of the queue and we don;t have to store it each time

@kostasrim
Copy link
Copy Markdown
Contributor Author

toy e2e #6929

@kostasrim kostasrim requested a review from romange March 30, 2026 15:18
@kostasrim
Copy link
Copy Markdown
Contributor Author

@romange before I finalize anything let's make sure we agree on the direction:

  1. No extra fiber, let the async cb progress the state machine --> simpler than streamer
  2. Pending list for Push
  3. I did not add a pending list for Pop but can be done (since disk reads are non-cpu)
  4. Simple bool cancellation without explicit Wait() -- caller must wait the current in-flight pop/push before destroying the queue

// Returns true when remaining bytes have dropped below the drain threshold.
// The caller should block new socket reads to allow a clean drain-to-memory
// transition (TCP backpressure builds during this window).
bool IsDraining() const;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is "draining" the right term here?
what the queue actually does during this state?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants