Skip to content

Commit 32889be

Browse files
committed
api: Add Delta NPDS and NPHDS
Add Delta rpc to the APIs so that we can run NPDS and NPHDS also via Delta xDS. Signed-off-by: Jarno Rajahalme <jarno@isovalent.com>
1 parent daed257 commit 32889be

15 files changed

Lines changed: 1383 additions & 216 deletions

cilium/api/npds.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ import "validate/validate.proto";
2020
service NetworkPolicyDiscoveryService {
2121
option (envoy.annotations.resource).type = "cilium.NetworkPolicy";
2222

23+
rpc DeltaNetworkPolicies(stream envoy.service.discovery.v3.DeltaDiscoveryRequest)
24+
returns (stream envoy.service.discovery.v3.DeltaDiscoveryResponse) {
25+
}
26+
2327
rpc StreamNetworkPolicies(stream envoy.service.discovery.v3.DiscoveryRequest)
2428
returns (stream envoy.service.discovery.v3.DiscoveryResponse) {
2529
}

cilium/api/nphds.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ service NetworkPolicyHostsDiscoveryService {
2626
body: "*"
2727
};
2828
}
29+
30+
rpc DeltaNetworkPolicyHosts(stream envoy.service.discovery.v3.DeltaDiscoveryRequest)
31+
returns (stream envoy.service.discovery.v3.DeltaDiscoveryResponse) {
32+
}
2933
}
3034

3135
// The mapping of a network policy identifier to the IP addresses of all the

cilium/bpf_metadata.cc

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -235,14 +235,15 @@ Config::Config(const ::cilium::BpfMetadata& config,
235235
config.ipv6_source_address()));
236236
}
237237
if (config.use_nphds()) {
238-
hosts_ =
239-
context.serverFactoryContext().singletonManager().getTyped<const Cilium::PolicyHostMap>(
240-
SINGLETON_MANAGER_REGISTERED_NAME(cilium_host_map),
241-
[&context, config_source = config_source_] {
242-
auto map = std::make_shared<Cilium::PolicyHostMap>(context.serverFactoryContext());
243-
map->startSubscription(context.serverFactoryContext(), config_source);
244-
return map;
245-
});
238+
hosts_ = context.serverFactoryContext().singletonManager().getTyped<Cilium::PolicyHostMap>(
239+
SINGLETON_MANAGER_REGISTERED_NAME(cilium_host_map),
240+
[&context, config_source = config_source_] {
241+
auto map = std::make_shared<Cilium::PolicyHostMap>(context.serverFactoryContext());
242+
map->startSubscription(context.serverFactoryContext(), config_source);
243+
return map;
244+
});
245+
// update desired config source on the map
246+
hosts_->setConfigSource(config_source_);
246247
}
247248

248249
// Note: all instances use the bpf root of the first filter with non-empty
@@ -279,12 +280,13 @@ Config::Config(const ::cilium::BpfMetadata& config,
279280
// instances!
280281
// Only created if either ipcache_ or hosts_ map exists
281282
if (ipcache_ || hosts_) {
282-
npmap_ =
283-
context.serverFactoryContext().singletonManager().getTyped<const Cilium::NetworkPolicyMap>(
284-
SINGLETON_MANAGER_REGISTERED_NAME(cilium_network_policy),
285-
[&context, config_source = config_source_] {
286-
return std::make_shared<Cilium::NetworkPolicyMap>(context, config_source, true);
287-
});
283+
npmap_ = context.serverFactoryContext().singletonManager().getTyped<Cilium::NetworkPolicyMap>(
284+
SINGLETON_MANAGER_REGISTERED_NAME(cilium_network_policy),
285+
[&context, config_source = config_source_] {
286+
return std::make_shared<Cilium::NetworkPolicyMap>(context, config_source, true);
287+
});
288+
// update desired config source on the map
289+
npmap_->setConfigSource(config_source_);
288290
}
289291
}
290292

