Skip to content

Commit 4c33f88

Browse files
authored
Support success limit of ParallelChannel (#2842)
* Support success limit of ParallelChannel * Update document of ParallelChannel
1 parent a18463f commit 4c33f88

5 files changed

Lines changed: 153 additions & 40 deletions

File tree

docs/cn/combo_channel.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,11 @@ ParallelChannel (有时被称为“pchan”)同时访问其包含的sub channel
1919

2020
示例代码见[example/parallel_echo_c++](https://github.com/apache/brpc/tree/master/example/parallel_echo_c++/)
2121

22-
任何brpc::ChannelBase的子类都可以加入ParallelChannel,包括ParallelChannel和其他组合Channel。用户可以设置ParallelChannelOptions.fail_limit来控制访问的最大失败次数,当失败的访问达到这个数目时,RPC会立刻结束而不等待超时。
22+
任何brpc::ChannelBase的子类都可以加入ParallelChannel,包括ParallelChannel和其他组合Channel。
23+
24+
用户可以设置ParallelChannelOptions.fail_limit来控制访问的最大失败次数,当失败的访问达到这个数目时,RPC会立刻结束而不等待超时。
25+
26+
用户可以设置ParallelChannelOptions.success_limit来控制访问的最大成功次数,当成功的访问达到这个数目时,RPC会立刻结束。ParallelChannelOptions.fail_limit的优先级高于ParallelChannelOptions.success_limit,只有未设置fail_limit时,success_limit才会生效。
2327

2428
一个sub channel可多次加入同一个ParallelChannel。当你需要对同一个服务发起多次异步访问并等待它们完成的话,这很有用。
2529

docs/en/combo_channel.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,11 @@ We need a better abstraction. If several channels are combined into a larger one
1919

2020
Check [example/parallel_echo_c++](https://github.com/apache/brpc/tree/master/example/parallel_echo_c++/) for an example.
2121

22-
Any subclasses of `brpc::ChannelBase` can be added into `ParallelChannel`, including `ParallelChannel` and other combo channels. Set `ParallelChannelOptions.fail_limit` to control maximum number of failures. When number of failed responses reaches the limit, the RPC is ended immediately rather than waiting for timeout.
22+
Any subclasses of `brpc::ChannelBase` can be added into `ParallelChannel`, including `ParallelChannel` and other combo channels.
23+
24+
Set `ParallelChannelOptions.fail_limit` to control maximum number of failures. When number of failed responses reaches the limit, the RPC is ended immediately rather than waiting for timeout.
25+
26+
Set `ParallelChannelOptions.sucess_limit` to control maximum number of successful responses. When number of successful responses reaches the limit, the RPC is ended immediately.`ParallelChannelOptions.fail_limit` has a higher priority than `ParallelChannelOptions.success_limit`. Success_limit will take effect only when fail_limit is not set.
2327

2428
A sub channel can be added to the same `ParallelChannel` more than once, which is useful when you need to initiate multiple asynchronous RPC to the same service and wait for their completions.
2529

src/brpc/parallel_channel.cpp

Lines changed: 45 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,8 @@
2424
#include "brpc/details/controller_private_accessor.h"
2525
#include "brpc/parallel_channel.h"
2626

27-
2827
namespace brpc {
2928

30-
ParallelChannelOptions::ParallelChannelOptions()
31-
: timeout_ms(500)
32-
, fail_limit(-1) {
33-
}
34-
3529
DECLARE_bool(usercode_in_pthread);
3630

3731
// Not see difference when memory is cached.
@@ -45,12 +39,15 @@ static __thread Memory tls_cached_pchan_mem = { 0, NULL };
4539

4640
class ParallelChannelDone : public google::protobuf::Closure {
4741
private:
48-
ParallelChannelDone(int fail_limit, int ndone, int nchan, int memsize,
42+
ParallelChannelDone(int fail_limit, int success_limit,
43+
int ndone, int nchan, int memsize,
4944
Controller* cntl, google::protobuf::Closure* user_done)
5045
: _fail_limit(fail_limit)
46+
, _success_limit(success_limit)
5147
, _ndone(ndone)
5248
, _nchan(nchan)
5349
, _memsize(memsize)
50+
, _current_success(0)
5451
, _current_fail(0)
5552
, _current_done(0)
5653
, _cntl(cntl)
@@ -59,15 +56,13 @@ class ParallelChannelDone : public google::protobuf::Closure {
5956
, _callmethod_pthread(0) {
6057
}
6158

62-
~ParallelChannelDone() { }
63-
6459
public:
6560
class SubDone : public google::protobuf::Closure {
6661
public:
6762
SubDone() : shared_data(NULL) {
6863
}
6964

70-
~SubDone() {
65+
~SubDone() override {
7166
// Can't delete request/response in ~SubCall because the
7267
// object is copyable.
7368
if (ap.flags & DELETE_REQUEST) {
@@ -78,7 +73,7 @@ class ParallelChannelDone : public google::protobuf::Closure {
7873
}
7974
}
8075

81-
void Run() {
76+
void Run() override {
8277
shared_data->OnSubDoneRun(this);
8378
}
8479

@@ -89,7 +84,8 @@ class ParallelChannelDone : public google::protobuf::Closure {
8984
};
9085

9186
static ParallelChannelDone* Create(
92-
int fail_limit, int ndone, const SubCall* aps, int nchan,
87+
int fail_limit, int success_limit,
88+
int ndone, const SubCall* aps, int nchan,
9389
Controller* cntl, google::protobuf::Closure* user_done) {
9490
// We need to create the object in this way because _sub_done is
9591
// dynamically allocated.
@@ -130,8 +126,8 @@ class ParallelChannelDone : public google::protobuf::Closure {
130126
return NULL;
131127
}
132128
#endif
133-
ParallelChannelDone* d = new (mem) ParallelChannelDone(
134-
fail_limit, ndone, nchan, memsize, cntl, user_done);
129+
auto d = new (mem) ParallelChannelDone(
130+
fail_limit, success_limit, ndone, nchan, memsize, cntl, user_done);
135131

136132
// Apply client settings of _cntl to controllers of sub calls, except
137133
// timeout. If we let sub channel do their timeout separately, when
@@ -183,7 +179,7 @@ class ParallelChannelDone : public google::protobuf::Closure {
183179
}
184180
}
185181

186-
void Run() {
182+
void Run() override {
187183
const int ec = _cntl->ErrorCode();
188184
if (ec == EPCHANFINISH) {
189185
// all sub calls finished. Clear the error and we'll set
@@ -220,14 +216,25 @@ class ParallelChannelDone : public google::protobuf::Closure {
220216
if (fin != NULL) {
221217
// [ called from SubDone::Run() ]
222218

223-
// Count failed sub calls, if fail_limit is reached, cancel others.
224-
if (fin->cntl.FailedInline() &&
225-
_current_fail.fetch_add(1, butil::memory_order_relaxed) + 1
226-
== _fail_limit) {
219+
int error_code = fin->cntl.ErrorCode();
220+
// EPCHANFINISH is not an error of sub calls.
221+
bool fail = 0 != error_code && EPCHANFINISH != error_code;
222+
bool cancel =
223+
// Count failed sub calls, if `fail_limit' is reached, cancel others.
224+
(fail && _current_fail.fetch_add(1, butil::memory_order_relaxed) + 1
225+
== _fail_limit) ||
226+
// Count successful sub calls, if `success_limit' is reached, cancel others.
227+
(0 == error_code &&
228+
_current_success.fetch_add(1, butil::memory_order_relaxed) + 1
229+
== _success_limit);
230+
231+
if (cancel) {
232+
// Only cancel once by `fail_limit' or `success_limit'.
227233
for (int i = 0; i < _ndone; ++i) {
228234
SubDone* sd = sub_done(i);
229235
if (fin != sd) {
230-
bthread_id_error(sd->cntl.call_id(), ECANCELED);
236+
bthread_id_error(
237+
sd->cntl.call_id(), fail ? ECANCELED : EPCHANFINISH);
231238
}
232239
}
233240
}
@@ -423,13 +430,15 @@ class ParallelChannelDone : public google::protobuf::Closure {
423430

424431
private:
425432
int _fail_limit;
433+
int _success_limit;
426434
int _ndone;
427435
int _nchan;
428436
#if defined(__clang__)
429437
int ALLOW_UNUSED _memsize;
430438
#else
431439
int _memsize;
432440
#endif
441+
butil::atomic<int> _current_success;
433442
butil::atomic<int> _current_fail;
434443
butil::atomic<uint32_t> _current_done;
435444
Controller* _cntl;
@@ -602,6 +611,7 @@ void ParallelChannel::CallMethod(
602611
ParallelChannelDone* d = NULL;
603612
int ndone = nchan;
604613
int fail_limit = 1;
614+
int success_limit = 1;
605615
DEFINE_SMALL_ARRAY(SubCall, aps, nchan, 64);
606616

607617
if (cntl->FailedInline()) {
@@ -655,9 +665,21 @@ void ParallelChannel::CallMethod(
655665
fail_limit = ndone;
656666
}
657667
}
658-
659-
d = ParallelChannelDone::Create(fail_limit, ndone, aps, nchan,
660-
cntl, done);
668+
669+
// `success_limit' is only valid when `fail_limit' is not set.
670+
if (_options.fail_limit >= 0 || _options.success_limit < 0) {
671+
success_limit = ndone;
672+
} else {
673+
success_limit = _options.success_limit;
674+
if (success_limit < 1) {
675+
success_limit = 1;
676+
} else if (success_limit > ndone) {
677+
success_limit = ndone;
678+
}
679+
}
680+
681+
d = ParallelChannelDone::Create(
682+
fail_limit, success_limit, ndone, aps, nchan, cntl, done);
661683
if (NULL == d) {
662684
cntl->SetFailed(ENOMEM, "Fail to new ParallelChannelDone");
663685
goto FAIL;

src/brpc/parallel_channel.h

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ class CallMapper : public SharedObject {
112112
}
113113

114114
// Only callable by subclasses and butil::intrusive_ptr
115-
virtual ~CallMapper() {}
115+
~CallMapper() override = default;
116116
};
117117

118118
// Clone req_base typed `Req'.
@@ -140,12 +140,11 @@ class ResponseMerger : public SharedObject {
140140
FAIL_ALL
141141
};
142142

143-
ResponseMerger() { }
144143
virtual Result Merge(google::protobuf::Message* response,
145144
const google::protobuf::Message* sub_response) = 0;
146145
protected:
147146
// Only callable by subclasses and butil::intrusive_ptr
148-
virtual ~ResponseMerger() { }
147+
~ResponseMerger() override = default;
149148
};
150149

151150
struct ParallelChannelOptions {
@@ -156,7 +155,7 @@ struct ParallelChannelOptions {
156155
// Overridable by Controller.set_timeout_ms().
157156
// Default: 500 (milliseconds)
158157
// Maximum: 0x7fffffff (roughly 30 days)
159-
int32_t timeout_ms;
158+
int32_t timeout_ms{500};
160159

161160
// The RPC is considered to be successful if number of failed sub RPC
162161
// does not reach this limit. Even if the RPC is timedout or canceled,
@@ -165,10 +164,14 @@ struct ParallelChannelOptions {
165164
// the timeout) when the limit is reached.
166165
// Default: number of sub channels, meaning that the RPC to ParallChannel
167166
// does not fail unless all sub RPC failed.
168-
int fail_limit;
167+
int fail_limit{-1};
169168

170-
// Construct with default options.
171-
ParallelChannelOptions();
169+
// The RPC is considered to be successful when number of successful sub
170+
// RPC reach this limit.
171+
// Default: number of sub channels, meaning that the RPC to ParallChannel
172+
// does not return unless all sub RPC succeed.
173+
// Note: `success_limit' is only valid when `fail_limit' is not set.
174+
int success_limit{ -1};
172175
};
173176

174177
// ParallelChannel(aka "pchan") accesses all sub channels simultaneously with
@@ -185,8 +188,7 @@ struct ParallelChannelOptions {
185188
class ParallelChannel : public ChannelBase {
186189
friend class Controller;
187190
public:
188-
ParallelChannel() { }
189-
~ParallelChannel();
191+
~ParallelChannel() override;
190192

191193
// Initialize ParallelChannel with `options'.
192194
// NOTE: Currently this function always returns 0.
@@ -234,7 +236,7 @@ friend class Controller;
234236
google::protobuf::RpcController* controller,
235237
const google::protobuf::Message* request,
236238
google::protobuf::Message* response,
237-
google::protobuf::Closure* done);
239+
google::protobuf::Closure* done) override;
238240

239241
// Number of sub channels.
240242
size_t channel_count() const { return _chans.size(); }
@@ -245,10 +247,10 @@ friend class Controller;
245247

246248
// Minimum weight of sub channels.
247249
// FIXME(gejun): be minimum of top(nchan-fail_limit)
248-
int Weight();
250+
int Weight() override;
249251

250252
// Put description into `os'.
251-
void Describe(std::ostream& os, const DescribeOptions&) const;
253+
void Describe(std::ostream& os, const DescribeOptions&) const override;
252254

253255
public:
254256
struct SubChan {
@@ -263,7 +265,7 @@ friend class Controller;
263265

264266
protected:
265267
static void* RunDoneAndDestroy(void* arg);
266-
int CheckHealth();
268+
int CheckHealth() override;
267269

268270
ParallelChannelOptions _options;
269271
ChannelList _chans;

test/brpc_channel_unittest.cpp

Lines changed: 83 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -569,6 +569,24 @@ class ChannelTest : public ::testing::Test{
569569
}
570570
};
571571

572+
class SuccessLimitCallMapper : public brpc::CallMapper {
573+
public:
574+
brpc::SubCall Map(int channel_index,
575+
const google::protobuf::MethodDescriptor* method,
576+
const google::protobuf::Message* req_base,
577+
google::protobuf::Message* response) override {
578+
auto req = brpc::Clone<test::EchoRequest>(req_base);
579+
req->set_code(channel_index + 1/*non-zero*/);
580+
if (_index++ > 0) {
581+
req->set_sleep_us(5 * 1000);
582+
}
583+
return brpc::SubCall(method, req, response->New(),
584+
brpc::DELETE_REQUEST | brpc::DELETE_RESPONSE);
585+
}
586+
private:
587+
size_t _index{0};
588+
};
589+
572590
class MergeNothing : public brpc::ResponseMerger {
573591
Result Merge(google::protobuf::Message* /*response*/,
574592
const google::protobuf::Message* /*sub_response*/) {
@@ -826,7 +844,60 @@ class ChannelTest : public ::testing::Test{
826844
}
827845
StopAndJoin();
828846
}
829-
847+
848+
void TestSuccessLimitParallel(bool single_server, bool async, bool short_connection) {
849+
std::cout << " *** single=" << single_server
850+
<< " async=" << async
851+
<< " short=" << short_connection << std::endl;
852+
853+
ASSERT_EQ(0, StartAccept(_ep));
854+
const size_t NCHANS = 8;
855+
brpc::Channel subchans[NCHANS];
856+
brpc::ParallelChannel channel;
857+
brpc::ParallelChannelOptions options;
858+
// Only care about the first successful response.
859+
options.success_limit = 1;
860+
channel.Init(&options);
861+
butil::intrusive_ptr<brpc::CallMapper> fast_call_mapper(new SuccessLimitCallMapper);
862+
for (size_t i = 0; i < NCHANS; ++i) {
863+
SetUpChannel(&subchans[i], single_server, short_connection);
864+
ASSERT_EQ(0, channel.AddChannel(
865+
&subchans[i], brpc::DOESNT_OWN_CHANNEL, fast_call_mapper, NULL));
866+
}
867+
brpc::Controller cntl;
868+
test::EchoRequest req;
869+
test::EchoResponse res;
870+
req.set_message(__FUNCTION__);
871+
req.set_code(23);
872+
CallMethod(&channel, &cntl, &req, &res, async);
873+
874+
EXPECT_EQ(0, cntl.ErrorCode()) << cntl.ErrorText();
875+
EXPECT_EQ(NCHANS, (size_t)cntl.sub_count());
876+
for (int i = 0; i < cntl.sub_count(); ++i) {
877+
EXPECT_TRUE(cntl.sub(i)) << "i=" << i;
878+
if (0 == i) {
879+
EXPECT_TRUE(!cntl.sub(i)->Failed()) << "i=" << i;
880+
} else {
881+
EXPECT_TRUE(cntl.sub(i)->Failed()) << "i=" << i;
882+
EXPECT_EQ(brpc::EPCHANFINISH, cntl.sub(i)->ErrorCode()) << "i=" << i;
883+
}
884+
}
885+
EXPECT_EQ("received " + std::string(__FUNCTION__), res.message());
886+
ASSERT_EQ(1, res.code_list_size());
887+
ASSERT_EQ((int)1, res.code_list(0));
888+
if (short_connection) {
889+
// Sleep to let `_messenger' detect `Socket' being `SetFailed'
890+
const int64_t start_time = butil::gettimeofday_us();
891+
while (_messenger.ConnectionCount() != 0) {
892+
EXPECT_LT(butil::gettimeofday_us(), start_time + 100000L/*100ms*/);
893+
bthread_usleep(1000);
894+
}
895+
} else {
896+
EXPECT_GE(1ul, _messenger.ConnectionCount());
897+
}
898+
StopAndJoin();
899+
}
900+
830901
struct CancelerArg {
831902
int64_t sleep_before_cancel_us;
832903
brpc::CallId cid;
@@ -2382,7 +2453,7 @@ TEST_F(ChannelTest, success_parallel) {
23822453
}
23832454

23842455
TEST_F(ChannelTest, success_duplicated_parallel) {
2385-
for (int i = 0; i <= 1; ++i) { // Flag SingleServer
2456+
for (int i = 0; i <= 1; ++i) { // Flag SingleServer
23862457
for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
23872458
for (int k = 0; k <=1; ++k) { // Flag ShortConnection
23882459
TestSuccessDuplicatedParallel(i, j, k);
@@ -2421,6 +2492,16 @@ TEST_F(ChannelTest, success_parallel2) {
24212492
}
24222493
}
24232494

2495+
TEST_F(ChannelTest, success_limit_parallel) {
2496+
for (int i = 0; i <= 1; ++i) { // Flag SingleServer
2497+
for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
2498+
for (int k = 0; k <=1; ++k) { // Flag ShortConnection
2499+
TestSuccessLimitParallel(i, j, k);
2500+
}
2501+
}
2502+
}
2503+
}
2504+
24242505
TEST_F(ChannelTest, cancel_before_callmethod) {
24252506
for (int i = 0; i <= 1; ++i) { // Flag SingleServer
24262507
for (int j = 0; j <= 1; ++j) { // Flag Asynchronous

0 commit comments

Comments
 (0)