Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/brpc/load_balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,13 @@ inline Extension<const LoadBalancer>* LoadBalancerExtension() {
return Extension<const LoadBalancer>::instance();
}

inline uint32_t GenRandomStride() {
uint32_t prime_offset[] = {
#include "bthread/offset_inl.list"
};
return prime_offset[butil::fast_rand_less_than(ARRAY_SIZE(prime_offset))];
}

} // namespace brpc


Expand Down
16 changes: 8 additions & 8 deletions src/brpc/policy/dynpart_load_balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ namespace policy {

class DynPartLoadBalancer : public LoadBalancer {
public:
bool AddServer(const ServerId& id);
bool RemoveServer(const ServerId& id);
size_t AddServersInBatch(const std::vector<ServerId>& servers);
size_t RemoveServersInBatch(const std::vector<ServerId>& servers);
int SelectServer(const SelectIn& in, SelectOut* out);
DynPartLoadBalancer* New(const butil::StringPiece&) const;
void Destroy();
void Describe(std::ostream&, const DescribeOptions& options);
bool AddServer(const ServerId& id) override;
bool RemoveServer(const ServerId& id) override;
size_t AddServersInBatch(const std::vector<ServerId>& servers) override;
size_t RemoveServersInBatch(const std::vector<ServerId>& servers) override;
int SelectServer(const SelectIn& in, SelectOut* out) override;
DynPartLoadBalancer* New(const butil::StringPiece&) const override;
void Destroy() override;
void Describe(std::ostream&, const DescribeOptions& options) override;

private:
struct Servers {
Expand Down
20 changes: 10 additions & 10 deletions src/brpc/policy/locality_aware_load_balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,16 @@ DECLARE_double(punish_inflight_ratio);
class LocalityAwareLoadBalancer : public LoadBalancer {
public:
LocalityAwareLoadBalancer();
~LocalityAwareLoadBalancer();
bool AddServer(const ServerId& id);
bool RemoveServer(const ServerId& id);
size_t AddServersInBatch(const std::vector<ServerId>& servers);
size_t RemoveServersInBatch(const std::vector<ServerId>& servers);
LocalityAwareLoadBalancer* New(const butil::StringPiece&) const;
void Destroy();
int SelectServer(const SelectIn& in, SelectOut* out);
void Feedback(const CallInfo& info);
void Describe(std::ostream& os, const DescribeOptions& options);
~LocalityAwareLoadBalancer() override;
bool AddServer(const ServerId& id) override;
bool RemoveServer(const ServerId& id) override;
size_t AddServersInBatch(const std::vector<ServerId>& servers) override;
size_t RemoveServersInBatch(const std::vector<ServerId>& servers) override;
LocalityAwareLoadBalancer* New(const butil::StringPiece&) const override;
void Destroy() override;
int SelectServer(const SelectIn& in, SelectOut* out) override;
void Feedback(const CallInfo& info) override;
void Describe(std::ostream& os, const DescribeOptions& options) override;

private:
struct TimeInfo {
Expand Down
8 changes: 0 additions & 8 deletions src/brpc/policy/randomized_load_balancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,6 @@
namespace brpc {
namespace policy {

const uint32_t prime_offset[] = {
#include "bthread/offset_inl.list"
};

inline uint32_t GenRandomStride() {
return prime_offset[butil::fast_rand_less_than(ARRAY_SIZE(prime_offset))];
}

bool RandomizedLoadBalancer::Add(Servers& bg, const ServerId& id) {
if (bg.server_list.capacity() < 128) {
bg.server_list.reserve(128);
Expand Down
16 changes: 8 additions & 8 deletions src/brpc/policy/randomized_load_balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ namespace policy {
// than RoundRobinLoadBalancer.
class RandomizedLoadBalancer : public LoadBalancer {
public:
bool AddServer(const ServerId& id);
bool RemoveServer(const ServerId& id);
size_t AddServersInBatch(const std::vector<ServerId>& servers);
size_t RemoveServersInBatch(const std::vector<ServerId>& servers);
int SelectServer(const SelectIn& in, SelectOut* out);
RandomizedLoadBalancer* New(const butil::StringPiece&) const;
void Destroy();
void Describe(std::ostream& os, const DescribeOptions&);
bool AddServer(const ServerId& id) override;
bool RemoveServer(const ServerId& id) override;
size_t AddServersInBatch(const std::vector<ServerId>& servers) override;
size_t RemoveServersInBatch(const std::vector<ServerId>& servers) override;
int SelectServer(const SelectIn& in, SelectOut* out) override;
RandomizedLoadBalancer* New(const butil::StringPiece&) const override;
void Destroy() override;
void Describe(std::ostream& os, const DescribeOptions&) override;

private:
struct Servers {
Expand Down
8 changes: 0 additions & 8 deletions src/brpc/policy/round_robin_load_balancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,6 @@
namespace brpc {
namespace policy {

const uint32_t prime_offset[] = {
#include "bthread/offset_inl.list"
};

inline uint32_t GenRandomStride() {
return prime_offset[butil::fast_rand_less_than(ARRAY_SIZE(prime_offset))];
}

bool RoundRobinLoadBalancer::Add(Servers& bg, const ServerId& id) {
if (bg.server_list.capacity() < 128) {
bg.server_list.reserve(128);
Expand Down
16 changes: 8 additions & 8 deletions src/brpc/policy/round_robin_load_balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ namespace policy {
// at the same time) are very close.
class RoundRobinLoadBalancer : public LoadBalancer {
public:
bool AddServer(const ServerId& id);
bool RemoveServer(const ServerId& id);
size_t AddServersInBatch(const std::vector<ServerId>& servers);
size_t RemoveServersInBatch(const std::vector<ServerId>& servers);
int SelectServer(const SelectIn& in, SelectOut* out);
RoundRobinLoadBalancer* New(const butil::StringPiece&) const;
void Destroy();
void Describe(std::ostream&, const DescribeOptions& options);
bool AddServer(const ServerId& id) override;
bool RemoveServer(const ServerId& id) override;
size_t AddServersInBatch(const std::vector<ServerId>& servers) override;
size_t RemoveServersInBatch(const std::vector<ServerId>& servers) override;
int SelectServer(const SelectIn& in, SelectOut* out) override;
RoundRobinLoadBalancer* New(const butil::StringPiece&) const override;
void Destroy() override;
void Describe(std::ostream&, const DescribeOptions& options) override;

private:
struct Servers {
Expand Down
57 changes: 45 additions & 12 deletions src/brpc/policy/weighted_randomized_load_balancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@
namespace brpc {
namespace policy {

static bool server_compare(const WeightedRandomizedLoadBalancer::Server& lhs, const WeightedRandomizedLoadBalancer::Server& rhs) {
return (lhs.current_weight_sum < rhs.current_weight_sum);
static bool server_compare(const WeightedRandomizedLoadBalancer::Server& lhs,
const WeightedRandomizedLoadBalancer::Server& rhs) {
return lhs.current_weight_sum < rhs.current_weight_sum;
}

bool WeightedRandomizedLoadBalancer::Add(Servers& bg, const ServerId& id) {
Expand All @@ -38,15 +39,16 @@ bool WeightedRandomizedLoadBalancer::Add(Servers& bg, const ServerId& id) {
if (!butil::StringToUint(id.tag, &weight) || weight <= 0) {
if (FLAGS_default_weight_of_wlb > 0) {
LOG(WARNING) << "Invalid weight is set: " << id.tag
<< ". Now, 'weight' has been set to 'FLAGS_default_weight_of_wlb' by default.";
<< ". Now, 'weight' has been set to "
"FLAGS_default_weight_of_wlb by default.";
weight = FLAGS_default_weight_of_wlb;
} else {
LOG(ERROR) << "Invalid weight is set: " << id.tag;
return false;
}
}
bool insert_server =
bg.server_map.emplace(id.id, bg.server_list.size()).second;
bg.server_map.emplace(id.id, bg.server_list.size()).second;
if (insert_server) {
uint64_t current_weight_sum = bg.weight_sum + weight;
bg.server_list.emplace_back(id.id, weight, current_weight_sum);
Expand Down Expand Up @@ -114,6 +116,10 @@ size_t WeightedRandomizedLoadBalancer::RemoveServersInBatch(
return _db_servers.Modify(BatchRemove, servers);
}

bool WeightedRandomizedLoadBalancer::IsServerAvailable(SocketId id, SocketUniquePtr* out) {
return Socket::Address(id, out) == 0 && (*out)->IsAvailable();
}

int WeightedRandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
butil::DoublyBufferedData<Servers>::ScopedPtr s;
if (_db_servers.Read(&s) != 0) {
Expand All @@ -123,22 +129,49 @@ int WeightedRandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut*
if (n == 0) {
return ENODATA;
}

butil::FlatSet<SocketId> random_traversed;
uint64_t weight_sum = s->weight_sum;
for (size_t i = 0; i < n; ++i) {
uint64_t random_weight = butil::fast_rand_less_than(weight_sum);
const Server random_server(0, 0, random_weight);
const auto& server = std::lower_bound(s->server_list.begin(), s->server_list.end(), random_server, server_compare);
const auto& server =
std::lower_bound(s->server_list.begin(), s->server_list.end(),
random_server, server_compare);
const SocketId id = server->id;
if (((i + 1) == n // always take last chance
|| !ExcludedServers::IsExcluded(in.excluded, id))
&& Socket::Address(id, out->ptr) == 0
&& (*out->ptr)->IsAvailable()) {
// We found an available server
if (ExcludedServers::IsExcluded(in.excluded, id)) {
continue;
}
random_traversed.insert(id);
if (0 == IsServerAvailable(id, out->ptr)) {
// An available server is found.
return 0;
}
}
// After we traversed the whole server list, there is still no
// available server

if (random_traversed.size() == n) {
// Try to traverse the remaining servers to find an available server.
uint32_t offset = butil::fast_rand_less_than(n);
uint32_t stride = GenRandomStride();
for (size_t i = 0; i < n; ++i) {
offset = (offset + stride) % n;
SocketId id = s->server_list[offset].id;
if (NULL != random_traversed.seek(id)) {
continue;
}
if (IsServerAvailable(id, out->ptr)) {
// An available server is found.
return 0;
}
}
}

if (NULL != out->ptr) {
// Use the excluded but available server.
return 0;
}

// After traversing the whole server list, no available server is found.
return EHOSTDOWN;
}

Expand Down
20 changes: 11 additions & 9 deletions src/brpc/policy/weighted_randomized_load_balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,18 @@ namespace policy {
// Weight is got from tag of ServerId.
class WeightedRandomizedLoadBalancer : public LoadBalancer {
public:
bool AddServer(const ServerId& id);
bool RemoveServer(const ServerId& id);
size_t AddServersInBatch(const std::vector<ServerId>& servers);
size_t RemoveServersInBatch(const std::vector<ServerId>& servers);
int SelectServer(const SelectIn& in, SelectOut* out);
LoadBalancer* New(const butil::StringPiece&) const;
void Destroy();
void Describe(std::ostream& os, const DescribeOptions&);
bool AddServer(const ServerId& id) override;
bool RemoveServer(const ServerId& id) override;
size_t AddServersInBatch(const std::vector<ServerId>& servers) override;
size_t RemoveServersInBatch(const std::vector<ServerId>& servers) override;
int SelectServer(const SelectIn& in, SelectOut* out) override;
LoadBalancer* New(const butil::StringPiece&) const override;
void Destroy() override;
void Describe(std::ostream& os, const DescribeOptions&) override;

struct Server {
Server(SocketId s_id = 0, uint32_t s_w = 0, uint64_t s_c_w_s = 0): id(s_id), weight(s_w), current_weight_sum(s_c_w_s) {}
Server(SocketId s_id = 0, uint32_t s_w = 0, uint64_t s_c_w_s = 0)
: id(s_id), weight(s_w), current_weight_sum(s_c_w_s) {}
SocketId id;
uint32_t weight;
uint64_t current_weight_sum;
Expand All @@ -60,6 +61,7 @@ class WeightedRandomizedLoadBalancer : public LoadBalancer {
static bool Remove(Servers& bg, const ServerId& id);
static size_t BatchAdd(Servers& bg, const std::vector<ServerId>& servers);
static size_t BatchRemove(Servers& bg, const std::vector<ServerId>& servers);
static bool IsServerAvailable(SocketId id, SocketUniquePtr* out);

butil::DoublyBufferedData<Servers> _db_servers;
};
Expand Down
6 changes: 3 additions & 3 deletions src/brpc/policy/weighted_round_robin_load_balancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ uint64_t GetStride(const uint64_t weight_sum, const size_t num) {
return 1;
}
uint32_t average_weight = weight_sum / num;
auto iter = std::lower_bound(prime_stride.begin(), prime_stride.end(),
average_weight);
auto iter = std::lower_bound(
prime_stride.begin(), prime_stride.end(), average_weight);
while (iter != prime_stride.end()
&& !IsCoprime(weight_sum, *iter)) {
++iter;
Expand Down Expand Up @@ -197,7 +197,7 @@ int WeightedRoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut*
}
filter.emplace(server_id);
remain_weight -= (s->server_list[s->server_map.at(server_id)]).weight;
// Select from begining status.
// Select from beginning status.
tls_temp.stride = GetStride(remain_weight, remain_servers);
tls_temp.position = tls.position;
tls_temp.remain_server = tls.remain_server;
Expand Down
16 changes: 8 additions & 8 deletions src/brpc/policy/weighted_round_robin_load_balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ namespace policy {
// Weight is got from tag of ServerId.
class WeightedRoundRobinLoadBalancer : public LoadBalancer {
public:
bool AddServer(const ServerId& id);
bool RemoveServer(const ServerId& id);
size_t AddServersInBatch(const std::vector<ServerId>& servers);
size_t RemoveServersInBatch(const std::vector<ServerId>& servers);
int SelectServer(const SelectIn& in, SelectOut* out);
LoadBalancer* New(const butil::StringPiece&) const;
void Destroy();
void Describe(std::ostream&, const DescribeOptions& options);
bool AddServer(const ServerId& id) override;
bool RemoveServer(const ServerId& id) override;
size_t AddServersInBatch(const std::vector<ServerId>& servers) override;
size_t RemoveServersInBatch(const std::vector<ServerId>& servers) override;
int SelectServer(const SelectIn& in, SelectOut* out) override;
LoadBalancer* New(const butil::StringPiece&) const override;
void Destroy() override;
void Describe(std::ostream&, const DescribeOptions& options) override;

private:
struct Server {
Expand Down
5 changes: 5 additions & 0 deletions test/bthread_countdown_event_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,22 @@ void *signaler(void *arg) {
}

TEST(CountdonwEventTest, sanity) {
std::vector<bthread_t> tids;
for (int n = 1; n < 10; ++n) {
Arg a;
a.num_sig = n;
a.event.reset(n);
for (int i = 0; i < n; ++i) {
bthread_t tid;
ASSERT_EQ(0, bthread_start_urgent(&tid, NULL, signaler, &a));
tids.push_back(tid);
}
a.event.wait();
ASSERT_EQ(0, a.num_sig.load(butil::memory_order_relaxed));
}
for (size_t i = 0; i < tids.size(); ++i) {
bthread_join(tids[i], NULL);
}
}

TEST(CountdonwEventTest, timed_wait) {
Expand Down
Loading