22
33#include < fmt/format.h>
44
5- #include < chrono >
5+ #include < functional >
66#include < memory>
77#include < string>
88#include < utility>
99#include < vector>
1010
1111#include " envoy/annotations/resource.pb.h"
12+ #include " envoy/common/callback.h"
1213#include " envoy/common/exception.h"
1314#include " envoy/config/core/v3/config_source.pb.h"
1415#include " envoy/config/custom_config_validators.h"
16+ #include " envoy/config/grpc_mux.h"
1517#include " envoy/config/subscription.h"
1618#include " envoy/config/subscription_factory.h"
1719#include " envoy/grpc/async_client.h"
2527#include " source/common/grpc/common.h"
2628#include " source/common/protobuf/protobuf.h" // IWYU pragma: keep
2729#include " source/extensions/config_subscription/grpc/grpc_mux_context.h"
30+ #include " source/extensions/config_subscription/grpc/grpc_mux_impl.h"
2831#include " source/extensions/config_subscription/grpc/grpc_subscription_impl.h"
32+ #include " source/extensions/config_subscription/grpc/new_grpc_mux_impl.h"
2933
3034#include " absl/container/flat_hash_map.h"
35+ #include " absl/container/flat_hash_set.h"
3136#include " absl/status/statusor.h"
3237#include " absl/strings/match.h"
3338#include " absl/strings/string_view.h"
@@ -38,6 +43,31 @@ namespace Cilium {
3843
3944namespace {
4045
46+ class StreamEventSubscription : public Config ::Subscription {
47+ public:
48+ StreamEventSubscription (std::unique_ptr<Config::Subscription> subscription,
49+ Common::CallbackHandlePtr stream_event_handle)
50+ : subscription_(std::move(subscription)),
51+ stream_event_handle_ (std::move(stream_event_handle)) {}
52+
53+ void start (const absl::flat_hash_set<std::string>& resource_names) override {
54+ subscription_->start (resource_names);
55+ }
56+
57+ void
58+ updateResourceInterest (const absl::flat_hash_set<std::string>& update_to_these_names) override {
59+ subscription_->updateResourceInterest (update_to_these_names);
60+ }
61+
62+ void requestOnDemandUpdate (const absl::flat_hash_set<std::string>& add_these_names) override {
63+ subscription_->requestOnDemandUpdate (add_these_names);
64+ }
65+
66+ private:
67+ std::unique_ptr<Config::Subscription> subscription_;
68+ Common::CallbackHandlePtr stream_event_handle_;
69+ };
70+
4171// service RPC method fully qualified names.
4272struct Service {
4373 std::string sotw_grpc_method_;
@@ -124,49 +154,81 @@ subscribe(const absl::string_view type_url,
124154 Server::Configuration::CommonFactoryContext& context, Stats::Scope& scope,
125155 Config::SubscriptionCallbacks& callbacks,
126156 Config::OpaqueResourceDecoderSharedPtr resource_decoder,
127- std::chrono::milliseconds init_fetch_timeout) {
128- auto & api_config_source = config_source.api_config_source ();
129- THROW_IF_NOT_OK (Config::Utility::checkApiConfigSourceSubscriptionBackingCluster (
130- context.clusterManager ().primaryClusters (), api_config_source));
131-
157+ Config::GrpcMuxStreamEventCallback on_stream_event) {
158+ auto initial_fetch_timeout = Config::Utility::configSourceInitialFetchTimeout (config_source);
132159 Config::SubscriptionStats stats = Config::Utility::generateStats (scope);
133160 Envoy::Config::SubscriptionOptions options;
134161
135- // No-op custom validators
136- Envoy::Config::CustomConfigValidatorsPtr nop_config_validators =
137- std::make_unique<NopConfigValidatorsImpl>();
138- auto factory_or_error = Config::Utility::factoryForGrpcApiConfigSource (
139- context.clusterManager ().grpcAsyncClientManager (), api_config_source, scope, true , 0 , false );
140- THROW_IF_NOT_OK_REF (factory_or_error.status ());
141-
142- absl::StatusOr<Config::RateLimitSettings> rate_limit_settings_or_error =
143- Config::Utility::parseRateLimitSettings (api_config_source);
144- THROW_IF_NOT_OK_REF (rate_limit_settings_or_error.status ());
145-
146- Config::GrpcMuxContext grpc_mux_context{
147- /* async_client_=*/ THROW_OR_RETURN_VALUE (
148- factory_or_error.value ()->createUncachedRawAsyncClient (), Grpc::RawAsyncClientPtr),
149- /* failover_async_client_=*/ nullptr ,
150- /* dispatcher_=*/ context.mainThreadDispatcher (),
151- /* service_method_=*/ sotwGrpcMethod (type_url),
152- /* local_info_=*/ context.localInfo (),
153- /* rate_limit_settings_=*/ rate_limit_settings_or_error.value (),
154- /* scope_=*/ scope,
155- /* config_validators_=*/ std::move (nop_config_validators),
156- /* xds_resources_delegate_=*/ absl::nullopt ,
157- /* xds_config_tracker_=*/ absl::nullopt ,
158- /* backoff_strategy_=*/
159- std::make_unique<JitteredExponentialBackOffStrategy>(
160- Config::SubscriptionFactory::RetryInitialDelayMs,
161- Config::SubscriptionFactory::RetryMaxDelayMs, context.api ().randomGenerator ()),
162- /* target_xds_authority_=*/ " " ,
163- /* eds_resources_cache_=*/ nullptr , // EDS cache is only used for ADS.
164- /* skip_subsequent_node_=*/ api_config_source.set_node_on_first_message_only (),
165- };
166-
167- return std::make_unique<Config::GrpcSubscriptionImpl>(
168- std::make_shared<GrpcMuxImpl>(grpc_mux_context), callbacks, resource_decoder, stats, type_url,
169- context.mainThreadDispatcher (), init_fetch_timeout, /* is_aggregated*/ false , options);
162+ std::shared_ptr<Config::GrpcMux> grpc_mux;
163+ bool is_aggregated =
164+ config_source.config_source_specifier_case () == envoy::config::core::v3::ConfigSource::kAds ;
165+ if (is_aggregated) {
166+ grpc_mux = std::static_pointer_cast<Config::GrpcMux>(context.xdsManager ().adsMux ());
167+ } else {
168+ auto & api_config_source = config_source.api_config_source ();
169+ THROW_IF_NOT_OK (Config::Utility::checkApiConfigSourceSubscriptionBackingCluster (
170+ context.clusterManager ().primaryClusters (), api_config_source));
171+
172+ // No-op custom validators
173+ Envoy::Config::CustomConfigValidatorsPtr nop_config_validators =
174+ std::make_unique<NopConfigValidatorsImpl>();
175+ auto factory_or_error = Config::Utility::factoryForGrpcApiConfigSource (
176+ context.clusterManager ().grpcAsyncClientManager (), api_config_source, scope, true , 0 ,
177+ false );
178+ THROW_IF_NOT_OK_REF (factory_or_error.status ());
179+
180+ absl::StatusOr<Config::RateLimitSettings> rate_limit_settings_or_error =
181+ Config::Utility::parseRateLimitSettings (api_config_source);
182+ THROW_IF_NOT_OK_REF (rate_limit_settings_or_error.status ());
183+
184+ const auto & api_type = api_config_source.api_type ();
185+ bool use_delta = api_type == envoy::config::core::v3::ApiConfigSource::DELTA_GRPC ||
186+ api_type == envoy::config::core::v3::ApiConfigSource::AGGREGATED_DELTA_GRPC;
187+ const auto & service_method = use_delta ? deltaGrpcMethod (type_url) : sotwGrpcMethod (type_url);
188+
189+ Config::GrpcMuxContext grpc_mux_context{
190+ THROW_OR_RETURN_VALUE (factory_or_error.value ()->createUncachedRawAsyncClient (),
191+ Grpc::RawAsyncClientPtr),
192+ /* failover_async_client_=*/ nullptr ,
193+ context.mainThreadDispatcher (),
194+ service_method,
195+ context.localInfo (),
196+ rate_limit_settings_or_error.value (),
197+ scope,
198+ std::move (nop_config_validators),
199+ /* xds_resources_delegate_=*/ absl::nullopt ,
200+ /* xds_config_tracker_=*/ absl::nullopt ,
201+ std::make_unique<JitteredExponentialBackOffStrategy>(
202+ Config::SubscriptionFactory::RetryInitialDelayMs,
203+ Config::SubscriptionFactory::RetryMaxDelayMs, context.api ().randomGenerator ()),
204+ /* target_xds_authority_=*/ " " ,
205+ /* eds_resources_cache_=*/ nullptr , // EDS cache is only used for ADS.
206+ /* skip_subsequent_node_=*/ api_config_source.set_node_on_first_message_only (),
207+ };
208+
209+ grpc_mux = use_delta ? std::static_pointer_cast<Config::GrpcMux>(
210+ std::make_shared<Config::NewGrpcMuxImpl>(grpc_mux_context))
211+ : std::static_pointer_cast<Config::GrpcMux>(
212+ std::make_shared<Config::GrpcMuxImpl>(grpc_mux_context));
213+ }
214+
215+ Common::CallbackHandlePtr stream_event_handle;
216+ if (on_stream_event) {
217+ auto stream_event_callback = std::move (on_stream_event);
218+ stream_event_handle = grpc_mux->addStreamEventCallback (stream_event_callback);
219+ if (grpc_mux->grpcStreamConnected ()) {
220+ stream_event_callback (Config::GrpcMuxStreamEvent::Established);
221+ }
222+ }
223+
224+ auto subscription = std::make_unique<Config::GrpcSubscriptionImpl>(
225+ grpc_mux, callbacks, resource_decoder, stats, type_url, context.mainThreadDispatcher (),
226+ initial_fetch_timeout, is_aggregated, options);
227+ if (stream_event_handle) {
228+ return std::make_unique<StreamEventSubscription>(std::move (subscription),
229+ std::move (stream_event_handle));
230+ }
231+ return subscription;
170232}
171233
172234} // namespace Cilium
0 commit comments