Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions src/controllers/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,11 @@ pub fn persistent_schemas_migration_settled(replica: &PostgresPhysicalReplica) -
if replica.spec.persistent_schemas.is_none() {
return true;
}
let phase = replica
replica
.status
.as_ref()
.and_then(|s| s.schema_migration_phase.as_deref());
!matches!(phase, Some("active"))
.and_then(|s| s.schema_migration_phase.as_ref())
.is_none_or(SchemaMigrationPhase::is_settled)
}

/// Generate a random password for analytics credentials.
Expand Down Expand Up @@ -1077,7 +1077,7 @@ async fn mark_schema_migration_complete(
let patch = serde_json::json!({
"status": {
"schemaMigrationJob": null,
"schemaMigrationPhase": "complete",
"schemaMigrationPhase": SchemaMigrationPhase::Complete,
}
});
replicas
Expand Down Expand Up @@ -1249,7 +1249,7 @@ async fn timeout_schema_migration(
let patch = serde_json::json!({
"status": {
"schemaMigrationJob": null,
"schemaMigrationPhase": "timeout-skipped",
"schemaMigrationPhase": SchemaMigrationPhase::TimeoutSkipped,
}
});
replicas
Expand Down Expand Up @@ -1307,7 +1307,7 @@ async fn reconcile_schema_migration(
.status
.as_ref()
.and_then(|s| s.schema_migration_phase.as_ref())
.is_some_and(|p| p == "complete")
.is_some_and(|p| matches!(p, SchemaMigrationPhase::Complete))
{
debug!(replica = %replica_name, "migration already complete in status");
return Ok(true);
Expand Down Expand Up @@ -1373,7 +1373,11 @@ async fn reconcile_schema_migration(
info!(replica = %replica_name, "migration Job succeeded");
}

let phase = if is_partial { "partial" } else { "complete" };
let phase = if is_partial {
SchemaMigrationPhase::Partial
} else {
SchemaMigrationPhase::Complete
};
let replicas: Api<PostgresPhysicalReplica> =
Api::namespaced(client.clone(), namespace);
let patch = serde_json::json!({
Expand Down Expand Up @@ -1422,7 +1426,7 @@ async fn reconcile_schema_migration(
let patch = serde_json::json!({
"status": {
"schemaMigrationJob": null,
"schemaMigrationPhase": format!("failed: {}", last_error),
"schemaMigrationPhase": SchemaMigrationPhase::Failed(last_error.clone()),
}
});
replicas
Expand Down Expand Up @@ -1662,7 +1666,7 @@ async fn reconcile_schema_migration(
let patch = serde_json::json!({
"status": {
"schemaMigrationJob": job_name,
"schemaMigrationPhase": "active",
"schemaMigrationPhase": SchemaMigrationPhase::Active,
}
});
replicas
Expand Down
14 changes: 10 additions & 4 deletions src/controllers/replica/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use super::{

fn make_replica(
persistent_schemas: Option<Vec<String>>,
schema_migration_phase: Option<String>,
schema_migration_phase: Option<SchemaMigrationPhase>,
) -> PostgresPhysicalReplica {
PostgresPhysicalReplica {
metadata: ObjectMeta {
Expand Down Expand Up @@ -64,8 +64,14 @@ fn migration_settled_when_no_status() {

#[test]
fn migration_settled_in_terminal_phases() {
for phase in ["complete", "partial", "timeout-skipped", "failed: stuff"] {
let replica = make_replica(Some(vec!["dbt".into()]), Some(phase.into()));
let terminal = [
SchemaMigrationPhase::Complete,
SchemaMigrationPhase::Partial,
SchemaMigrationPhase::TimeoutSkipped,
SchemaMigrationPhase::Failed("stuff".into()),
];
for phase in terminal {
let replica = make_replica(Some(vec!["dbt".into()]), Some(phase.clone()));
assert!(
persistent_schemas_migration_settled(&replica),
"phase {phase:?} should let sweep proceed"
Expand All @@ -75,7 +81,7 @@ fn migration_settled_in_terminal_phases() {

#[test]
fn migration_blocks_sweep_only_when_active() {
let replica = make_replica(Some(vec!["dbt".into()]), Some("active".into()));
let replica = make_replica(Some(vec!["dbt".into()]), Some(SchemaMigrationPhase::Active));
assert!(
!persistent_schemas_migration_settled(&replica),
"active phase must block sweep so we don't delete the migration source"
Expand Down
165 changes: 163 additions & 2 deletions src/types/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,9 @@ pub struct PostgresPhysicalReplicaStatus {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub schema_migration_job: Option<String>,

/// Phase of schema migration: pending, active, complete, failed
/// Phase of schema migration. See [`SchemaMigrationPhase`].
#[serde(default, skip_serializing_if = "Option::is_none")]
pub schema_migration_phase: Option<String>,
pub schema_migration_phase: Option<SchemaMigrationPhase>,

/// Measured size of persistent schema data from the last successful migration (bytes).
/// Used to size the next restore PVC.
Expand All @@ -306,6 +306,106 @@ pub enum ReplicaPhase {
Failed,
}

/// Lifecycle phase of the operator's schema-migration step that runs
/// during a switchover when `persistent_schemas` is configured on the
/// replica. The serialized form is a flat string matching the historical
/// wire format (`active` / `complete` / `partial` / `timeout-skipped` /
/// `failed: <reason>`) so existing replica status objects round-trip
/// unchanged.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SchemaMigrationPhase {
/// Migration Job is running. The sweep must not delete the source
/// restore while we're in this state.
Active,
/// Migration Job finished cleanly; persistent schemas were carried
/// across to the new restore.
Complete,
/// Migration Job finished but psql logged statement errors (typical
/// when dbt views reference renamed/dropped upstream columns). Some
/// persistent_schemas objects may need regenerating upstream.
Partial,
/// Migration exceeded the per-cycle wall-clock budget (20% of the
/// cron interval). The operator dropped the persistent_schemas on
/// the new restore and proceeded to switchover anyway — a usable
/// replica beats carrying the schema through indefinitely. The next
/// cycle re-attempts migration if the schemas have regenerated.
TimeoutSkipped,
/// Migration Job failed. The old restore stays Active; the new
/// restore stays in Switching. The wrapped string is the reason
/// surfaced from the Job's callback body (or "no callback received").
Failed(String),
}

impl SchemaMigrationPhase {
/// True for every phase except [`Self::Active`]. Used by the sweep
/// gate: as long as the migration isn't currently running, deleting
/// the previous Active restore is safe (nothing depends on it being
/// around). Coded as "not Active" rather than enumerating terminal
/// variants so adding a new variant doesn't risk silently
/// reintroducing the deadlock that originally motivated this enum.
pub fn is_settled(&self) -> bool {
!matches!(self, Self::Active)
}
}

impl std::fmt::Display for SchemaMigrationPhase {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Active => f.write_str("active"),
Self::Complete => f.write_str("complete"),
Self::Partial => f.write_str("partial"),
Self::TimeoutSkipped => f.write_str("timeout-skipped"),
Self::Failed(reason) => write!(f, "failed: {reason}"),
}
}
}

impl std::str::FromStr for SchemaMigrationPhase {
type Err = String;

fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
match s {
"active" => Ok(Self::Active),
"complete" => Ok(Self::Complete),
"partial" => Ok(Self::Partial),
"timeout-skipped" => Ok(Self::TimeoutSkipped),
other => {
if let Some(reason) = other.strip_prefix("failed:") {
Ok(Self::Failed(reason.trim().to_string()))
} else {
Err(format!("unknown schema migration phase: {other:?}"))
}
}
}
}
}

impl Serialize for SchemaMigrationPhase {
fn serialize<S: serde::Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
s.serialize_str(&self.to_string())
}
}

impl<'de> Deserialize<'de> for SchemaMigrationPhase {
fn deserialize<D: serde::Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
let s = String::deserialize(d)?;
s.parse().map_err(serde::de::Error::custom)
}
}

impl JsonSchema for SchemaMigrationPhase {
fn schema_name() -> Cow<'static, str> {
"SchemaMigrationPhase".into()
}

fn json_schema(_: &mut SchemaGenerator) -> Schema {
json_schema!({
"type": "string",
"description": "Schema migration phase: 'active' (Job running), 'complete' (succeeded), 'partial' (succeeded with statement errors), 'timeout-skipped' (budget exceeded; persistent schemas dropped and switchover proceeded), or 'failed: <reason>' (Job failed).",
})
}
}

#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct ConnectionInfo {
Expand Down Expand Up @@ -349,3 +449,64 @@ impl PostgresPhysicalReplica {
format!("{name}-creds", name = self.name_any())
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn schema_migration_phase_roundtrips_terminal_variants() {
for phase in [
SchemaMigrationPhase::Active,
SchemaMigrationPhase::Complete,
SchemaMigrationPhase::Partial,
SchemaMigrationPhase::TimeoutSkipped,
] {
let s = serde_json::to_string(&phase).expect("serialize");
let back: SchemaMigrationPhase = serde_json::from_str(&s).expect("deserialize");
assert_eq!(phase, back, "round-trip mismatch for {phase:?}");
}
}

#[test]
fn schema_migration_phase_failed_preserves_reason() {
let phase = SchemaMigrationPhase::Failed("connection refused".into());
let s = serde_json::to_string(&phase).expect("serialize");
assert_eq!(s, "\"failed: connection refused\"");
let back: SchemaMigrationPhase = serde_json::from_str(&s).expect("deserialize");
assert_eq!(phase, back);
}

#[test]
fn schema_migration_phase_wire_strings_match_history() {
// The wire format is documented in the README and consumed by
// external tooling (dashboards, alerts). These strings are part
// of pgro's public contract; renaming them is a breaking change.
assert_eq!(SchemaMigrationPhase::Active.to_string(), "active");
assert_eq!(SchemaMigrationPhase::Complete.to_string(), "complete");
assert_eq!(SchemaMigrationPhase::Partial.to_string(), "partial");
assert_eq!(
SchemaMigrationPhase::TimeoutSkipped.to_string(),
"timeout-skipped"
);
assert_eq!(
SchemaMigrationPhase::Failed("boom".into()).to_string(),
"failed: boom"
);
}

#[test]
fn schema_migration_phase_rejects_unknown_string() {
let r: Result<SchemaMigrationPhase, _> = "what".parse();
assert!(r.is_err());
}

#[test]
fn schema_migration_phase_is_settled() {
assert!(!SchemaMigrationPhase::Active.is_settled());
assert!(SchemaMigrationPhase::Complete.is_settled());
assert!(SchemaMigrationPhase::Partial.is_settled());
assert!(SchemaMigrationPhase::TimeoutSkipped.is_settled());
assert!(SchemaMigrationPhase::Failed("x".into()).is_settled());
}
}
14 changes: 7 additions & 7 deletions tests/persistent_schemas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use k8s_openapi::api::core::v1::{LocalObjectReference, Secret};
use kube::{Api, ResourceExt as _, api::PostParams};
use postgres_restore_operator::types::{
PostgresPhysicalReplica, PostgresPhysicalRestore, PostgresPhysicalRestoreSpec, ReplicaPhase,
RestorePhase,
RestorePhase, SchemaMigrationPhase,
};
use tokio::time::{sleep, timeout};

Expand Down Expand Up @@ -144,9 +144,9 @@ async fn persistent_schemas_migration() {
let phase = replica
.status
.as_ref()
.and_then(|s| s.schema_migration_phase.as_deref());
.and_then(|s| s.schema_migration_phase.as_ref());
println!("[{replica_name}] schemaMigrationPhase: {phase:?}");
if phase == Some("complete") {
if matches!(phase, Some(SchemaMigrationPhase::Complete)) {
return;
}
}
Expand Down Expand Up @@ -571,9 +571,9 @@ async fn persistent_schemas_skip_missing_on_source() {
let phase = replica
.status
.as_ref()
.and_then(|s| s.schema_migration_phase.as_deref());
.and_then(|s| s.schema_migration_phase.as_ref());
println!("[{replica_name}] schemaMigrationPhase: {phase:?}");
if phase == Some("complete") {
if matches!(phase, Some(SchemaMigrationPhase::Complete)) {
return;
}
}
Expand Down Expand Up @@ -767,8 +767,8 @@ async fn persistent_schemas_all_missing_prunes_previous_restore() {
post_switchover
.status
.as_ref()
.and_then(|s| s.schema_migration_phase.as_deref()),
Some("complete"),
.and_then(|s| s.schema_migration_phase.as_ref()),
Some(&SchemaMigrationPhase::Complete),
"schemaMigrationPhase must be set to complete even when all schemas are skipped"
);

Expand Down