Skip to content

Commit 18261d2

Browse files
committed
policy: Fix stream generation accounting
Stream generation accounting has to be shared between NPDS and NPRDS streams, so that the handoff works as designed, but no other xDS protocols (e.g., NPHDS) should interfere with the stream generation accounting. Solve this by defining the stream generation number as a static member of NetworkPolicyMapImpl and updating it from the already established transport connected/closed callbacks. Adjust tests to work with the new shape where the generation numbers do not start from 1 for each NetworkPolicyMapImpl instance, but increase monotonically for each established NPDS/NPRDS stream. Signed-off-by: Jarno Rajahalme <jarno@isovalent.com>
1 parent bbbc246 commit 18261d2

4 files changed

Lines changed: 47 additions & 82 deletions

File tree

cilium/grpc_subscription.cc

Lines changed: 31 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,34 @@ namespace Cilium {
4343

4444
namespace {
4545

46-
constexpr uint64_t FirstStreamGeneration = 1;
47-
4846
class StreamTrackedGrpcMux {
4947
public:
48+
StreamTrackedGrpcMux(std::function<void()> on_transport_established,
49+
std::function<void()> on_transport_close)
50+
: on_transport_established_(std::move(on_transport_established)),
51+
on_transport_close_(std::move(on_transport_close)) {}
52+
5053
virtual ~StreamTrackedGrpcMux() = default;
51-
virtual uint64_t streamGeneration() const = 0;
52-
virtual bool streamConnected() const = 0;
54+
55+
bool streamConnected() const { return stream_connected_; }
56+
57+
void onStreamEstablished() {
58+
stream_connected_ = true;
59+
on_transport_established_();
60+
}
61+
62+
void onEstablishmentFailure() {
63+
const bool was_connected = stream_connected_;
64+
stream_connected_ = false;
65+
if (was_connected) {
66+
on_transport_close_();
67+
}
68+
}
69+
70+
private:
71+
bool stream_connected_{false};
72+
std::function<void()> on_transport_established_;
73+
std::function<void()> on_transport_close_;
5374
};
5475

5576
class SotwGrpcMuxImpl : public Config::GrpcMuxImpl, public StreamTrackedGrpcMux {
@@ -58,37 +79,18 @@ class SotwGrpcMuxImpl : public Config::GrpcMuxImpl, public StreamTrackedGrpcMux
5879
std::function<void()> on_transport_established,
5980
std::function<void()> on_transport_close)
6081
: Config::GrpcMuxImpl(grpc_mux_context, skip_subsequent_node),
61-
on_transport_established_(std::move(on_transport_established)),
62-
on_transport_close_(std::move(on_transport_close)) {}
63-
82+
StreamTrackedGrpcMux(std::move(on_transport_established), std::move(on_transport_close)) {}
6483
~SotwGrpcMuxImpl() override = default;
6584

6685
void onStreamEstablished() override {
67-
stream_connected_ = true;
68-
++stream_generation_;
6986
Config::GrpcMuxImpl::onStreamEstablished();
70-
if (on_transport_established_) {
71-
on_transport_established_();
72-
}
87+
StreamTrackedGrpcMux::onStreamEstablished();
7388
}
7489

7590
void onEstablishmentFailure(bool next_attempt_may_send_initial_resource_version) override {
76-
const bool was_connected = stream_connected_;
77-
stream_connected_ = false;
7891
Config::GrpcMuxImpl::onEstablishmentFailure(next_attempt_may_send_initial_resource_version);
79-
if (was_connected && on_transport_close_) {
80-
on_transport_close_();
81-
}
92+
StreamTrackedGrpcMux::onEstablishmentFailure();
8293
}
83-
84-
uint64_t streamGeneration() const override { return stream_generation_; }
85-
bool streamConnected() const override { return stream_connected_; }
86-
87-
private:
88-
uint64_t stream_generation_{0};
89-
bool stream_connected_{false};
90-
std::function<void()> on_transport_established_;
91-
std::function<void()> on_transport_close_;
9294
};
9395

9496
class DeltaGrpcMuxImpl : public Config::NewGrpcMuxImpl, public StreamTrackedGrpcMux {
@@ -97,37 +99,19 @@ class DeltaGrpcMuxImpl : public Config::NewGrpcMuxImpl, public StreamTrackedGrpc
9799
std::function<void()> on_transport_established,
98100
std::function<void()> on_transport_close)
99101
: Config::NewGrpcMuxImpl(grpc_mux_context),
100-
on_transport_established_(std::move(on_transport_established)),
101-
on_transport_close_(std::move(on_transport_close)) {}
102+
StreamTrackedGrpcMux(std::move(on_transport_established), std::move(on_transport_close)) {}
102103

103104
~DeltaGrpcMuxImpl() override = default;
104105

105106
void onStreamEstablished() override {
106-
stream_connected_ = true;
107-
++stream_generation_;
108107
Config::NewGrpcMuxImpl::onStreamEstablished();
109-
if (on_transport_established_) {
110-
on_transport_established_();
111-
}
108+
StreamTrackedGrpcMux::onStreamEstablished();
112109
}
113110

114111
void onEstablishmentFailure(bool next_attempt_may_send_initial_resource_version) override {
115-
const bool was_connected = stream_connected_;
116-
stream_connected_ = false;
117112
Config::NewGrpcMuxImpl::onEstablishmentFailure(next_attempt_may_send_initial_resource_version);
118-
if (was_connected && on_transport_close_) {
119-
on_transport_close_();
120-
}
113+
StreamTrackedGrpcMux::onEstablishmentFailure();
121114
}
122-
123-
uint64_t streamGeneration() const override { return stream_generation_; }
124-
bool streamConnected() const override { return stream_connected_; }
125-
126-
private:
127-
uint64_t stream_generation_{0};
128-
bool stream_connected_{false};
129-
std::function<void()> on_transport_established_;
130-
std::function<void()> on_transport_close_;
131115
};
132116

