Skip to content

Commit ecba368

Browse files
authored
fix(operator): terminate other client backends before drop_schemas_on (#57)
2 parents 29f3933 + 9e011f7 commit ecba368

1 file changed

Lines changed: 31 additions & 0 deletions

File tree

src/controllers/postgres.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,9 +309,40 @@ pub async fn existing_schemas_on(
309309
.collect())
310310
}
311311

312+
/// Terminate every other client backend connected to the current database.
313+
/// The operator owns each restore's database fully until handover, so any
314+
/// other session is a transient or stray client whose lock-holding would
315+
/// block our DDL. Best-effort: rows where `pg_terminate_backend` returns
316+
/// false (already gone, raced with another terminator) are ignored.
317+
///
318+
/// Note: this only dislodges sessions PG can interrupt. A backend stuck in
319+
/// an uninterruptible kernel wait (rare but it happens — e.g. a long-dead
320+
/// client whose TCP socket the kernel hasn't reaped) will not exit. For
321+
/// those, the restore pod must be restarted.
322+
pub async fn terminate_other_backends(pg: &tokio_postgres::Client) -> Result<u64> {
323+
let row = pg
324+
.query_one(
325+
"SELECT count(*) FILTER (WHERE pg_terminate_backend(pid)) \
326+
FROM pg_stat_activity \
327+
WHERE datname = current_database() \
328+
AND backend_type = 'client backend' \
329+
AND pid <> pg_backend_pid()",
330+
&[],
331+
)
332+
.await?;
333+
let killed: i64 = row.get(0);
334+
if killed > 0 {
335+
info!(count = killed, "terminated other client backends");
336+
}
337+
Ok(killed as u64)
338+
}
339+
312340
/// Drop the given schemas (and all contents) on an already-open connection.
341+
/// Terminates other client backends on the database first so stray sessions
342+
/// holding locks on the target schemas don't queue our DDL behind them.
313343
/// Idempotent: missing schemas are skipped via `DROP SCHEMA IF EXISTS`.
314344
pub async fn drop_schemas_on(pg: &tokio_postgres::Client, schemas: &[String]) -> Result<()> {
345+
terminate_other_backends(pg).await?;
315346
for schema in schemas {
316347
let stmt = format!("DROP SCHEMA IF EXISTS {} CASCADE", quote_ident(schema));
317348
debug!(schema = schema, "dropping schema");

0 commit comments

Comments
 (0)