Skip to content

Commit 735f85d

Browse files
authored
refactor(types): replace schemaMigrationPhase magic strings with an enum (#65)
2 parents a5395e4 + 6ab323b commit 735f85d

4 files changed

Lines changed: 193 additions & 22 deletions

File tree

src/controllers/replica.rs

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,11 @@ pub fn persistent_schemas_migration_settled(replica: &PostgresPhysicalReplica) -
6060
if replica.spec.persistent_schemas.is_none() {
6161
return true;
6262
}
63-
let phase = replica
63+
replica
6464
.status
6565
.as_ref()
66-
.and_then(|s| s.schema_migration_phase.as_deref());
67-
!matches!(phase, Some("active"))
66+
.and_then(|s| s.schema_migration_phase.as_ref())
67+
.is_none_or(SchemaMigrationPhase::is_settled)
6868
}
6969

7070
/// Generate a random password for analytics credentials.
@@ -1077,7 +1077,7 @@ async fn mark_schema_migration_complete(
10771077
let patch = serde_json::json!({
10781078
"status": {
10791079
"schemaMigrationJob": null,
1080-
"schemaMigrationPhase": "complete",
1080+
"schemaMigrationPhase": SchemaMigrationPhase::Complete,
10811081
}
10821082
});
10831083
replicas
@@ -1249,7 +1249,7 @@ async fn timeout_schema_migration(
12491249
let patch = serde_json::json!({
12501250
"status": {
12511251
"schemaMigrationJob": null,
1252-
"schemaMigrationPhase": "timeout-skipped",
1252+
"schemaMigrationPhase": SchemaMigrationPhase::TimeoutSkipped,
12531253
}
12541254
});
12551255
replicas
@@ -1307,7 +1307,7 @@ async fn reconcile_schema_migration(
13071307
.status
13081308
.as_ref()
13091309
.and_then(|s| s.schema_migration_phase.as_ref())
1310-
.is_some_and(|p| p == "complete")
1310+
.is_some_and(|p| matches!(p, SchemaMigrationPhase::Complete))
13111311
{
13121312
debug!(replica = %replica_name, "migration already complete in status");
13131313
return Ok(true);
@@ -1373,7 +1373,11 @@ async fn reconcile_schema_migration(
13731373
info!(replica = %replica_name, "migration Job succeeded");
13741374
}
13751375

1376-
let phase = if is_partial { "partial" } else { "complete" };
1376+
let phase = if is_partial {
1377+
SchemaMigrationPhase::Partial
1378+
} else {
1379+
SchemaMigrationPhase::Complete
1380+
};
13771381
let replicas: Api<PostgresPhysicalReplica> =
13781382
Api::namespaced(client.clone(), namespace);
13791383
let patch = serde_json::json!({
@@ -1422,7 +1426,7 @@ async fn reconcile_schema_migration(
14221426
let patch = serde_json::json!({
14231427
"status": {
14241428
"schemaMigrationJob": null,
1425-
"schemaMigrationPhase": format!("failed: {}", last_error),
1429+
"schemaMigrationPhase": SchemaMigrationPhase::Failed(last_error.clone()),
14261430
}
14271431
});
14281432
replicas
@@ -1662,7 +1666,7 @@ async fn reconcile_schema_migration(
16621666
let patch = serde_json::json!({
16631667
"status": {
16641668
"schemaMigrationJob": job_name,
1665-
"schemaMigrationPhase": "active",
1669+
"schemaMigrationPhase": SchemaMigrationPhase::Active,
16661670
}
16671671
});
16681672
replicas

src/controllers/replica/tests.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use super::{
99

1010
fn make_replica(
1111
persistent_schemas: Option<Vec<String>>,
12-
schema_migration_phase: Option<String>,
12+
schema_migration_phase: Option<SchemaMigrationPhase>,
1313
) -> PostgresPhysicalReplica {
1414
PostgresPhysicalReplica {
1515
metadata: ObjectMeta {
@@ -64,8 +64,14 @@ fn migration_settled_when_no_status() {
6464

6565
#[test]
6666
fn migration_settled_in_terminal_phases() {
67-
for phase in ["complete", "partial", "timeout-skipped", "failed: stuff"] {
68-
let replica = make_replica(Some(vec!["dbt".into()]), Some(phase.into()));
67+
let terminal = [
68+
SchemaMigrationPhase::Complete,
69+
SchemaMigrationPhase::Partial,
70+
SchemaMigrationPhase::TimeoutSkipped,
71+
SchemaMigrationPhase::Failed("stuff".into()),
72+
];
73+
for phase in terminal {
74+
let replica = make_replica(Some(vec!["dbt".into()]), Some(phase.clone()));
6975
assert!(
7076
persistent_schemas_migration_settled(&replica),
7177
"phase {phase:?} should let sweep proceed"
@@ -75,7 +81,7 @@ fn migration_settled_in_terminal_phases() {
7581

7682
#[test]
7783
fn migration_blocks_sweep_only_when_active() {
78-
let replica = make_replica(Some(vec!["dbt".into()]), Some("active".into()));
84+
let replica = make_replica(Some(vec!["dbt".into()]), Some(SchemaMigrationPhase::Active));
7985
assert!(
8086
!persistent_schemas_migration_settled(&replica),
8187
"active phase must block sweep so we don't delete the migration source"

src/types/replica.rs

Lines changed: 163 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -281,9 +281,9 @@ pub struct PostgresPhysicalReplicaStatus {
281281
#[serde(default, skip_serializing_if = "Option::is_none")]
282282
pub schema_migration_job: Option<String>,
283283

284-
/// Phase of schema migration: pending, active, complete, failed
284+
/// Phase of schema migration. See [`SchemaMigrationPhase`].
285285
#[serde(default, skip_serializing_if = "Option::is_none")]
286-
pub schema_migration_phase: Option<String>,
286+
pub schema_migration_phase: Option<SchemaMigrationPhase>,
287287

288288
/// Measured size of persistent schema data from the last successful migration (bytes).
289289
/// Used to size the next restore PVC.
@@ -306,6 +306,106 @@ pub enum ReplicaPhase {
306306
Failed,
307307
}
308308

309+
/// Lifecycle phase of the operator's schema-migration step that runs
310+
/// during a switchover when `persistent_schemas` is configured on the
311+
/// replica. The serialized form is a flat string matching the historical
312+
/// wire format (`active` / `complete` / `partial` / `timeout-skipped` /
313+
/// `failed: <reason>`) so existing replica status objects round-trip
314+
/// unchanged.
315+
#[derive(Debug, Clone, PartialEq, Eq)]
316+
pub enum SchemaMigrationPhase {
317+
/// Migration Job is running. The sweep must not delete the source
318+
/// restore while we're in this state.
319+
Active,
320+
/// Migration Job finished cleanly; persistent schemas were carried
321+
/// across to the new restore.
322+
Complete,
323+
/// Migration Job finished but psql logged statement errors (typical
324+
/// when dbt views reference renamed/dropped upstream columns). Some
325+
/// persistent_schemas objects may need regenerating upstream.
326+
Partial,
327+
/// Migration exceeded the per-cycle wall-clock budget (20% of the
328+
/// cron interval). The operator dropped the persistent_schemas on
329+
/// the new restore and proceeded to switchover anyway — a usable
330+
/// replica beats carrying the schema through indefinitely. The next
331+
/// cycle re-attempts migration if the schemas have regenerated.
332+
TimeoutSkipped,
333+
/// Migration Job failed. The old restore stays Active; the new
334+
/// restore stays in Switching. The wrapped string is the reason
335+
/// surfaced from the Job's callback body (or "no callback received").
336+
Failed(String),
337+
}
338+
339+
impl SchemaMigrationPhase {
340+
/// True for every phase except [`Self::Active`]. Used by the sweep
341+
/// gate: as long as the migration isn't currently running, deleting
342+
/// the previous Active restore is safe (nothing depends on it being
343+
/// around). Coded as "not Active" rather than enumerating terminal
344+
/// variants so adding a new variant doesn't risk silently
345+
/// reintroducing the deadlock that originally motivated this enum.
346+
pub fn is_settled(&self) -> bool {
347+
!matches!(self, Self::Active)
348+
}
349+
}
350+
351+
impl std::fmt::Display for SchemaMigrationPhase {
352+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
353+
match self {
354+
Self::Active => f.write_str("active"),
355+
Self::Complete => f.write_str("complete"),
356+
Self::Partial => f.write_str("partial"),
357+
Self::TimeoutSkipped => f.write_str("timeout-skipped"),
358+
Self::Failed(reason) => write!(f, "failed: {reason}"),
359+
}
360+
}
361+
}
362+
363+
impl std::str::FromStr for SchemaMigrationPhase {
364+
type Err = String;
365+
366+
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
367+
match s {
368+
"active" => Ok(Self::Active),
369+
"complete" => Ok(Self::Complete),
370+
"partial" => Ok(Self::Partial),
371+
"timeout-skipped" => Ok(Self::TimeoutSkipped),
372+
other => {
373+
if let Some(reason) = other.strip_prefix("failed:") {
374+
Ok(Self::Failed(reason.trim().to_string()))
375+
} else {
376+
Err(format!("unknown schema migration phase: {other:?}"))
377+
}
378+
}
379+
}
380+
}
381+
}
382+
383+
impl Serialize for SchemaMigrationPhase {
384+
fn serialize<S: serde::Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
385+
s.serialize_str(&self.to_string())
386+
}
387+
}
388+
389+
impl<'de> Deserialize<'de> for SchemaMigrationPhase {
390+
fn deserialize<D: serde::Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
391+
let s = String::deserialize(d)?;
392+
s.parse().map_err(serde::de::Error::custom)
393+
}
394+
}
395+
396+
impl JsonSchema for SchemaMigrationPhase {
397+
fn schema_name() -> Cow<'static, str> {
398+
"SchemaMigrationPhase".into()
399+
}
400+
401+
fn json_schema(_: &mut SchemaGenerator) -> Schema {
402+
json_schema!({
403+
"type": "string",
404+
"description": "Schema migration phase: 'active' (Job running), 'complete' (succeeded), 'partial' (succeeded with statement errors), 'timeout-skipped' (budget exceeded; persistent schemas dropped and switchover proceeded), or 'failed: <reason>' (Job failed).",
405+
})
406+
}
407+
}
408+
309409
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
310410
#[serde(rename_all = "camelCase")]
311411
pub struct ConnectionInfo {
@@ -349,3 +449,64 @@ impl PostgresPhysicalReplica {
349449
format!("{name}-creds", name = self.name_any())
350450
}
351451
}
452+
453+
#[cfg(test)]
454+
mod tests {
455+
use super::*;
456+
457+
#[test]
458+
fn schema_migration_phase_roundtrips_terminal_variants() {
459+
for phase in [
460+
SchemaMigrationPhase::Active,
461+
SchemaMigrationPhase::Complete,
462+
SchemaMigrationPhase::Partial,
463+
SchemaMigrationPhase::TimeoutSkipped,
464+
] {
465+
let s = serde_json::to_string(&phase).expect("serialize");
466+
let back: SchemaMigrationPhase = serde_json::from_str(&s).expect("deserialize");
467+
assert_eq!(phase, back, "round-trip mismatch for {phase:?}");
468+
}
469+
}
470+
471+
#[test]
472+
fn schema_migration_phase_failed_preserves_reason() {
473+
let phase = SchemaMigrationPhase::Failed("connection refused".into());
474+
let s = serde_json::to_string(&phase).expect("serialize");
475+
assert_eq!(s, "\"failed: connection refused\"");
476+
let back: SchemaMigrationPhase = serde_json::from_str(&s).expect("deserialize");
477+
assert_eq!(phase, back);
478+
}
479+
480+
#[test]
481+
fn schema_migration_phase_wire_strings_match_history() {
482+
// The wire format is documented in the README and consumed by
483+
// external tooling (dashboards, alerts). These strings are part
484+
// of pgro's public contract; renaming them is a breaking change.
485+
assert_eq!(SchemaMigrationPhase::Active.to_string(), "active");
486+
assert_eq!(SchemaMigrationPhase::Complete.to_string(), "complete");
487+
assert_eq!(SchemaMigrationPhase::Partial.to_string(), "partial");
488+
assert_eq!(
489+
SchemaMigrationPhase::TimeoutSkipped.to_string(),
490+
"timeout-skipped"
491+
);
492+
assert_eq!(
493+
SchemaMigrationPhase::Failed("boom".into()).to_string(),
494+
"failed: boom"
495+
);
496+
}
497+
498+
#[test]
499+
fn schema_migration_phase_rejects_unknown_string() {
500+
let r: Result<SchemaMigrationPhase, _> = "what".parse();
501+
assert!(r.is_err());
502+
}
503+
504+
#[test]
505+
fn schema_migration_phase_is_settled() {
506+
assert!(!SchemaMigrationPhase::Active.is_settled());
507+
assert!(SchemaMigrationPhase::Complete.is_settled());
508+
assert!(SchemaMigrationPhase::Partial.is_settled());
509+
assert!(SchemaMigrationPhase::TimeoutSkipped.is_settled());
510+
assert!(SchemaMigrationPhase::Failed("x".into()).is_settled());
511+
}
512+
}

tests/persistent_schemas.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use k8s_openapi::api::core::v1::{LocalObjectReference, Secret};
22
use kube::{Api, ResourceExt as _, api::PostParams};
33
use postgres_restore_operator::types::{
44
PostgresPhysicalReplica, PostgresPhysicalRestore, PostgresPhysicalRestoreSpec, ReplicaPhase,
5-
RestorePhase,
5+
RestorePhase, SchemaMigrationPhase,
66
};
77
use tokio::time::{sleep, timeout};
88

@@ -144,9 +144,9 @@ async fn persistent_schemas_migration() {
144144
let phase = replica
145145
.status
146146
.as_ref()
147-
.and_then(|s| s.schema_migration_phase.as_deref());
147+
.and_then(|s| s.schema_migration_phase.as_ref());
148148
println!("[{replica_name}] schemaMigrationPhase: {phase:?}");
149-
if phase == Some("complete") {
149+
if matches!(phase, Some(SchemaMigrationPhase::Complete)) {
150150
return;
151151
}
152152
}
@@ -571,9 +571,9 @@ async fn persistent_schemas_skip_missing_on_source() {
571571
let phase = replica
572572
.status
573573
.as_ref()
574-
.and_then(|s| s.schema_migration_phase.as_deref());
574+
.and_then(|s| s.schema_migration_phase.as_ref());
575575
println!("[{replica_name}] schemaMigrationPhase: {phase:?}");
576-
if phase == Some("complete") {
576+
if matches!(phase, Some(SchemaMigrationPhase::Complete)) {
577577
return;
578578
}
579579
}
@@ -767,8 +767,8 @@ async fn persistent_schemas_all_missing_prunes_previous_restore() {
767767
post_switchover
768768
.status
769769
.as_ref()
770-
.and_then(|s| s.schema_migration_phase.as_deref()),
771-
Some("complete"),
770+
.and_then(|s| s.schema_migration_phase.as_ref()),
771+
Some(&SchemaMigrationPhase::Complete),
772772
"schemaMigrationPhase must be set to complete even when all schemas are skipped"
773773
);
774774

0 commit comments

Comments
 (0)