133117
// service RPC method fully qualified names.
@@ -233,20 +217,6 @@ envoy::config::core::v3::ConfigSource getCiliumXDSAPIConfig(bool use_delta_xds =
233217

234218
envoy::config::core::v3::ConfigSource cilium_xds_api_config = getCiliumXDSAPIConfig();
235219

236-
uint64_t grpcStreamGeneration(Config::Subscription* subscription) {
237-
auto* sub = dynamic_cast<Config::GrpcSubscriptionImpl*>(subscription);
238-
if (!sub) {
239-
return FirstStreamGeneration;
240-
}
241-
242-
auto* grpc_mux = dynamic_cast<StreamTrackedGrpcMux*>(sub->grpcMux().get());
243-
if (grpc_mux == nullptr) {
244-
return FirstStreamGeneration;
245-
}
246-
247-
return grpc_mux->streamGeneration();
248-
}
249-
250220
bool grpcStreamConnected(Config::Subscription* subscription) {
251221
auto* sub = dynamic_cast<Config::GrpcSubscriptionImpl*>(subscription);
252222
if (!sub) {

cilium/grpc_subscription.h

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,6 @@ subscribe(const std::string& type_url, Server::Configuration::CommonFactoryConte
2525
std::function<void()> on_transport_established = {},
2626
std::function<void()> on_transport_close = {});
2727

28-
// Returns a monotonic stream generation for Cilium subscriptions.
29-
// Value 0 is reserved for policy-map detection of the initial stream and may be returned for
30-
// tracked gRPC subscriptions before any stream has been established.
31-
// Non-gRPC subscriptions and subscriptions without stream tracking are treated as generation 1.
32-
uint64_t grpcStreamGeneration(Config::Subscription* subscription);
33-
3428
// Returns whether a tracked gRPC subscription currently has an established transport.
3529
bool grpcStreamConnected(Config::Subscription* subscription);
3630

cilium/network_policy.cc

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -411,12 +411,8 @@ class NetworkPolicyMapImpl : public Envoy::Config::SubscriptionCallbacks,
411411
}
412412

413413
protected:
414-
uint64_t streamGeneration() const {
415-
return stream_generation_override_for_test_ != 0 ? stream_generation_override_for_test_
416-
: grpcStreamGeneration(subscription_.get());
417-
}
418-
419-
void resetStreamForTest() { stream_generation_override_for_test_ = streamGeneration() + 1; }
414+
uint64_t streamGeneration() const { return subscription_stream_generation_; }
415+
void resetStreamForTest() { subscription_stream_generation_++; }
420416

421417
// run the given function after all the threads have scheduled
422418
void runAfterAllThreads(std::function<void()> cb) const {
@@ -454,6 +450,8 @@ class NetworkPolicyMapImpl : public Envoy::Config::SubscriptionCallbacks,
454450
}
455451

456452
void onSubscriptionTransportEstablished(uint64_t subscription_id) {
453+
++subscription_stream_generation_;
454+
457455
if (subscription_id != subscription_id_) {
458456
return;
459457
}
@@ -561,10 +559,8 @@ class NetworkPolicyMapImpl : public Envoy::Config::SubscriptionCallbacks,
561559
transport_factory_context_;
562560

563561
std::unique_ptr<Envoy::Config::Subscription> subscription_;
562+
static uint64_t subscription_stream_generation_;
564563
NetworkPolicyMap::SubscriptionFactoryForTest subscription_factory_for_test_;
565-
// Test-only override used to simulate a restarted NPDS stream when the test subscription does
566-
// not expose a new underlying gRPC stream generation.
567-
uint64_t stream_generation_override_for_test_{0};
568564

569565
ProtobufTypes::MessagePtr dumpNetworkPolicyConfigs(const Matchers::StringMatcher& name_matcher);
570566
Server::ConfigTracker::EntryOwnerPtr config_tracker_entry_;
@@ -576,6 +572,7 @@ class NetworkPolicyMapImpl : public Envoy::Config::SubscriptionCallbacks,
576572
};
577573

578574
uint64_t NetworkPolicyMapImpl::instance_id_ = 0;
575+
uint64_t NetworkPolicyMapImpl::subscription_stream_generation_ = 1;
579576

580577
IpAddressPair::IpAddressPair(const cilium::NetworkPolicy& proto) {
581578
for (const auto& ip_addr : proto.endpoint_ips()) {

tests/cilium_network_policy_test.cc

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1801,8 +1801,10 @@ TEST_F(CiliumNetworkPolicyDeltaTest, SameStreamSelectorOnlyUpdateUsesLatestSelec
18011801

18021802
const auto old_policy = policyInstanceShared("10.1.2.3");
18031803
ASSERT_NE(nullptr, old_policy);
1804+
const auto initial_stream_generation = selectorStreamGenerationForTest(*old_policy);
18041805

1805-
EXPECT_EQ(1, selectorStreamGenerationForTest(*old_policy));
1806+
EXPECT_GT(initial_stream_generation, 0);
1807+
EXPECT_EQ(initial_stream_generation, selectorStreamGenerationForTest(*old_policy));
18061808
EXPECT_EQ(1, selectorVersionForTest(*old_policy));
18071809

18081810
EXPECT_NO_THROW(deltaUpdateFromYaml(R"EOF(system_version_info: "2"
@@ -1815,7 +1817,7 @@ TEST_F(CiliumNetworkPolicyDeltaTest, SameStreamSelectorOnlyUpdateUsesLatestSelec
18151817
remote_identities: [ 44 ]
18161818
)EOF"));
18171819

1818-
EXPECT_EQ(1, selectorStreamGenerationForTest(*old_policy));
1820+
EXPECT_EQ(initial_stream_generation, selectorStreamGenerationForTest(*old_policy));
18191821
EXPECT_EQ(2, selectorVersionForTest(*old_policy));
18201822
EXPECT_TRUE(ingressAllowed("10.1.2.3", 44, 80));
18211823
EXPECT_FALSE(ingressAllowed("10.1.2.3", 43, 80));
@@ -1852,8 +1854,10 @@ TEST_F(CiliumNetworkPolicyDeltaTest, NewStreamKeepsOldPolicyPinnedToOldSelectorS
18521854

18531855
const auto old_policy = policyInstanceShared("10.1.2.3");
18541856
ASSERT_NE(nullptr, old_policy);
1857+
const auto initial_stream_generation = selectorStreamGenerationForTest(*old_policy);
18551858

1856-
EXPECT_EQ(1, selectorStreamGenerationForTest(*old_policy));
1859+
EXPECT_GT(initial_stream_generation, 0);
1860+
EXPECT_EQ(initial_stream_generation, selectorStreamGenerationForTest(*old_policy));
18571861
EXPECT_EQ(1, selectorVersionForTest(*old_policy));
18581862

18591863
EXPECT_NO_THROW(deltaUpdateFromYaml(R"EOF(system_version_info: "1"
@@ -1866,7 +1870,7 @@ TEST_F(CiliumNetworkPolicyDeltaTest, NewStreamKeepsOldPolicyPinnedToOldSelectorS
18661870
remote_identities: [ 45 ]
18671871
)EOF"));
18681872

1869-
EXPECT_EQ(1, selectorStreamGenerationForTest(*old_policy));
1873+
EXPECT_EQ(initial_stream_generation, selectorStreamGenerationForTest(*old_policy));
18701874
EXPECT_EQ(2, selectorVersionForTest(*old_policy));
18711875

18721876
resetStreamForTest();
@@ -1903,9 +1907,9 @@ TEST_F(CiliumNetworkPolicyDeltaTest, NewStreamKeepsOldPolicyPinnedToOldSelectorS
19031907
ASSERT_NE(nullptr, new_policy);
19041908
EXPECT_NE(old_policy.get(), new_policy.get());
19051909

1906-
EXPECT_EQ(1, selectorStreamGenerationForTest(*old_policy));
1910+
EXPECT_EQ(initial_stream_generation, selectorStreamGenerationForTest(*old_policy));
19071911
EXPECT_EQ(2, selectorVersionForTest(*old_policy));
1908-
EXPECT_EQ(2, selectorStreamGenerationForTest(*new_policy));
1912+
EXPECT_EQ(initial_stream_generation + 1, selectorStreamGenerationForTest(*new_policy));
19091913
EXPECT_EQ(3, selectorVersionForTest(*new_policy));
19101914
EXPECT_TRUE(ingressAllowed("10.1.2.3", 44, 80));
19111915
EXPECT_FALSE(ingressAllowed("10.1.2.3", 43, 80));

0 commit comments

Comments
 (0)