22
33#include < fmt/format.h>
44
5- #include < chrono >
5+ #include < functional >
66#include < memory>
77#include < string>
88#include < utility>
99#include < vector>
1010
1111#include " envoy/annotations/resource.pb.h"
12+ #include " envoy/common/callback.h"
1213#include " envoy/common/exception.h"
1314#include " envoy/config/core/v3/config_source.pb.h"
1415#include " envoy/config/custom_config_validators.h"
2627#include " source/common/grpc/common.h"
2728#include " source/common/protobuf/protobuf.h" // IWYU pragma: keep
2829#include " source/extensions/config_subscription/grpc/grpc_mux_context.h"
30+ #include " source/extensions/config_subscription/grpc/grpc_mux_impl.h"
2931#include " source/extensions/config_subscription/grpc/grpc_subscription_impl.h"
32+ #include " source/extensions/config_subscription/grpc/new_grpc_mux_impl.h"
3033
3134#include " absl/container/flat_hash_map.h"
35+ #include " absl/container/flat_hash_set.h"
3236#include " absl/status/statusor.h"
3337#include " absl/strings/match.h"
3438#include " absl/strings/string_view.h"
@@ -39,6 +43,31 @@ namespace Cilium {
3943
4044namespace {
4145
46+ class StreamEventSubscription : public Config ::Subscription {
47+ public:
48+ StreamEventSubscription (std::unique_ptr<Config::Subscription> subscription,
49+ Common::CallbackHandlePtr stream_event_handle)
50+ : subscription_(std::move(subscription)),
51+ stream_event_handle_ (std::move(stream_event_handle)) {}
52+
53+ void start (const absl::flat_hash_set<std::string>& resource_names) override {
54+ subscription_->start (resource_names);
55+ }
56+
57+ void
58+ updateResourceInterest (const absl::flat_hash_set<std::string>& update_to_these_names) override {
59+ subscription_->updateResourceInterest (update_to_these_names);
60+ }
61+
62+ void requestOnDemandUpdate (const absl::flat_hash_set<std::string>& add_these_names) override {
63+ subscription_->requestOnDemandUpdate (add_these_names);
64+ }
65+
66+ private:
67+ std::unique_ptr<Config::Subscription> subscription_;
68+ Common::CallbackHandlePtr stream_event_handle_;
69+ };
70+
4271// service RPC method fully qualified names.
4372struct Service {
4473 std::string sotw_grpc_method_;
@@ -125,49 +154,81 @@ subscribe(const absl::string_view type_url,
125154 Server::Configuration::CommonFactoryContext& context, Stats::Scope& scope,
126155 Config::SubscriptionCallbacks& callbacks,
127156 Config::OpaqueResourceDecoderSharedPtr resource_decoder,
128- std::chrono::milliseconds init_fetch_timeout) {
129- auto & api_config_source = config_source.api_config_source ();
130- THROW_IF_NOT_OK (Config::Utility::checkApiConfigSourceSubscriptionBackingCluster (
131- context.clusterManager ().primaryClusters (), api_config_source));
132-
157+ Config::GrpcMuxStreamEventCallback on_stream_event) {
158+ auto initial_fetch_timeout = Config::Utility::configSourceInitialFetchTimeout (config_source);
133159 Config::SubscriptionStats stats = Config::Utility::generateStats (scope);
134160 Envoy::Config::SubscriptionOptions options;
135161
136- // No-op custom validators
137- Envoy::Config::CustomConfigValidatorsPtr nop_config_validators =
138- std::make_unique<NopConfigValidatorsImpl>();
139- auto factory_or_error = Config::Utility::factoryForGrpcApiConfigSource (
140- context.clusterManager ().grpcAsyncClientManager (), api_config_source, scope, true , 0 , false );
141- THROW_IF_NOT_OK_REF (factory_or_error.status ());
142-
143- absl::StatusOr<Config::RateLimitSettings> rate_limit_settings_or_error =
144- Config::Utility::parseRateLimitSettings (api_config_source);
145- THROW_IF_NOT_OK_REF (rate_limit_settings_or_error.status ());
146-
147- Config::GrpcMuxContext grpc_mux_context{
148- /* async_client_=*/ THROW_OR_RETURN_VALUE (
149- factory_or_error.value ()->createUncachedRawAsyncClient (), Grpc::RawAsyncClientPtr),
150- /* failover_async_client_=*/ nullptr ,
151- /* dispatcher_=*/ context.mainThreadDispatcher (),
152- /* service_method_=*/ sotwGrpcMethod (type_url),
153- /* local_info_=*/ context.localInfo (),
154- /* rate_limit_settings_=*/ rate_limit_settings_or_error.value (),
155- /* scope_=*/ scope,
156- /* config_validators_=*/ std::move (nop_config_validators),
157- /* xds_resources_delegate_=*/ absl::nullopt ,
158- /* xds_config_tracker_=*/ absl::nullopt ,
159- /* backoff_strategy_=*/
160- std::make_unique<JitteredExponentialBackOffStrategy>(
161- Config::SubscriptionFactory::RetryInitialDelayMs,
162- Config::SubscriptionFactory::RetryMaxDelayMs, context.api ().randomGenerator ()),
163- /* target_xds_authority_=*/ " " ,
164- /* eds_resources_cache_=*/ nullptr , // EDS cache is only used for ADS.
165- /* skip_subsequent_node_=*/ api_config_source.set_node_on_first_message_only (),
166- };
167-
168- return std::make_unique<Config::GrpcSubscriptionImpl>(
169- std::make_shared<GrpcMuxImpl>(grpc_mux_context), callbacks, resource_decoder, stats, type_url,
170- context.mainThreadDispatcher (), init_fetch_timeout, /* is_aggregated*/ false , options);
162+ std::shared_ptr<Config::GrpcMux> grpc_mux;
163+ bool is_aggregated =
164+ config_source.config_source_specifier_case () == envoy::config::core::v3::ConfigSource::kAds ;
165+ if (is_aggregated) {
166+ grpc_mux = std::static_pointer_cast<Config::GrpcMux>(context.xdsManager ().adsMux ());
167+ } else {
168+ auto & api_config_source = config_source.api_config_source ();
169+ THROW_IF_NOT_OK (Config::Utility::checkApiConfigSourceSubscriptionBackingCluster (
170+ context.clusterManager ().primaryClusters (), api_config_source));
171+
172+ // No-op custom validators
173+ Envoy::Config::CustomConfigValidatorsPtr nop_config_validators =
174+ std::make_unique<NopConfigValidatorsImpl>();
175+ auto factory_or_error = Config::Utility::factoryForGrpcApiConfigSource (
176+ context.clusterManager ().grpcAsyncClientManager (), api_config_source, scope, true , 0 ,
177+ false );
178+ THROW_IF_NOT_OK_REF (factory_or_error.status ());
179+
180+ absl::StatusOr<Config::RateLimitSettings> rate_limit_settings_or_error =
181+ Config::Utility::parseRateLimitSettings (api_config_source);
182+ THROW_IF_NOT_OK_REF (rate_limit_settings_or_error.status ());
183+
184+ const auto & api_type = api_config_source.api_type ();
185+ bool use_delta = api_type == envoy::config::core::v3::ApiConfigSource::DELTA_GRPC ||
186+ api_type == envoy::config::core::v3::ApiConfigSource::AGGREGATED_DELTA_GRPC;
187+ const auto & service_method = use_delta ? deltaGrpcMethod (type_url) : sotwGrpcMethod (type_url);
188+
189+ Config::GrpcMuxContext grpc_mux_context{
190+ THROW_OR_RETURN_VALUE (factory_or_error.value ()->createUncachedRawAsyncClient (),
191+ Grpc::RawAsyncClientPtr),
192+ /* failover_async_client_=*/ nullptr ,
193+ context.mainThreadDispatcher (),
194+ service_method,
195+ context.localInfo (),
196+ rate_limit_settings_or_error.value (),
197+ scope,
198+ std::move (nop_config_validators),
199+ /* xds_resources_delegate_=*/ absl::nullopt ,
200+ /* xds_config_tracker_=*/ absl::nullopt ,
201+ std::make_unique<JitteredExponentialBackOffStrategy>(
202+ Config::SubscriptionFactory::RetryInitialDelayMs,
203+ Config::SubscriptionFactory::RetryMaxDelayMs, context.api ().randomGenerator ()),
204+ /* target_xds_authority_=*/ " " ,
205+ /* eds_resources_cache_=*/ nullptr , // EDS cache is only used for ADS.
206+ /* skip_subsequent_node_=*/ api_config_source.set_node_on_first_message_only (),
207+ };
208+
209+ grpc_mux = use_delta ? std::static_pointer_cast<Config::GrpcMux>(
210+ std::make_shared<Config::NewGrpcMuxImpl>(grpc_mux_context))
211+ : std::static_pointer_cast<Config::GrpcMux>(
212+ std::make_shared<Config::GrpcMuxImpl>(grpc_mux_context));
213+ }
214+
215+ Common::CallbackHandlePtr stream_event_handle;
216+ if (on_stream_event) {
217+ auto stream_event_callback = std::move (on_stream_event);
218+ stream_event_handle = grpc_mux->addStreamEventCallback (stream_event_callback);
219+ if (grpc_mux->grpcStreamConnected ()) {
220+ stream_event_callback (Config::GrpcMuxStreamEvent::Established);
221+ }
222+ }
223+
224+ auto subscription = std::make_unique<Config::GrpcSubscriptionImpl>(
225+ grpc_mux, callbacks, resource_decoder, stats, type_url, context.mainThreadDispatcher (),
226+ initial_fetch_timeout, is_aggregated, options);
227+ if (stream_event_handle) {
228+ return std::make_unique<StreamEventSubscription>(std::move (subscription),
229+ std::move (stream_event_handle));
230+ }
231+ return subscription;
171232}
172233
173234} // namespace Cilium
0 commit comments