Skip to content

Commit 3d386c8

Browse files
wbpcodeCopilot
andauthored
lb: unified the lifetime mechanism of worker local lb (envoyproxy#45100)
Commit Message: lb: unify the lifetime mechanism of worker local lb Additional Description: In the previous implementation, the thread aware lb will recreate the worker local lb when endpoint set changes. This PR updated the implementation of thread aware lb to update the worker local lb in place rather than to create a new one. The new behavior is same with all other LB implementation. Risk Level: mid. Testing: unit. Docs Changes: n/a. Release Notes: n/a. Platform Specific Features: n/a. --------- Signed-off-by: wbpcode/wangbaiping <wbphub@gmail.com> Signed-off-by: code <wbphub@gmail.com> Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
1 parent 86ae80b commit 3d386c8

4 files changed

Lines changed: 129 additions & 20 deletions

File tree

source/extensions/load_balancing_policies/common/thread_aware_lb_impl.cc

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -236,16 +236,30 @@ ThreadAwareLoadBalancerBase::LoadBalancerImpl::chooseHost(LoadBalancerContext* c
236236
return host;
237237
}
238238

239-
LoadBalancerPtr ThreadAwareLoadBalancerBase::LoadBalancerFactoryImpl::create(LoadBalancerParams) {
240-
auto lb = std::make_unique<LoadBalancerImpl>(stats_, random_, hash_policy_);
241-
242-
// We must protect current_lb_ via a RW lock since it is accessed and written to by multiple
243-
// threads. All complex processing has already been precalculated however.
244-
absl::ReaderMutexLock lock(mutex_);
245-
lb->healthy_per_priority_load_ = healthy_per_priority_load_;
246-
lb->degraded_per_priority_load_ = degraded_per_priority_load_;
247-
lb->per_priority_state_ = per_priority_state_;
248-
return lb;
239+
void ThreadAwareLoadBalancerBase::LoadBalancerImpl::refresh() {
240+
// The per priority state is shared across all threads and refreshed on main thread. We need to
241+
// copy the latest per priority state to the worker thread load balancer instance under lock.
242+
absl::ReaderMutexLock lock(factory_->mutex_);
243+
healthy_per_priority_load_ = factory_->healthy_per_priority_load_;
244+
degraded_per_priority_load_ = factory_->degraded_per_priority_load_;
245+
per_priority_state_ = factory_->per_priority_state_;
246+
}
247+
248+
ThreadAwareLoadBalancerBase::LoadBalancerImpl::LoadBalancerImpl(
249+
std::shared_ptr<LoadBalancerFactoryImpl> factory, ClusterLbStats& stats,
250+
Random::RandomGenerator& random, HashPolicySharedPtr hash_policy,
251+
const Upstream::PrioritySet& priority_set)
252+
: factory_(std::move(factory)), stats_(stats), random_(random),
253+
hash_policy_(std::move(hash_policy)) {
254+
member_update_cb_ =
255+
priority_set.addMemberUpdateCb([this](const HostVector&, const HostVector&) { refresh(); });
256+
refresh();
257+
}
258+
259+
LoadBalancerPtr
260+
ThreadAwareLoadBalancerBase::LoadBalancerFactoryImpl::create(LoadBalancerParams params) {
261+
return std::make_unique<LoadBalancerImpl>(shared_from_this(), stats_, random_, hash_policy_,
262+
params.priority_set);
249263
}
250264

251265
double ThreadAwareLoadBalancerBase::BoundedLoadHashingLoadBalancer::hostOverloadFactor(

source/extensions/load_balancing_policies/common/thread_aware_lb_impl.h

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -126,10 +126,11 @@ class ThreadAwareLoadBalancerBase : public LoadBalancerBase, public ThreadAwareL
126126
};
127127
using PerPriorityStatePtr = std::unique_ptr<PerPriorityState>;
128128

129+
struct LoadBalancerFactoryImpl;
129130
struct LoadBalancerImpl : public LoadBalancer {
130-
LoadBalancerImpl(ClusterLbStats& stats, Random::RandomGenerator& random,
131-
HashPolicySharedPtr hash_policy)
132-
: stats_(stats), random_(random), hash_policy_(std::move(hash_policy)) {}
131+
LoadBalancerImpl(std::shared_ptr<LoadBalancerFactoryImpl> factory, ClusterLbStats& stats,
132+
Random::RandomGenerator& random, HashPolicySharedPtr hash_policy,
133+
const Upstream::PrioritySet& priority_set);
133134

134135
// Upstream::LoadBalancer
135136
HostSelectionResponse chooseHost(LoadBalancerContext* context) override;
@@ -145,23 +146,32 @@ class ThreadAwareLoadBalancerBase : public LoadBalancerBase, public ThreadAwareL
145146
return {};
146147
}
147148

149+
void refresh();
150+
151+
std::shared_ptr<LoadBalancerFactoryImpl> factory_;
152+
148153
ClusterLbStats& stats_;
149154
Random::RandomGenerator& random_;
150155
HashPolicySharedPtr hash_policy_;
151156

152157
std::shared_ptr<std::vector<PerPriorityStatePtr>> per_priority_state_;
153158
std::shared_ptr<HealthyLoad> healthy_per_priority_load_;
154159
std::shared_ptr<DegradedLoad> degraded_per_priority_load_;
160+
161+
Common::CallbackHandlePtr member_update_cb_;
155162
};
156163

157-
struct LoadBalancerFactoryImpl : public LoadBalancerFactory {
164+
struct LoadBalancerFactoryImpl : public LoadBalancerFactory,
165+
public std::enable_shared_from_this<LoadBalancerFactoryImpl> {
158166
LoadBalancerFactoryImpl(ClusterLbStats& stats, Random::RandomGenerator& random,
159167
std::shared_ptr<Http::HashPolicy> hash_policy)
160168
: stats_(stats), random_(random), hash_policy_(std::move(hash_policy)) {}
161169

162170
// Upstream::LoadBalancerFactory
163-
// Ignore the params for the thread-aware LB.
171+
// Uses the per-worker params to create the thread-aware LB instance, including the worker
172+
// priority_set used to register member-update callbacks.
164173
LoadBalancerPtr create(LoadBalancerParams) override;
174+
bool recreateOnHostChange() const override { return false; }
165175

166176
ClusterLbStats& stats_;
167177
Random::RandomGenerator& random_;

test/common/upstream/deferred_cluster_initialization_test.cc

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -546,11 +546,9 @@ TEST_P(EdsTest, ShouldNotMergeAddingHostsForDifferentClustersWithSameName) {
546546
addEndpoint(cluster_load_assignment, 1000);
547547
doOnConfigUpdateVerifyNoThrow(cluster_load_assignment);
548548

549-
auto initiailization_instance =
549+
auto initialization_instance =
550550
cluster_manager_->clusterInitializationMap().find("cluster_1")->second;
551-
EXPECT_NE(nullptr, initiailization_instance->load_balancer_factory_);
552-
// RING_HASH lb policy requires Envoy re-create the load balancer when the cluster is updated.
553-
EXPECT_TRUE(initiailization_instance->load_balancer_factory_->recreateOnHostChange());
551+
EXPECT_NE(nullptr, initialization_instance->load_balancer_factory_);
554552

555553
// Update the cluster with a different lb policy. Now it's a different cluster and should
556554
// not be merged.
@@ -565,7 +563,7 @@ TEST_P(EdsTest, ShouldNotMergeAddingHostsForDifferentClustersWithSameName) {
565563

566564
auto new_initialization_instance =
567565
cluster_manager_->clusterInitializationMap().find("cluster_1")->second;
568-
EXPECT_NE(initiailization_instance.get(), new_initialization_instance.get());
566+
EXPECT_NE(initialization_instance.get(), new_initialization_instance.get());
569567

570568
EXPECT_EQ(1, new_initialization_instance->per_priority_state_.at(1).hosts_added_.size());
571569
// Ensure the hosts_added_ is empty for priority 0. Because if unexpected merge happens,

test/extensions/load_balancing_policies/maglev/maglev_lb_test.cc

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,93 @@ TEST_F(MaglevLoadBalancerTest, LbDestructedBeforeFactory) {
135135
EXPECT_NE(nullptr, factory->create(lb_params_));
136136
}
137137

138+
// The thread-aware factory updates the worker LB in place via a member update callback on the
139+
// worker priority set, so the cluster manager should not recreate the worker LB on host changes.
140+
TEST_F(MaglevLoadBalancerTest, FactoryDoesNotRecreateOnHostChange) {
141+
init(7);
142+
EXPECT_FALSE(lb_->factory()->recreateOnHostChange());
143+
}
144+
145+
// Worker LB instances pick up new factory state when the worker priority set fires a member
146+
// update, without needing to be recreated.
147+
TEST_F(MaglevLoadBalancerTest, WorkerLbRefreshesOnMemberUpdate) {
148+
host_set_.hosts_ = {makeTestHost(info_, "tcp://127.0.0.1:90")};
149+
host_set_.healthy_hosts_ = host_set_.hosts_;
150+
host_set_.runCallbacks({}, {});
151+
init(7);
152+
153+
// The worker LB is created once and keeps a reference to the factory.
154+
LoadBalancerPtr lb = lb_->factory()->create(lb_params_);
155+
EXPECT_EQ(host_set_.hosts_[0], lb->chooseHost(nullptr).host);
156+
157+
// Replace the host set. host_set_.runCallbacks fires the main priority set's callbacks, which
158+
// recomputes the factory state on the main thread.
159+
auto new_host = makeTestHost(info_, "tcp://127.0.0.1:91");
160+
host_set_.hosts_ = {new_host};
161+
host_set_.healthy_hosts_ = host_set_.hosts_;
162+
host_set_.runCallbacks({}, {});
163+
164+
// The worker LB still references the previous per-priority state until its own priority set
165+
// fires the member update.
166+
EXPECT_EQ("127.0.0.1:90", lb->chooseHost(nullptr).host->address()->asString());
167+
168+
// Simulate the worker priority set firing the member update on the worker. The worker LB must
169+
// refresh its cached state from the factory.
170+
worker_priority_set_.member_update_cb_helper_.runCallbacks({}, {});
171+
172+
EXPECT_EQ(new_host, lb->chooseHost(nullptr).host);
173+
}
174+
175+
// Multiple worker LBs share the same factory; each refreshes independently when its own priority
176+
// set fires a member update.
177+
TEST_F(MaglevLoadBalancerTest, MultipleWorkerLbsRefreshIndependently) {
178+
host_set_.hosts_ = {makeTestHost(info_, "tcp://127.0.0.1:90")};
179+
host_set_.healthy_hosts_ = host_set_.hosts_;
180+
host_set_.runCallbacks({}, {});
181+
init(7);
182+
183+
NiceMock<MockPrioritySet> worker_priority_set_a;
184+
NiceMock<MockPrioritySet> worker_priority_set_b;
185+
LoadBalancerParams params_a{worker_priority_set_a, {}};
186+
LoadBalancerParams params_b{worker_priority_set_b, {}};
187+
188+
LoadBalancerPtr lb_a = lb_->factory()->create(params_a);
189+
LoadBalancerPtr lb_b = lb_->factory()->create(params_b);
190+
191+
EXPECT_EQ(host_set_.hosts_[0], lb_a->chooseHost(nullptr).host);
192+
EXPECT_EQ(host_set_.hosts_[0], lb_b->chooseHost(nullptr).host);
193+
194+
auto new_host = makeTestHost(info_, "tcp://127.0.0.1:91");
195+
host_set_.hosts_ = {new_host};
196+
host_set_.healthy_hosts_ = host_set_.hosts_;
197+
host_set_.runCallbacks({}, {});
198+
199+
// Only refresh worker A; worker B keeps the previous state until its own priority set fires.
200+
worker_priority_set_a.member_update_cb_helper_.runCallbacks({}, {});
201+
EXPECT_EQ(new_host, lb_a->chooseHost(nullptr).host);
202+
EXPECT_EQ("127.0.0.1:90", lb_b->chooseHost(nullptr).host->address()->asString());
203+
204+
worker_priority_set_b.member_update_cb_helper_.runCallbacks({}, {});
205+
EXPECT_EQ(new_host, lb_b->chooseHost(nullptr).host);
206+
}
207+
208+
// The worker LB unregisters its member update callback on destruction; firing the worker priority
209+
// set's callback after the LB is gone must not touch freed memory.
210+
TEST_F(MaglevLoadBalancerTest, WorkerLbCallbackUnregisteredOnDestruction) {
211+
host_set_.hosts_ = {makeTestHost(info_, "tcp://127.0.0.1:90")};
212+
host_set_.healthy_hosts_ = host_set_.hosts_;
213+
host_set_.runCallbacks({}, {});
214+
init(7);
215+
216+
LoadBalancerPtr lb = lb_->factory()->create(lb_params_);
217+
EXPECT_EQ(host_set_.hosts_[0], lb->chooseHost(nullptr).host);
218+
219+
lb.reset();
220+
221+
// Must be a no-op rather than calling into freed memory.
222+
worker_priority_set_.member_update_cb_helper_.runCallbacks({}, {});
223+
}
224+
138225
// Throws an exception if table size is not a prime number.
139226
TEST_F(MaglevLoadBalancerTest, NoPrimeNumber) {
140227
EXPECT_THROW_WITH_MESSAGE(init(8), EnvoyException,

0 commit comments

Comments
 (0)