Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ git_repository(
"@//patches:0004-thread_local-reset-slot-in-worker-threads-first.patch",
"@//patches:0005-http-header-expose-attribute.patch",
"@//patches:0006-test-integration-Defer-fake-upstream-read-enable-un.patch",
"@//patches:0007-config-add-grpc-mux-stream-event-callback.patch",
"@//patches:0008-repo-Make-yq-dependency-optional-for-CI-config-parsi.patch",
],
# // clang-format off: Envoy's format check: Only repository_locations.bzl may contains URL references
Expand Down
12 changes: 12 additions & 0 deletions cilium/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,17 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "versioned_lib",
hdrs = ["versioned.h"],
repository = "@envoy",
deps = [
"@com_google_absl//absl/container:flat_hash_map",
"@com_google_absl//absl/container:flat_hash_set",
"@envoy//source/common/common:assert_lib",
],
)

envoy_cc_library(
name = "network_policy_lib",
srcs = [
Expand All @@ -45,6 +56,7 @@ envoy_cc_library(
"//cilium:conntrack_lib",
"//cilium:grpc_subscription_lib",
"//cilium:ipcache_lib",
"//cilium:versioned_lib",
"//cilium/api:npds_cc_proto",
"@envoy//envoy/singleton:manager_interface",
"@envoy//source/common/common:logger_lib",
Expand Down
7 changes: 7 additions & 0 deletions cilium/api/bpf_metadata.proto
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,11 @@ message BpfMetadata {
// Configuration for the source of Cilium xDS updates.
// Used for all cilium-specific xDS protocol, e.g., NPHDS, NPDS, and Secrets (SDS) therein.
envoy.config.core.v3.ConfigSource cilium_config_source = 16;

// Policy type to use
enum PolicyType {
NPDS = 0; // Legacy NPDS (default)
NPRDS = 1; // New NetworkPolicyResource (NPRDS)
}
bool policy_type = 17;
}
41 changes: 41 additions & 0 deletions cilium/api/npds.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@ import "validate/validate.proto";
// [#protodoc-title: Network policy management and NPDS]

// Each resource name is a network policy identifier.
// Deprecated: This service will be removed when Cilium 1.20 is the oldest supported release.
service NetworkPolicyDiscoveryService {
option (envoy.annotations.resource).type = "cilium.NetworkPolicy";

rpc DeltaNetworkPolicies(stream envoy.service.discovery.v3.DeltaDiscoveryRequest)
returns (stream envoy.service.discovery.v3.DeltaDiscoveryResponse) {
}

rpc StreamNetworkPolicies(stream envoy.service.discovery.v3.DiscoveryRequest)
returns (stream envoy.service.discovery.v3.DiscoveryResponse) {
}
Expand All @@ -33,6 +38,36 @@ service NetworkPolicyDiscoveryService {
}
}

// Policy and selector resource names are exact-match identifiers in NPRDS.
service NetworkPolicyResourceDiscoveryService {
option (envoy.annotations.resource).type = "cilium.NetworkPolicyResource";

rpc StreamNetworkPolicyResources(stream envoy.service.discovery.v3.DiscoveryRequest)
returns (stream envoy.service.discovery.v3.DiscoveryResponse) {
}

rpc DeltaNetworkPolicyResources(stream envoy.service.discovery.v3.DeltaDiscoveryRequest)
returns (stream envoy.service.discovery.v3.DeltaDiscoveryResponse) {
}
}

// An NPRDS resource that carries either an endpoint policy or a shared selector.
message NetworkPolicyResource {
oneof resource {
NetworkPolicy policy = 1;
Selector selector = 2;
}
}

// A shared set of remote identities referenced by selector resource name.
// Unlike the old state-of-the-world remote identity lists, an empty selector
// matches nothing.
message Selector {
// The set of numeric remote security IDs selected by this selector.
// If empty, this selector selects no remote identities.
repeated uint32 remote_identities = 1;
}

// A network policy that is enforced by a filter on the network flows to/from
// associated hosts.
message NetworkPolicy {
Expand Down Expand Up @@ -153,6 +188,12 @@ message PortNetworkPolicyRule {
// Optional. If not specified, any remote host is matched by this predicate.
repeated uint32 remote_policies = 7;

// Optional selector resource names that can be resolved to shared remote
// policy sets in delta NPDS.
// Selector references are matched by exact selector resource name.
// Optional. If not specified, any remote host is matched by this predicate.
repeated string selectors = 11;

// Optional downstream TLS context. If present, the incoming connection must
// be a TLS connection.
TLSContext downstream_tls_context = 3;
Expand Down
4 changes: 4 additions & 0 deletions cilium/api/nphds.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ service NetworkPolicyHostsDiscoveryService {
body: "*"
};
}

rpc DeltaNetworkPolicyHosts(stream envoy.service.discovery.v3.DeltaDiscoveryRequest)
returns (stream envoy.service.discovery.v3.DeltaDiscoveryResponse) {
}
}

// The mapping of a network policy identifier to the IP addresses of all the
Expand Down
32 changes: 18 additions & 14 deletions cilium/bpf_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -235,14 +235,15 @@ Config::Config(const ::cilium::BpfMetadata& config,
config.ipv6_source_address()));
}
if (config.use_nphds()) {
hosts_ =
context.serverFactoryContext().singletonManager().getTyped<const Cilium::PolicyHostMap>(
SINGLETON_MANAGER_REGISTERED_NAME(cilium_host_map),
[&context, config_source = config_source_] {
auto map = std::make_shared<Cilium::PolicyHostMap>(context.serverFactoryContext());
map->startSubscription(context.serverFactoryContext(), config_source);
return map;
});
hosts_ = context.serverFactoryContext().singletonManager().getTyped<Cilium::PolicyHostMap>(
SINGLETON_MANAGER_REGISTERED_NAME(cilium_host_map),
[&context, config_source = config_source_] {
auto map = std::make_shared<Cilium::PolicyHostMap>(context.serverFactoryContext());
map->startSubscription(context.serverFactoryContext(), config_source);
return map;
});
// update desired config source on the map
hosts_->setConfigSource(config_source_);
}

// Note: all instances use the bpf root of the first filter with non-empty
Expand Down Expand Up @@ -279,12 +280,15 @@ Config::Config(const ::cilium::BpfMetadata& config,
// instances!
// Only created if either ipcache_ or hosts_ map exists
if (ipcache_ || hosts_) {
npmap_ =
context.serverFactoryContext().singletonManager().getTyped<const Cilium::NetworkPolicyMap>(
SINGLETON_MANAGER_REGISTERED_NAME(cilium_network_policy),
[&context, config_source = config_source_] {
return std::make_shared<Cilium::NetworkPolicyMap>(context, config_source, true);
});
bool use_nprds = config.policy_type() == cilium::BpfMetadata::NPRDS;
npmap_ = context.serverFactoryContext().singletonManager().getTyped<Cilium::NetworkPolicyMap>(
SINGLETON_MANAGER_REGISTERED_NAME(cilium_network_policy),
[&context, use_nprds, config_source = config_source_] {
return std::make_shared<Cilium::NetworkPolicyMap>(context, use_nprds, config_source,
true);
});
// update desired config on the map
npmap_->setConfig(use_nprds, config_source_);
}
}

Expand Down
4 changes: 2 additions & 2 deletions cilium/bpf_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,10 @@ class Config : public Cilium::PolicyResolver,
Random::RandomGenerator& random_;
envoy::config::core::v3::ConfigSource config_source_;

std::shared_ptr<const Cilium::NetworkPolicyMap> npmap_;
std::shared_ptr<Cilium::NetworkPolicyMap> npmap_;
Cilium::CtMapSharedPtr ct_maps_;
Cilium::IpCacheSharedPtr ipcache_;
std::shared_ptr<const Cilium::PolicyHostMap> hosts_;
std::shared_ptr<Cilium::PolicyHostMap> hosts_;

private:
uint32_t resolveSourceIdentity(const Network::Address::Ip* sip, const Network::Address::Ip* dip,
Expand Down
145 changes: 104 additions & 41 deletions cilium/grpc_subscription.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@

#include <fmt/format.h>

#include <chrono>
#include <functional>
#include <memory>
#include <string>
#include <utility>
#include <vector>

#include "envoy/annotations/resource.pb.h"
#include "envoy/common/callback.h"
#include "envoy/common/exception.h"
#include "envoy/config/core/v3/config_source.pb.h"
#include "envoy/config/custom_config_validators.h"
#include "envoy/config/grpc_mux.h"
#include "envoy/config/subscription.h"
#include "envoy/config/subscription_factory.h"
#include "envoy/grpc/async_client.h"
Expand All @@ -25,9 +27,12 @@
#include "source/common/grpc/common.h"
#include "source/common/protobuf/protobuf.h" // IWYU pragma: keep
#include "source/extensions/config_subscription/grpc/grpc_mux_context.h"
#include "source/extensions/config_subscription/grpc/grpc_mux_impl.h"
#include "source/extensions/config_subscription/grpc/grpc_subscription_impl.h"
#include "source/extensions/config_subscription/grpc/new_grpc_mux_impl.h"

#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
#include "absl/status/statusor.h"
#include "absl/strings/match.h"
#include "absl/strings/string_view.h"
Expand All @@ -38,6 +43,31 @@ namespace Cilium {

namespace {

class StreamEventSubscription : public Config::Subscription {
public:
StreamEventSubscription(std::unique_ptr<Config::Subscription> subscription,
Common::CallbackHandlePtr stream_event_handle)
: subscription_(std::move(subscription)),
stream_event_handle_(std::move(stream_event_handle)) {}

void start(const absl::flat_hash_set<std::string>& resource_names) override {
subscription_->start(resource_names);
}

void
updateResourceInterest(const absl::flat_hash_set<std::string>& update_to_these_names) override {
subscription_->updateResourceInterest(update_to_these_names);
}

void requestOnDemandUpdate(const absl::flat_hash_set<std::string>& add_these_names) override {
subscription_->requestOnDemandUpdate(add_these_names);
}

private:
std::unique_ptr<Config::Subscription> subscription_;
Common::CallbackHandlePtr stream_event_handle_;
};

// service RPC method fully qualified names.
struct Service {
std::string sotw_grpc_method_;
Expand All @@ -57,6 +87,7 @@ TypeUrlToServiceMap* buildTypeUrlToServiceMap() {
// https://www.mail-archive.com/protobuf@googlegroups.com/msg04540.html.
for (absl::string_view name : {
"cilium.NetworkPolicyDiscoveryService",
"cilium.NetworkPolicyResourceDiscoveryService",
"cilium.NetworkPolicyHostsDiscoveryService",
}) {
const auto* service_desc =
Expand Down Expand Up @@ -124,49 +155,81 @@ subscribe(const absl::string_view type_url,
Server::Configuration::CommonFactoryContext& context, Stats::Scope& scope,
Config::SubscriptionCallbacks& callbacks,
Config::OpaqueResourceDecoderSharedPtr resource_decoder,
std::chrono::milliseconds init_fetch_timeout) {
auto& api_config_source = config_source.api_config_source();
THROW_IF_NOT_OK(Config::Utility::checkApiConfigSourceSubscriptionBackingCluster(
context.clusterManager().primaryClusters(), api_config_source));

Config::GrpcMuxStreamEventCallback on_stream_event) {
auto initial_fetch_timeout = Config::Utility::configSourceInitialFetchTimeout(config_source);
Config::SubscriptionStats stats = Config::Utility::generateStats(scope);
Envoy::Config::SubscriptionOptions options;

// No-op custom validators
Envoy::Config::CustomConfigValidatorsPtr nop_config_validators =
std::make_unique<NopConfigValidatorsImpl>();
auto factory_or_error = Config::Utility::factoryForGrpcApiConfigSource(
context.clusterManager().grpcAsyncClientManager(), api_config_source, scope, true, 0, false);
THROW_IF_NOT_OK_REF(factory_or_error.status());

absl::StatusOr<Config::RateLimitSettings> rate_limit_settings_or_error =
Config::Utility::parseRateLimitSettings(api_config_source);
THROW_IF_NOT_OK_REF(rate_limit_settings_or_error.status());

Config::GrpcMuxContext grpc_mux_context{
/*async_client_=*/THROW_OR_RETURN_VALUE(
factory_or_error.value()->createUncachedRawAsyncClient(), Grpc::RawAsyncClientPtr),
/*failover_async_client_=*/nullptr,
/*dispatcher_=*/context.mainThreadDispatcher(),
/*service_method_=*/sotwGrpcMethod(type_url),
/*local_info_=*/context.localInfo(),
/*rate_limit_settings_=*/rate_limit_settings_or_error.value(),
/*scope_=*/scope,
/*config_validators_=*/std::move(nop_config_validators),
/*xds_resources_delegate_=*/absl::nullopt,
/*xds_config_tracker_=*/absl::nullopt,
/*backoff_strategy_=*/
std::make_unique<JitteredExponentialBackOffStrategy>(
Config::SubscriptionFactory::RetryInitialDelayMs,
Config::SubscriptionFactory::RetryMaxDelayMs, context.api().randomGenerator()),
/*target_xds_authority_=*/"",
/*eds_resources_cache_=*/nullptr, // EDS cache is only used for ADS.
/*skip_subsequent_node_=*/api_config_source.set_node_on_first_message_only(),
};

return std::make_unique<Config::GrpcSubscriptionImpl>(
std::make_shared<GrpcMuxImpl>(grpc_mux_context), callbacks, resource_decoder, stats, type_url,
context.mainThreadDispatcher(), init_fetch_timeout, /*is_aggregated*/ false, options);
std::shared_ptr<Config::GrpcMux> grpc_mux;
bool is_aggregated =
config_source.config_source_specifier_case() == envoy::config::core::v3::ConfigSource::kAds;
if (is_aggregated) {
grpc_mux = std::static_pointer_cast<Config::GrpcMux>(context.xdsManager().adsMux());
} else {
auto& api_config_source = config_source.api_config_source();
THROW_IF_NOT_OK(Config::Utility::checkApiConfigSourceSubscriptionBackingCluster(
context.clusterManager().primaryClusters(), api_config_source));

// No-op custom validators
Envoy::Config::CustomConfigValidatorsPtr nop_config_validators =
std::make_unique<NopConfigValidatorsImpl>();
auto factory_or_error = Config::Utility::factoryForGrpcApiConfigSource(
context.clusterManager().grpcAsyncClientManager(), api_config_source, scope, true, 0,
false);
THROW_IF_NOT_OK_REF(factory_or_error.status());

absl::StatusOr<Config::RateLimitSettings> rate_limit_settings_or_error =
Config::Utility::parseRateLimitSettings(api_config_source);
THROW_IF_NOT_OK_REF(rate_limit_settings_or_error.status());

const auto& api_type = api_config_source.api_type();
bool use_delta = api_type == envoy::config::core::v3::ApiConfigSource::DELTA_GRPC ||
api_type == envoy::config::core::v3::ApiConfigSource::AGGREGATED_DELTA_GRPC;
const auto& service_method = use_delta ? deltaGrpcMethod(type_url) : sotwGrpcMethod(type_url);

Config::GrpcMuxContext grpc_mux_context{
THROW_OR_RETURN_VALUE(factory_or_error.value()->createUncachedRawAsyncClient(),
Grpc::RawAsyncClientPtr),
/*failover_async_client_=*/nullptr,
context.mainThreadDispatcher(),
service_method,
context.localInfo(),
rate_limit_settings_or_error.value(),
scope,
std::move(nop_config_validators),
/*xds_resources_delegate_=*/absl::nullopt,
/*xds_config_tracker_=*/absl::nullopt,
std::make_unique<JitteredExponentialBackOffStrategy>(
Config::SubscriptionFactory::RetryInitialDelayMs,
Config::SubscriptionFactory::RetryMaxDelayMs, context.api().randomGenerator()),
/*target_xds_authority_=*/"",
/*eds_resources_cache_=*/nullptr, // EDS cache is only used for ADS.
/*skip_subsequent_node_=*/api_config_source.set_node_on_first_message_only(),
};

grpc_mux = use_delta ? std::static_pointer_cast<Config::GrpcMux>(
std::make_shared<Config::NewGrpcMuxImpl>(grpc_mux_context))
: std::static_pointer_cast<Config::GrpcMux>(
std::make_shared<Config::GrpcMuxImpl>(grpc_mux_context));
}

Common::CallbackHandlePtr stream_event_handle;
if (on_stream_event) {
auto stream_event_callback = std::move(on_stream_event);
stream_event_handle = grpc_mux->addStreamEventCallback(stream_event_callback);
if (grpc_mux->grpcStreamConnected()) {
stream_event_callback(Config::GrpcMuxStreamEvent::Established);
}
}

auto subscription = std::make_unique<Config::GrpcSubscriptionImpl>(
grpc_mux, callbacks, resource_decoder, stats, type_url, context.mainThreadDispatcher(),
initial_fetch_timeout, is_aggregated, options);
if (stream_event_handle) {
return std::make_unique<StreamEventSubscription>(std::move(subscription),
std::move(stream_event_handle));
}
return subscription;
}

} // namespace Cilium
Expand Down
Loading
Loading