Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 37 additions & 21 deletions src/controllers/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,27 @@ mod tests;

use resources::*;

/// True when no schema migration is currently in flight for this replica
/// β€” i.e. the sweep can safely delete the previous Active restore without
/// pulling the rug out from under a running `pg_dump | psql`.
///
/// Returns `true` when `persistent_schemas` is unset (no migration ever
/// runs) or when `schemaMigrationPhase` is anything other than `active`.
/// Coded as "not in flight" rather than enumerating terminal phases so
/// future additions (the operator's set of phase labels has grown over
/// time: `complete`, `partial`, `failed: <reason>`, `timeout-skipped`)
/// don't silently block the sweep.
pub fn persistent_schemas_migration_settled(replica: &PostgresPhysicalReplica) -> bool {
if replica.spec.persistent_schemas.is_none() {
return true;
}
let phase = replica
.status
.as_ref()
.and_then(|s| s.schema_migration_phase.as_deref());
!matches!(phase, Some("active"))
}

/// Generate a random password for analytics credentials.
pub(crate) fn generate_password() -> String {
let mut rng = rand::rng();
Expand Down Expand Up @@ -292,27 +313,22 @@ pub async fn reconcile(replica: Arc<PostgresPhysicalReplica>, ctx: Arc<Context>)
.as_ref()
.and_then(|s| s.current_restore.as_deref());
if let Some(current) = current_restore_name {
// Allow the sweep when no migration is in flight: phase=None means
// either no migration has run since this replica was first
// reconciled, or a previous sweep cleared it. In both cases the
// gate's purpose (don't delete the source while a migration depends
// on it) is satisfied. Without this branch, a replica with
// persistent_schemas configured that hits the too-many-restores
// guardrail can't recover automatically: the guardrail blocks new
// restores, no switchover runs, no path sets phase=complete, and
// the sweep stays gated forever.
let migration_complete = if replica.spec.persistent_schemas.is_some() {
let phase = replica
.status
.as_ref()
.and_then(|s| s.schema_migration_phase.as_deref());
// "partial" counts as complete for sweep purposes: the
// migration Job ran and we accepted the result; we no longer
// depend on the previous restore being around.
matches!(phase, None | Some("complete") | Some("partial"))
} else {
true
};
// Gate the sweep on "migration isn't actively in flight": the
// gate's only purpose is to keep us from deleting the migration's
// source restore while pg_dump | psql still has it open. Any
// terminal phase (`complete`, `partial`, `timeout-skipped`,
// `failed: …`) β€” and `None` (no migration has run since this
// replica was first reconciled, or a previous sweep cleared the
// field) β€” all mean "no migration in flight, sweep is safe".
//
// Coded as "not in flight" rather than enumerating every terminal
// phase so future additions don't have to remember to update this
// gate. Previously this was an allow-list of `complete`/`partial`,
// which silently blocked the sweep when `timeout-skipped` was
// added β€” the replica then accumulated stale Active restores,
// hit the too-many-restores guardrail, and couldn't recover
// automatically.
let migration_complete = persistent_schemas_migration_settled(&replica);

let grace_period =
SignedDuration::try_from(replica.spec.switchover_grace_period.0).unwrap_or_default();
Expand Down
79 changes: 78 additions & 1 deletion src/controllers/replica/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,84 @@ use kube::api::ObjectMeta;

use crate::{kopia::Snapshot, types::*, util::TimeSpan};

use super::{generate_password, resources::build_snapshot_list_job};
use super::{
generate_password, persistent_schemas_migration_settled, resources::build_snapshot_list_job,
};

fn make_replica(
persistent_schemas: Option<Vec<String>>,
schema_migration_phase: Option<String>,
) -> PostgresPhysicalReplica {
PostgresPhysicalReplica {
metadata: ObjectMeta {
name: Some("test".into()),
namespace: Some("default".into()),
..Default::default()
},
spec: PostgresPhysicalReplicaSpec {
kopia_secret_ref: SecretReference {
name: Some("creds".into()),
namespace: None,
},
snapshot_filter: None,
schedule: "0 * * * *".into(),
schedule_jitter: TimeSpan(jiff::Span::new()),
minimum_ttl: None,
switchover_grace_period: TimeSpan(jiff::Span::new()),
analytics_username: "analytics".into(),
storage_class: None,
storage_size_override: None,
resources: None,
service_annotations: None,
pod_annotations: None,
affinity: None,
tolerations: vec![],
read_only: true,
postgres_extra_config: None,
notifications: vec![],
persistent_schemas,
storage_size_maximum: k8s_openapi::apimachinery::pkg::api::resource::Quantity(
"2Ti".to_string(),
),
},
status: schema_migration_phase.map(|p| PostgresPhysicalReplicaStatus {
schema_migration_phase: Some(p),
..Default::default()
}),
}
}

#[test]
fn migration_settled_when_persistent_schemas_unset() {
let replica = make_replica(None, None);
assert!(persistent_schemas_migration_settled(&replica));
}

#[test]
fn migration_settled_when_no_status() {
let replica = make_replica(Some(vec!["dbt".into()]), None);
assert!(persistent_schemas_migration_settled(&replica));
}

#[test]
fn migration_settled_in_terminal_phases() {
for phase in ["complete", "partial", "timeout-skipped", "failed: stuff"] {
let replica = make_replica(Some(vec!["dbt".into()]), Some(phase.into()));
assert!(
persistent_schemas_migration_settled(&replica),
"phase {phase:?} should let sweep proceed"
);
}
}

#[test]
fn migration_blocks_sweep_only_when_active() {
let replica = make_replica(Some(vec!["dbt".into()]), Some("active".into()));
assert!(
!persistent_schemas_migration_settled(&replica),
"active phase must block sweep so we don't delete the migration source"
);
}

#[test]
fn generate_password_length_and_charset() {
Expand Down