cilium/bpf_metadata.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,10 +175,10 @@ class Config : public Cilium::PolicyResolver,
175175
Random::RandomGenerator& random_;
176176
envoy::config::core::v3::ConfigSource config_source_;
177177

178-
std::shared_ptr<const Cilium::NetworkPolicyMap> npmap_;
178+
std::shared_ptr<Cilium::NetworkPolicyMap> npmap_;
179179
Cilium::CtMapSharedPtr ct_maps_;
180180
Cilium::IpCacheSharedPtr ipcache_;
181-
std::shared_ptr<const Cilium::PolicyHostMap> hosts_;
181+
std::shared_ptr<Cilium::PolicyHostMap> hosts_;
182182

183183
private:
184184
uint32_t resolveSourceIdentity(const Network::Address::Ip* sip, const Network::Address::Ip* dip,

cilium/host_map.cc

Lines changed: 182 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,18 @@
44
#include <fmt/format.h>
55
#include <sys/socket.h>
66

7+
#include <charconv>
78
#include <cstdint>
89
#include <cstring>
910
#include <memory>
1011
#include <string>
12+
#include <system_error>
1113
#include <utility>
1214
#include <vector>
1315

1416
#include "envoy/common/exception.h"
1517
#include "envoy/config/core/v3/config_source.pb.h"
18+
#include "envoy/config/grpc_mux.h"
1619
#include "envoy/config/subscription.h"
1720
#include "envoy/event/dispatcher.h"
1821
#include "envoy/server/factory_context.h"
@@ -21,9 +24,11 @@
2124
#include "envoy/thread_local/thread_local.h"
2225
#include "envoy/thread_local/thread_local_object.h"
2326

27+
#include "source/common/common/assert.h"
2428
#include "source/common/common/logger.h"
2529
#include "source/common/common/macros.h"
2630

31+
#include "absl/container/flat_hash_set.h"
2732
#include "absl/numeric/int128.h"
2833
#include "absl/status/status.h"
2934
#include "absl/strings/str_cat.h"
@@ -58,9 +63,18 @@ unsigned int checkPrefix(T addr, bool have_prefix, unsigned int plen, absl::stri
5863
} // namespace
5964

6065
struct ThreadLocalHostMapInitializer : public PolicyHostMap::ThreadLocalHostMap {
61-
protected:
66+
public:
6267
friend class PolicyHostMap; // PolicyHostMap can insert();
6368

69+
ThreadLocalHostMapInitializer() = default;
70+
71+
explicit ThreadLocalHostMapInitializer(const PolicyHostMap::ThreadLocalHostMap* host_map) {
72+
if (host_map != nullptr) {
73+
static_cast<PolicyHostMap::ThreadLocalHostMap&>(*this) = *host_map;
74+
}
75+
}
76+
77+
protected:
6478
// find the map of the given prefix length, insert in the decreasing order if
6579
// it does not exist
6680
template <typename M>
@@ -159,6 +173,29 @@ struct ThreadLocalHostMapInitializer : public PolicyHostMap::ThreadLocalHostMap
159173
fmt::format("NetworkPolicyHosts: Invalid host entry \'{}\' for policy {}", host, policy));
160174
}
161175
}
176+
177+
template <typename MapVec>
178+
void prunePolicyMapVec(MapVec& maps, const absl::flat_hash_set<uint64_t>& nids) {
179+
for (auto vec_it = maps.begin(); vec_it != maps.end();) {
180+
auto& map = vec_it->second;
181+
for (auto map_it = map.begin(); map_it != map.end();) {
182+
auto it = map_it++;
183+
if (nids.contains(it->second)) {
184+
map.erase(it);
185+
}
186+
}
187+
if (map.empty()) {
188+
vec_it = maps.erase(vec_it);
189+
} else {
190+
++vec_it;
191+
}
192+
}
193+
}
194+
195+
void remove(const absl::flat_hash_set<uint64_t>& removed_nids) {
196+
prunePolicyMapVec(ipv4_to_policy_, removed_nids);
197+
prunePolicyMapVec(ipv6_to_policy_, removed_nids);
198+
}
162199
};
163200

