Skip to content

Commit 2810ec2

Browse files
authored
Create pdb for clusters (#25)
* Create pdb for clusters Fixes #24 * Ensure deletions get reconciled
1 parent 659bcae commit 2810ec2

3 files changed

Lines changed: 75 additions & 0 deletions

File tree

charts/restate-operator-helm/templates/rbac.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ rules:
4747
- services
4848
- configmaps
4949
- serviceaccounts
50+
- poddisruptionbudgets
5051
- networkpolicies
5152
- statefulsets
5253
- persistentvolumeclaims
@@ -66,6 +67,7 @@ rules:
6667
- ''
6768
- batch
6869
- apps
70+
- policy
6971
- networking.k8s.io
7072
- vpcresources.k8s.aws
7173
- secrets-store.csi.x-k8s.io

src/controllers/restatecluster/controller.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use k8s_openapi::api::core::v1::{
1010
ConfigMap, Namespace, PersistentVolumeClaim, Service, ServiceAccount, ServiceSpec,
1111
};
1212
use k8s_openapi::api::networking::v1::NetworkPolicy;
13+
use k8s_openapi::api::policy::v1::PodDisruptionBudget;
1314
use k8s_openapi::apimachinery::pkg::apis::meta::v1::{APIGroup, ObjectMeta};
1415

1516
use kube::core::object::HasStatus;
@@ -373,6 +374,7 @@ pub async fn run(client: Client, metrics: Metrics, state: State) {
373374
let pvc_api = Api::<PersistentVolumeClaim>::all(client.clone());
374375
let svc_api = Api::<Service>::all(client.clone());
375376
let svcacc_api = Api::<ServiceAccount>::all(client.clone());
377+
let pdb_api = Api::<PodDisruptionBudget>::all(client.clone());
376378
let cm_api = Api::<ConfigMap>::all(client.clone());
377379
let np_api = Api::<NetworkPolicy>::all(client.clone());
378380
let pia_api = Api::<PodIdentityAssociation>::all(client.clone());
@@ -401,28 +403,39 @@ pub async fn run(client: Client, metrics: Metrics, state: State) {
401403

402404
let (ss_store, ss_writer) = reflector::store();
403405
let ss_reflector = reflector(ss_writer, watcher(ss_api, cfg.clone()))
406+
.map(|event| ensure_deletion_change(event))
404407
.touched_objects()
405408
.default_backoff()
406409
.predicate_filter(changed_predicate.combine(status_predicate_serde));
407410

408411
let np_watcher = metadata_watcher(np_api, cfg.clone())
412+
.map(|event| ensure_deletion_change(event))
409413
.touched_objects()
410414
.predicate_filter(changed_predicate);
411415

412416
let ns_watcher = metadata_watcher(ns_api, cfg.clone())
417+
.map(|event| ensure_deletion_change(event))
413418
.touched_objects()
414419
.predicate_filter(changed_predicate);
415420

416421
let svcacc_watcher = metadata_watcher(svcacc_api, cfg.clone())
422+
.map(|event| ensure_deletion_change(event))
423+
.touched_objects()
424+
.predicate_filter(changed_predicate);
425+
426+
let pdb_watcher = metadata_watcher(pdb_api, cfg.clone())
427+
.map(|event| ensure_deletion_change(event))
417428
.touched_objects()
418429
.predicate_filter(changed_predicate);
419430

420431
let svc_watcher = watcher(svc_api, cfg.clone())
432+
.map(|event| ensure_deletion_change(event))
421433
.touched_objects()
422434
// svc has no generation so we hash the spec to check for changes
423435
.predicate_filter(changed_predicate.combine(spec_predicate_serde));
424436

425437
let cm_watcher = watcher(cm_api, cfg.clone())
438+
.map(|event| ensure_deletion_change(event))
426439
.touched_objects()
427440
// cm has no generation so we hash the data to check for changes
428441
.predicate_filter(changed_predicate.combine(spec_predicate));
@@ -433,6 +446,7 @@ pub async fn run(client: Client, metrics: Metrics, state: State) {
433446
.owns_stream(cm_watcher)
434447
.owns_stream(ns_watcher)
435448
.owns_stream(svcacc_watcher)
449+
.owns_stream(pdb_watcher)
436450
.owns_stream(np_watcher)
437451
.owns_stream(ss_reflector)
438452
.watches_stream(
@@ -451,6 +465,7 @@ pub async fn run(client: Client, metrics: Metrics, state: State) {
451465
);
452466
let controller = if pod_identity_association_installed {
453467
let pia_watcher = watcher(pia_api, cfg.clone())
468+
.map(|event| ensure_deletion_change(event))
454469
.touched_objects()
455470
// avoid apply loops that seem to happen with crds
456471
.predicate_filter(changed_predicate.combine(status_predicate));
@@ -461,6 +476,7 @@ pub async fn run(client: Client, metrics: Metrics, state: State) {
461476
job_api,
462477
Config::default().labels("app.kubernetes.io/name=restate-pia-canary"),
463478
)
479+
.map(|event| ensure_deletion_change(event))
464480
.touched_objects()
465481
.predicate_filter(changed_predicate);
466482

@@ -470,6 +486,7 @@ pub async fn run(client: Client, metrics: Metrics, state: State) {
470486
};
471487
let controller = if security_group_policy_installed {
472488
let sgp_watcher = metadata_watcher(sgp_api, cfg.clone())
489+
.map(|event| ensure_deletion_change(event))
473490
.touched_objects()
474491
// avoid apply loops that seem to happen with crds
475492
.predicate_filter(changed_predicate);
@@ -480,6 +497,7 @@ pub async fn run(client: Client, metrics: Metrics, state: State) {
480497
};
481498
let controller = if secret_provider_class_installed {
482499
let spc_watcher = metadata_watcher(spc_api, cfg.clone())
500+
.map(|event| ensure_deletion_change(event))
483501
.touched_objects()
484502
// avoid apply loops that seem to happen with crds
485503
.predicate_filter(changed_predicate);
@@ -507,6 +525,21 @@ pub async fn run(client: Client, metrics: Metrics, state: State) {
507525
.await;
508526
}
509527

528+
// deletion apparently doesn't lead to any change in metadata otherwise, which means the changed_predicate
529+
// would drop them.
530+
fn ensure_deletion_change<K: Resource, E>(
531+
mut event: Result<kube::runtime::watcher::Event<K>, E>,
532+
) -> Result<kube::runtime::watcher::Event<K>, E> {
533+
if let Ok(kube::runtime::watcher::Event::Delete(ref mut object)) = event {
534+
let meta = object.meta_mut();
535+
meta.generation = match meta.generation {
536+
Some(val) => Some(val + 1),
537+
None => Some(0),
538+
}
539+
}
540+
event
541+
}
542+
510543
fn changed_predicate<K: Resource>(obj: &K) -> Option<u64> {
511544
let mut hasher = DefaultHasher::new();
512545
if let Some(g) = obj.meta().generation {

src/controllers/restatecluster/reconcilers/compute.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use k8s_openapi::api::core::v1::{
1111
ServiceAccount, ServicePort, ServiceSpec, Toleration, Volume, VolumeMount,
1212
VolumeResourceRequirements,
1313
};
14+
use k8s_openapi::api::policy::v1::{PodDisruptionBudget, PodDisruptionBudgetSpec};
1415
use k8s_openapi::apimachinery::pkg::api::resource::Quantity;
1516
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
1617
use k8s_openapi::apimachinery::pkg::util::intstr::IntOrString;
@@ -177,6 +178,23 @@ fn restate_cluster_service(base_metadata: &ObjectMeta) -> Service {
177178
}
178179
}
179180

181+
fn restate_pod_disruption_budget(base_metadata: &ObjectMeta) -> PodDisruptionBudget {
182+
PodDisruptionBudget {
183+
metadata: object_meta(base_metadata, "restate"),
184+
spec: Some(PodDisruptionBudgetSpec {
185+
// 1 is a sane default for clusters of all sizes:
186+
// cluster size one it will allow downtime, but this is unavoidable when draining nodes
187+
// cluster size of 3 with r=2, it will prevent rollouts leading to unavailability
188+
// cluster size of 5 with r=3, it is conservative but not unreasonable
189+
max_unavailable: Some(IntOrString::Int(1)),
190+
min_available: None,
191+
selector: Some(label_selector(base_metadata)),
192+
unhealthy_pod_eviction_policy: None,
193+
}),
194+
status: None,
195+
}
196+
}
197+
180198
fn env(cluster_name: &str, custom: Option<&[EnvVar]>) -> Vec<EnvVar> {
181199
let defaults = [
182200
("RESTATE_LOG_FORMAT", "json"),
@@ -439,6 +457,7 @@ pub async fn reconcile_compute(
439457
let job_api: Api<Job> = Api::namespaced(ctx.client.clone(), namespace);
440458
let pod_api: Api<Pod> = Api::namespaced(ctx.client.clone(), namespace);
441459
let sgp_api: Api<SecurityGroupPolicy> = Api::namespaced(ctx.client.clone(), namespace);
460+
let pdb_api: Api<PodDisruptionBudget> = Api::namespaced(ctx.client.clone(), namespace);
442461

443462
apply_service_account(
444463
namespace,
@@ -556,6 +575,13 @@ pub async fn reconcile_compute(
556575
let restate_cluster_service = restate_cluster_service(base_metadata);
557576
apply_service(namespace, &svc_api, restate_cluster_service).await?;
558577

578+
apply_pod_disruption_budget(
579+
namespace,
580+
&pdb_api,
581+
restate_pod_disruption_budget(base_metadata),
582+
)
583+
.await?;
584+
559585
resize_statefulset_storage(
560586
namespace,
561587
base_metadata,
@@ -1002,3 +1028,17 @@ fn validate_stateful_set_status(
10021028

10031029
Ok(())
10041030
}
1031+
1032+
async fn apply_pod_disruption_budget(
1033+
namespace: &str,
1034+
pdb_api: &Api<PodDisruptionBudget>,
1035+
pdb: PodDisruptionBudget,
1036+
) -> Result<PodDisruptionBudget, Error> {
1037+
let name = pdb.metadata.name.as_ref().unwrap();
1038+
let params: PatchParams = PatchParams::apply("restate-operator").force();
1039+
debug!(
1040+
"Applying Pod Disruption Budget {} in namespace {}",
1041+
name, namespace
1042+
);
1043+
Ok(pdb_api.patch(name, &params, &Patch::Apply(&pdb)).await?)
1044+
}

0 commit comments

Comments
 (0)