Skip to content

Commit f2c684a

Browse files
committed
api: Add Delta NPHDS
Add Delta rpc to the API so that we can run NPHDS also via Delta xDS. Signed-off-by: Jarno Rajahalme <jarno@isovalent.com>
1 parent 4f13b40 commit f2c684a

7 files changed

Lines changed: 403 additions & 32 deletions

File tree

cilium/api/nphds.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ service NetworkPolicyHostsDiscoveryService {
2626
body: "*"
2727
};
2828
}
29+
30+
rpc DeltaNetworkPolicyHosts(stream envoy.service.discovery.v3.DeltaDiscoveryRequest)
31+
returns (stream envoy.service.discovery.v3.DeltaDiscoveryResponse) {
32+
}
2933
}
3034

3135
// The mapping of a network policy identifier to the IP addresses of all the

cilium/bpf_metadata.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,7 @@ Config::Config(const ::cilium::BpfMetadata& config,
243243
map->startSubscription(context.serverFactoryContext(), config_source);
244244
return map;
245245
});
246+
hosts_->setConfigSource(config_source_);
246247
}
247248

248249
// Note: all instances use the bpf root of the first filter with non-empty

cilium/host_map.cc

Lines changed: 184 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,18 @@
44
#include <fmt/format.h>
55
#include <sys/socket.h>
66

7+
#include <charconv>
78
#include <cstdint>
89
#include <cstring>
910
#include <memory>
1011
#include <string>
12+
#include <system_error>
1113
#include <utility>
1214
#include <vector>
1315

1416
#include "envoy/common/exception.h"
1517
#include "envoy/config/core/v3/config_source.pb.h"
18+
#include "envoy/config/grpc_mux.h"
1619
#include "envoy/config/subscription.h"
1720
#include "envoy/event/dispatcher.h"
1821
#include "envoy/server/factory_context.h"
@@ -21,9 +24,11 @@
2124
#include "envoy/thread_local/thread_local.h"
2225
#include "envoy/thread_local/thread_local_object.h"
2326

27+
#include "source/common/common/assert.h"
2428
#include "source/common/common/logger.h"
2529
#include "source/common/common/macros.h"
2630

31+
#include "absl/container/flat_hash_set.h"
2732
#include "absl/numeric/int128.h"
2833
#include "absl/status/status.h"
2934
#include "absl/strings/str_cat.h"
@@ -58,9 +63,18 @@ unsigned int checkPrefix(T addr, bool have_prefix, unsigned int plen, absl::stri
5863
} // namespace
5964

6065
struct ThreadLocalHostMapInitializer : public PolicyHostMap::ThreadLocalHostMap {
61-
protected:
66+
public:
6267
friend class PolicyHostMap; // PolicyHostMap can insert();
6368

69+
ThreadLocalHostMapInitializer() = default;
70+
71+
explicit ThreadLocalHostMapInitializer(const PolicyHostMap::ThreadLocalHostMap* host_map) {
72+
if (host_map != nullptr) {
73+
static_cast<PolicyHostMap::ThreadLocalHostMap&>(*this) = *host_map;
74+
}
75+
}
76+
77+
protected:
6478
// find the map of the given prefix length, insert in the decreasing order if
6579
// it does not exist
6680
template <typename M>
@@ -159,6 +173,29 @@ struct ThreadLocalHostMapInitializer : public PolicyHostMap::ThreadLocalHostMap
159173
fmt::format("NetworkPolicyHosts: Invalid host entry \'{}\' for policy {}", host, policy));
160174
}
161175
}
176+
177+
template <typename MapVec>
178+
void prunePolicyMapVec(MapVec& maps, const absl::flat_hash_set<uint64_t>& nids) {
179+
for (auto vec_it = maps.begin(); vec_it != maps.end();) {
180+
auto& map = vec_it->second;
181+
for (auto map_it = map.begin(); map_it != map.end();) {
182+
auto it = map_it++;
183+
if (nids.contains(it->second)) {
184+
map.erase(it);
185+
}
186+
}
187+
if (map.empty()) {
188+
vec_it = maps.erase(vec_it);
189+
} else {
190+
++vec_it;
191+
}
192+
}
193+
}
194+
195+
void remove(const absl::flat_hash_set<uint64_t> removed_nids) {
196+
prunePolicyMapVec(ipv4_to_policy_, removed_nids);
197+
prunePolicyMapVec(ipv6_to_policy_, removed_nids);
198+
}
162199
};
163200

