diff --git a/src/controllers/replica.rs b/src/controllers/replica.rs index a042b96..a7168c8 100644 --- a/src/controllers/replica.rs +++ b/src/controllers/replica.rs @@ -60,11 +60,11 @@ pub fn persistent_schemas_migration_settled(replica: &PostgresPhysicalReplica) - if replica.spec.persistent_schemas.is_none() { return true; } - let phase = replica + replica .status .as_ref() - .and_then(|s| s.schema_migration_phase.as_deref()); - !matches!(phase, Some("active")) + .and_then(|s| s.schema_migration_phase.as_ref()) + .is_none_or(SchemaMigrationPhase::is_settled) } /// Generate a random password for analytics credentials. @@ -1077,7 +1077,7 @@ async fn mark_schema_migration_complete( let patch = serde_json::json!({ "status": { "schemaMigrationJob": null, - "schemaMigrationPhase": "complete", + "schemaMigrationPhase": SchemaMigrationPhase::Complete, } }); replicas @@ -1249,7 +1249,7 @@ async fn timeout_schema_migration( let patch = serde_json::json!({ "status": { "schemaMigrationJob": null, - "schemaMigrationPhase": "timeout-skipped", + "schemaMigrationPhase": SchemaMigrationPhase::TimeoutSkipped, } }); replicas @@ -1307,7 +1307,7 @@ async fn reconcile_schema_migration( .status .as_ref() .and_then(|s| s.schema_migration_phase.as_ref()) - .is_some_and(|p| p == "complete") + .is_some_and(|p| matches!(p, SchemaMigrationPhase::Complete)) { debug!(replica = %replica_name, "migration already complete in status"); return Ok(true); @@ -1373,7 +1373,11 @@ async fn reconcile_schema_migration( info!(replica = %replica_name, "migration Job succeeded"); } - let phase = if is_partial { "partial" } else { "complete" }; + let phase = if is_partial { + SchemaMigrationPhase::Partial + } else { + SchemaMigrationPhase::Complete + }; let replicas: Api = Api::namespaced(client.clone(), namespace); let patch = serde_json::json!({ @@ -1422,7 +1426,7 @@ async fn reconcile_schema_migration( let patch = serde_json::json!({ "status": { "schemaMigrationJob": null, - "schemaMigrationPhase": format!("failed: {}", last_error), + "schemaMigrationPhase": SchemaMigrationPhase::Failed(last_error.clone()), } }); replicas @@ -1662,7 +1666,7 @@ async fn reconcile_schema_migration( let patch = serde_json::json!({ "status": { "schemaMigrationJob": job_name, - "schemaMigrationPhase": "active", + "schemaMigrationPhase": SchemaMigrationPhase::Active, } }); replicas diff --git a/src/controllers/replica/tests.rs b/src/controllers/replica/tests.rs index 80b7f69..2f575d8 100644 --- a/src/controllers/replica/tests.rs +++ b/src/controllers/replica/tests.rs @@ -9,7 +9,7 @@ use super::{ fn make_replica( persistent_schemas: Option>, - schema_migration_phase: Option, + schema_migration_phase: Option, ) -> PostgresPhysicalReplica { PostgresPhysicalReplica { metadata: ObjectMeta { @@ -64,8 +64,14 @@ fn migration_settled_when_no_status() { #[test] fn migration_settled_in_terminal_phases() { - for phase in ["complete", "partial", "timeout-skipped", "failed: stuff"] { - let replica = make_replica(Some(vec!["dbt".into()]), Some(phase.into())); + let terminal = [ + SchemaMigrationPhase::Complete, + SchemaMigrationPhase::Partial, + SchemaMigrationPhase::TimeoutSkipped, + SchemaMigrationPhase::Failed("stuff".into()), + ]; + for phase in terminal { + let replica = make_replica(Some(vec!["dbt".into()]), Some(phase.clone())); assert!( persistent_schemas_migration_settled(&replica), "phase {phase:?} should let sweep proceed" @@ -75,7 +81,7 @@ fn migration_settled_in_terminal_phases() { #[test] fn migration_blocks_sweep_only_when_active() { - let replica = make_replica(Some(vec!["dbt".into()]), Some("active".into())); + let replica = make_replica(Some(vec!["dbt".into()]), Some(SchemaMigrationPhase::Active)); assert!( !persistent_schemas_migration_settled(&replica), "active phase must block sweep so we don't delete the migration source" diff --git a/src/types/replica.rs b/src/types/replica.rs index 3886a6e..1d9c4f1 100644 --- a/src/types/replica.rs +++ b/src/types/replica.rs @@ -281,9 +281,9 @@ pub struct PostgresPhysicalReplicaStatus { #[serde(default, skip_serializing_if = "Option::is_none")] pub schema_migration_job: Option, - /// Phase of schema migration: pending, active, complete, failed + /// Phase of schema migration. See [`SchemaMigrationPhase`]. #[serde(default, skip_serializing_if = "Option::is_none")] - pub schema_migration_phase: Option, + pub schema_migration_phase: Option, /// Measured size of persistent schema data from the last successful migration (bytes). /// Used to size the next restore PVC. @@ -306,6 +306,106 @@ pub enum ReplicaPhase { Failed, } +/// Lifecycle phase of the operator's schema-migration step that runs +/// during a switchover when `persistent_schemas` is configured on the +/// replica. The serialized form is a flat string matching the historical +/// wire format (`active` / `complete` / `partial` / `timeout-skipped` / +/// `failed: `) so existing replica status objects round-trip +/// unchanged. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum SchemaMigrationPhase { + /// Migration Job is running. The sweep must not delete the source + /// restore while we're in this state. + Active, + /// Migration Job finished cleanly; persistent schemas were carried + /// across to the new restore. + Complete, + /// Migration Job finished but psql logged statement errors (typical + /// when dbt views reference renamed/dropped upstream columns). Some + /// persistent_schemas objects may need regenerating upstream. + Partial, + /// Migration exceeded the per-cycle wall-clock budget (20% of the + /// cron interval). The operator dropped the persistent_schemas on + /// the new restore and proceeded to switchover anyway — a usable + /// replica beats carrying the schema through indefinitely. The next + /// cycle re-attempts migration if the schemas have regenerated. + TimeoutSkipped, + /// Migration Job failed. The old restore stays Active; the new + /// restore stays in Switching. The wrapped string is the reason + /// surfaced from the Job's callback body (or "no callback received"). + Failed(String), +} + +impl SchemaMigrationPhase { + /// True for every phase except [`Self::Active`]. Used by the sweep + /// gate: as long as the migration isn't currently running, deleting + /// the previous Active restore is safe (nothing depends on it being + /// around). Coded as "not Active" rather than enumerating terminal + /// variants so adding a new variant doesn't risk silently + /// reintroducing the deadlock that originally motivated this enum. + pub fn is_settled(&self) -> bool { + !matches!(self, Self::Active) + } +} + +impl std::fmt::Display for SchemaMigrationPhase { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Active => f.write_str("active"), + Self::Complete => f.write_str("complete"), + Self::Partial => f.write_str("partial"), + Self::TimeoutSkipped => f.write_str("timeout-skipped"), + Self::Failed(reason) => write!(f, "failed: {reason}"), + } + } +} + +impl std::str::FromStr for SchemaMigrationPhase { + type Err = String; + + fn from_str(s: &str) -> std::result::Result { + match s { + "active" => Ok(Self::Active), + "complete" => Ok(Self::Complete), + "partial" => Ok(Self::Partial), + "timeout-skipped" => Ok(Self::TimeoutSkipped), + other => { + if let Some(reason) = other.strip_prefix("failed:") { + Ok(Self::Failed(reason.trim().to_string())) + } else { + Err(format!("unknown schema migration phase: {other:?}")) + } + } + } + } +} + +impl Serialize for SchemaMigrationPhase { + fn serialize(&self, s: S) -> Result { + s.serialize_str(&self.to_string()) + } +} + +impl<'de> Deserialize<'de> for SchemaMigrationPhase { + fn deserialize>(d: D) -> Result { + let s = String::deserialize(d)?; + s.parse().map_err(serde::de::Error::custom) + } +} + +impl JsonSchema for SchemaMigrationPhase { + fn schema_name() -> Cow<'static, str> { + "SchemaMigrationPhase".into() + } + + fn json_schema(_: &mut SchemaGenerator) -> Schema { + json_schema!({ + "type": "string", + "description": "Schema migration phase: 'active' (Job running), 'complete' (succeeded), 'partial' (succeeded with statement errors), 'timeout-skipped' (budget exceeded; persistent schemas dropped and switchover proceeded), or 'failed: ' (Job failed).", + }) + } +} + #[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)] #[serde(rename_all = "camelCase")] pub struct ConnectionInfo { @@ -349,3 +449,64 @@ impl PostgresPhysicalReplica { format!("{name}-creds", name = self.name_any()) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn schema_migration_phase_roundtrips_terminal_variants() { + for phase in [ + SchemaMigrationPhase::Active, + SchemaMigrationPhase::Complete, + SchemaMigrationPhase::Partial, + SchemaMigrationPhase::TimeoutSkipped, + ] { + let s = serde_json::to_string(&phase).expect("serialize"); + let back: SchemaMigrationPhase = serde_json::from_str(&s).expect("deserialize"); + assert_eq!(phase, back, "round-trip mismatch for {phase:?}"); + } + } + + #[test] + fn schema_migration_phase_failed_preserves_reason() { + let phase = SchemaMigrationPhase::Failed("connection refused".into()); + let s = serde_json::to_string(&phase).expect("serialize"); + assert_eq!(s, "\"failed: connection refused\""); + let back: SchemaMigrationPhase = serde_json::from_str(&s).expect("deserialize"); + assert_eq!(phase, back); + } + + #[test] + fn schema_migration_phase_wire_strings_match_history() { + // The wire format is documented in the README and consumed by + // external tooling (dashboards, alerts). These strings are part + // of pgro's public contract; renaming them is a breaking change. + assert_eq!(SchemaMigrationPhase::Active.to_string(), "active"); + assert_eq!(SchemaMigrationPhase::Complete.to_string(), "complete"); + assert_eq!(SchemaMigrationPhase::Partial.to_string(), "partial"); + assert_eq!( + SchemaMigrationPhase::TimeoutSkipped.to_string(), + "timeout-skipped" + ); + assert_eq!( + SchemaMigrationPhase::Failed("boom".into()).to_string(), + "failed: boom" + ); + } + + #[test] + fn schema_migration_phase_rejects_unknown_string() { + let r: Result = "what".parse(); + assert!(r.is_err()); + } + + #[test] + fn schema_migration_phase_is_settled() { + assert!(!SchemaMigrationPhase::Active.is_settled()); + assert!(SchemaMigrationPhase::Complete.is_settled()); + assert!(SchemaMigrationPhase::Partial.is_settled()); + assert!(SchemaMigrationPhase::TimeoutSkipped.is_settled()); + assert!(SchemaMigrationPhase::Failed("x".into()).is_settled()); + } +} diff --git a/tests/persistent_schemas.rs b/tests/persistent_schemas.rs index 012b50a..92f7cc4 100644 --- a/tests/persistent_schemas.rs +++ b/tests/persistent_schemas.rs @@ -2,7 +2,7 @@ use k8s_openapi::api::core::v1::{LocalObjectReference, Secret}; use kube::{Api, ResourceExt as _, api::PostParams}; use postgres_restore_operator::types::{ PostgresPhysicalReplica, PostgresPhysicalRestore, PostgresPhysicalRestoreSpec, ReplicaPhase, - RestorePhase, + RestorePhase, SchemaMigrationPhase, }; use tokio::time::{sleep, timeout}; @@ -144,9 +144,9 @@ async fn persistent_schemas_migration() { let phase = replica .status .as_ref() - .and_then(|s| s.schema_migration_phase.as_deref()); + .and_then(|s| s.schema_migration_phase.as_ref()); println!("[{replica_name}] schemaMigrationPhase: {phase:?}"); - if phase == Some("complete") { + if matches!(phase, Some(SchemaMigrationPhase::Complete)) { return; } } @@ -571,9 +571,9 @@ async fn persistent_schemas_skip_missing_on_source() { let phase = replica .status .as_ref() - .and_then(|s| s.schema_migration_phase.as_deref()); + .and_then(|s| s.schema_migration_phase.as_ref()); println!("[{replica_name}] schemaMigrationPhase: {phase:?}"); - if phase == Some("complete") { + if matches!(phase, Some(SchemaMigrationPhase::Complete)) { return; } } @@ -767,8 +767,8 @@ async fn persistent_schemas_all_missing_prunes_previous_restore() { post_switchover .status .as_ref() - .and_then(|s| s.schema_migration_phase.as_deref()), - Some("complete"), + .and_then(|s| s.schema_migration_phase.as_ref()), + Some(&SchemaMigrationPhase::Complete), "schemaMigrationPhase must be set to complete even when all schemas are skipped" );