Skip to content

Commit c77d91d

Browse files
committed
policy: detect new streams for delta and file-based substriptions
Replace Config::GrpcMuxImpl wrapper with stream event callback patch on upstream so that new stream detection works on all the needed Mux types for SotW, Delta, and ADS. New stream detection is the means by which we detect Cilium Agent restarts, which generally requires the ipcache bpf map to be reopened. Delta updates also depend on this detection to force synchronization as the restarted agent may not know which resources to remove. Signed-off-by: Jarno Rajahalme <jarno@isovalent.com>
1 parent 11811c3 commit c77d91d

7 files changed

Lines changed: 901 additions & 178 deletions

WORKSPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ git_repository(
4343
"@//patches:0004-thread_local-reset-slot-in-worker-threads-first.patch",
4444
"@//patches:0005-http-header-expose-attribute.patch",
4545
"@//patches:0006-test-integration-Defer-fake-upstream-read-enable-un.patch",
46+
"@//patches:0007-config-add-grpc-mux-stream-event-callback.patch",
4647
],
4748
# // clang-format off: Envoy's format check: Only repository_locations.bzl may contains URL references
4849
remote = "https://github.com/envoyproxy/envoy.git",

cilium/grpc_subscription.cc

Lines changed: 103 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,14 @@
22

33
#include <fmt/format.h>
44

5-
#include <chrono>
5+
#include <functional>
66
#include <memory>
77
#include <string>
88
#include <utility>
99
#include <vector>
1010

1111
#include "envoy/annotations/resource.pb.h"
12+
#include "envoy/common/callback.h"
1213
#include "envoy/common/exception.h"
1314
#include "envoy/config/core/v3/config_source.pb.h"
1415
#include "envoy/config/custom_config_validators.h"
@@ -26,9 +27,12 @@
2627
#include "source/common/grpc/common.h"
2728
#include "source/common/protobuf/protobuf.h" // IWYU pragma: keep
2829
#include "source/extensions/config_subscription/grpc/grpc_mux_context.h"
30+
#include "source/extensions/config_subscription/grpc/grpc_mux_impl.h"
2931
#include "source/extensions/config_subscription/grpc/grpc_subscription_impl.h"
32+
#include "source/extensions/config_subscription/grpc/new_grpc_mux_impl.h"
3033

