Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/brpc/policy/locality_aware_load_balancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

#include <limits> // numeric_limits
#include <gflags/gflags.h>
#include "butil/time.h" // cpuwide_time_us
#include "butil/time.h" // gettimeofday_us
#include "butil/fast_rand.h"
#include "brpc/log.h"
#include "brpc/socket.h"
Expand Down Expand Up @@ -376,7 +376,7 @@ void LocalityAwareLoadBalancer::Feedback(const CallInfo& info) {

int64_t LocalityAwareLoadBalancer::Weight::Update(
const CallInfo& ci, size_t index) {
const int64_t end_time_us = butil::cpuwide_time_us();
const int64_t end_time_us = butil::gettimeofday_us();
const int64_t latency = end_time_us - ci.begin_time_us;
BAIDU_SCOPED_LOCK(_mutex);
if (Disabled()) {
Expand Down Expand Up @@ -524,7 +524,7 @@ void LocalityAwareLoadBalancer::Describe(
if (_db_servers.Read(&s) != 0) {
os << "fail to read _db_servers";
} else {
const int64_t now = butil::cpuwide_time_us();
const int64_t now = butil::gettimeofday_us();
const size_t n = s->weight_tree.size();
os << '[';
for (size_t i = 0; i < n; ++i) {
Expand Down
77 changes: 77 additions & 0 deletions test/brpc_load_balancer_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1303,6 +1303,83 @@ TEST_F(LoadBalancerTest, revived_from_all_failed_intergrated) {
}
#endif // BUTIL_USE_ASAN

// Regression for #3268's incomplete migration of LocalityAwareLoadBalancer.
//
// #3268 switched `LocalityAwareLoadBalancer::Weight::Update::end_time_us` and
// `LocalityAwareLoadBalancer::Describe::now` to `butil::cpuwide_time_us()`
// while every caller that supplies `CallInfo::begin_time_us` (the RPC entry
// in `Channel::CallMethod` and the retry sites in
// `Controller::OnVersionedRPCReturned`) still uses `butil::gettimeofday_us()`.
// The resulting time-source mismatch makes
//
// latency = end_time_us - ci.begin_time_us
// = cpuwide_now - wallclock_begin
// ~= -1.7e15 us (huge negative)
//
// trigger the
//
// if (latency <= 0) { /* time skews, ignore the sample */ return 0; }
//
// short-circuit on every call. `_time_q` never accumulates samples,
// `_avg_latency` stays at 0, and locality-aware weight feedback is silently
// disabled. Visible downstream symptom: cold-start `list://` channels with
// `lb=la` and 2 backends occasionally fail RPCs with `EHOSTDOWN`
// ("Fail to select server") on retry even when one backend is healthy.
//
// This commit reverts the LA side of #3268, so `Weight::Update` and
// `Describe` once again use `butil::gettimeofday_us()` to match every
// existing caller of `CallInfo::begin_time_us`.
//
// The test below runs entirely against `LocalityAwareLoadBalancer` (no
// Server / Channel is involved), so it is hermetic. It supplies a
// gettimeofday-based `begin_time_us` (matching what `Channel::CallMethod`
// passes today) and asserts that the LB records a positive `_avg_latency`,
// rather than tripping the time-skew short-circuit.
TEST_F(LoadBalancerTest, la_records_latency_with_consistent_time_source) {
LALB lalb;
char addr[] = "192.168.1.1:8080";
butil::EndPoint dummy;
ASSERT_EQ(0, str2endpoint(addr, &dummy));
brpc::ServerId id(8888);
brpc::SocketOptions options;
options.remote_side = dummy;
ASSERT_EQ(0, brpc::Socket::Create(options, &id.id));
ASSERT_TRUE(lalb.AddServer(id));

auto avg_latency = [&]() -> int64_t {
std::ostringstream os;
brpc::DescribeOptions opts;
opts.verbose = true;
lalb.Describe(os, opts);
const std::string s = os.str();
const size_t p = s.find("avg_latency=");
if (p == std::string::npos) return -1;
return strtoll(s.c_str() + p + strlen("avg_latency="), NULL, 10);
};

// Drive a few "RPCs": pick a server, sleep ~2ms, feed back. begin_time_us
// comes from gettimeofday_us(), matching what Channel::CallMethod and the
// retry sites in Controller::OnVersionedRPCReturned pass on every RPC.
for (int i = 0; i < 8; ++i) {
const int64_t begin_us = butil::gettimeofday_us();
brpc::SocketUniquePtr ptr;
brpc::LoadBalancer::SelectIn in = { begin_us, true, false, 0u, NULL };
brpc::LoadBalancer::SelectOut out(&ptr);
ASSERT_EQ(0, lalb.SelectServer(in, &out));
bthread_usleep(2000);
brpc::LoadBalancer::CallInfo ci = { begin_us, id.id, 0, NULL };
lalb.Feedback(ci);
}

// _avg_latency must reflect actual elapsed time. If this is 0, either
// Weight::Update::end_time_us was changed away from gettimeofday_us
// again (re-introducing the time-source mismatch) or some caller of
// CallInfo::begin_time_us drifted to a different clock domain.
EXPECT_GT(avg_latency(), 0);

ASSERT_EQ(0, brpc::Socket::SetFailed(id.id));
}

TEST_F(LoadBalancerTest, la_selection_too_long) {
brpc::GlobalInitializeOrDie();
brpc::LoadBalancerWithNaming lb;
Expand Down
Loading