Skip to content

Commit 5da48cc

Browse files
committed
feat(operator): bound schema migration at 20% of cron interval
The schema migration step inside switchover used to wait indefinitely for its Job to complete. A genuinely-stuck postgres backend on the target restore (e.g. a CREATE TABLE spinning at 100% CPU forever) therefore wedged the entire replica: switchover blocked, replica stuck in Switching, the old Active restore serves increasingly stale data, and every subsequent scheduled restore queues behind the wedge. Cap the migration at 20% of the time between consecutive cron firings (72 min for a 6-hourly schedule, 5h for daily). On timeout the operator drops the persistent_schemas on the new restore via DROP SCHEMA … CASCADE, records a SchemaMigrationTimedOut Warning event, sets schemaMigrationPhase = timeout-skipped, and proceeds with the switchover. The replica comes up serving snapshot contents without the persistent schemas; the next cycle re-attempts migration if the schemas have regenerated on the source. A usable replica beats carrying the schema through indefinitely. Document the budget + timeout behaviour in the README's new "Persistent schemas" section.
1 parent 3c3c36d commit 5da48cc

3 files changed

Lines changed: 222 additions & 2 deletions

File tree

README.md

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ Defines a continuously-refreshed replica of a PostgreSQL database restored from
102102
| `readOnly` | `bool` | No | `true` | Set the restored database to read-only mode. |
103103
| `postgresExtraConfig` | `string` | No | — | Extra lines appended to `postgresql.conf` (e.g. `shared_preload_libraries`). |
104104
| `notifications` | `[]NotificationConfig` | No | `[]` | Notification targets called on restore events. |
105-
| `persistentSchemas` | `[]string` | No | — | List of schema names to migrate from the previous restore to the new restore on each switchover. |
105+
| `persistentSchemas` | `[]string` | No | — | List of schema names to migrate from the previous restore to the new restore on each switchover. See [Persistent schemas](#persistent-schemas) below for the migration time budget and what happens on timeout. |
106106

107107
The cron expression is parsed using the [cronexpr](https://docs.rs/cronexpr) crate.
108108
It has two interesting features:
@@ -114,6 +114,24 @@ The jitter is a random duration between -time/2 and +time/2.
114114
For example, `10m` will result in a jitter between -5m and 5m.
115115
When using `H` in the cron expression, you might want to set the jitter to zero to properly take advantage of the spread-but-stable behaviour.
116116

117+
#### Persistent schemas
118+
119+
Each switchover normally drops the new restore (so it carries only what was in the snapshot) and is fast.
120+
The `persistentSchemas` field opts a schema (e.g. `dbt`) into being **carried across restores** via a `pg_dump | psql` migration Job that runs between the previous restore and the new one.
121+
A healthy migration takes seconds.
122+
123+
The migration has a **hard time budget of 20% of the cron interval** (e.g. ~72 min on a 6-hourly schedule, ~5 h on a daily one).
124+
If the budget is exceeded — most realistically because some external upstream condition wedges postgres mid-migration — the operator:
125+
126+
1. Cancels the migration Job.
127+
2. Runs `DROP SCHEMA <name> CASCADE` for each persistent schema on the new restore.
128+
3. Records a `SchemaMigrationTimedOut` Warning event on the replica.
129+
4. Sets `status.schemaMigrationPhase = "timeout-skipped"`.
130+
5. Proceeds with the switchover.
131+
132+
The intent is that **a usable replica beats carrying the schema through**.
133+
The next restore cycle will re-attempt the migration if the schemas have been regenerated on the source in the meantime; until then the replica is up and serving the snapshot contents.
134+
117135
#### SnapshotFilter
118136

119137
| Field | Type | Required | Description |
@@ -163,7 +181,7 @@ Additional fields for `target: graphQL`:
163181
| `notifications` | `[]NotificationStatus` | Status of each configured notification target. |
164182
| `conditions` | `[]Condition` | Standard Kubernetes conditions. |
165183
| `schemaMigrationJob` | `string` | Name of the active schema migration Job (set while migration is in progress). |
166-
| `schemaMigrationPhase` | `string` | Phase of the schema migration (`active`, `complete`, or `failed: <reason>`). |
184+
| `schemaMigrationPhase` | `string` | Phase of the schema migration (`active`, `complete`, `partial`, `timeout-skipped`, or `failed: <reason>`). See [Persistent schemas](#persistent-schemas). |
167185
| `persistentSchemaDataSize` | `Quantity` | Measured size of persistent schema data from the last successful migration. Used to size the next restore PVC. |
168186
| `consecutiveRestoreFailures` | `uint32` | Number of consecutive restore failures. Reset to 0 on success. After 3 consecutive failures the operator stops scheduling new restores until the counter is reset (automatically on next successful restore, or manually via `kubectl patch --subresource=status`). |
169187

src/controllers/replica.rs

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1074,6 +1074,133 @@ async fn mark_schema_migration_complete(
10741074
Ok(())
10751075
}
10761076

1077+
/// True if the running schema-migration Job has been alive longer than the
1078+
/// replica's per-cycle migration budget (see
1079+
/// [`PostgresPhysicalReplica::schema_migration_timeout`]). Uses the Job's
1080+
/// `creationTimestamp` as the start; falls back to "not exceeded" if the
1081+
/// Job has no creation timestamp (which shouldn't happen in practice).
1082+
fn migration_exceeded_budget(replica: &PostgresPhysicalReplica, job: &Job) -> bool {
1083+
let Some(created) = job
1084+
.metadata
1085+
.creation_timestamp
1086+
.as_ref()
1087+
.map(|t| Timestamp::from(t.0))
1088+
else {
1089+
return false;
1090+
};
1091+
let elapsed = Timestamp::now().duration_since(created);
1092+
elapsed > replica.schema_migration_timeout()
1093+
}
1094+
1095+
/// Abandon a stuck schema migration: drop the Job, DROP SCHEMA … CASCADE
1096+
/// the configured `persistent_schemas` on the new restore, record a
1097+
/// Warning event, and mark the migration phase as `timeout-skipped`.
1098+
/// Lets switchover proceed so the replica gets a usable (if
1099+
/// schema-less) database instead of being blocked indefinitely. The
1100+
/// next restore cycle re-attempts migration if the schemas reappear on
1101+
/// the source.
1102+
async fn timeout_schema_migration(
1103+
client: &Client,
1104+
ctx: &Arc<Context>,
1105+
replica: &PostgresPhysicalReplica,
1106+
namespace: &str,
1107+
new_restore: &PostgresPhysicalRestore,
1108+
job_name: &str,
1109+
) -> Result<()> {
1110+
let replica_name = replica.name_any();
1111+
let new_restore_name = new_restore.name_any();
1112+
let schemas: Vec<String> = replica.spec.persistent_schemas.clone().unwrap_or_default();
1113+
1114+
warn!(
1115+
replica = %replica_name,
1116+
restore = %new_restore_name,
1117+
schemas = ?schemas,
1118+
timeout = ?replica.schema_migration_timeout(),
1119+
"schema migration exceeded budget; dropping persistent schemas on new restore and proceeding to switchover"
1120+
);
1121+
1122+
// Cancel the Job (background propagation so its pods are GC'd too).
1123+
let jobs: Api<Job> = Api::namespaced(client.clone(), namespace);
1124+
let dp = kube::api::DeleteParams::background();
1125+
if let Err(e) = jobs.delete(job_name, &dp).await {
1126+
warn!(job = %job_name, error = %e, "failed to delete timed-out migration Job");
1127+
}
1128+
1129+
// DROP SCHEMA … CASCADE the persistent_schemas on the new restore so
1130+
// the operator's "owned" schemas don't carry stale leftovers from the
1131+
// restored data. Best-effort per-schema via IF EXISTS.
1132+
if !schemas.is_empty() {
1133+
let reader_secret_name = replica.creds_secret_name();
1134+
let secrets: Api<Secret> = Api::namespaced(client.clone(), namespace);
1135+
let reader_secret = secrets.get(&reader_secret_name).await?;
1136+
let reader_user = postgres::read_secret_field(&reader_secret, "username")?;
1137+
let reader_password = postgres::read_secret_field(&reader_secret, "password")?;
1138+
let target_dbname = postgres::discover_restore_database(
1139+
client,
1140+
namespace,
1141+
&new_restore_name,
1142+
&reader_user,
1143+
&reader_password,
1144+
ctx.use_port_forward(),
1145+
)
1146+
.await?;
1147+
let conn = postgres::connect_to_restore(
1148+
client,
1149+
namespace,
1150+
&new_restore_name,
1151+
&target_dbname,
1152+
&reader_user,
1153+
&reader_password,
1154+
ctx.use_port_forward(),
1155+
)
1156+
.await?;
1157+
postgres::drop_schemas_on(&conn.client, &schemas).await?;
1158+
}
1159+
1160+
// Surface as a Warning event so this is visible on the replica CR.
1161+
let note = format!(
1162+
"Schema migration exceeded its time budget (20% of cron interval). \
1163+
Persistent schemas [{}] were dropped on the new restore so it can come up. \
1164+
The next restore cycle will reattempt migration if the schemas have been regenerated upstream.",
1165+
schemas.join(", ")
1166+
);
1167+
if let Err(e) = ctx
1168+
.recorder
1169+
.publish(
1170+
&Event {
1171+
type_: EventType::Warning,
1172+
reason: "SchemaMigrationTimedOut".into(),
1173+
note: Some(note),
1174+
action: "Restore".into(),
1175+
secondary: Some(new_restore.object_ref(&())),
1176+
},
1177+
&replica.object_ref(&()),
1178+
)
1179+
.await
1180+
{
1181+
warn!(replica = %replica_name, error = %e, "failed to publish SchemaMigrationTimedOut event");
1182+
}
1183+
1184+
// Status: phase = timeout-skipped so it's distinguishable from
1185+
// complete/partial/failed.
1186+
let replicas: Api<PostgresPhysicalReplica> = Api::namespaced(client.clone(), namespace);
1187+
let patch = serde_json::json!({
1188+
"status": {
1189+
"schemaMigrationJob": null,
1190+
"schemaMigrationPhase": "timeout-skipped",
1191+
}
1192+
});
1193+
replicas
1194+
.patch_status(
1195+
&replica_name,
1196+
&PatchParams::apply("postgres-restore-operator"),
1197+
&Patch::Merge(&patch),
1198+
)
1199+
.await?;
1200+
1201+
Ok(())
1202+
}
1203+
10771204
async fn reconcile_schema_migration(
10781205
client: &Client,
10791206
ctx: &Arc<Context>,
@@ -1131,6 +1258,18 @@ async fn reconcile_schema_migration(
11311258
if let Some(job) = jobs.get_opt(&job_name).await? {
11321259
match classify_job(&job) {
11331260
JobStatus::Active => {
1261+
if migration_exceeded_budget(replica, &job) {
1262+
timeout_schema_migration(
1263+
client,
1264+
ctx,
1265+
replica,
1266+
namespace,
1267+
new_restore,
1268+
&job_name,
1269+
)
1270+
.await?;
1271+
return Ok(true);
1272+
}
11341273
debug!(replica = %replica_name, job = %job_name, "migration Job still running");
11351274
return Ok(false);
11361275
}

src/controllers/replica/scheduling.rs

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,46 @@ impl PostgresPhysicalReplica {
5353
format!("{:016x}", hasher.finish())
5454
}
5555

56+
/// Wall-clock budget for the schema migration step inside a single
57+
/// restore cycle. Returns 20% of the interval between consecutive
58+
/// cron firings; e.g. a `0 */6 * * *` schedule (every 6h) gets a
59+
/// ~72 min budget. A healthy migration completes in seconds, so this
60+
/// is a generous backstop, not a tight SLA — the goal is to keep a
61+
/// pathological migration (postgres backend stuck on a single DDL,
62+
/// for example) from blocking the replica from coming up at all.
63+
/// Falls back to 1h if the cron expression can't be parsed or there
64+
/// is no schedule. When the timeout fires, the operator drops the
65+
/// `persistent_schemas` on the new restore (DROP SCHEMA … CASCADE)
66+
/// and proceeds to switchover. The next restore reattempts the
67+
/// migration if the schemas were regenerated upstream in between.
68+
pub fn schema_migration_timeout(&self) -> SignedDuration {
69+
const FALLBACK: SignedDuration = SignedDuration::from_secs(3600);
70+
const BUDGET_FRACTION_DENOMINATOR: i64 = 5; // 1/5 == 20%
71+
let Some(interval) = self.cron_interval(Timestamp::now()) else {
72+
return FALLBACK;
73+
};
74+
SignedDuration::from_secs(interval.as_secs() / BUDGET_FRACTION_DENOMINATOR)
75+
}
76+
77+
/// Interval between two consecutive cron firings of this replica's
78+
/// schedule, measured from `now`. Returns `None` when the schedule
79+
/// can't be parsed or doesn't have a second next-fire.
80+
fn cron_interval(&self, now: Timestamp) -> Option<SignedDuration> {
81+
let schedule = &self.spec.schedule;
82+
let cron = parse_crontab_with(schedule, {
83+
let mut options = ParseOptions::default();
84+
options.fallback_timezone_option = cronexpr::FallbackTimezoneOption::UTC;
85+
options
86+
})
87+
.ok()?;
88+
let next = cron.find_next(now).ok()?;
89+
let next_ts = next.timestamp();
90+
let after = cron
91+
.find_next(next_ts + SignedDuration::from_secs(1))
92+
.ok()?;
93+
Some(after.timestamp().duration_since(next_ts))
94+
}
95+
5696
pub fn compute_next_scheduled_restore(&self, now: Timestamp) -> Option<Timestamp> {
5797
let schedule = &self.spec.schedule;
5898

@@ -348,6 +388,29 @@ mod tests {
348388
assert!(next.unwrap() > now);
349389
}
350390

391+
#[test]
392+
fn schema_migration_timeout_six_hourly_cron_is_twenty_percent() {
393+
// `0 */6 * * *` fires every 6h → 21600s → 20% = 4320s = 72min.
394+
let replica = make_replica("0 */6 * * *", None, None, None);
395+
let timeout = replica.schema_migration_timeout();
396+
assert_eq!(timeout, SignedDuration::from_secs(4320));
397+
}
398+
399+
#[test]
400+
fn schema_migration_timeout_daily_cron_is_twenty_percent() {
401+
// Daily at midnight → 86400s → 20% = 17280s = 288min.
402+
let replica = make_replica("0 0 * * *", None, None, None);
403+
let timeout = replica.schema_migration_timeout();
404+
assert_eq!(timeout, SignedDuration::from_secs(17280));
405+
}
406+
407+
#[test]
408+
fn schema_migration_timeout_falls_back_on_invalid_cron() {
409+
let replica = make_replica("not a cron", None, None, None);
410+
let timeout = replica.schema_migration_timeout();
411+
assert_eq!(timeout, SignedDuration::from_secs(3600));
412+
}
413+
351414
#[test]
352415
fn compute_next_scheduled_restore_invalid_cron() {
353416
let replica = make_replica("not a cron", None, None, None);

0 commit comments

Comments
 (0)