Skip to content

Commit 4ed6666

Browse files
feng-yclaude
andcommitted
feat(backup_request): add rate-limited backup request policy (#3228)
Add ratio-based rate limiting for backup requests to prevent backup request storms under high QPS or downstream latency spikes. Design: - BackupRequestPolicy interface unchanged (ABI stable) - BackupRateLimiter: standalone statistics module tracking backup/total ratio within a sliding time window using bvar counters - RateLimitedBackupPolicy: internal implementation composing BackupRateLimiter, hidden in .cpp - CreateRateLimitedBackupPolicy() factory function in header - ChannelOptions.backup_request_max_ratio: per-channel configuration Priority: backup_request_policy > backup_request_max_ratio > backup_request_ms - Channel auto-creates internal policy when max_ratio > 0 and no user policy; uses max_ratio > 0 as ownership marker for cleanup - 3 gflags with validators: backup_request_max_ratio, backup_request_ratio_window_size_s, backup_request_ratio_update_interval_s Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent aa784b8 commit 4ed6666

5 files changed

Lines changed: 334 additions & 2 deletions

File tree

src/brpc/backup_request_policy.cpp

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "brpc/backup_request_policy.h"
19+
20+
#include <gflags/gflags.h>
21+
#include "brpc/reloadable_flags.h"
22+
#include "bvar/reducer.h"
23+
#include "bvar/window.h"
24+
#include "butil/atomicops.h"
25+
#include "butil/time.h"
26+
27+
namespace brpc {
28+
29+
DEFINE_double(backup_request_max_ratio, -1,
30+
"Maximum ratio of backup requests to total requests. "
31+
"Value in (0, 1]. -1 means no limit (default). Can be overridden "
32+
"per-channel via ChannelOptions.backup_request_max_ratio. "
33+
"Note: takes effect at Channel::Init() time; changing this flag "
34+
"at runtime does not affect already-created channels.");
35+
36+
static bool validate_backup_request_max_ratio(const char*, double v) {
37+
if (v <= 0) return true; // non-positive means disabled
38+
if (v <= 1.0) return true;
39+
LOG(ERROR) << "Invalid backup_request_max_ratio=" << v
40+
<< ", must be <= 0 (disabled) or in (0, 1]";
41+
return false;
42+
}
43+
BRPC_VALIDATE_GFLAG(backup_request_max_ratio,
44+
validate_backup_request_max_ratio);
45+
46+
DEFINE_int32(backup_request_ratio_window_size_s, 10,
47+
"Window size in seconds for computing the backup request ratio. "
48+
"Must be >= 1.");
49+
50+
static bool validate_backup_request_ratio_window_size_s(
51+
const char*, int32_t v) {
52+
if (v >= 1) return true;
53+
LOG(ERROR) << "Invalid backup_request_ratio_window_size_s=" << v
54+
<< ", must be >= 1";
55+
return false;
56+
}
57+
BRPC_VALIDATE_GFLAG(backup_request_ratio_window_size_s,
58+
validate_backup_request_ratio_window_size_s);
59+
60+
DEFINE_int32(backup_request_ratio_update_interval_s, 5,
61+
"Interval in seconds between ratio cache updates. Must be >= 1.");
62+
63+
static bool validate_backup_request_ratio_update_interval_s(
64+
const char*, int32_t v) {
65+
if (v >= 1) return true;
66+
LOG(ERROR) << "Invalid backup_request_ratio_update_interval_s=" << v
67+
<< ", must be >= 1";
68+
return false;
69+
}
70+
BRPC_VALIDATE_GFLAG(backup_request_ratio_update_interval_s,
71+
validate_backup_request_ratio_update_interval_s);
72+
73+
// Standalone statistics module for tracking backup/total request ratio
74+
// within a sliding time window.
75+
class BackupRateLimiter {
76+
public:
77+
BackupRateLimiter(double max_backup_ratio,
78+
int window_size_seconds,
79+
int update_interval_seconds)
80+
: _max_backup_ratio(max_backup_ratio)
81+
, _update_interval_us(update_interval_seconds * 1000000LL)
82+
, _total_window(&_total_count, window_size_seconds)
83+
, _backup_window(&_backup_count, window_size_seconds)
84+
, _cached_ratio(0.0)
85+
, _last_update_us(0) {
86+
}
87+
88+
// All atomic operations use relaxed ordering intentionally.
89+
// This is best-effort rate limiting: a slightly stale ratio is
90+
// acceptable for approximate throttling.
91+
bool ShouldAllow() const {
92+
const int64_t now_us = butil::cpuwide_time_us();
93+
int64_t last_us = _last_update_us.load(butil::memory_order_relaxed);
94+
double ratio = _cached_ratio.load(butil::memory_order_relaxed);
95+
96+
if (now_us - last_us >= _update_interval_us) {
97+
if (_last_update_us.compare_exchange_strong(
98+
last_us, now_us, butil::memory_order_relaxed)) {
99+
int64_t total = _total_window.get_value();
100+
int64_t backup = _backup_window.get_value();
101+
ratio = (total > 0) ? static_cast<double>(backup) / total : 0.0;
102+
_cached_ratio.store(ratio, butil::memory_order_relaxed);
103+
}
104+
}
105+
106+
return ratio < _max_backup_ratio;
107+
}
108+
109+
void OnRPCEnd(const Controller* controller) {
110+
_total_count << 1;
111+
if (controller->has_backup_request()) {
112+
_backup_count << 1;
113+
}
114+
}
115+
116+
private:
117+
double _max_backup_ratio;
118+
int64_t _update_interval_us;
119+
120+
bvar::Adder<int64_t> _total_count;
121+
bvar::Adder<int64_t> _backup_count;
122+
bvar::Window<bvar::Adder<int64_t>> _total_window;
123+
bvar::Window<bvar::Adder<int64_t>> _backup_window;
124+
125+
mutable butil::atomic<double> _cached_ratio;
126+
mutable butil::atomic<int64_t> _last_update_us;
127+
};
128+
129+
// Internal BackupRequestPolicy that composes a BackupRateLimiter
130+
// for ratio-based suppression.
131+
class RateLimitedBackupPolicy : public BackupRequestPolicy {
132+
public:
133+
RateLimitedBackupPolicy(int32_t backup_request_ms,
134+
double max_backup_ratio,
135+
int window_size_seconds,
136+
int update_interval_seconds)
137+
: _backup_request_ms(backup_request_ms)
138+
, _rate_limiter(max_backup_ratio, window_size_seconds,
139+
update_interval_seconds) {
140+
}
141+
142+
int32_t GetBackupRequestMs(const Controller* /*controller*/) const override {
143+
return _backup_request_ms;
144+
}
145+
146+
bool DoBackup(const Controller* /*controller*/) const override {
147+
return _rate_limiter.ShouldAllow();
148+
}
149+
150+
void OnRPCEnd(const Controller* controller) override {
151+
_rate_limiter.OnRPCEnd(controller);
152+
}
153+
154+
private:
155+
int32_t _backup_request_ms;
156+
BackupRateLimiter _rate_limiter;
157+
};
158+
159+
BackupRequestPolicy* CreateRateLimitedBackupPolicy(
160+
int32_t backup_request_ms,
161+
double max_backup_ratio,
162+
int window_size_seconds,
163+
int update_interval_seconds) {
164+
return new RateLimitedBackupPolicy(
165+
backup_request_ms, max_backup_ratio,
166+
window_size_seconds, update_interval_seconds);
167+
}
168+
169+
} // namespace brpc

src/brpc/backup_request_policy.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,16 @@ class BackupRequestPolicy {
3838
virtual void OnRPCEnd(const Controller* controller) = 0;
3939
};
4040

41+
// Create a BackupRequestPolicy that limits the ratio of backup requests
42+
// to total requests within a sliding time window. When the ratio reaches
43+
// or exceeds max_backup_ratio, DoBackup() returns false.
44+
// The caller owns the returned pointer.
45+
BackupRequestPolicy* CreateRateLimitedBackupPolicy(
46+
int32_t backup_request_ms,
47+
double max_backup_ratio,
48+
int window_size_seconds,
49+
int update_interval_seconds);
50+
4151
}
4252

