Skip to content

Commit d9ea6bd

Browse files
committed
policy: detect new streams for delta and file-based substriptions
Signed-off-by: Jarno Rajahalme <jarno@isovalent.com>
1 parent 11811c3 commit d9ea6bd

7 files changed

Lines changed: 901 additions & 177 deletions

WORKSPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ git_repository(
4343
"@//patches:0004-thread_local-reset-slot-in-worker-threads-first.patch",
4444
"@//patches:0005-http-header-expose-attribute.patch",
4545
"@//patches:0006-test-integration-Defer-fake-upstream-read-enable-un.patch",
46+
"@//patches:0007-config-add-grpc-mux-stream-event-callback.patch",
4647
],
4748
# // clang-format off: Envoy's format check: Only repository_locations.bzl may contains URL references
4849
remote = "https://github.com/envoyproxy/envoy.git",

cilium/grpc_subscription.cc

Lines changed: 103 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,14 @@
22

33
#include <fmt/format.h>
44

5-
#include <chrono>
5+
#include <functional>
66
#include <memory>
77
#include <string>
88
#include <utility>
99
#include <vector>
1010

1111
#include "envoy/annotations/resource.pb.h"
12+
#include "envoy/common/callback.h"
1213
#include "envoy/common/exception.h"
1314
#include "envoy/config/core/v3/config_source.pb.h"
1415
#include "envoy/config/custom_config_validators.h"
@@ -26,9 +27,12 @@
2627
#include "source/common/grpc/common.h"
2728
#include "source/common/protobuf/protobuf.h" // IWYU pragma: keep
2829
#include "source/extensions/config_subscription/grpc/grpc_mux_context.h"
30+
#include "source/extensions/config_subscription/grpc/grpc_mux_impl.h"
2931
#include "source/extensions/config_subscription/grpc/grpc_subscription_impl.h"
32+
#include "source/extensions/config_subscription/grpc/new_grpc_mux_impl.h"
3033

