Skip to content

Commit 7501ec0

Browse files
committed
Make sure WeightedRandomizedLoadBalancer can traverse the whole server list
1 parent 3eb4041 commit 7501ec0

10 files changed

Lines changed: 108 additions & 74 deletions

src/brpc/load_balancer.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,13 @@ inline Extension<const LoadBalancer>* LoadBalancerExtension() {
184184
return Extension<const LoadBalancer>::instance();
185185
}
186186

187+
inline uint32_t GenRandomStride() {
188+
uint32_t prime_offset[] = {
189+
#include "bthread/offset_inl.list"
190+
};
191+
return prime_offset[butil::fast_rand_less_than(ARRAY_SIZE(prime_offset))];
192+
}
193+
187194
} // namespace brpc
188195

189196

src/brpc/policy/dynpart_load_balancer.h

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,14 @@ namespace policy {
3333

3434
class DynPartLoadBalancer : public LoadBalancer {
3535
public:
36-
bool AddServer(const ServerId& id);
37-
bool RemoveServer(const ServerId& id);
38-
size_t AddServersInBatch(const std::vector<ServerId>& servers);
39-
size_t RemoveServersInBatch(const std::vector<ServerId>& servers);
40-
int SelectServer(const SelectIn& in, SelectOut* out);
41-
DynPartLoadBalancer* New(const butil::StringPiece&) const;
42-
void Destroy();
43-
void Describe(std::ostream&, const DescribeOptions& options);
36+
bool AddServer(const ServerId& id) override;
37+
bool RemoveServer(const ServerId& id) override;
38+
size_t AddServersInBatch(const std::vector<ServerId>& servers) override;
39+
size_t RemoveServersInBatch(const std::vector<ServerId>& servers) override;
40+
int SelectServer(const SelectIn& in, SelectOut* out) override;
41+
DynPartLoadBalancer* New(const butil::StringPiece&) const override;
42+
void Destroy() override;
43+
void Describe(std::ostream&, const DescribeOptions& options) override;
4444

4545
private:
4646
struct Servers {

src/brpc/policy/locality_aware_load_balancer.h

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -41,16 +41,16 @@ DECLARE_double(punish_inflight_ratio);
4141
class LocalityAwareLoadBalancer : public LoadBalancer {
4242
public:
4343
LocalityAwareLoadBalancer();
44-
~LocalityAwareLoadBalancer();
45-
bool AddServer(const ServerId& id);
46-
bool RemoveServer(const ServerId& id);
47-
size_t AddServersInBatch(const std::vector<ServerId>& servers);
48-
size_t RemoveServersInBatch(const std::vector<ServerId>& servers);
49-
LocalityAwareLoadBalancer* New(const butil::StringPiece&) const;
50-
void Destroy();
51-
int SelectServer(const SelectIn& in, SelectOut* out);
52-
void Feedback(const CallInfo& info);
53-
void Describe(std::ostream& os, const DescribeOptions& options);
44+
~LocalityAwareLoadBalancer() override;
45+
bool AddServer(const ServerId& id) override;
46+
bool RemoveServer(const ServerId& id) override;
47+
size_t AddServersInBatch(const std::vector<ServerId>& servers) override;
48+
size_t RemoveServersInBatch(const std::vector<ServerId>& servers) override;
49+
LocalityAwareLoadBalancer* New(const butil::StringPiece&) const override;
50+
void Destroy() override;
51+
int SelectServer(const SelectIn& in, SelectOut* out) override;
52+
void Feedback(const CallInfo& info) override;
53+
void Describe(std::ostream& os, const DescribeOptions& options) override;
5454

5555
private:
5656
struct TimeInfo {

src/brpc/policy/randomized_load_balancer.h

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,14 @@ namespace policy {
3333
// than RoundRobinLoadBalancer.
3434
class RandomizedLoadBalancer : public LoadBalancer {
3535
public:
36-
bool AddServer(const ServerId& id);
37-
bool RemoveServer(const ServerId& id);
38-
size_t AddServersInBatch(const std::vector<ServerId>& servers);
39-
size_t RemoveServersInBatch(const std::vector<ServerId>& servers);
40-
int SelectServer(const SelectIn& in, SelectOut* out);
41-
RandomizedLoadBalancer* New(const butil::StringPiece&) const;
42-
void Destroy();
43-
void Describe(std::ostream& os, const DescribeOptions&);
36+
bool AddServer(const ServerId& id) override;
37+
bool RemoveServer(const ServerId& id) override;
38+
size_t AddServersInBatch(const std::vector<ServerId>& servers) override;
39+
size_t RemoveServersInBatch(const std::vector<ServerId>& servers) override;
40+
int SelectServer(const SelectIn& in, SelectOut* out) override;
41+
RandomizedLoadBalancer* New(const butil::StringPiece&) const override;
42+
void Destroy() override;
43+
void Describe(std::ostream& os, const DescribeOptions&) override;
4444

4545
private:
4646
struct Servers {

src/brpc/policy/round_robin_load_balancer.cpp

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,6 @@
2525
namespace brpc {
2626
namespace policy {
2727

28-
const uint32_t prime_offset[] = {
29-
#include "bthread/offset_inl.list"
30-
};
31-
32-
inline uint32_t GenRandomStride() {
33-
return prime_offset[butil::fast_rand_less_than(ARRAY_SIZE(prime_offset))];
34-
}
35-
3628
bool RoundRobinLoadBalancer::Add(Servers& bg, const ServerId& id) {
3729
if (bg.server_list.capacity() < 128) {
3830
bg.server_list.reserve(128);

src/brpc/policy/round_robin_load_balancer.h

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,14 @@ namespace policy {
3232
// at the same time) are very close.
3333
class RoundRobinLoadBalancer : public LoadBalancer {
3434
public:
35-
bool AddServer(const ServerId& id);
36-
bool RemoveServer(const ServerId& id);
37-
size_t AddServersInBatch(const std::vector<ServerId>& servers);
38-
size_t RemoveServersInBatch(const std::vector<ServerId>& servers);
39-
int SelectServer(const SelectIn& in, SelectOut* out);
40-
RoundRobinLoadBalancer* New(const butil::StringPiece&) const;
41-
void Destroy();
42-
void Describe(std::ostream&, const DescribeOptions& options);
35+
bool AddServer(const ServerId& id) override;
36+
bool RemoveServer(const ServerId& id) override;
37+
size_t AddServersInBatch(const std::vector<ServerId>& servers) override;
38+
size_t RemoveServersInBatch(const std::vector<ServerId>& servers) override;
39+
int SelectServer(const SelectIn& in, SelectOut* out) override;
40+
RoundRobinLoadBalancer* New(const butil::StringPiece&) const override;
41+
void Destroy() override;
42+
void Describe(std::ostream&, const DescribeOptions& options) override;
4343

4444
private:
4545
struct Servers {

src/brpc/policy/weighted_randomized_load_balancer.cpp

Lines changed: 45 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,9 @@
2626
namespace brpc {
2727
namespace policy {
2828

29-
static bool server_compare(const WeightedRandomizedLoadBalancer::Server& lhs, const WeightedRandomizedLoadBalancer::Server& rhs) {
30-
return (lhs.current_weight_sum < rhs.current_weight_sum);
29+
static bool server_compare(const WeightedRandomizedLoadBalancer::Server& lhs,
30+
const WeightedRandomizedLoadBalancer::Server& rhs) {
31+
return lhs.current_weight_sum < rhs.current_weight_sum;
3132
}
3233

3334
bool WeightedRandomizedLoadBalancer::Add(Servers& bg, const ServerId& id) {
@@ -38,15 +39,16 @@ bool WeightedRandomizedLoadBalancer::Add(Servers& bg, const ServerId& id) {
3839
if (!butil::StringToUint(id.tag, &weight) || weight <= 0) {
3940
if (FLAGS_default_weight_of_wlb > 0) {
4041
LOG(WARNING) << "Invalid weight is set: " << id.tag
41-
<< ". Now, 'weight' has been set to 'FLAGS_default_weight_of_wlb' by default.";
42+
<< ". Now, 'weight' has been set to "
43+
"FLAGS_default_weight_of_wlb by default.";
4244
weight = FLAGS_default_weight_of_wlb;
4345
} else {
4446
LOG(ERROR) << "Invalid weight is set: " << id.tag;
4547
return false;
4648
}
4749
}
4850
bool insert_server =
49-
bg.server_map.emplace(id.id, bg.server_list.size()).second;
51+
bg.server_map.emplace(id.id, bg.server_list.size()).second;
5052
if (insert_server) {
5153
uint64_t current_weight_sum = bg.weight_sum + weight;
5254
bg.server_list.emplace_back(id.id, weight, current_weight_sum);
@@ -114,6 +116,10 @@ size_t WeightedRandomizedLoadBalancer::RemoveServersInBatch(
114116
return _db_servers.Modify(BatchRemove, servers);
115117
}
116118

119+
bool WeightedRandomizedLoadBalancer::IsServerAvailable(SocketId id, SocketUniquePtr* out) {
120+
return Socket::Address(id, out) == 0 && (*out)->IsAvailable();
121+
}
122+
117123
int WeightedRandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
118124
butil::DoublyBufferedData<Servers>::ScopedPtr s;
119125
if (_db_servers.Read(&s) != 0) {
@@ -123,22 +129,49 @@ int WeightedRandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut*
123129
if (n == 0) {
124130
return ENODATA;
125131
}
132+
133+
butil::FlatSet<SocketId> random_traversed;
126134
uint64_t weight_sum = s->weight_sum;
127135
for (size_t i = 0; i < n; ++i) {
128136
uint64_t random_weight = butil::fast_rand_less_than(weight_sum);
129137
const Server random_server(0, 0, random_weight);
130-
const auto& server = std::lower_bound(s->server_list.begin(), s->server_list.end(), random_server, server_compare);
138+
const auto& server =
139+
std::lower_bound(s->server_list.begin(), s->server_list.end(),
140+
random_server, server_compare);
131141
const SocketId id = server->id;
132-
if (((i + 1) == n // always take last chance
133-
|| !ExcludedServers::IsExcluded(in.excluded, id))
134-
&& Socket::Address(id, out->ptr) == 0
135-
&& (*out->ptr)->IsAvailable()) {
136-
// We found an available server
142+
random_traversed.insert(id);
143+
if (0 != IsServerAvailable(id, out->ptr)) {
144+
continue;
145+
}
146+
if (!ExcludedServers::IsExcluded(in.excluded, id)) {
147+
// An available server is found.
137148
return 0;
138149
}
139150
}
140-
// After we traversed the whole server list, there is still no
141-
// available server
151+
152+
if (random_traversed.size() == n) {
153+
// Try to traverse the remaining servers to find an available server.
154+
uint32_t offset = butil::fast_rand_less_than(n);
155+
uint32_t stride = GenRandomStride();
156+
for (size_t i = 0; i < n; ++i) {
157+
offset = (offset + stride) % n;
158+
SocketId id = s->server_list[offset].id;
159+
if (NULL != random_traversed.seek(id)) {
160+
continue;
161+
}
162+
if (IsServerAvailable(id, out->ptr)) {
163+
// An available server is found.
164+
return 0;
165+
}
166+
}
167+
}
168+
169+
if (NULL != out->ptr) {
170+
// Use the excluded but available server.
171+
return 0;
172+
}
173+
174+
// After traversing the whole server list, no available server is found.
142175
return EHOSTDOWN;
143176
}
144177

src/brpc/policy/weighted_randomized_load_balancer.h

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,17 +31,18 @@ namespace policy {
3131
// Weight is got from tag of ServerId.
3232
class WeightedRandomizedLoadBalancer : public LoadBalancer {
3333
public:
34-
bool AddServer(const ServerId& id);
35-
bool RemoveServer(const ServerId& id);
36-
size_t AddServersInBatch(const std::vector<ServerId>& servers);
37-
size_t RemoveServersInBatch(const std::vector<ServerId>& servers);
38-
int SelectServer(const SelectIn& in, SelectOut* out);
39-
LoadBalancer* New(const butil::StringPiece&) const;
40-
void Destroy();
41-
void Describe(std::ostream& os, const DescribeOptions&);
34+
bool AddServer(const ServerId& id) override;
35+
bool RemoveServer(const ServerId& id) override;
36+
size_t AddServersInBatch(const std::vector<ServerId>& servers) override;
37+
size_t RemoveServersInBatch(const std::vector<ServerId>& servers) override;
38+
int SelectServer(const SelectIn& in, SelectOut* out) override;
39+
LoadBalancer* New(const butil::StringPiece&) const override;
40+
void Destroy() override;
41+
void Describe(std::ostream& os, const DescribeOptions&) override;
4242

4343
struct Server {
44-
Server(SocketId s_id = 0, uint32_t s_w = 0, uint64_t s_c_w_s = 0): id(s_id), weight(s_w), current_weight_sum(s_c_w_s) {}
44+
Server(SocketId s_id = 0, uint32_t s_w = 0, uint64_t s_c_w_s = 0)
45+
: id(s_id), weight(s_w), current_weight_sum(s_c_w_s) {}
4546
SocketId id;
4647
uint32_t weight;
4748
uint64_t current_weight_sum;
@@ -60,6 +61,7 @@ class WeightedRandomizedLoadBalancer : public LoadBalancer {
6061
static bool Remove(Servers& bg, const ServerId& id);
6162
static size_t BatchAdd(Servers& bg, const std::vector<ServerId>& servers);
6263
static size_t BatchRemove(Servers& bg, const std::vector<ServerId>& servers);
64+
static bool IsServerAvailable(SocketId id, SocketUniquePtr* out);
6365

6466
butil::DoublyBufferedData<Servers> _db_servers;
6567
};

src/brpc/policy/weighted_round_robin_load_balancer.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,8 @@ uint64_t GetStride(const uint64_t weight_sum, const size_t num) {
5858
return 1;
5959
}
6060
uint32_t average_weight = weight_sum / num;
61-
auto iter = std::lower_bound(prime_stride.begin(), prime_stride.end(),
62-
average_weight);
61+
auto iter = std::lower_bound(
62+
prime_stride.begin(), prime_stride.end(), average_weight);
6363
while (iter != prime_stride.end()
6464
&& !IsCoprime(weight_sum, *iter)) {
6565
++iter;
@@ -197,7 +197,7 @@ int WeightedRoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut*
197197
}
198198
filter.emplace(server_id);
199199
remain_weight -= (s->server_list[s->server_map.at(server_id)]).weight;
200-
// Select from begining status.
200+
// Select from beginning status.
201201
tls_temp.stride = GetStride(remain_weight, remain_servers);
202202
tls_temp.position = tls.position;
203203
tls_temp.remain_server = tls.remain_server;

src/brpc/policy/weighted_round_robin_load_balancer.h

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,14 @@ namespace policy {
3232
// Weight is got from tag of ServerId.
3333
class WeightedRoundRobinLoadBalancer : public LoadBalancer {
3434
public:
35-
bool AddServer(const ServerId& id);
36-
bool RemoveServer(const ServerId& id);
37-
size_t AddServersInBatch(const std::vector<ServerId>& servers);
38-
size_t RemoveServersInBatch(const std::vector<ServerId>& servers);
39-
int SelectServer(const SelectIn& in, SelectOut* out);
40-
LoadBalancer* New(const butil::StringPiece&) const;
41-
void Destroy();
42-
void Describe(std::ostream&, const DescribeOptions& options);
35+
bool AddServer(const ServerId& id) override;
36+
bool RemoveServer(const ServerId& id) override;
37+
size_t AddServersInBatch(const std::vector<ServerId>& servers) override;
38+
size_t RemoveServersInBatch(const std::vector<ServerId>& servers) override;
39+
int SelectServer(const SelectIn& in, SelectOut* out) override;
40+
LoadBalancer* New(const butil::StringPiece&) const override;
41+
void Destroy() override;
42+
void Describe(std::ostream&, const DescribeOptions& options) override;
4343

4444
private:
4545
struct Server {

0 commit comments

Comments
 (0)