Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 71 additions & 25 deletions src/controllers/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1099,6 +1099,46 @@ fn migration_exceeded_budget(replica: &PostgresPhysicalReplica, job: &Job) -> bo
/// schema-less) database instead of being blocked indefinitely. The
/// next restore cycle re-attempts migration if the schemas reappear on
/// the source.
/// Connect to the new restore and `DROP SCHEMA … CASCADE` each given
/// schema. Separate function so the caller can wrap it in
/// `tokio::time::timeout`; the operation has no internal deadline and
/// can hang indefinitely if a backend on the target has a lock on the
/// schema's namespace.
async fn drop_persistent_schemas_on_target(
client: &Client,
ctx: &Arc<Context>,
replica: &PostgresPhysicalReplica,
namespace: &str,
new_restore_name: &str,
schemas: &[String],
) -> Result<()> {
let reader_secret_name = replica.creds_secret_name();
let secrets: Api<Secret> = Api::namespaced(client.clone(), namespace);
let reader_secret = secrets.get(&reader_secret_name).await?;
let reader_user = postgres::read_secret_field(&reader_secret, "username")?;
let reader_password = postgres::read_secret_field(&reader_secret, "password")?;
let target_dbname = postgres::discover_restore_database(
client,
namespace,
new_restore_name,
&reader_user,
&reader_password,
ctx.use_port_forward(),
)
.await?;
let conn = postgres::connect_to_restore(
client,
namespace,
new_restore_name,
&target_dbname,
&reader_user,
&reader_password,
ctx.use_port_forward(),
)
.await?;
postgres::drop_schemas_on(&conn.client, schemas).await
}

async fn timeout_schema_migration(
client: &Client,
ctx: &Arc<Context>,
Expand Down Expand Up @@ -1126,35 +1166,41 @@ async fn timeout_schema_migration(
warn!(job = %job_name, error = %e, "failed to delete timed-out migration Job");
}

// DROP SCHEMA … CASCADE the persistent_schemas on the new restore so
// the operator's "owned" schemas don't carry stale leftovers from the
// restored data. Best-effort per-schema via IF EXISTS.
// Opportunistically DROP SCHEMA … CASCADE the persistent_schemas on
// the new restore so the operator's "owned" schemas don't carry stale
// leftovers from the restored data. Bounded at 60 seconds: if a
// backend in the target restore is itself stuck (the exact failure
// mode that tripped the migration budget in the first place — e.g.
// CREATE TABLE spinning at 100% CPU and ignoring SIGTERM), our DROP
// queues on its lock and never completes. Better to leave leftover
// schemas than to wedge the switchover indefinitely on the cleanup.
const CLEANUP_TIMEOUT: Duration = Duration::from_secs(60);
if !schemas.is_empty() {
let reader_secret_name = replica.creds_secret_name();
let secrets: Api<Secret> = Api::namespaced(client.clone(), namespace);
let reader_secret = secrets.get(&reader_secret_name).await?;
let reader_user = postgres::read_secret_field(&reader_secret, "username")?;
let reader_password = postgres::read_secret_field(&reader_secret, "password")?;
let target_dbname = postgres::discover_restore_database(
let cleanup = drop_persistent_schemas_on_target(
client,
ctx,
replica,
namespace,
&new_restore_name,
&reader_user,
&reader_password,
ctx.use_port_forward(),
)
.await?;
let conn = postgres::connect_to_restore(
client,
namespace,
&new_restore_name,
&target_dbname,
&reader_user,
&reader_password,
ctx.use_port_forward(),
)
.await?;
postgres::drop_schemas_on(&conn.client, &schemas).await?;
&schemas,
);
match tokio::time::timeout(CLEANUP_TIMEOUT, cleanup).await {
Ok(Ok(())) => {}
Ok(Err(e)) => {
warn!(
replica = %replica_name,
error = %e,
"DROP SCHEMA cleanup errored in timeout-skip path; proceeding to switchover with leftover schemas"
);
}
Err(_) => {
warn!(
replica = %replica_name,
timeout = ?CLEANUP_TIMEOUT,
"DROP SCHEMA cleanup itself timed out (target postgres backend likely stuck); proceeding to switchover with leftover schemas"
);
}
}
}

// Surface as a Warning event so this is visible on the replica CR.
Expand Down