Skip to content

Commit bf88565

Browse files
authored
support dynamic update method concurrency (#2923)
1 parent c7add74 commit bf88565

10 files changed

Lines changed: 39 additions & 8 deletions

src/brpc/adaptive_max_concurrency.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include "butil/logging.h"
2222
#include "butil/strings/string_number_conversions.h"
2323
#include "brpc/adaptive_max_concurrency.h"
24+
#include "brpc/concurrency_limiter.h"
2425

2526
namespace brpc {
2627

@@ -72,6 +73,9 @@ void AdaptiveMaxConcurrency::operator=(const butil::StringPiece& value) {
7273
value.CopyToString(&_value);
7374
_max_concurrency = -1;
7475
}
76+
if (_cl) {
77+
_cl->ResetMaxConcurrency(*this);
78+
}
7579
}
7680

7781
void AdaptiveMaxConcurrency::operator=(int max_concurrency) {
@@ -82,12 +86,18 @@ void AdaptiveMaxConcurrency::operator=(int max_concurrency) {
8286
_value = butil::string_printf("%d", max_concurrency);
8387
_max_concurrency = max_concurrency;
8488
}
89+
if (_cl) {
90+
_cl->ResetMaxConcurrency(*this);
91+
}
8592
}
8693

8794
void AdaptiveMaxConcurrency::operator=(const TimeoutConcurrencyConf& value) {
8895
_value = "timeout";
8996
_max_concurrency = -1;
9097
_timeout_conf = value;
98+
if (_cl) {
99+
_cl->ResetMaxConcurrency(*this);
100+
}
91101
}
92102

93103
const std::string& AdaptiveMaxConcurrency::type() const {

src/brpc/adaptive_max_concurrency.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ struct TimeoutConcurrencyConf {
3232
int max_concurrency;
3333
};
3434

35+
class ConcurrencyLimiter;
3536
class AdaptiveMaxConcurrency{
3637
public:
3738
explicit AdaptiveMaxConcurrency();
@@ -68,11 +69,14 @@ class AdaptiveMaxConcurrency{
6869
static const std::string& UNLIMITED();
6970
static const std::string& CONSTANT();
7071

72+
void SetConcurrencyLimiter(ConcurrencyLimiter* cl) { _cl = cl; }
73+
7174
private:
7275
std::string _value;
7376
int _max_concurrency;
7477
TimeoutConcurrencyConf
7578
_timeout_conf; // TODO std::varient for different type
79+
ConcurrencyLimiter* _cl{nullptr};
7680
};
7781

7882
inline std::ostream& operator<<(std::ostream& os, const AdaptiveMaxConcurrency& amc) {

src/brpc/concurrency_limiter.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ class ConcurrencyLimiter {
4747
// The return value is only for logging.
4848
virtual int MaxConcurrency() = 0;
4949

50+
// Reset max_concurrency
51+
virtual int ResetMaxConcurrency(const AdaptiveMaxConcurrency& amc) = 0;
52+
5053
// Create an instance from the amc
5154
// Caller is responsible for delete the instance after usage.
5255
virtual ConcurrencyLimiter* New(const AdaptiveMaxConcurrency& amc) const = 0;

src/brpc/policy/auto_concurrency_limiter.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,10 @@ int AutoConcurrencyLimiter::MaxConcurrency() {
134134
return _max_concurrency;
135135
}
136136

137+
int AutoConcurrencyLimiter::ResetMaxConcurrency(const AdaptiveMaxConcurrency&) {
138+
return -1;
139+
}
140+
137141
int64_t AutoConcurrencyLimiter::NextResetTime(int64_t sampling_time_us) {
138142
int64_t reset_start_us = sampling_time_us +
139143
(FLAGS_auto_cl_noload_latency_remeasure_interval_ms / 2 +

src/brpc/policy/auto_concurrency_limiter.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ class AutoConcurrencyLimiter : public ConcurrencyLimiter {
3535

3636
int MaxConcurrency() override;
3737

38+
int ResetMaxConcurrency(const AdaptiveMaxConcurrency&) override;
39+
3840
AutoConcurrencyLimiter* New(const AdaptiveMaxConcurrency&) const override;
3941

4042
private:

src/brpc/policy/constant_concurrency_limiter.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,12 @@ int ConstantConcurrencyLimiter::MaxConcurrency() {
3535
return _max_concurrency.load(butil::memory_order_relaxed);
3636
}
3737

38+
int ConstantConcurrencyLimiter::ResetMaxConcurrency(
39+
const AdaptiveMaxConcurrency& amc) {
40+
_max_concurrency.store(static_cast<int>(amc), butil::memory_order_relaxed);
41+
return 0;
42+
}
43+
3844
ConstantConcurrencyLimiter*
3945
ConstantConcurrencyLimiter::New(const AdaptiveMaxConcurrency& amc) const {
4046
CHECK_EQ(amc.type(), AdaptiveMaxConcurrency::CONSTANT());

src/brpc/policy/constant_concurrency_limiter.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ class ConstantConcurrencyLimiter : public ConcurrencyLimiter {
3333

3434
int MaxConcurrency() override;
3535

36+
int ResetMaxConcurrency(const AdaptiveMaxConcurrency&) override;
37+
3638
ConstantConcurrencyLimiter* New(const AdaptiveMaxConcurrency&) const override;
3739

3840
private:

src/brpc/policy/timeout_concurrency_limiter.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,11 @@ int TimeoutConcurrencyLimiter::MaxConcurrency() {
117117
return FLAGS_timeout_cl_max_concurrency;
118118
}
119119

120+
int TimeoutConcurrencyLimiter::ResetMaxConcurrency(
121+
const AdaptiveMaxConcurrency &) {
122+
return -1;
123+
}
124+
120125
bool TimeoutConcurrencyLimiter::AddSample(int error_code, int64_t latency_us,
121126
int64_t sampling_time_us) {
122127
std::unique_lock<butil::Mutex> lock_guard(_sw_mutex);

src/brpc/policy/timeout_concurrency_limiter.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ class TimeoutConcurrencyLimiter : public ConcurrencyLimiter {
3434

3535
int MaxConcurrency() override;
3636

37+
int ResetMaxConcurrency(const AdaptiveMaxConcurrency&) override;
38+
3739
TimeoutConcurrencyLimiter* New(
3840
const AdaptiveMaxConcurrency&) const override;
3941

src/brpc/server.cpp

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1095,6 +1095,7 @@ int Server::StartInternal(const butil::EndPoint& endpoint,
10951095
return -1;
10961096
}
10971097
it->second.status->SetConcurrencyLimiter(cl);
1098+
it->second.max_concurrency.SetConcurrencyLimiter(cl);
10981099
}
10991100
}
11001101
if (0 != SetServiceMaxConcurrency(_options.nshead_service)) {
@@ -2221,10 +2222,6 @@ int Server::ResetMaxConcurrency(int max_concurrency) {
22212222
}
22222223

22232224
AdaptiveMaxConcurrency& Server::MaxConcurrencyOf(MethodProperty* mp) {
2224-
if (IsRunning()) {
2225-
LOG(WARNING) << "MaxConcurrencyOf is only allowed before Server started";
2226-
return g_default_max_concurrency_of_method;
2227-
}
22282225
if (mp->status == NULL) {
22292226
LOG(ERROR) << "method=" << mp->method->full_name()
22302227
<< " does not support max_concurrency";
@@ -2235,10 +2232,6 @@ AdaptiveMaxConcurrency& Server::MaxConcurrencyOf(MethodProperty* mp) {
22352232
}
22362233

22372234
int Server::MaxConcurrencyOf(const MethodProperty* mp) const {
2238-
if (IsRunning()) {
2239-
LOG(WARNING) << "MaxConcurrencyOf is only allowed before Server started";
2240-
return g_default_max_concurrency_of_method;
2241-
}
22422235
if (mp == NULL || mp->status == NULL) {
22432236
return 0;
22442237
}

0 commit comments

Comments
 (0)