4353
#endif // BRPC_BACKUP_REQUEST_POLICY_H

src/brpc/channel.cpp

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ namespace brpc {
4343

4444
DECLARE_bool(enable_rpcz);
4545
DECLARE_bool(usercode_in_pthread);
46+
DECLARE_double(backup_request_max_ratio);
47+
DECLARE_int32(backup_request_ratio_window_size_s);
48+
DECLARE_int32(backup_request_ratio_update_interval_s);
4649
DEFINE_string(health_check_path, "", "Http path of health check call."
4750
"By default health check succeeds if the server is connectable."
4851
"If this flag is set, health check is not completed until a http "
@@ -63,6 +66,7 @@ ChannelOptions::ChannelOptions()
6366
, log_succeed_without_server(true)
6467
, socket_mode(SOCKET_MODE_TCP)
6568
, auth(NULL)
69+
, backup_request_max_ratio(-1)
6670
, backup_request_policy(NULL)
6771
, retry_policy(NULL)
6872
, ns_filter(NULL)
@@ -164,6 +168,7 @@ Channel::Channel(ProfilerLinker)
164168
, _serialize_request(NULL)
165169
, _pack_request(NULL)
166170
, _get_method_name(NULL)
171+
, _owns_backup_policy(false)
167172
, _preferred_index(-1) {
168173
}
169174

@@ -172,10 +177,20 @@ Channel::~Channel() {
172177
const ChannelSignature sig = ComputeChannelSignature(_options);
173178
SocketMapRemove(SocketMapKey(_server_address, sig));
174179
}
180+
if (_owns_backup_policy) {
181+
delete _options.backup_request_policy;
182+
}
175183
}
176184

177185

178186
int Channel::InitChannelOptions(const ChannelOptions* options) {
187+
// Clean up any previously created internal backup policy (re-Init case).
188+
if (_owns_backup_policy) {
189+
delete _options.backup_request_policy;
190+
_options.backup_request_policy = NULL;
191+
_owns_backup_policy = false;
192+
}
193+
179194
if (options) { // Override default options if user provided one.
180195
_options = *options;
181196
}
@@ -242,6 +257,32 @@ int Channel::InitChannelOptions(const ChannelOptions* options) {
242257
if (!cg.empty() && (::isspace(cg.front()) || ::isspace(cg.back()))) {
243258
butil::TrimWhitespace(cg, butil::TRIM_ALL, &cg);
244259
}
260+
261+
// Create rate-limited backup policy if configured.
262+
// User-provided backup_request_policy takes precedence.
263+
if (_options.backup_request_policy != NULL &&
264+
_options.backup_request_max_ratio > 0) {
265+
LOG(WARNING) << "backup_request_max_ratio is ignored because "
266+
"backup_request_policy is already set";
267+
}
268+
// Per-channel option takes precedence over the global gflag.
269+
double max_ratio = _options.backup_request_max_ratio;
270+
if (max_ratio < 0) {
271+
max_ratio = FLAGS_backup_request_max_ratio;
272+
}
273+
if (max_ratio > 1.0) {
274+
LOG(WARNING) << "backup_request_max_ratio=" << max_ratio
275+
<< " is out of range (0, 1], clamped to 1.0";
276+
max_ratio = 1.0;
277+
}
278+
if (max_ratio > 0 && _options.backup_request_policy == NULL &&
279+
_options.backup_request_ms >= 0) {
280+
_options.backup_request_policy = CreateRateLimitedBackupPolicy(
281+
_options.backup_request_ms, max_ratio,
282+
FLAGS_backup_request_ratio_window_size_s,
283+
FLAGS_backup_request_ratio_update_interval_s);
284+
_owns_backup_policy = true;
285+
}
245286
return 0;
246287
}
247288

src/brpc/channel.h

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,11 +116,22 @@ struct ChannelOptions {
116116
// Default: NULL
117117
const Authenticator* auth;
118118

119+
// Maximum ratio of backup requests to total requests within a sliding
120+
// time window. When the ratio reaches or exceeds this value, backup
121+
// requests are suppressed. Value in (0, 1]. -1 means no limit.
122+
// Only effective when backup_request_ms >= 0 and backup_request_policy
123+
// is NULL (i.e. no custom policy). When effective, an internal
124+
// rate-limited BackupRequestPolicy is created and used automatically.
125+
// Default: -1 (no limit, same as FLAGS_backup_request_max_ratio)
126+
double backup_request_max_ratio;
127+
119128
// Customize the backup request time and whether to send backup request.
120-
// Priority: `backup_request_policy' > `backup_request_ms'.
129+
// Priority: `backup_request_policy' > `backup_request_max_ratio' > `backup_request_ms'.
121130
// Overridable by Controller.set_backup_request_ms() or
122131
// Controller.set_backup_request_policy().
123-
// This object is NOT owned by channel and should remain valid when channel is used.
132+
// When user-supplied, this object is NOT owned by channel and should
133+
// remain valid during channel's lifetime. When backup_request_max_ratio
134+
// creates an internal policy, that policy IS owned by channel.
124135
// Default: NULL
125136
BackupRequestPolicy* backup_request_policy;
126137

@@ -263,6 +274,7 @@ friend class SelectiveChannel;
263274
// the RPC above has finished
264275
butil::intrusive_ptr<SharedLoadBalancer> _lb;
265276
ChannelOptions _options;
277+
bool _owns_backup_policy;
266278
int _preferred_index;
267279
};
268280

0 commit comments

Comments
 (0)