3134
#include "absl/container/flat_hash_map.h"
35+
#include "absl/container/flat_hash_set.h"
3236
#include "absl/status/statusor.h"
3337
#include "absl/strings/match.h"
3438
#include "absl/strings/string_view.h"
@@ -39,6 +43,31 @@ namespace Cilium {
3943

4044
namespace {
4145

46+
class StreamEventSubscription : public Config::Subscription {
47+
public:
48+
StreamEventSubscription(std::unique_ptr<Config::Subscription> subscription,
49+
Common::CallbackHandlePtr stream_event_handle)
50+
: subscription_(std::move(subscription)),
51+
stream_event_handle_(std::move(stream_event_handle)) {}
52+
53+
void start(const absl::flat_hash_set<std::string>& resource_names) override {
54+
subscription_->start(resource_names);
55+
}
56+
57+
void
58+
updateResourceInterest(const absl::flat_hash_set<std::string>& update_to_these_names) override {
59+
subscription_->updateResourceInterest(update_to_these_names);
60+
}
61+
62+
void requestOnDemandUpdate(const absl::flat_hash_set<std::string>& add_these_names) override {
63+
subscription_->requestOnDemandUpdate(add_these_names);
64+
}
65+
66+
private:
67+
std::unique_ptr<Config::Subscription> subscription_;
68+
Common::CallbackHandlePtr stream_event_handle_;
69+
};
70+
4271
// service RPC method fully qualified names.
4372
struct Service {
4473
std::string sotw_grpc_method_;
@@ -58,6 +87,7 @@ TypeUrlToServiceMap* buildTypeUrlToServiceMap() {
5887
// https://www.mail-archive.com/protobuf@googlegroups.com/msg04540.html.
5988
for (absl::string_view name : {
6089
"cilium.NetworkPolicyDiscoveryService",
90+
//"cilium.NetworkPolicyResourceDiscoveryService",
6191
"cilium.NetworkPolicyHostsDiscoveryService",
6292
}) {
6393
const auto* service_desc =
@@ -121,56 +151,85 @@ const Protobuf::MethodDescriptor& sotwGrpcMethod(absl::string_view type_url) {
121151

122152
std::unique_ptr<Config::Subscription>
123153
subscribe(const absl::string_view type_url,
124-
const envoy::config::core::v3::ConfigSource& npds_config,
154+
const envoy::config::core::v3::ConfigSource& config_source,
125155
Server::Configuration::CommonFactoryContext& context, Stats::Scope& scope,
126156
Config::SubscriptionCallbacks& callbacks,
127157
Config::OpaqueResourceDecoderSharedPtr resource_decoder,
128-
std::chrono::milliseconds init_fetch_timeout) {
129-
auto& api_config_source = npds_config.api_config_source();
130-
THROW_IF_NOT_OK(Config::Utility::checkApiConfigSourceSubscriptionBackingCluster(
131-
context.clusterManager().primaryClusters(), api_config_source));
132-
158+
Config::GrpcMuxStreamEventCallback on_stream_event) {
159+
auto initial_fetch_timeout = Config::Utility::configSourceInitialFetchTimeout(config_source);
133160
Config::SubscriptionStats stats = Config::Utility::generateStats(scope);
134161
Envoy::Config::SubscriptionOptions options;
135162

136-
// No-op custom validators
137-
Envoy::Config::CustomConfigValidatorsPtr nop_config_validators =
138-
std::make_unique<NopConfigValidatorsImpl>();
139-
auto factory_or_error = Config::Utility::factoryForGrpcApiConfigSource(
140-
context.clusterManager().grpcAsyncClientManager(), api_config_source, scope, true, 0, false);
141-
THROW_IF_NOT_OK_REF(factory_or_error.status());
142-
143-
absl::StatusOr<Config::RateLimitSettings> rate_limit_settings_or_error =
144-
Config::Utility::parseRateLimitSettings(api_config_source);
145-
THROW_IF_NOT_OK_REF(rate_limit_settings_or_error.status());
146-
147-
Config::GrpcMuxContext grpc_mux_context{
148-
/*async_client_=*/THROW_OR_RETURN_VALUE(
149-
factory_or_error.value()->createUncachedRawAsyncClient(), Grpc::RawAsyncClientPtr),
150-
/*failover_async_client_=*/nullptr,
151-
/*dispatcher_=*/context.mainThreadDispatcher(),
152-
/*service_method_=*/sotwGrpcMethod(type_url),
153-
/*local_info_=*/context.localInfo(),
154-
/*rate_limit_settings_=*/rate_limit_settings_or_error.value(),
155-
/*scope_=*/scope,
156-
/*config_validators_=*/std::move(nop_config_validators),
157-
/*xds_resources_delegate_=*/absl::nullopt,
158-
/*xds_config_tracker_=*/absl::nullopt,
159-
/*backoff_strategy_=*/
160-
std::make_unique<JitteredExponentialBackOffStrategy>(
161-
Config::SubscriptionFactory::RetryInitialDelayMs,
162-
Config::SubscriptionFactory::RetryMaxDelayMs, context.api().randomGenerator()),
163-
/*target_xds_authority_=*/"",
164-
/*eds_resources_cache_=*/nullptr // EDS cache is only used for ADS.
165-
};
166-
167-
std::shared_ptr<Config::GrpcMux> grpc_mux =
168-
std::static_pointer_cast<Config::GrpcMux>(std::make_shared<GrpcMuxImpl>(
169-
grpc_mux_context, api_config_source.set_node_on_first_message_only()));
170-
171-
return std::make_unique<Config::GrpcSubscriptionImpl>(
163+
std::shared_ptr<Config::GrpcMux> grpc_mux;
164+
bool is_aggregated =
165+
config_source.config_source_specifier_case() == envoy::config::core::v3::ConfigSource::kAds;
166+
if (is_aggregated) {
167+
grpc_mux = std::static_pointer_cast<Config::GrpcMux>(context.xdsManager().adsMux());
168+
} else {
169+
auto& api_config_source = config_source.api_config_source();
170+
THROW_IF_NOT_OK(Config::Utility::checkApiConfigSourceSubscriptionBackingCluster(
171+
context.clusterManager().primaryClusters(), api_config_source));
172+
173+
// No-op custom validators
174+
Envoy::Config::CustomConfigValidatorsPtr nop_config_validators =
175+
std::make_unique<NopConfigValidatorsImpl>();
176+
auto factory_or_error = Config::Utility::factoryForGrpcApiConfigSource(
177+
context.clusterManager().grpcAsyncClientManager(), api_config_source, scope, true, 0,
178+
false);
179+
THROW_IF_NOT_OK_REF(factory_or_error.status());
180+
181+
absl::StatusOr<Config::RateLimitSettings> rate_limit_settings_or_error =
182+
Config::Utility::parseRateLimitSettings(api_config_source);
183+
THROW_IF_NOT_OK_REF(rate_limit_settings_or_error.status());
184+
185+
const auto& api_type = api_config_source.api_type();
186+
bool use_delta = api_type == envoy::config::core::v3::ApiConfigSource::DELTA_GRPC ||
187+
api_type == envoy::config::core::v3::ApiConfigSource::AGGREGATED_DELTA_GRPC;
188+
const auto& service_method = use_delta ? deltaGrpcMethod(type_url) : sotwGrpcMethod(type_url);
189+
190+
Config::GrpcMuxContext grpc_mux_context{
191+
THROW_OR_RETURN_VALUE(factory_or_error.value()->createUncachedRawAsyncClient(),
192+
Grpc::RawAsyncClientPtr),
193+
/*failover_async_client_=*/nullptr,
194+
context.mainThreadDispatcher(),
195+
service_method,
196+
context.localInfo(),
197+
rate_limit_settings_or_error.value(),
198+
scope,
199+
std::move(nop_config_validators),
200+
/*xds_resources_delegate_=*/absl::nullopt,
201+
/*xds_config_tracker_=*/absl::nullopt,
202+
std::make_unique<JitteredExponentialBackOffStrategy>(
203+
Config::SubscriptionFactory::RetryInitialDelayMs,
204+
Config::SubscriptionFactory::RetryMaxDelayMs, context.api().randomGenerator()),
205+
/*target_xds_authority_=*/"",
206+
/*eds_resources_cache_=*/nullptr // EDS cache is only used for ADS.
207+
};
208+
209+
grpc_mux =
210+
use_delta ? std::static_pointer_cast<Config::GrpcMux>(
211+
std::make_shared<Config::NewGrpcMuxImpl>(grpc_mux_context))
212+
: std::static_pointer_cast<Config::GrpcMux>(std::make_shared<Config::GrpcMuxImpl>(
213+
grpc_mux_context, api_config_source.set_node_on_first_message_only()));
214+
}
215+
216+
Common::CallbackHandlePtr stream_event_handle;
217+
if (on_stream_event) {
218+
auto stream_event_callback = std::move(on_stream_event);
219+
stream_event_handle = grpc_mux->addStreamEventCallback(stream_event_callback);
220+
if (grpc_mux->grpcStreamConnected()) {
221+
stream_event_callback(Config::GrpcMuxStreamEvent::Established);
222+
}
223+
}
224+
225+
auto subscription = std::make_unique<Config::GrpcSubscriptionImpl>(
172226
grpc_mux, callbacks, resource_decoder, stats, type_url, context.mainThreadDispatcher(),
173-
init_fetch_timeout, /*is_aggregated*/ false, options);
227+
initial_fetch_timeout, is_aggregated, options);
228+
if (stream_event_handle) {
229+
return std::make_unique<StreamEventSubscription>(std::move(subscription),
230+
std::move(stream_event_handle));
231+
}
232+
return subscription;
174233
}
175234

176235
} // namespace Cilium

cilium/grpc_subscription.h

Lines changed: 4 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,52 +1,24 @@
11
#pragma once
22

3-
#include <chrono>
43
#include <memory>
54

65
#include "envoy/config/core/v3/config_source.pb.h"
6+
#include "envoy/config/grpc_mux.h"
77
#include "envoy/config/subscription.h"
8-
#include "envoy/server/factory_context.h"
8+
#include "envoy/ssl/context_manager.h"
99
#include "envoy/stats/scope.h"
1010

11-
#include "source/extensions/config_subscription/grpc/grpc_mux_context.h"
12-
#include "source/extensions/config_subscription/grpc/grpc_mux_impl.h"
13-
1411
#include "absl/strings/string_view.h"
1512

1613
namespace Envoy {
1714
namespace Cilium {
1815

19-
// GrpcMux wrapper to get access to control plane identifier
20-
class GrpcMuxImpl : public Config::GrpcMuxImpl {
21-
public:
22-
GrpcMuxImpl(Config::GrpcMuxContext& grpc_mux_context, bool skip_subsequent_node)
23-
: Config::GrpcMuxImpl(grpc_mux_context, skip_subsequent_node) {}
24-
25-
~GrpcMuxImpl() override = default;
26-
27-
void onStreamEstablished() override {
28-
new_stream_ = true;
29-
Config::GrpcMuxImpl::onStreamEstablished();
30-
}
31-
32-
// isNewStream returns true for the first call after a new stream has been established
33-
bool isNewStream() {
34-
bool new_stream = new_stream_;
35-
new_stream_ = false;
36-
return new_stream;
37-
}
38-
39-
private:
40-
bool new_stream_ = true;
41-
};
42-
4316
std::unique_ptr<Config::Subscription>
4417
subscribe(const absl::string_view type_url,
45-
const envoy::config::core::v3::ConfigSource& npds_config,
18+
const envoy::config::core::v3::ConfigSource& config_source,
4619
Server::Configuration::CommonFactoryContext& context, Stats::Scope& scope,
4720
Config::SubscriptionCallbacks& callbacks,
4821
Config::OpaqueResourceDecoderSharedPtr resource_decoder,
49-
std::chrono::milliseconds init_fetch_timeout = std::chrono::milliseconds(0));
50-
22+
Config::GrpcMuxStreamEventCallback on_stream_event = {});
5123
} // namespace Cilium
5224
} // namespace Envoy

0 commit comments

Comments
 (0)