164201
uint64_t PolicyHostMap::instance_id_ = 0;
@@ -196,22 +233,157 @@ PolicyHostMap::PolicyHostMap(Server::Configuration::CommonFactoryContext& contex
196233
}
197234

198235
void PolicyHostMap::startSubscription(Server::Configuration::CommonFactoryContext& context,
199-
const envoy::config::core::v3::ConfigSource& config_source) {
200-
if (config_source.config_source_specifier_case() == envoy::config::core::v3::ConfigSource::kAds) {
201-
auto ads_mux = context.xdsManager().adsMux();
202-
subscription_ = THROW_OR_RETURN_VALUE(
203-
context.clusterManager().subscriptionFactory().subscriptionOverAdsGrpcMux(
204-
ads_mux, config_source, NetworkPolicyHostsTypeUrl, *scope_, *this,
205-
std::make_shared<Cilium::PolicyHostDecoder>(), {}),
206-
Config::SubscriptionPtr);
207-
} else {
208-
subscription_ = subscribe(NetworkPolicyHostsTypeUrl, config_source, context, *scope_, *this,
209-
std::make_shared<Cilium::PolicyHostDecoder>());
236+
const envoy::config::core::v3::ConfigSource& npds_config) {
237+
context_ = &context;
238+
desired_config_source_ = npds_config;
239+
subscribe();
240+
}
241+
242+
void PolicyHostMap::setConfigSource(
243+
const envoy::config::core::v3::ConfigSource& config_source) const {
244+
auto* self = const_cast<PolicyHostMap*>(this);
245+
self->desired_config_source_ = config_source;
246+
if (self->context_ != nullptr) {
247+
self->maybeRecreateSubscriptionInDesiredMode(/*transport_closed=*/false);
248+
}
249+
}
250+
251+
bool PolicyHostMap::subscriptionUseDeltaXds() const {
252+
if (!config_source_.has_api_config_source()) {
253+
return false;
210254
}
255+
const auto& api_type = config_source_.api_config_source().api_type();
256+
return api_type == envoy::config::core::v3::ApiConfigSource::DELTA_GRPC ||
257+
api_type == envoy::config::core::v3::ApiConfigSource::AGGREGATED_DELTA_GRPC;
258+
}
211259

260+
void PolicyHostMap::subscribe() {
261+
ASSERT(context_ != nullptr);
262+
subscription_connected_ = false;
263+
config_source_ = desired_config_source_;
264+
++subscription_id_;
265+
266+
auto on_stream_event = [weak_this = weak_from_this(),
267+
id = subscription_id_](Config::GrpcMuxStreamEvent event) {
268+
if (auto shared_this = weak_this.lock()) {
269+
shared_this->onSubscriptionStreamEvent(id, event);
270+
}
271+
};
272+
273+
subscription_ =
274+
Cilium::subscribe(NetworkPolicyHostsTypeUrl, config_source_, *context_, *scope_, *this,
275+
std::make_shared<Cilium::PolicyHostDecoder>(), std::move(on_stream_event));
212276
subscription_->start({});
213277
}
214278

279+
void PolicyHostMap::onSubscriptionStreamEvent(uint64_t subscription_id,
280+
Config::GrpcMuxStreamEvent event) {
281+
if (subscription_id != subscription_id_) {
282+
return;
283+
}
284+
285+
switch (event) {
286+
case Config::GrpcMuxStreamEvent::Established:
287+
subscription_connected_ = true;
288+
break;
289+
case Config::GrpcMuxStreamEvent::Closed:
290+
if (!subscription_connected_) {
291+
return;
292+
}
293+
subscription_connected_ = false;
294+
295+
if (context_ == nullptr) {
296+
return;
297+
}
298+
299+
context_->mainThreadDispatcher().post(
300+
[weak_this = weak_from_this(), subscription_id = subscription_id_]() {
301+
if (auto shared_this = weak_this.lock()) {
302+
if (subscription_id != shared_this->subscription_id_) {
303+
return;
304+
}
305+
shared_this->maybeRecreateSubscriptionInDesiredMode(/*transport_closed=*/true);
306+
}
307+
});
308+
break;
309+
}
310+
}
311+
312+
void PolicyHostMap::maybeRecreateSubscriptionInDesiredMode(bool transport_closed) {
313+
if (subscription_ && (subscription_connected_ || !transport_closed)) {
314+
if (subscription_connected_ && subscriptionUseDeltaXds()) {
315+
return;
316+
}
317+
if (Protobuf::util::MessageDifferencer::Equals(config_source_, desired_config_source_)) {
318+
return;
319+
}
320+
}
321+
322+
subscribe();
323+
}
324+
325+
absl::Status
326+
PolicyHostMap::onConfigUpdate(const std::vector<Envoy::Config::DecodedResourceRef>& added_resources,
327+
const Protobuf::RepeatedPtrField<std::string>& removed_resources,
328+
const std::string& system_version_info) {
329+
const bool is_new_stream = subscription_id_ != accepted_subscription_id_;
330+
ENVOY_LOG(
331+
debug,
332+
"PolicyHostMap::onConfigUpdate({}), {} added_resources, {} removed_resources, version: {}, "
333+
"subscription_id: {}, accepted_subscription_id: {}, is_new_stream: {}",
334+
name_, added_resources.size(), removed_resources.size(), system_version_info,
335+
subscription_id_, accepted_subscription_id_, is_new_stream);
336+
337+
auto newmap =
338+
std::make_shared<ThreadLocalHostMapInitializer>(is_new_stream ? nullptr : getHostMap());
339+
340+
absl::flat_hash_set<uint64_t> to_remove;
341+
to_remove.reserve(added_resources.size() + removed_resources.size());
342+
343+
for (const auto& name : removed_resources) {
344+
uint64_t nid = 0;
345+
auto [ptr, ec] = std::from_chars(name.data(), name.data() + name.size(), nid);
346+
if (ec != std::errc{} || ptr != name.data() + name.size()) {
347+
throw EnvoyException(fmt::format("Invalid removed resource name '{}'", name));
348+
}
349+
ENVOY_LOG(trace,
350+
"Removing NetworkPolicyHosts for policy {} in delta onConfigUpdate() version {}", nid,
351+
system_version_info);
352+
to_remove.insert(nid);
353+
}
354+
for (const auto& resource : added_resources) {
355+
const auto& config = dynamic_cast<const cilium::NetworkPolicyHosts&>(resource.get().resource());
356+
to_remove.insert(config.policy());
357+
}
358+
newmap->remove(to_remove);
359+
360+
for (const auto& resource : added_resources) {
361+
const auto& config = dynamic_cast<const cilium::NetworkPolicyHosts&>(resource.get().resource());
362+
ENVOY_LOG(trace,
363+
"Received NetworkPolicyHosts for policy {} in delta onConfigUpdate() version {}",
364+
config.policy(), system_version_info);
365+
newmap->insert(config);
366+
}
367+
368+
// Force 'this' to be not deleted for as long as the lambda stays
369+
// alive. Note that generally capturing a shared pointer is
370+
// dangerous as it may happen that there is a circular reference
371+
// from 'this' to itself via the lambda capture, leading to 'this'
372+
// never being released. It should happen in this case, though.
373+
std::shared_ptr<PolicyHostMap> shared_this = shared_from_this();
374+
375+
// Assign the new map to all threads.
376+
tls_->set([shared_this, newmap](Event::Dispatcher&) -> ThreadLocal::ThreadLocalObjectSharedPtr {
377+
UNREFERENCED_PARAMETER(shared_this);
378+
ENVOY_LOG(trace, "PolicyHostMap: Assigning new map");
379+
return newmap;
380+
});
381+
logmaps("delta onConfigUpdate");
382+
accepted_subscription_id_ = subscription_id_;
383+
stats_.update_success_.inc();
384+
return absl::OkStatus();
385+
}
386+
215387
absl::Status
216388
PolicyHostMap::onConfigUpdate(const std::vector<Envoy::Config::DecodedResourceRef>& resources,
217389
const std::string& version_info) {

cilium/host_map.h

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
#include "envoy/common/exception.h"
1717
#include "envoy/config/core/v3/config_source.pb.h"
18+
#include "envoy/config/grpc_mux.h"
1819
#include "envoy/config/subscription.h"
1920
#include "envoy/network/address.h"
2021
#include "envoy/protobuf/message_validator.h"
@@ -26,7 +27,6 @@
2627
#include "envoy/thread_local/thread_local_object.h"
2728

2829
#include "source/common/common/logger.h"
29-
#include "source/common/common/macros.h"
3030
#include "source/common/network/utility.h"
3131
#include "source/common/protobuf/message_validator_impl.h"
3232
#include "source/common/protobuf/protobuf.h"
@@ -118,6 +118,8 @@ class PolicyHostMap : public Singleton::Instance,
118118
void startSubscription(Server::Configuration::CommonFactoryContext& context,
119119
const envoy::config::core::v3::ConfigSource& config_source);
120120

121+
void setConfigSource(const envoy::config::core::v3::ConfigSource& config_source) const;
122+
121123
// This is used for testing with a file-based subscription
122124
void startSubscription(std::unique_ptr<Envoy::Config::Subscription>&& subscription) {
123125
subscription_ = std::move(subscription);
@@ -229,22 +231,27 @@ class PolicyHostMap : public Singleton::Instance,
229231
const std::string& version_info) override;
230232
absl::Status onConfigUpdate(const std::vector<Envoy::Config::DecodedResourceRef>& added_resources,
231233
const Protobuf::RepeatedPtrField<std::string>& removed_resources,
232-
const std::string& system_version_info) override {
233-
// NOT IMPLEMENTED YET.
234-
UNREFERENCED_PARAMETER(added_resources);
235-
UNREFERENCED_PARAMETER(removed_resources);
236-
UNREFERENCED_PARAMETER(system_version_info);
237-
return absl::OkStatus();
238-
}
234+
const std::string& system_version_info) override;
239235
void onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason,
240236
const EnvoyException* e) override;
241237

242238
private:
239+
bool subscriptionUseDeltaXds() const;
240+
void subscribe();
241+
void onSubscriptionStreamEvent(uint64_t subscription_id, Config::GrpcMuxStreamEvent event);
242+
void maybeRecreateSubscriptionInDesiredMode(bool transport_closed);
243+
243244
ThreadLocal::SlotPtr tls_;
244245
std::string name_;
245246
Stats::ScopeSharedPtr scope_;
246247
Stats::ScopeSharedPtr stats_scope_;
247248
std::unique_ptr<Envoy::Config::Subscription> subscription_;
249+
Server::Configuration::CommonFactoryContext* context_{nullptr};
250+
envoy::config::core::v3::ConfigSource desired_config_source_;
251+
envoy::config::core::v3::ConfigSource config_source_;
252+
uint64_t subscription_id_{0};
253+
uint64_t accepted_subscription_id_{0};
254+
bool subscription_connected_{false};
248255
static uint64_t instance_id_;
249256
PolicyHostsStats stats_;
250257
};

go/cilium/api/nphds.pb.go

Lines changed: 14 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)