3134
#include "absl/container/flat_hash_map.h"
35+
#include "absl/container/flat_hash_set.h"
3236
#include "absl/status/statusor.h"
3337
#include "absl/strings/match.h"
3438
#include "absl/strings/string_view.h"
@@ -39,6 +43,31 @@ namespace Cilium {
3943

4044
namespace {
4145

46+
class StreamEventSubscription : public Config::Subscription {
47+
public:
48+
StreamEventSubscription(std::unique_ptr<Config::Subscription> subscription,
49+
Common::CallbackHandlePtr stream_event_handle)
50+
: subscription_(std::move(subscription)),
51+
stream_event_handle_(std::move(stream_event_handle)) {}
52+
53+
void start(const absl::flat_hash_set<std::string>& resource_names) override {
54+
subscription_->start(resource_names);
55+
}
56+
57+
void
58+
updateResourceInterest(const absl::flat_hash_set<std::string>& update_to_these_names) override {
59+
subscription_->updateResourceInterest(update_to_these_names);
60+
}
61+
62+
void requestOnDemandUpdate(const absl::flat_hash_set<std::string>& add_these_names) override {
63+
subscription_->requestOnDemandUpdate(add_these_names);
64+
}
65+
66+
private:
67+
std::unique_ptr<Config::Subscription> subscription_;
68+
Common::CallbackHandlePtr stream_event_handle_;
69+
};
70+
4271
// service RPC method fully qualified names.
4372
struct Service {
4473
std::string sotw_grpc_method_;
@@ -58,6 +87,7 @@ TypeUrlToServiceMap* buildTypeUrlToServiceMap() {
5887
// https://www.mail-archive.com/protobuf@googlegroups.com/msg04540.html.
5988
for (absl::string_view name : {
6089
"cilium.NetworkPolicyDiscoveryService",
90+
//"cilium.NetworkPolicyResourceDiscoveryService",
6191
"cilium.NetworkPolicyHostsDiscoveryService",
6292
}) {
6393
const auto* service_desc =
@@ -121,56 +151,85 @@ const Protobuf::MethodDescriptor& sotwGrpcMethod(absl::string_view type_url) {
121151

122152
std::unique_ptr<Config::Subscription>
123153
subscribe(const absl::string_view type_url,
124-
const envoy::config::core::v3::ConfigSource& npds_config,
154+
const envoy::config::core::v3::ConfigSource& config_source,
125155
Server::Configuration::CommonFactoryContext& context, Stats::Scope& scope,
126156
Config::SubscriptionCallbacks& callbacks,
127157
Config::OpaqueResourceDecoderSharedPtr resource_decoder,
128-
std::chrono::milliseconds init_fetch_timeout) {
129-
auto& api_config_source = npds_config.api_config_source();
130-
THROW_IF_NOT_OK(Config::Utility::checkApiConfigSourceSubscriptionBackingCluster(
131-
context.clusterManager().primaryClusters(), api_config_source));
132-
158+
Config::GrpcMuxStreamEventCallback on_stream_event) {
159+
auto initial_fetch_timeout = Config::Utility::configSourceInitialFetchTimeout(config_source);
133160
Config::SubscriptionStats stats = Config::Utility::generateStats(scope);
134161
Envoy::Config::SubscriptionOptions options;
135162

136-
// No-op custom validators
137-
Envoy::Config::CustomConfigValidatorsPtr nop_config_validators =
138-
std::make_unique<NopConfigValidatorsImpl>();
139-
auto factory_or_error = Config::Utility::factoryForGrpcApiConfigSource(
140-
context.clusterManager().grpcAsyncClientManager(), api_config_source, scope, true, 0, false);
141-
THROW_IF_NOT_OK_REF(factory_or_error.status());
142-
143-
absl::StatusOr<Config::RateLimitSettings> rate_limit_settings_or_error =
144-
Config::Utility::parseRateLimitSettings(api_config_source);
145-
THROW_IF_NOT_OK_REF(rate_limit_settings_or_error.status());
146-
147-
Config::GrpcMuxContext grpc_mux_context{
148-
/*async_client_=*/THROW_OR_RETURN_VALUE(
149-
factory_or_error.value()->createUncachedRawAsyncClient(), Grpc::RawAsyncClientPtr),
150-
/*failover_async_client_=*/nullptr,
151-
/*dispatcher_=*/context.mainThreadDispatcher(),
152-
/*service_method_=*/sotwGrpcMethod(type_url),
153-
/*local_info_=*/context.localInfo(),
154-
/*rate_limit_settings_=*/rate_limit_settings_or_error.value(),
155-
/*scope_=*/scope,
156-
/*config_validators_=*/std::move(nop_config_validators),
157-
/*xds_resources_delegate_=*/absl::nullopt,
158-
/*xds_config_tracker_=*/absl::nullopt,
159-
/*backoff_strategy_=*/
160-
std::make_unique<JitteredExponentialBackOffStrategy>(
161-
Config::SubscriptionFactory::RetryInitialDelayMs,
162-
Config::SubscriptionFactory::RetryMaxDelayMs, context.api().randomGenerator()),
163-
/*target_xds_authority_=*/"",
164-
/*eds_resources_cache_=*/nullptr // EDS cache is only used for ADS.
165-
};
166-
167-
std::shared_ptr<Config::GrpcMux> grpc_mux =
168-
std::static_pointer_cast<Config::GrpcMux>(std::make_shared<GrpcMuxImpl>(
169-
grpc_mux_context, api_config_source.set_node_on_first_message_only()));
170-
171-
return std::make_unique<Config::GrpcSubscriptionImpl>(
163+
std::shared_ptr<Config::GrpcMux> grpc_mux;
164+
bool is_aggregated =
165+
config_source.config_source_specifier_case() == envoy::config::core::v3::ConfigSource::kAds;
166+
if (is_aggregated) {
167+
grpc_mux = std::static_pointer_cast<Config::GrpcMux>(context.xdsManager().adsMux());
168+
} else {
169+
auto& api_config_source = config_source.api_config_source();
170+
THROW_IF_NOT_OK(Config::Utility::checkApiConfigSourceSubscriptionBackingCluster(
171+
context.clusterManager().primaryClusters(), api_config_source));
172+
173+
// No-op custom validators
174+
Envoy::Config::CustomConfigValidatorsPtr nop_config_validators =
175+
std::make_unique<NopConfigValidatorsImpl>();
176+
auto factory_or_error = Config::Utility::factoryForGrpcApiConfigSource(
177+
context.clusterManager().grpcAsyncClientManager(), api_config_source, scope, true, 0,
178+
false);
179+
THROW_IF_NOT_OK_REF(factory_or_error.status());
180+
181+
absl::StatusOr<Config::RateLimitSettings> rate_limit_settings_or_error =
182+
Config::Utility::parseRateLimitSettings(api_config_source);
183+
THROW_IF_NOT_OK_REF(rate_limit_settings_or_error.status());
184+
185+
const auto& api_type = api_config_source.api_type();
186+
bool use_delta = api_type == envoy::config::core::v3::ApiConfigSource::DELTA_GRPC ||
187+
api_type == envoy::config::core::v3::ApiConfigSource::AGGREGATED_DELTA_GRPC;
188+
const auto& service_method = use_delta ? deltaGrpcMethod(type_url) : sotwGrpcMethod(type_url);
189+
190+
Config::GrpcMuxContext grpc_mux_context{
191+
THROW_OR_RETURN_VALUE(factory_or_error.value()->createUncachedRawAsyncClient(),
192+
Grpc::RawAsyncClientPtr),
193+
/*failover_async_client_=*/nullptr,
194+
context.mainThreadDispatcher(),
195+
service_method,
196+
context.localInfo(),
197+
rate_limit_settings_or_error.value(),
198+
scope,
199+
std::move(nop_config_validators),
200+
/*xds_resources_delegate_=*/absl::nullopt,
201+
/*xds_config_tracker_=*/absl::nullopt,
202+
std::make_unique<JitteredExponentialBackOffStrategy>(
203+
Config::SubscriptionFactory::RetryInitialDelayMs,
204+
Config::SubscriptionFactory::RetryMaxDelayMs, context.api().randomGenerator()),
205+
/*target_xds_authority_=*/"",
206+
/*eds_resources_cache_=*/nullptr // EDS cache is only used for ADS.
207+
};
208+
209+
grpc_mux =
210+
use_delta ? std::static_pointer_cast<Config::GrpcMux>(
211+
std::make_shared<Config::NewGrpcMuxImpl>(grpc_mux_context))
212+
: std::static_pointer_cast<Config::GrpcMux>(std::make_shared<Config::GrpcMuxImpl>(
213+
grpc_mux_context, api_config_source.set_node_on_first_message_only()));
214+
}
215+
216+
Common::CallbackHandlePtr stream_event_handle;
217+
if (on_stream_event) {
218+
auto stream_event_callback = std::move(on_stream_event);
219+
stream_event_handle = grpc_mux->addStreamEventCallback(stream_event_callback);
220+
if (grpc_mux->grpcStreamConnected()) {
221+
stream_event_callback(Config::GrpcMuxStreamEvent::Established);
222+
}
223+
}
224+
225+
auto subscription = std::make_unique<Config::GrpcSubscriptionImpl>(
172226
grpc_mux, callbacks, resource_decoder, stats, type_url, context.mainThreadDispatcher(),
173-
init_fetch_timeout, /*is_aggregated*/ false, options);
227+
initial_fetch_timeout, is_aggregated, options);
228+
if (stream_event_handle) {
229+
return std::make_unique<StreamEventSubscription>(std::move(subscription),
230+
std::move(stream_event_handle));
231+
}
232+
return subscription;
174233
}
175234

176235
} // namespace Cilium

cilium/grpc_subscription.h

Lines changed: 4 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,52 +1,24 @@
11
#pragma once
22

3-
#include <chrono>
43
#include <memory>
54

65
#include "envoy/config/core/v3/config_source.pb.h"
6+
#include "envoy/config/grpc_mux.h"
77
#include "envoy/config/subscription.h"
8-
#include "envoy/server/factory_context.h"
8+
#include "envoy/ssl/context_manager.h"
99
#include "envoy/stats/scope.h"
1010

11-
#include "source/extensions/config_subscription/grpc/grpc_mux_context.h"
12-
#include "source/extensions/config_subscription/grpc/grpc_mux_impl.h"
13-
1411
#include "absl/strings/string_view.h"
1512

1613
namespace Envoy {
1714
namespace Cilium {
1815

19-
// GrpcMux wrapper to get access to control plane identifier
20-
class GrpcMuxImpl : public Config::GrpcMuxImpl {
21-
public:
22-
GrpcMuxImpl(Config::GrpcMuxContext& grpc_mux_context, bool skip_subsequent_node)
23-
: Config::GrpcMuxImpl(grpc_mux_context, skip_subsequent_node) {}
24-
25-
~GrpcMuxImpl() override = default;
26-
27-
void onStreamEstablished() override {
28-
new_stream_ = true;
29-
Config::GrpcMuxImpl::onStreamEstablished();
30-
}
31-
32-
// isNewStream returns true for the first call after a new stream has been established
33-
bool isNewStream() {
34-
bool new_stream = new_stream_;
35-
new_stream_ = false;
36-
return new_stream;
37-
}
38-
39-
private:
40-
bool new_stream_ = true;
41-
};
42-
4316
std::unique_ptr<Config::Subscription>
4417
subscribe(const absl::string_view type_url,
45-
const envoy::config::core::v3::ConfigSource& npds_config,
18+
const envoy::config::core::v3::ConfigSource& config_source,
4619
Server::Configuration::CommonFactoryContext& context, Stats::Scope& scope,
4720
Config::SubscriptionCallbacks& callbacks,
4821
Config::OpaqueResourceDecoderSharedPtr resource_decoder,
49-
std::chrono::milliseconds init_fetch_timeout = std::chrono::milliseconds(0));
50-
22+
Config::GrpcMuxStreamEventCallback on_stream_event = {});
5123
} // namespace Cilium
5224
} // namespace Envoy

0 commit comments

Comments
 (0)