-
Notifications
You must be signed in to change notification settings - Fork 1.2k
feat(conn): add pipeline backpressure for IoLoopV2 #7018
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
35413b6
92cf4a0
8e9a818
23e550d
0f38d3a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| +21 −3 | .github/workflows/ci.yml | |
| +25 −4 | base/file_log_sink.cc | |
| +4 −2 | base/file_log_sink.h | |
| +6 −2 | examples/gcs_demo.cc | |
| +160 −0 | tests/test_azure.py | |
| +76 −80 | util/cloud/azure/azure.cc | |
| +25 −8 | util/cloud/azure/creds_provider.h | |
| +28 −19 | util/cloud/azure/storage.cc | |
| +6 −7 | util/cloud/azure/storage.h | |
| +30 −2 | util/fibers/detail/wait_queue.cc | |
| +28 −2 | util/fibers/detail/wait_queue.h | |
| +178 −0 | util/fibers/fibers_test.cc | |
| +44 −6 | util/fibers/synchronization.h |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,6 +12,7 @@ | |
| #include <absl/strings/str_cat.h> | ||
| #include <absl/time/time.h> | ||
|
|
||
| #include <algorithm> | ||
| #include <numeric> | ||
| #include <variant> | ||
|
|
||
|
|
@@ -361,6 +362,18 @@ struct QueueBackpressure { | |
| // Used together with pipeline_buffer_limit to limit the pipeline usage per thread. | ||
| util::fb2::CondVarAny pipeline_cnd; | ||
|
|
||
| // V2 connections subscribe to this EventCount when parked on backpressure. | ||
| // When global memory is freed, notifyAll() calls for each subscriber's callback, | ||
| // which in turn calls io_event_.notify() to wake the V2 fiber. | ||
| // Registration and unregistration are O(1) via intrusive linked list. | ||
| util::fb2::EventCount v2_pipeline_backpressure_ec; | ||
|
|
||
| // Notifies both V1 waiters (pipeline_cnd) and V2 subscribers (v2_pipeline_backpressure_ec). | ||
| void NotifyPipelineWaiters() { | ||
| pipeline_cnd.notify_all(); | ||
| v2_pipeline_backpressure_ec.notifyAll(); | ||
| } | ||
|
|
||
| size_t publish_buffer_limit = 0; // cached flag publish_buffer_limit | ||
| size_t pipeline_cache_limit = 0; // cached flag pipeline_cache_limit | ||
| size_t pipeline_buffer_limit = 0; // cached flag for buffer size in bytes | ||
|
|
@@ -1202,7 +1215,7 @@ void Connection::DispatchSingle(bool has_more, absl::FunctionRef<void()> invoke_ | |
| if ((optimize_for_async || !can_dispatch_sync) && | ||
| qbp.IsPipelineBufferOverLimit(conn_stats->pipeline_queue_bytes, parsed_cmd_q_len_)) { | ||
| conn_stats->pipeline_throttle_count++; | ||
| LOG_EVERY_T(WARNING, 10) << "Pipeline buffer over limit." | ||
| LOG_EVERY_T(WARNING, 10) << "Pipeline buffer over limit (V1)." | ||
| << ", Thread pipeline_queue_bytes: " | ||
| << conn_stats->pipeline_queue_bytes | ||
| << ", Thread pipeline_queue_entries: " | ||
|
|
@@ -1292,10 +1305,19 @@ Connection::ParserStatus Connection::ParseRedis(unsigned max_busy_cycles, bool e | |
| LogTraffic(id_, has_more, *parsed_cmd_, service_->GetContextInfo(cc_.get())); | ||
| } | ||
|
|
||
| if (enqueue_only) | ||
| if (enqueue_only) { | ||
| dispatch_async(); | ||
| else | ||
|
|
||
| // Stop parsing the current buffer if we crossed the limit. | ||
| // Unparsed bytes remain in io_buf_ for the next ParseLoop iteration. | ||
| if (GetQueueBackpressure().IsPipelineBufferOverLimit( | ||
| GetLocalConnStats().pipeline_queue_bytes, parsed_cmd_q_len_)) { | ||
| DVLOG(2) << "Pipeline buffer over limit, breaking from parsing loop."; | ||
| break; | ||
| } | ||
| } else { | ||
| DispatchSingle(has_more, dispatch_sync, dispatch_async); | ||
| } | ||
| } | ||
| if (result != RespSrvParser::OK && result != RespSrvParser::INPUT_PENDING) { | ||
| // We do not expect that a replica sends an invalid command so we log if it happens. | ||
|
|
@@ -1632,7 +1654,7 @@ void Connection::ClearPipelinedMessages() { | |
| parsed_to_execute_ = nullptr; | ||
|
|
||
| QueueBackpressure& qbp = GetQueueBackpressure(); | ||
| qbp.pipeline_cnd.notify_all(); | ||
| qbp.NotifyPipelineWaiters(); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am trying to understand how it worked before - we did not have
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will answer here Since i think few comments are related to the same issue. |
||
| qbp.pubsub_ec.notifyAll(); | ||
| } | ||
|
|
||
|
|
@@ -1681,7 +1703,7 @@ bool Connection::ProcessAdminMessage(MessageHandle* msg, AsyncOperations* async_ | |
| // Fiber Termination Check | ||
| if (ShouldEndAsyncFiber(*msg)) { | ||
| CHECK(!HasPendingMessages()) << DebugInfo(); | ||
| GetQueueBackpressure().pipeline_cnd.notify_all(); | ||
| GetQueueBackpressure().NotifyPipelineWaiters(); | ||
| return true; // Signal to terminate AsyncFiber | ||
| } | ||
|
|
||
|
|
@@ -1876,7 +1898,7 @@ void Connection::AsyncFiber() { | |
| // 2. Local queue (length) is under limit -> Wakes up this connection's producer. | ||
| if (qbp.IsPipelineBufferUnderLimit(conn_stats.pipeline_queue_bytes, parsed_cmd_q_len_) || | ||
| !HasPendingMessages()) { | ||
| qbp.pipeline_cnd.notify_all(); | ||
| qbp.NotifyPipelineWaiters(); | ||
| } | ||
|
|
||
| if (subscriber_over_limit && | ||
|
|
@@ -1887,7 +1909,7 @@ void Connection::AsyncFiber() { | |
| DCHECK(cc_->conn_closing || reply_builder_->GetError()); | ||
|
|
||
| cc_->conn_closing = true; | ||
| qbp.pipeline_cnd.notify_all(); | ||
| qbp.NotifyPipelineWaiters(); | ||
|
|
||
| // If shutdown was requested, we need to break the receive call in case the i/o fiber | ||
| // is blocked there. With io loop v2, we can have a different mechanism to break from recv flow. | ||
|
|
@@ -2271,7 +2293,18 @@ bool Connection::IsReplySizeOverLimit() const { | |
| } | ||
|
|
||
| bool Connection::ParseRedisBatch() { | ||
| // TODO: Handle pipeline backpressure | ||
| QueueBackpressure& qbp = GetQueueBackpressure(); | ||
|
|
||
| // Only throttle parsing if this connection is actively contributing to the queue. | ||
| // Connections with parsed_cmd_q_len_ == 0 must always be allowed to parse so that | ||
| // administrative commands (CONFIG SET, etc.) can execute and relieve backpressure. | ||
| if ((parsed_cmd_q_len_ > 0) && | ||
| qbp.IsPipelineBufferOverLimit(GetLocalConnStats().pipeline_queue_bytes, parsed_cmd_q_len_)) { | ||
| // Signal ParseLoop to stop. IoLoopV2 will drain before resuming. | ||
| DVLOG(2) << "Pipeline buffer over limit. Avoid parsing Redis batch."; | ||
| GetLocalConnStats().pipeline_throttle_count++; | ||
| return false; | ||
|
glevkovich marked this conversation as resolved.
|
||
| } | ||
| return ParseRedis(max_busy_read_cycles_cached, true) == ParserStatus::OK; | ||
| } | ||
|
|
||
|
|
@@ -2395,13 +2428,19 @@ bool Connection::ExecuteBatch() { | |
|
|
||
| if (parsed_head_ == nullptr) | ||
| parsed_tail_ = nullptr; | ||
|
|
||
| // Since we are done executing a batch, and advance_head might be called which release commands, | ||
| // notify waiters that backpressure might be relieved. | ||
| if (ioloop_v2_) { | ||
| io_event_.notify(); | ||
| } | ||
| return true; | ||
| } | ||
|
|
||
| bool Connection::ReplyBatch() { | ||
| reply_builder_->SetBatchMode(true); | ||
| while (HasDispatchedCommands() && parsed_head_->CanReply()) { | ||
| current_wait_.reset(); // we must free waiter before proceeding with other commands | ||
| current_wait_.reset(); // Clear the subscription before moving to the next command | ||
| auto* cmd = parsed_head_; | ||
| parsed_head_ = cmd->next; | ||
| cmd->SendReply(); | ||
|
|
@@ -2413,6 +2452,12 @@ bool Connection::ReplyBatch() { | |
| if (parsed_head_ == nullptr) | ||
| parsed_tail_ = nullptr; | ||
|
|
||
| // Since we are done replying a batch, and ReleaseParsedCommand might be called which release | ||
| // commands, notify waiters that backpressure might be relieved. | ||
| if (ioloop_v2_) { | ||
|
glevkovich marked this conversation as resolved.
|
||
| io_event_.notify(); | ||
| } | ||
|
|
||
| reply_builder_->SetBatchMode(false); | ||
| reply_builder_->Flush(); | ||
| return !reply_builder_->GetError(); | ||
|
|
@@ -2525,7 +2570,7 @@ void Connection::UpdateFromFlags() { | |
| unsigned tid = fb2::ProactorBase::me()->GetPoolIndex(); | ||
| thread_queue_backpressure[tid].pipeline_queue_max_len = GetFlag(FLAGS_pipeline_queue_limit); | ||
| thread_queue_backpressure[tid].pipeline_buffer_limit = GetFlag(FLAGS_pipeline_buffer_limit); | ||
| thread_queue_backpressure[tid].pipeline_cnd.notify_all(); | ||
| thread_queue_backpressure[tid].NotifyPipelineWaiters(); | ||
|
|
||
| max_busy_read_cycles_cached = base::CycleClock::FromUsec(GetFlag(FLAGS_max_busy_read_usec)); | ||
| always_flush_pipeline_cached = GetFlag(FLAGS_always_flush_pipeline); | ||
|
|
@@ -2709,23 +2754,33 @@ variant<error_code, Connection::ParserStatus> Connection::IoLoopV2() { | |
|
|
||
| ParserStatus parse_status = OK; | ||
|
|
||
| // Waiter that is passed to the current async command head to be notified on completion | ||
| // Callback that wakes the currrent V2 fiber by bumping the io_event_ epoch. | ||
| // Multiple waiters (e.g command completion, backpressure relief) can use the same callback since | ||
| // they all wake the same fiber. | ||
| auto ioevent_cb = [this]() { io_event_.notify(); }; | ||
| util::fb2::detail::Waiter ioevent_waiter{ioevent_cb}; // takes callback by reference | ||
|
|
||
| // Waiter used to establish a mandatory subscription to the head command's blocker, | ||
| // ensuring the fiber wakes immediately upon async command completion. | ||
| util::fb2::detail::Waiter cmd_completion_waiter{ioevent_cb}; | ||
| absl::Cleanup waiter_cleanup = [this] { current_wait_.reset(); }; | ||
|
|
||
| // Waiter used for transient, conditional subscriptions (via check_or_subscribe) | ||
| // to global pipeline-backpressure relief notifications. | ||
| util::fb2::detail::Waiter backpressure_waiter{ioevent_cb}; | ||
|
|
||
| do { | ||
| HandleMigrateRequest(); | ||
|
|
||
| // Register completion for current head if its pending and we don't wait on current_wait_. | ||
| if (HasDispatchedCommands() && !current_wait_.has_value()) { | ||
| current_wait_.emplace(parsed_head_, &ioevent_waiter); | ||
| current_wait_.emplace(parsed_head_, &cmd_completion_waiter); | ||
| } | ||
|
|
||
| if (pending_input_) { | ||
| ReadPendingInput(); | ||
| } | ||
|
|
||
| // await block (no data to read) | ||
| if (io_buf_.InputLen() == 0) { | ||
| io_event_.await([this]() { | ||
| // TODO: optimize CanReply with looking up waiter key | ||
|
|
@@ -2738,7 +2793,7 @@ variant<error_code, Connection::ParserStatus> Connection::IoLoopV2() { | |
| phase_ = PROCESS; | ||
| bool is_iobuf_full = io_buf_.AppendLen() == 0; | ||
|
|
||
| // Temporary: Handle dispatch queue items one by one blocking command execution | ||
| // Temporary: Handle dispatch queue items (Control Path) one by one blocking command execution | ||
| if (!dispatch_q_.empty()) { | ||
| while (!dispatch_q_.empty()) { | ||
| auto msg = std::move(dispatch_q_.front()); | ||
|
|
@@ -2758,16 +2813,80 @@ variant<error_code, Connection::ParserStatus> Connection::IoLoopV2() { | |
| continue; | ||
| } | ||
|
|
||
| if (io_buf_.InputLen() > 0) { | ||
| // Handle Parsed Commands Queue (Data Path) | ||
| auto& conn_stats = GetLocalConnStats(); | ||
| QueueBackpressure& qbp = GetQueueBackpressure(); | ||
|
|
||
| // Only parse data if we are under the memory limit (backpressure). | ||
| // Exception: If the queue is empty, we always parse to allow admin commands | ||
| // (like CONFIG SET) to run so they can fix the memory limits if needed. | ||
| bool pre_over_limit = | ||
| (parsed_cmd_q_len_ > 0) && | ||
| qbp.IsPipelineBufferOverLimit(conn_stats.pipeline_queue_bytes, parsed_cmd_q_len_); | ||
| if (io_buf_.InputLen() > 0 && !pre_over_limit) { // Parse, execute and reply | ||
| size_t mem_before = conn_stats.pipeline_queue_bytes; | ||
| parse_status = ParseLoop(); | ||
| } else { | ||
|
|
||
| // Executing and replying to commands (in ParseLoop()) frees up memory. Because those internal | ||
| // functions only wake up this specific connection, we need to manually notify | ||
| // other connections on this thread that there is now room to resume. | ||
| if (conn_stats.pipeline_queue_bytes < mem_before) { | ||
| qbp.NotifyPipelineWaiters(); | ||
| } | ||
| } else { // Execute and reply what we have, then wait if we are over the limit | ||
| parse_status = NEED_MORE; | ||
|
|
||
| size_t mem_before = conn_stats.pipeline_queue_bytes; | ||
|
|
||
| if (parsed_head_) { | ||
| if (HeadReadyToDispatch()) | ||
| ExecuteBatch(); | ||
| ReplyBatch(); | ||
| } | ||
|
|
||
| // After draining commands, notify all connections parked on backpressure relief | ||
| if (conn_stats.pipeline_queue_bytes < mem_before) { | ||
| qbp.NotifyPipelineWaiters(); | ||
| } | ||
|
|
||
| // await block (backpressure) | ||
| // Re-check if pipeline buffer over limit after draining - ExecuteBatch/ReplyBatch may have | ||
| // freed memory. If still over limit, sleep to prevent busy-spin. | ||
| // Only park if this connection is actively contributing (parsed_cmd_q_len_ > 0). | ||
| // Connections with an empty queue must stay in the read loop. | ||
| bool post_over_limit = | ||
| (parsed_cmd_q_len_ > 0) && | ||
|
Comment on lines
+2847
to
+2858
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My comment still applies. IoLoopV2 function now becomes veeeery long, it doesn't even fit on my screen. Maybe we can refactor it, at least in the future?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Noted, sure, wrote also in the previous comment. I will add a task. For sure not part of this PR. |
||
| qbp.IsPipelineBufferOverLimit(conn_stats.pipeline_queue_bytes, parsed_cmd_q_len_); | ||
| if (post_over_limit) { | ||
| conn_stats.pipeline_throttle_count++; | ||
| LOG_EVERY_T(WARNING, 10) | ||
| << "Pipeline buffer over limit (V2)." | ||
| << ", Thread pipeline_queue_bytes: " << conn_stats.pipeline_queue_bytes | ||
| << ", Thread pipeline_queue_entries: " << conn_stats.pipeline_queue_entries | ||
| << ", Connection parsed_cmd_q_bytes_: " << parsed_cmd_q_bytes_ | ||
| << ", Connection parsed commands queue size: " << parsed_cmd_q_len_ | ||
| << ", consider increasing pipeline_buffer_limit/pipeline_queue_limit"; | ||
|
|
||
| // Subscribe persistently to the global backpressure EventCount so that when another | ||
| // connection frees memory (or CONFIG SET raises limits), our backpressure_waiter callback | ||
| // fires io_event_.notify(), waking this fiber. Must be persistent because | ||
| // io_event_.await()'s internal loop may re-sleep if the predicate is still false after the | ||
| // first notification. A one-shot subscription would be consumed on the first wake, leaving | ||
| // us "deaf" to future memory relief. | ||
| auto sub_key = qbp.v2_pipeline_backpressure_ec.subscribe_persistent(&backpressure_waiter); | ||
|
glevkovich marked this conversation as resolved.
|
||
|
|
||
| io_event_.await([this]() { | ||
| bool cmd_ready = parsed_head_ && parsed_head_->CanReply(); | ||
| bool under_limit = !GetQueueBackpressure().IsPipelineBufferOverLimit( | ||
| GetLocalConnStats().pipeline_queue_bytes, parsed_cmd_q_len_); | ||
|
|
||
| // We wake up and exit the backpressure wait if: | ||
| // 1. Memory is freed (under_limit) or we can free it ourselves (cmd_ready). | ||
| // 2. Control-plane messages need processing (!dispatch_q_.empty()). | ||
| // 3. The connection is terminating (io_ec_). | ||
| return under_limit || cmd_ready || !dispatch_q_.empty() || io_ec_; | ||
| }); | ||
| } | ||
| } | ||
|
|
||
| if (reply_builder_->GetError()) { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
todo: unite somewhen in the future, but maybe even not if we abolish v1 fast
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Noted, will consider add a task.