Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 135 additions & 16 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <absl/strings/str_cat.h>
#include <absl/time/time.h>

#include <algorithm>
#include <numeric>
#include <variant>

Expand Down Expand Up @@ -361,6 +362,18 @@ struct QueueBackpressure {
// Used together with pipeline_buffer_limit to limit the pipeline usage per thread.
util::fb2::CondVarAny pipeline_cnd;

// V2 connections subscribe to this EventCount when parked on backpressure.
// When global memory is freed, notifyAll() calls for each subscriber's callback,
// which in turn calls io_event_.notify() to wake the V2 fiber.
// Registration and unregistration are O(1) via intrusive linked list.
util::fb2::EventCount v2_pipeline_backpressure_ec;

// Notifies both V1 waiters (pipeline_cnd) and V2 subscribers (v2_pipeline_backpressure_ec).
void NotifyPipelineWaiters() {
pipeline_cnd.notify_all();
v2_pipeline_backpressure_ec.notifyAll();
}

Comment on lines +371 to +376
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo: unite somewhen in the future, but maybe even not if we abolish v1 fast

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted, will consider add a task.

size_t publish_buffer_limit = 0; // cached flag publish_buffer_limit
size_t pipeline_cache_limit = 0; // cached flag pipeline_cache_limit
size_t pipeline_buffer_limit = 0; // cached flag for buffer size in bytes
Expand Down Expand Up @@ -1202,7 +1215,7 @@ void Connection::DispatchSingle(bool has_more, absl::FunctionRef<void()> invoke_
if ((optimize_for_async || !can_dispatch_sync) &&
qbp.IsPipelineBufferOverLimit(conn_stats->pipeline_queue_bytes, parsed_cmd_q_len_)) {
conn_stats->pipeline_throttle_count++;
LOG_EVERY_T(WARNING, 10) << "Pipeline buffer over limit."
LOG_EVERY_T(WARNING, 10) << "Pipeline buffer over limit (V1)."
<< ", Thread pipeline_queue_bytes: "
<< conn_stats->pipeline_queue_bytes
<< ", Thread pipeline_queue_entries: "
Expand Down Expand Up @@ -1292,10 +1305,19 @@ Connection::ParserStatus Connection::ParseRedis(unsigned max_busy_cycles, bool e
LogTraffic(id_, has_more, *parsed_cmd_, service_->GetContextInfo(cc_.get()));
}

if (enqueue_only)
if (enqueue_only) {
dispatch_async();
else

// Stop parsing the current buffer if we crossed the limit.
// Unparsed bytes remain in io_buf_ for the next ParseLoop iteration.
if (GetQueueBackpressure().IsPipelineBufferOverLimit(
GetLocalConnStats().pipeline_queue_bytes, parsed_cmd_q_len_)) {
DVLOG(2) << "Pipeline buffer over limit, breaking from parsing loop.";
break;
}
} else {
DispatchSingle(has_more, dispatch_sync, dispatch_async);
}
}
if (result != RespSrvParser::OK && result != RespSrvParser::INPUT_PENDING) {
// We do not expect that a replica sends an invalid command so we log if it happens.
Expand Down Expand Up @@ -1632,7 +1654,7 @@ void Connection::ClearPipelinedMessages() {
parsed_to_execute_ = nullptr;

QueueBackpressure& qbp = GetQueueBackpressure();
qbp.pipeline_cnd.notify_all();
qbp.NotifyPipelineWaiters();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am trying to understand how it worked before - we did not have O(n) array of fibers parked.
so what changed now?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will answer here Since i think few comments are related to the same issue.

qbp.pubsub_ec.notifyAll();
}

Expand Down Expand Up @@ -1681,7 +1703,7 @@ bool Connection::ProcessAdminMessage(MessageHandle* msg, AsyncOperations* async_
// Fiber Termination Check
if (ShouldEndAsyncFiber(*msg)) {
CHECK(!HasPendingMessages()) << DebugInfo();
GetQueueBackpressure().pipeline_cnd.notify_all();
GetQueueBackpressure().NotifyPipelineWaiters();
return true; // Signal to terminate AsyncFiber
}

Expand Down Expand Up @@ -1876,7 +1898,7 @@ void Connection::AsyncFiber() {
// 2. Local queue (length) is under limit -> Wakes up this connection's producer.
if (qbp.IsPipelineBufferUnderLimit(conn_stats.pipeline_queue_bytes, parsed_cmd_q_len_) ||
!HasPendingMessages()) {
qbp.pipeline_cnd.notify_all();
qbp.NotifyPipelineWaiters();
}

if (subscriber_over_limit &&
Expand All @@ -1887,7 +1909,7 @@ void Connection::AsyncFiber() {
DCHECK(cc_->conn_closing || reply_builder_->GetError());

cc_->conn_closing = true;
qbp.pipeline_cnd.notify_all();
qbp.NotifyPipelineWaiters();

// If shutdown was requested, we need to break the receive call in case the i/o fiber
// is blocked there. With io loop v2, we can have a different mechanism to break from recv flow.
Expand Down Expand Up @@ -2271,7 +2293,18 @@ bool Connection::IsReplySizeOverLimit() const {
}

bool Connection::ParseRedisBatch() {
// TODO: Handle pipeline backpressure
QueueBackpressure& qbp = GetQueueBackpressure();

// Only throttle parsing if this connection is actively contributing to the queue.
// Connections with parsed_cmd_q_len_ == 0 must always be allowed to parse so that
// administrative commands (CONFIG SET, etc.) can execute and relieve backpressure.
if ((parsed_cmd_q_len_ > 0) &&
qbp.IsPipelineBufferOverLimit(GetLocalConnStats().pipeline_queue_bytes, parsed_cmd_q_len_)) {
// Signal ParseLoop to stop. IoLoopV2 will drain before resuming.
DVLOG(2) << "Pipeline buffer over limit. Avoid parsing Redis batch.";
GetLocalConnStats().pipeline_throttle_count++;
return false;
Comment thread
glevkovich marked this conversation as resolved.
}
return ParseRedis(max_busy_read_cycles_cached, true) == ParserStatus::OK;
}

Expand Down Expand Up @@ -2395,13 +2428,19 @@ bool Connection::ExecuteBatch() {

if (parsed_head_ == nullptr)
parsed_tail_ = nullptr;

// Since we are done executing a batch, and advance_head might be called which release commands,
// notify waiters that backpressure might be relieved.
if (ioloop_v2_) {
io_event_.notify();
}
return true;
}

bool Connection::ReplyBatch() {
reply_builder_->SetBatchMode(true);
while (HasDispatchedCommands() && parsed_head_->CanReply()) {
current_wait_.reset(); // we must free waiter before proceeding with other commands
current_wait_.reset(); // Clear the subscription before moving to the next command
auto* cmd = parsed_head_;
parsed_head_ = cmd->next;
cmd->SendReply();
Expand All @@ -2413,6 +2452,12 @@ bool Connection::ReplyBatch() {
if (parsed_head_ == nullptr)
parsed_tail_ = nullptr;

// Since we are done replying a batch, and ReleaseParsedCommand might be called which release
// commands, notify waiters that backpressure might be relieved.
if (ioloop_v2_) {
Comment thread
glevkovich marked this conversation as resolved.
io_event_.notify();
}

reply_builder_->SetBatchMode(false);
reply_builder_->Flush();
return !reply_builder_->GetError();
Expand Down Expand Up @@ -2525,7 +2570,7 @@ void Connection::UpdateFromFlags() {
unsigned tid = fb2::ProactorBase::me()->GetPoolIndex();
thread_queue_backpressure[tid].pipeline_queue_max_len = GetFlag(FLAGS_pipeline_queue_limit);
thread_queue_backpressure[tid].pipeline_buffer_limit = GetFlag(FLAGS_pipeline_buffer_limit);
thread_queue_backpressure[tid].pipeline_cnd.notify_all();
thread_queue_backpressure[tid].NotifyPipelineWaiters();

max_busy_read_cycles_cached = base::CycleClock::FromUsec(GetFlag(FLAGS_max_busy_read_usec));
always_flush_pipeline_cached = GetFlag(FLAGS_always_flush_pipeline);
Expand Down Expand Up @@ -2709,23 +2754,33 @@ variant<error_code, Connection::ParserStatus> Connection::IoLoopV2() {

ParserStatus parse_status = OK;

// Waiter that is passed to the current async command head to be notified on completion
// Callback that wakes the currrent V2 fiber by bumping the io_event_ epoch.
// Multiple waiters (e.g command completion, backpressure relief) can use the same callback since
// they all wake the same fiber.
auto ioevent_cb = [this]() { io_event_.notify(); };
util::fb2::detail::Waiter ioevent_waiter{ioevent_cb}; // takes callback by reference

// Waiter used to establish a mandatory subscription to the head command's blocker,
// ensuring the fiber wakes immediately upon async command completion.
util::fb2::detail::Waiter cmd_completion_waiter{ioevent_cb};
absl::Cleanup waiter_cleanup = [this] { current_wait_.reset(); };

// Waiter used for transient, conditional subscriptions (via check_or_subscribe)
// to global pipeline-backpressure relief notifications.
util::fb2::detail::Waiter backpressure_waiter{ioevent_cb};

do {
HandleMigrateRequest();

// Register completion for current head if its pending and we don't wait on current_wait_.
if (HasDispatchedCommands() && !current_wait_.has_value()) {
current_wait_.emplace(parsed_head_, &ioevent_waiter);
current_wait_.emplace(parsed_head_, &cmd_completion_waiter);
}

if (pending_input_) {
ReadPendingInput();
}

// await block (no data to read)
if (io_buf_.InputLen() == 0) {
io_event_.await([this]() {
// TODO: optimize CanReply with looking up waiter key
Expand All @@ -2738,7 +2793,7 @@ variant<error_code, Connection::ParserStatus> Connection::IoLoopV2() {
phase_ = PROCESS;
bool is_iobuf_full = io_buf_.AppendLen() == 0;

// Temporary: Handle dispatch queue items one by one blocking command execution
// Temporary: Handle dispatch queue items (Control Path) one by one blocking command execution
if (!dispatch_q_.empty()) {
while (!dispatch_q_.empty()) {
auto msg = std::move(dispatch_q_.front());
Expand All @@ -2758,16 +2813,80 @@ variant<error_code, Connection::ParserStatus> Connection::IoLoopV2() {
continue;
}

if (io_buf_.InputLen() > 0) {
// Handle Parsed Commands Queue (Data Path)
auto& conn_stats = GetLocalConnStats();
QueueBackpressure& qbp = GetQueueBackpressure();

// Only parse data if we are under the memory limit (backpressure).
// Exception: If the queue is empty, we always parse to allow admin commands
// (like CONFIG SET) to run so they can fix the memory limits if needed.
bool pre_over_limit =
(parsed_cmd_q_len_ > 0) &&
qbp.IsPipelineBufferOverLimit(conn_stats.pipeline_queue_bytes, parsed_cmd_q_len_);
if (io_buf_.InputLen() > 0 && !pre_over_limit) { // Parse, execute and reply
size_t mem_before = conn_stats.pipeline_queue_bytes;
parse_status = ParseLoop();
} else {

// Executing and replying to commands (in ParseLoop()) frees up memory. Because those internal
// functions only wake up this specific connection, we need to manually notify
// other connections on this thread that there is now room to resume.
if (conn_stats.pipeline_queue_bytes < mem_before) {
qbp.NotifyPipelineWaiters();
}
} else { // Execute and reply what we have, then wait if we are over the limit
parse_status = NEED_MORE;

size_t mem_before = conn_stats.pipeline_queue_bytes;

if (parsed_head_) {
if (HeadReadyToDispatch())
ExecuteBatch();
ReplyBatch();
}

// After draining commands, notify all connections parked on backpressure relief
if (conn_stats.pipeline_queue_bytes < mem_before) {
qbp.NotifyPipelineWaiters();
}

// await block (backpressure)
// Re-check if pipeline buffer over limit after draining - ExecuteBatch/ReplyBatch may have
// freed memory. If still over limit, sleep to prevent busy-spin.
// Only park if this connection is actively contributing (parsed_cmd_q_len_ > 0).
// Connections with an empty queue must stay in the read loop.
bool post_over_limit =
(parsed_cmd_q_len_ > 0) &&
Comment on lines +2847 to +2858
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My comment still applies. IoLoopV2 function now becomes veeeery long, it doesn't even fit on my screen. Maybe we can refactor it, at least in the future?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted, sure, wrote also in the previous comment. I will add a task. For sure not part of this PR.

qbp.IsPipelineBufferOverLimit(conn_stats.pipeline_queue_bytes, parsed_cmd_q_len_);
if (post_over_limit) {
conn_stats.pipeline_throttle_count++;
LOG_EVERY_T(WARNING, 10)
<< "Pipeline buffer over limit (V2)."
<< ", Thread pipeline_queue_bytes: " << conn_stats.pipeline_queue_bytes
<< ", Thread pipeline_queue_entries: " << conn_stats.pipeline_queue_entries
<< ", Connection parsed_cmd_q_bytes_: " << parsed_cmd_q_bytes_
<< ", Connection parsed commands queue size: " << parsed_cmd_q_len_
<< ", consider increasing pipeline_buffer_limit/pipeline_queue_limit";

// Subscribe persistently to the global backpressure EventCount so that when another
// connection frees memory (or CONFIG SET raises limits), our backpressure_waiter callback
// fires io_event_.notify(), waking this fiber. Must be persistent because
// io_event_.await()'s internal loop may re-sleep if the predicate is still false after the
// first notification. A one-shot subscription would be consumed on the first wake, leaving
// us "deaf" to future memory relief.
auto sub_key = qbp.v2_pipeline_backpressure_ec.subscribe_persistent(&backpressure_waiter);
Comment thread
glevkovich marked this conversation as resolved.

io_event_.await([this]() {
bool cmd_ready = parsed_head_ && parsed_head_->CanReply();
bool under_limit = !GetQueueBackpressure().IsPipelineBufferOverLimit(
GetLocalConnStats().pipeline_queue_bytes, parsed_cmd_q_len_);

// We wake up and exit the backpressure wait if:
// 1. Memory is freed (under_limit) or we can free it ourselves (cmd_ready).
// 2. Control-plane messages need processing (!dispatch_q_.empty()).
// 3. The connection is terminating (io_ec_).
return under_limit || cmd_ready || !dispatch_q_.empty() || io_ec_;
});
}
}

if (reply_builder_->GetError()) {
Expand Down
2 changes: 1 addition & 1 deletion src/server/detail/snapshot_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ io::Result<vector<string>, GenericError> AzureSnapshotStorage::ExpandFromPath(
auto paths = proactor->Await(
[&, &bucket_name = bucket_name]() -> io::Result<vector<string>, GenericError> {
vector<string> res;
cloud::azure::Storage azure(creds_provider_.get());
cloud::azure::Storage azure(static_cast<cloud::azure::Credentials*>(creds_provider_.get()));

error_code ec =
azure.List(bucket_name, prefix, false, 500, [&](const cloud::StorageListItem& item) {
Expand Down
Loading
Loading