164201
uint64_t PolicyHostMap::instance_id_ = 0;
@@ -196,22 +233,155 @@ PolicyHostMap::PolicyHostMap(Server::Configuration::CommonFactoryContext& contex
196233
}
197234

198235
void PolicyHostMap::startSubscription(Server::Configuration::CommonFactoryContext& context,
199-
const envoy::config::core::v3::ConfigSource& config_source) {
200-
if (config_source.config_source_specifier_case() == envoy::config::core::v3::ConfigSource::kAds) {
201-
auto ads_mux = context.xdsManager().adsMux();
202-
subscription_ = THROW_OR_RETURN_VALUE(
203-
context.clusterManager().subscriptionFactory().subscriptionOverAdsGrpcMux(
204-
ads_mux, config_source, NetworkPolicyHostsTypeUrl, *scope_, *this,
205-
std::make_shared<Cilium::PolicyHostDecoder>(), {}),
206-
Config::SubscriptionPtr);
207-
} else {
208-
subscription_ = subscribe(NetworkPolicyHostsTypeUrl, config_source, context, *scope_, *this,
209-
std::make_shared<Cilium::PolicyHostDecoder>());
236+
const envoy::config::core::v3::ConfigSource& npds_config) {
237+
context_ = &context;
238+
desired_config_source_ = npds_config;
239+
subscribe();
240+
}
241+
242+
void PolicyHostMap::setConfigSource(const envoy::config::core::v3::ConfigSource& config_source) {
243+
desired_config_source_ = config_source;
244+
if (context_ != nullptr) {
245+
maybeRecreateSubscriptionInDesiredMode(/*transport_closed=*/false);
246+
}
247+
}
248+
249+
bool PolicyHostMap::subscriptionUseDeltaXds() const {
250+
if (!config_source_.has_api_config_source()) {
251+
return false;
210252
}
253+
const auto& api_type = config_source_.api_config_source().api_type();
254+
return api_type == envoy::config::core::v3::ApiConfigSource::DELTA_GRPC ||
255+
api_type == envoy::config::core::v3::ApiConfigSource::AGGREGATED_DELTA_GRPC;
256+
}
211257

258+
void PolicyHostMap::subscribe() {
259+
ASSERT(context_ != nullptr);
260+
subscription_connected_ = false;
261+
config_source_ = desired_config_source_;
262+
++subscription_id_;
263+
264+
auto on_stream_event = [weak_this = weak_from_this(),
265+
id = subscription_id_](Config::GrpcMuxStreamEvent event) {
266+
if (auto shared_this = weak_this.lock()) {
267+
shared_this->onSubscriptionStreamEvent(id, event);
268+
}
269+
};
270+
271+
subscription_ =
272+
Cilium::subscribe(NetworkPolicyHostsTypeUrl, config_source_, *context_, *scope_, *this,
273+
std::make_shared<Cilium::PolicyHostDecoder>(), std::move(on_stream_event));
212274
subscription_->start({});
213275
}
214276

277+
void PolicyHostMap::onSubscriptionStreamEvent(uint64_t subscription_id,
278+
Config::GrpcMuxStreamEvent event) {
279+
if (subscription_id != subscription_id_) {
280+
return;
281+
}
282+
283+
switch (event) {
284+
case Config::GrpcMuxStreamEvent::Established:
285+
subscription_connected_ = true;
286+
break;
287+
case Config::GrpcMuxStreamEvent::Closed:
288+
if (!subscription_connected_) {
289+
return;
290+
}
291+
subscription_connected_ = false;
292+
293+
if (context_ == nullptr) {
294+
return;
295+
}
296+
297+
context_->mainThreadDispatcher().post(
298+
[weak_this = weak_from_this(), subscription_id = subscription_id_]() {
299+
if (auto shared_this = weak_this.lock()) {
300+
if (subscription_id != shared_this->subscription_id_) {
301+
return;
302+
}
303+
shared_this->maybeRecreateSubscriptionInDesiredMode(/*transport_closed=*/true);
304+
}
305+
});
306+
break;
307+
}
308+
}
309+
310+
void PolicyHostMap::maybeRecreateSubscriptionInDesiredMode(bool transport_closed) {
311+
if (subscription_ && (subscription_connected_ || !transport_closed)) {
312+
if (subscription_connected_ && subscriptionUseDeltaXds()) {
313+
return;
314+
}
315+
if (Protobuf::util::MessageDifferencer::Equals(config_source_, desired_config_source_)) {
316+
return;
317+
}
318+
}
319+
320+
subscribe();
321+
}
322+
323+
absl::Status
324+
PolicyHostMap::onConfigUpdate(const std::vector<Envoy::Config::DecodedResourceRef>& added_resources,
325+
const Protobuf::RepeatedPtrField<std::string>& removed_resources,
326+
const std::string& system_version_info) {
327+
const bool is_new_stream = subscription_id_ != accepted_subscription_id_;
328+
ENVOY_LOG(
329+
debug,
330+
"PolicyHostMap::onConfigUpdate({}), {} added_resources, {} removed_resources, version: {}, "
331+
"subscription_id: {}, accepted_subscription_id: {}, is_new_stream: {}",
332+
name_, added_resources.size(), removed_resources.size(), system_version_info,
333+
subscription_id_, accepted_subscription_id_, is_new_stream);
334+
335+
auto newmap =
336+
std::make_shared<ThreadLocalHostMapInitializer>(is_new_stream ? nullptr : getHostMap());
337+
338+
absl::flat_hash_set<uint64_t> to_remove;
339+
to_remove.reserve(added_resources.size() + removed_resources.size());
340+
341+
for (const auto& name : removed_resources) {
342+
uint64_t nid = 0;
343+
auto [ptr, ec] = std::from_chars(name.data(), name.data() + name.size(), nid);
344+
if (ec != std::errc{} || ptr != name.data() + name.size()) {
345+
throw EnvoyException(fmt::format("Invalid removed resource name '{}'", name));
346+
}
347+
ENVOY_LOG(trace,
348+
"Removing NetworkPolicyHosts for policy {} in delta onConfigUpdate() version {}", nid,
349+
system_version_info);
350+
to_remove.insert(nid);
351+
}
352+
for (const auto& resource : added_resources) {
353+
const auto& config = dynamic_cast<const cilium::NetworkPolicyHosts&>(resource.get().resource());
354+
to_remove.insert(config.policy());
355+
}
356+
newmap->remove(to_remove);
357+
358+
for (const auto& resource : added_resources) {
359+
const auto& config = dynamic_cast<const cilium::NetworkPolicyHosts&>(resource.get().resource());
360+
ENVOY_LOG(trace,
361+
"Received NetworkPolicyHosts for policy {} in delta onConfigUpdate() version {}",
362+
config.policy(), system_version_info);
363+
newmap->insert(config);
364+
}
365+
366+
// Force 'this' to be not deleted for as long as the lambda stays
367+
// alive. Note that generally capturing a shared pointer is
368+
// dangerous as it may happen that there is a circular reference
369+
// from 'this' to itself via the lambda capture, leading to 'this'
370+
// never being released. It should happen in this case, though.
371+
std::shared_ptr<PolicyHostMap> shared_this = shared_from_this();
372+
373+
// Assign the new map to all threads.
374+
tls_->set([shared_this, newmap](Event::Dispatcher&) -> ThreadLocal::ThreadLocalObjectSharedPtr {
375+
UNREFERENCED_PARAMETER(shared_this);
376+
ENVOY_LOG(trace, "PolicyHostMap: Assigning new map");
377+
return newmap;
378+
});
379+
logmaps("delta onConfigUpdate");
380+
accepted_subscription_id_ = subscription_id_;
381+
stats_.update_success_.inc();
382+
return absl::OkStatus();
383+
}
384+
215385
absl::Status
216386
PolicyHostMap::onConfigUpdate(const std::vector<Envoy::Config::DecodedResourceRef>& resources,
217387
const std::string& version_info) {

cilium/host_map.h

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
#include "envoy/common/exception.h"
1717
#include "envoy/config/core/v3/config_source.pb.h"
18+
#include "envoy/config/grpc_mux.h"
1819
#include "envoy/config/subscription.h"
1920
#include "envoy/network/address.h"
2021
#include "envoy/protobuf/message_validator.h"
@@ -26,7 +27,6 @@
2627
#include "envoy/thread_local/thread_local_object.h"
2728

2829
#include "source/common/common/logger.h"
29-
#include "source/common/common/macros.h"
3030
#include "source/common/network/utility.h"
3131
#include "source/common/protobuf/message_validator_impl.h"
3232
#include "source/common/protobuf/protobuf.h"
@@ -118,6 +118,8 @@ class PolicyHostMap : public Singleton::Instance,
118118
void startSubscription(Server::Configuration::CommonFactoryContext& context,
119119
const envoy::config::core::v3::ConfigSource& config_source);
120120

121+
void setConfigSource(const envoy::config::core::v3::ConfigSource& config_source);
122+
121123
// This is used for testing with a file-based subscription
122124
void startSubscription(std::unique_ptr<Envoy::Config::Subscription>&& subscription) {
123125
subscription_ = std::move(subscription);
@@ -229,22 +231,30 @@ class PolicyHostMap : public Singleton::Instance,
229231
const std::string& version_info) override;
230232
absl::Status onConfigUpdate(const std::vector<Envoy::Config::DecodedResourceRef>& added_resources,
231233
const Protobuf::RepeatedPtrField<std::string>& removed_resources,
232-
const std::string& system_version_info) override {
233-
// NOT IMPLEMENTED YET.
234-
UNREFERENCED_PARAMETER(added_resources);
235-
UNREFERENCED_PARAMETER(removed_resources);
236-
UNREFERENCED_PARAMETER(system_version_info);
237-
return absl::OkStatus();
238-
}
234+
const std::string& system_version_info) override;
239235
void onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason,
240236
const EnvoyException* e) override;
241237

