diff --git a/src/controllers/replica.rs b/src/controllers/replica.rs index 366bc2b..a042b96 100644 --- a/src/controllers/replica.rs +++ b/src/controllers/replica.rs @@ -46,6 +46,27 @@ mod tests; use resources::*; +/// True when no schema migration is currently in flight for this replica +/// — i.e. the sweep can safely delete the previous Active restore without +/// pulling the rug out from under a running `pg_dump | psql`. +/// +/// Returns `true` when `persistent_schemas` is unset (no migration ever +/// runs) or when `schemaMigrationPhase` is anything other than `active`. +/// Coded as "not in flight" rather than enumerating terminal phases so +/// future additions (the operator's set of phase labels has grown over +/// time: `complete`, `partial`, `failed: `, `timeout-skipped`) +/// don't silently block the sweep. +pub fn persistent_schemas_migration_settled(replica: &PostgresPhysicalReplica) -> bool { + if replica.spec.persistent_schemas.is_none() { + return true; + } + let phase = replica + .status + .as_ref() + .and_then(|s| s.schema_migration_phase.as_deref()); + !matches!(phase, Some("active")) +} + /// Generate a random password for analytics credentials. pub(crate) fn generate_password() -> String { let mut rng = rand::rng(); @@ -292,27 +313,22 @@ pub async fn reconcile(replica: Arc, ctx: Arc) .as_ref() .and_then(|s| s.current_restore.as_deref()); if let Some(current) = current_restore_name { - // Allow the sweep when no migration is in flight: phase=None means - // either no migration has run since this replica was first - // reconciled, or a previous sweep cleared it. In both cases the - // gate's purpose (don't delete the source while a migration depends - // on it) is satisfied. Without this branch, a replica with - // persistent_schemas configured that hits the too-many-restores - // guardrail can't recover automatically: the guardrail blocks new - // restores, no switchover runs, no path sets phase=complete, and - // the sweep stays gated forever. - let migration_complete = if replica.spec.persistent_schemas.is_some() { - let phase = replica - .status - .as_ref() - .and_then(|s| s.schema_migration_phase.as_deref()); - // "partial" counts as complete for sweep purposes: the - // migration Job ran and we accepted the result; we no longer - // depend on the previous restore being around. - matches!(phase, None | Some("complete") | Some("partial")) - } else { - true - }; + // Gate the sweep on "migration isn't actively in flight": the + // gate's only purpose is to keep us from deleting the migration's + // source restore while pg_dump | psql still has it open. Any + // terminal phase (`complete`, `partial`, `timeout-skipped`, + // `failed: …`) — and `None` (no migration has run since this + // replica was first reconciled, or a previous sweep cleared the + // field) — all mean "no migration in flight, sweep is safe". + // + // Coded as "not in flight" rather than enumerating every terminal + // phase so future additions don't have to remember to update this + // gate. Previously this was an allow-list of `complete`/`partial`, + // which silently blocked the sweep when `timeout-skipped` was + // added — the replica then accumulated stale Active restores, + // hit the too-many-restores guardrail, and couldn't recover + // automatically. + let migration_complete = persistent_schemas_migration_settled(&replica); let grace_period = SignedDuration::try_from(replica.spec.switchover_grace_period.0).unwrap_or_default(); diff --git a/src/controllers/replica/tests.rs b/src/controllers/replica/tests.rs index ff68259..80b7f69 100644 --- a/src/controllers/replica/tests.rs +++ b/src/controllers/replica/tests.rs @@ -3,7 +3,84 @@ use kube::api::ObjectMeta; use crate::{kopia::Snapshot, types::*, util::TimeSpan}; -use super::{generate_password, resources::build_snapshot_list_job}; +use super::{ + generate_password, persistent_schemas_migration_settled, resources::build_snapshot_list_job, +}; + +fn make_replica( + persistent_schemas: Option>, + schema_migration_phase: Option, +) -> PostgresPhysicalReplica { + PostgresPhysicalReplica { + metadata: ObjectMeta { + name: Some("test".into()), + namespace: Some("default".into()), + ..Default::default() + }, + spec: PostgresPhysicalReplicaSpec { + kopia_secret_ref: SecretReference { + name: Some("creds".into()), + namespace: None, + }, + snapshot_filter: None, + schedule: "0 * * * *".into(), + schedule_jitter: TimeSpan(jiff::Span::new()), + minimum_ttl: None, + switchover_grace_period: TimeSpan(jiff::Span::new()), + analytics_username: "analytics".into(), + storage_class: None, + storage_size_override: None, + resources: None, + service_annotations: None, + pod_annotations: None, + affinity: None, + tolerations: vec![], + read_only: true, + postgres_extra_config: None, + notifications: vec![], + persistent_schemas, + storage_size_maximum: k8s_openapi::apimachinery::pkg::api::resource::Quantity( + "2Ti".to_string(), + ), + }, + status: schema_migration_phase.map(|p| PostgresPhysicalReplicaStatus { + schema_migration_phase: Some(p), + ..Default::default() + }), + } +} + +#[test] +fn migration_settled_when_persistent_schemas_unset() { + let replica = make_replica(None, None); + assert!(persistent_schemas_migration_settled(&replica)); +} + +#[test] +fn migration_settled_when_no_status() { + let replica = make_replica(Some(vec!["dbt".into()]), None); + assert!(persistent_schemas_migration_settled(&replica)); +} + +#[test] +fn migration_settled_in_terminal_phases() { + for phase in ["complete", "partial", "timeout-skipped", "failed: stuff"] { + let replica = make_replica(Some(vec!["dbt".into()]), Some(phase.into())); + assert!( + persistent_schemas_migration_settled(&replica), + "phase {phase:?} should let sweep proceed" + ); + } +} + +#[test] +fn migration_blocks_sweep_only_when_active() { + let replica = make_replica(Some(vec!["dbt".into()]), Some("active".into())); + assert!( + !persistent_schemas_migration_settled(&replica), + "active phase must block sweep so we don't delete the migration source" + ); +} #[test] fn generate_password_length_and_charset() {