Skip to content

Commit db12d3e

Browse files
committed
policy: Switch to delta mode when possible
Switch to delta mode more eagerly when we have evidence that the agent is capable, but switch to SotW mode only when xDS stream transport had failed to connect or closes. Signed-off-by: Jarno Rajahalme <jarno@isovalent.com>
1 parent e090d1d commit db12d3e

2 files changed

Lines changed: 152 additions & 24 deletions

File tree

cilium/network_policy.cc

Lines changed: 43 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -253,8 +253,8 @@ class ResourceMapOverlay {
253253
}
254254
const auto* selector_entry = entry->selectorResourceEntry();
255255
if (selector_entry == nullptr || selector_entry->handle == nullptr) {
256-
throw EnvoyException(
257-
fmt::format("NetworkPolicyResource rule references non-selector resource '{}'", selector));
256+
throw EnvoyException(fmt::format(
257+
"NetworkPolicyResource rule references non-selector resource '{}'", selector));
258258
}
259259
return selector_entry->handle;
260260
}
@@ -407,7 +407,7 @@ class NetworkPolicyMapImpl : public Envoy::Config::SubscriptionCallbacks,
407407
if (!subscription_connected_ && subscription_ != nullptr) {
408408
subscription_connected_ = grpcStreamConnected(subscription_.get());
409409
}
410-
maybeRecreateSubscriptionInDesiredMode();
410+
maybeRecreateSubscriptionInDesiredMode(/*transport_closed=*/false);
411411
}
412412

413413
protected:
@@ -450,27 +450,57 @@ class NetworkPolicyMapImpl : public Envoy::Config::SubscriptionCallbacks,
450450
}
451451

452452
void onSubscriptionTransportEstablished(uint64_t subscription_id) {
453-
++subscription_stream_generation_;
454-
453+
// skip stale notifications for earlier subscriptions
455454
if (subscription_id != subscription_id_) {
456455
return;
457456
}
457+
++subscription_stream_generation_;
458+
458459
subscription_connected_ = true;
459460
}
460461

461462
void onSubscriptionTransportClosed(uint64_t subscription_id) {
463+
// skip stale notifications for earlier subscriptions
462464
if (subscription_id != subscription_id_) {
463465
return;
464466
}
465467
subscription_connected_ = false;
466-
maybeRecreateSubscriptionInDesiredMode();
467-
}
468468