242238
private:
239+
bool subscriptionUseDeltaXds() const;
240+
void subscribe();
241+
void onSubscriptionStreamEvent(uint64_t subscription_id, Config::GrpcMuxStreamEvent event);
242+
void maybeRecreateSubscriptionInDesiredMode(bool transport_closed);
243+
243244
ThreadLocal::SlotPtr tls_;
244245
std::string name_;
245246
Stats::ScopeSharedPtr scope_;
246247
Stats::ScopeSharedPtr stats_scope_;
247248
std::unique_ptr<Envoy::Config::Subscription> subscription_;
249+
Server::Configuration::CommonFactoryContext* context_{nullptr};
250+
// We need a separate desired_config_source_ as it may be set to a pessimistic value via explicit
251+
// BpfMetadata config in CiliumEnvoyConfig CRD, and we should not change to a "worse" (e.g., SotW)
252+
// ConfigSource if "better" (e.g., Delta) is already up-and-running (in config_source_).
253+
envoy::config::core::v3::ConfigSource desired_config_source_;
254+
envoy::config::core::v3::ConfigSource config_source_;
255+
uint64_t subscription_id_{0};
256+
uint64_t accepted_subscription_id_{0};
257+
bool subscription_connected_{false};
248258
static uint64_t instance_id_;
249259
PolicyHostsStats stats_;
250260
};

0 commit comments

Comments
 (0)