Skip to content

Commit fc301c0

Browse files
committed
Make sure WeightedRandomizedLoadBalancer can traverse the whole server list
1 parent ba9e838 commit fc301c0

2 files changed

Lines changed: 37 additions & 24 deletions

File tree

src/brpc/policy/weighted_randomized_load_balancer.cpp

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,9 @@
2626
namespace brpc {
2727
namespace policy {
2828

29-
static bool server_compare(const WeightedRandomizedLoadBalancer::Server& lhs, const WeightedRandomizedLoadBalancer::Server& rhs) {
30-
return (lhs.current_weight_sum < rhs.current_weight_sum);
29+
static bool server_compare(const WeightedRandomizedLoadBalancer::Server& lhs,
30+
const WeightedRandomizedLoadBalancer::Server& rhs) {
31+
return lhs.current_weight_sum < rhs.current_weight_sum;
3132
}
3233

3334
bool WeightedRandomizedLoadBalancer::Add(Servers& bg, const ServerId& id) {
@@ -38,15 +39,16 @@ bool WeightedRandomizedLoadBalancer::Add(Servers& bg, const ServerId& id) {
3839
if (!butil::StringToUint(id.tag, &weight) || weight <= 0) {
3940
if (FLAGS_default_weight_of_wlb > 0) {
4041
LOG(WARNING) << "Invalid weight is set: " << id.tag
41-
<< ". Now, 'weight' has been set to 'FLAGS_default_weight_of_wlb' by default.";
42+
<< ". Now, 'weight' has been set to "
43+
"FLAGS_default_weight_of_wlb by default.";
4244
weight = FLAGS_default_weight_of_wlb;
4345
} else {
4446
LOG(ERROR) << "Invalid weight is set: " << id.tag;
4547
return false;
4648
}
4749
}
4850
bool insert_server =
49-
bg.server_map.emplace(id.id, bg.server_list.size()).second;
51+
bg.server_map.emplace(id.id, bg.server_list.size()).second;
5052
if (insert_server) {
5153
uint64_t current_weight_sum = bg.weight_sum + weight;
5254
bg.server_list.emplace_back(id.id, weight, current_weight_sum);
@@ -124,21 +126,27 @@ int WeightedRandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut*
124126
return ENODATA;
125127
}
126128
uint64_t weight_sum = s->weight_sum;
127-
for (size_t i = 0; i < n; ++i) {
128-
uint64_t random_weight = butil::fast_rand_less_than(weight_sum);
129-
const Server random_server(0, 0, random_weight);
130-
const auto& server = std::lower_bound(s->server_list.begin(), s->server_list.end(), random_server, server_compare);
131-
const SocketId id = server->id;
132-
if (((i + 1) == n // always take last chance
133-
|| !ExcludedServers::IsExcluded(in.excluded, id))
134-
&& Socket::Address(id, out->ptr) == 0
135-
&& (*out->ptr)->IsAvailable()) {
129+
uint64_t random_weight = butil::fast_rand_less_than(weight_sum);
130+
const Server random_server(0, 0, random_weight);
131+
const auto& server = std::lower_bound(
132+
s->server_list.begin(), s->server_list.end(), random_server, server_compare);
133+
if (IsIdSelected(in, server->id, out->ptr)) {
134+
// We found an available server
135+
return 0;
136+
}
137+
138+
uint32_t offset = server - s->server_list.begin();
139+
uint32_t stride = butil::fast_rand_less_than(n);
140+
for (size_t i = 1; i < n; ++i) {
141+
offset = (offset + stride) % n;
142+
SocketId id = s->server_list[offset].id;
143+
if ((i + 1) == n // always take last chance
144+
|| IsIdSelected(in, id, out->ptr)) {
136145
// We found an available server
137146
return 0;
138147
}
139148
}
140-
// After we traversed the whole server list, there is still no
141-
// available server
149+
// After we traversed the whole server list, there is still no available server.
142150
return EHOSTDOWN;
143151
}
144152

src/brpc/policy/weighted_randomized_load_balancer.h

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,17 +31,18 @@ namespace policy {
3131
// Weight is got from tag of ServerId.
3232
class WeightedRandomizedLoadBalancer : public LoadBalancer {
3333
public:
34-
bool AddServer(const ServerId& id);
35-
bool RemoveServer(const ServerId& id);
36-
size_t AddServersInBatch(const std::vector<ServerId>& servers);
37-
size_t RemoveServersInBatch(const std::vector<ServerId>& servers);
38-
int SelectServer(const SelectIn& in, SelectOut* out);
39-
LoadBalancer* New(const butil::StringPiece&) const;
40-
void Destroy();
41-
void Describe(std::ostream& os, const DescribeOptions&);
34+
bool AddServer(const ServerId& id) override;
35+
bool RemoveServer(const ServerId& id) override;
36+
size_t AddServersInBatch(const std::vector<ServerId>& servers) override;
37+
size_t RemoveServersInBatch(const std::vector<ServerId>& servers) override;
38+
int SelectServer(const SelectIn& in, SelectOut* out) override;
39+
LoadBalancer* New(const butil::StringPiece&) const override;
40+
void Destroy() override;
41+
void Describe(std::ostream& os, const DescribeOptions&) override;
4242

4343
struct Server {
44-
Server(SocketId s_id = 0, uint32_t s_w = 0, uint64_t s_c_w_s = 0): id(s_id), weight(s_w), current_weight_sum(s_c_w_s) {}
44+
Server(SocketId s_id = 0, uint32_t s_w = 0, uint64_t s_c_w_s = 0)
45+
: id(s_id), weight(s_w), current_weight_sum(s_c_w_s) {}
4546
SocketId id;
4647
uint32_t weight;
4748
uint64_t current_weight_sum;
@@ -60,6 +61,10 @@ class WeightedRandomizedLoadBalancer : public LoadBalancer {
6061
static bool Remove(Servers& bg, const ServerId& id);
6162
static size_t BatchAdd(Servers& bg, const std::vector<ServerId>& servers);
6263
static size_t BatchRemove(Servers& bg, const std::vector<ServerId>& servers);
64+
static bool IsIdSelected(const SelectIn& in, SocketId id, SocketUniquePtr* out) {
65+
return !ExcludedServers::IsExcluded(in.excluded, id) &&
66+
Socket::Address(id, out) == 0 && (*out)->IsAvailable();
67+
}
6368

6469
butil::DoublyBufferedData<Servers> _db_servers;
6570
};

0 commit comments

Comments
 (0)