469-
void maybeRecreateSubscriptionInDesiredMode() {
470-
if (subscription_ == nullptr || subscription_connected_ ||
471-
desired_use_delta_xds_ == subscription_use_delta_xds_) {
469+
// Test code executes synchronously
470+
if (subscription_factory_for_test_) {
471+
maybeRecreateSubscriptionInDesiredMode(/*transport_closed=*/true);
472472
return;
473473
}
474+
475+
// The close callback runs on the subscription object's own stack, so defer any possible
476+
// recreation until after it unwinds to avoid destroying the current subscription mid-callback.
477+
context_.mainThreadDispatcher().post(
478+
[weak_this = weak_from_this(), subscription_id = subscription_id_]() {
479+
if (auto shared_this = weak_this.lock()) {
480+
// skip stale callbacks for earlier subscriptions
481+
if (subscription_id != shared_this->subscription_id_) {
482+
return;
483+
}
484+
shared_this->maybeRecreateSubscriptionInDesiredMode(/*transport_closed=*/true);
485+
}
486+
});
487+
}
488+
489+
void maybeRecreateSubscriptionInDesiredMode(bool transport_closed) {
490+
// only ever skip subscribe if we have a subscription already, and it is already connected in
491+
// delta or desired mode, or still connecting in desired mode.
492+
if (subscription_ && (subscription_connected_ || !transport_closed)) {
493+
if (subscription_connected_ && subscription_use_delta_xds_) {
494+
// Keep delta on a connected subscription until transport closes.
495+
return;
496+
}
497+
if (subscription_use_delta_xds_ == desired_use_delta_xds_) {
498+
// Let the current subscription keep going when it is already in the desired mode.
499+
return;
500+
}
501+
}
502+
503+
// Recreate the subscription in the latest desired mode.
474504
subscribe();
475505
}
476506

@@ -2618,10 +2648,9 @@ absl::Status NetworkPolicyMapImpl::onConfigUpdate(
26182648
ENVOY_LOG(trace, "Cilium removing NetworkPolicyResource selector {}", resource);
26192649
const auto* resource_entry = pending_resource_map.findEntry(resource);
26202650
if (resource_entry == nullptr) {
2621-
ENVOY_LOG(
2622-
debug,
2623-
"NetworkPolicyResource removed selector name '{}' not found from resource map",
2624-
resource);
2651+
ENVOY_LOG(debug,
2652+
"NetworkPolicyResource removed selector name '{}' not found from resource map",
2653+
resource);
26252654
continue;
26262655
}
26272656
if (resource_entry->isPolicyEndpointIpEntry()) {

tests/cilium_network_policy_test.cc

Lines changed: 109 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ TEST_F(CiliumNetworkPolicyDeltaTest, ManagedSubscriptionColdStartUsesConfiguredD
348348
EXPECT_THAT(state->start_resources_.front(), testing::ElementsAre(std::string("*")));
349349
}
350350

351-
TEST_F(CiliumNetworkPolicyTest, FlagFlipOnHealthySubscriptionWaitsForTransportClose) {
351+
TEST_F(CiliumNetworkPolicyTest, FlagFlipFromSotwToDeltaOnHealthySubscriptionRecreatesImmediately) {
352352
auto state = std::make_shared<FakeSubscriptionState>();
353353
std::vector<bool> created_modes;
354354
setSubscriptionFactoryForTest(
@@ -364,18 +364,57 @@ TEST_F(CiliumNetworkPolicyTest, FlagFlipOnHealthySubscriptionWaitsForTransportCl
364364
setUseDeltaXds(true);
365365

366366
EXPECT_TRUE(configuredUseDeltaXds());
367-
EXPECT_FALSE(subscriptionUseDeltaXdsForTest());
368-
EXPECT_TRUE(subscriptionConnectedForTest());
369-
EXPECT_THAT(created_modes, testing::ElementsAre(false));
370-
EXPECT_EQ(state->start_calls_, 1);
367+
EXPECT_TRUE(subscriptionUseDeltaXdsForTest());
368+
EXPECT_FALSE(subscriptionConnectedForTest());
369+
EXPECT_THAT(created_modes, testing::ElementsAre(false, true));
370+
EXPECT_EQ(state->start_calls_, 2);
371+
EXPECT_THAT(state->start_resources_.back(), testing::ElementsAre(std::string("*")));
372+
}
371373

372-
onSubscriptionTransportCloseForTest();
374+
TEST_F(CiliumNetworkPolicyTest, FlagFlipFromDeltaToSotwOnHealthySubscriptionWaitsForClose) {
375+
auto state = std::make_shared<FakeSubscriptionState>();
376+
std::vector<bool> created_modes;
377+
setSubscriptionFactoryForTest(
378+
[state, &created_modes](bool use_delta_xds) -> std::unique_ptr<Envoy::Config::Subscription> {
379+
created_modes.push_back(use_delta_xds);
380+
return std::make_unique<FakeSubscription>(state);
381+
});
373382

374-
EXPECT_FALSE(subscriptionConnectedForTest());
383+
startManagedSubscriptionForTest();
384+
onSubscriptionConnectedForTest();
385+
ASSERT_TRUE(subscriptionConnectedForTest());
386+
ASSERT_FALSE(subscriptionUseDeltaXdsForTest());
387+
388+
setUseDeltaXds(true);
389+
390+
EXPECT_TRUE(configuredUseDeltaXds());
375391
EXPECT_TRUE(subscriptionUseDeltaXdsForTest());
392+
EXPECT_FALSE(subscriptionConnectedForTest());
376393
EXPECT_THAT(created_modes, testing::ElementsAre(false, true));
377394
EXPECT_EQ(state->start_calls_, 2);
378395
EXPECT_THAT(state->start_resources_.back(), testing::ElementsAre(std::string("*")));
396+
397+
onSubscriptionConnectedForTest();
398+
ASSERT_TRUE(subscriptionConnectedForTest());
399+
ASSERT_TRUE(subscriptionUseDeltaXdsForTest());
400+
401+
setUseDeltaXds(false);
402+
403+
EXPECT_FALSE(configuredUseDeltaXds());
404+
EXPECT_TRUE(subscriptionUseDeltaXdsForTest());
405+
EXPECT_TRUE(subscriptionConnectedForTest());
406+
// Once we have an established delta subscription, keep it until transport close even if the
407+
// configured desired mode flips back to SotW.
408+
EXPECT_THAT(created_modes, testing::ElementsAre(false, true));
409+
EXPECT_EQ(state->start_calls_, 2);
410+
411+
onSubscriptionTransportCloseForTest();
412+
413+
EXPECT_FALSE(subscriptionConnectedForTest());
414+
EXPECT_FALSE(subscriptionUseDeltaXdsForTest());
415+
EXPECT_THAT(created_modes, testing::ElementsAre(false, true, false));
416+
EXPECT_EQ(state->start_calls_, 3);
417+
EXPECT_TRUE(state->start_resources_.back().empty());
379418
}
380419

381420
TEST_F(CiliumNetworkPolicyTest, FlagFlipWhileDisconnectedRecreatesImmediately) {
@@ -400,7 +439,66 @@ TEST_F(CiliumNetworkPolicyTest, FlagFlipWhileDisconnectedRecreatesImmediately) {
400439
EXPECT_THAT(state->start_resources_.back(), testing::ElementsAre(std::string("*")));
401440
}
402441

403-
TEST_F(CiliumNetworkPolicyTest, TransportCloseWithoutFlagFlipKeepsCurrentMode) {
442+
TEST_F(CiliumNetworkPolicyDeltaTest, FlagFlipFromDisconnectedDeltaToSotwRecreatesImmediately) {
443+
auto state = std::make_shared<FakeSubscriptionState>();
444+
std::vector<bool> created_modes;
445+
setSubscriptionFactoryForTest(
446+
[state, &created_modes](bool use_delta_xds) -> std::unique_ptr<Envoy::Config::Subscription> {
447+
created_modes.push_back(use_delta_xds);
448+
return std::make_unique<FakeSubscription>(state);
449+
});
450+
451+
startManagedSubscriptionForTest();
452+
ASSERT_FALSE(subscriptionConnectedForTest());
453+
ASSERT_TRUE(subscriptionUseDeltaXdsForTest());
454+
455+
setUseDeltaXds(false);
456+
457+
EXPECT_FALSE(configuredUseDeltaXds());
458+
EXPECT_FALSE(subscriptionUseDeltaXdsForTest());
459+
EXPECT_FALSE(subscriptionConnectedForTest());
460+
EXPECT_THAT(created_modes, testing::ElementsAre(true, false));
461+
EXPECT_EQ(state->start_calls_, 2);
462+
EXPECT_TRUE(state->start_resources_.back().empty());
463+
}
464+
465+
TEST_F(CiliumNetworkPolicyDeltaTest, DowngradeFromConnectedDeltaRecreatesDisconnectedRetryToSotw) {
466+
auto state = std::make_shared<FakeSubscriptionState>();
467+
std::vector<bool> created_modes;
468+
setSubscriptionFactoryForTest(
469+
[state, &created_modes](bool use_delta_xds) -> std::unique_ptr<Envoy::Config::Subscription> {
470+
created_modes.push_back(use_delta_xds);
471+
return std::make_unique<FakeSubscription>(state);
472+
});
473+
474+
startManagedSubscriptionForTest();
475+
onSubscriptionConnectedForTest();
476+
ASSERT_TRUE(subscriptionConnectedForTest());
477+
ASSERT_TRUE(subscriptionUseDeltaXdsForTest());
478+
479+
// Agent restart/downgrade drops the established delta transport. While the desired mode is still
480+
// delta we recreate immediately and begin retrying delta.
481+
onSubscriptionTransportCloseForTest();
482+
483+
ASSERT_FALSE(subscriptionConnectedForTest());
484+
ASSERT_TRUE(subscriptionUseDeltaXdsForTest());
485+
ASSERT_EQ(state->start_calls_, 2);
486+
EXPECT_THAT(created_modes, testing::ElementsAre(true, true));
487+
EXPECT_THAT(state->start_resources_.back(), testing::ElementsAre(std::string("*")));
488+
489+
// When listener metadata later reveals the downgraded agent no longer supports delta, flip to
490+
// SotW immediately rather than letting the disconnected delta retry loop forever.
491+
setUseDeltaXds(false);
492+
493+
EXPECT_FALSE(configuredUseDeltaXds());
494+
EXPECT_FALSE(subscriptionConnectedForTest());
495+
EXPECT_FALSE(subscriptionUseDeltaXdsForTest());
496+
EXPECT_THAT(created_modes, testing::ElementsAre(true, true, false));
497+
EXPECT_EQ(state->start_calls_, 3);
498+
EXPECT_TRUE(state->start_resources_.back().empty());
499+
}
500+
501+
TEST_F(CiliumNetworkPolicyTest, TransportCloseWithoutFlagFlipRecreatesInCurrentMode) {
404502
auto state = std::make_shared<FakeSubscriptionState>();
405503
std::vector<bool> created_modes;
406504
setSubscriptionFactoryForTest(
@@ -416,8 +514,9 @@ TEST_F(CiliumNetworkPolicyTest, TransportCloseWithoutFlagFlipKeepsCurrentMode) {
416514

417515
EXPECT_FALSE(subscriptionConnectedForTest());
418516
EXPECT_FALSE(subscriptionUseDeltaXdsForTest());
419-
EXPECT_THAT(created_modes, testing::ElementsAre(false));
420-
EXPECT_EQ(state->start_calls_, 1);
517+
EXPECT_THAT(created_modes, testing::ElementsAre(false, false));
518+
EXPECT_EQ(state->start_calls_, 2);
519+
EXPECT_TRUE(state->start_resources_.back().empty());
421520
}
422521

423522
TEST_F(CiliumNetworkPolicyTest, EmptyPolicyUpdate) {

0 commit comments

Comments
 (0)