Skip to content

Commit 3abc7fb

Browse files
authored
fix(operator): drop wall-clock query timeout, keep TCP keepalives (#56)
2 parents b7a71d7 + 5248e3e commit 3abc7fb

2 files changed

Lines changed: 30 additions & 84 deletions

File tree

AGENTS.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,10 @@
2222
- Always work from a branch. If you're on `main`, create a new branch.
2323
- If you can at all do something from the operator, instead of in a script that runs in a job, do so. For example, if you need to query something in a database, you should be able to do so from the operator, instead of writing SQL code in bash logic.
2424
<!-- end rules -->
25+
26+
## Lessons learned
27+
28+
- Detect dead Postgres connections with **TCP keepalives** (`socket2::TcpKeepalive` on the raw `TcpStream` before handing it to `tokio_postgres::Config::connect_raw`), not with a wall-clock `tokio::time::timeout` around the query. Keepalives fire on the kernel's own timer regardless of in-flight queries, so a legitimately long-running statement (e.g. `DROP SCHEMA … CASCADE` on a populated dbt schema, which can run for tens of minutes) keeps working, while a NAT-evicted / silently-dead socket still errors within ~90s and the reconcile retries.
29+
- Wall-clock timeouts on a long-tail-distributed operation make a retry loop **never converge**: every attempt hits the cap, error policy retries, every retry hits the cap again. Don't add a timeout unless you have a real p99 number for the operation it wraps.
30+
- The operator's own libpq sockets are separate from the migration Job's libpq URI. Keepalive fixes in the Job (`keepalives=1&keepalives_idle=...`) do **not** cover the operator — set both.
31+
- The kube port-forward path is not a raw TCP socket; socket-level keepalives don't apply. That path keeps its own liveness via the API server WebSocket — only `connect_via_tcp` needs the socket2 setup.

src/controllers/postgres.rs

Lines changed: 23 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -7,24 +7,20 @@ use kube::{
77
};
88
use socket2::{SockRef, TcpKeepalive};
99
use tokio::net::TcpStream;
10-
use tokio_postgres::{NoTls, Row, ToStatement, types::ToSql};
10+
use tokio_postgres::NoTls;
1111
use tracing::{debug, info, warn};
1212

1313
use crate::error::{Error, Result};
1414

1515
pub const DEFAULT_PG_VERSION: i32 = 18;
1616

17-
/// Per-query timeout. Backstop for hung connections that survive TCP
18-
/// keepalives (e.g. a server that ACKs keepalive probes but the query
19-
/// itself wedges). Generous because `DROP SCHEMA ... CASCADE` on a
20-
/// populated dbt schema can take minutes; intent is runaway detection,
21-
/// not a query SLA.
22-
const QUERY_TIMEOUT: Duration = Duration::from_secs(5 * 60);
23-
2417
/// TCP keepalive parameters for operator-side libpq sockets. Matches the
2518
/// settings the migration Job uses in its libpq URI (see [[fix(migration)
2619
/// TCP keepalives]]). Detects NAT-evicted / silently-dead sockets within
2720
/// ~90s so reconciles can fail and retry instead of hanging forever.
21+
/// Keepalives fire on the kernel's own timer regardless of in-flight
22+
/// queries, so a legitimately long-running `DROP SCHEMA ... CASCADE` on
23+
/// a populated dbt schema is fine — only dead sockets are killed.
2824
const KEEPALIVE_IDLE: Duration = Duration::from_secs(60);
2925
const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(10);
3026
const KEEPALIVE_RETRIES: u32 = 3;
@@ -40,65 +36,6 @@ fn set_tcp_keepalive(stream: &TcpStream) -> Result<()> {
4036
Ok(())
4137
}
4238

43-
/// Run a parameterised query with the global [`QUERY_TIMEOUT`]. Returns a
44-
/// timeout-shaped error if the query doesn't complete in time, so the
45-
/// reconcile fails fast instead of hanging on a wedged connection.
46-
pub async fn query_timeout<T>(
47-
pg: &tokio_postgres::Client,
48-
stmt: &T,
49-
params: &[&(dyn ToSql + Sync)],
50-
) -> Result<Vec<Row>>
51-
where
52-
T: ?Sized + ToStatement,
53-
{
54-
tokio::time::timeout(QUERY_TIMEOUT, pg.query(stmt, params))
55-
.await
56-
.map_err(|_| Error::MissingField(format!("query timed out after {QUERY_TIMEOUT:?}")))?
57-
.map_err(Into::into)
58-
}
59-
60-
pub async fn query_one_timeout<T>(
61-
pg: &tokio_postgres::Client,
62-
stmt: &T,
63-
params: &[&(dyn ToSql + Sync)],
64-
) -> Result<Row>
65-
where
66-
T: ?Sized + ToStatement,
67-
{
68-
tokio::time::timeout(QUERY_TIMEOUT, pg.query_one(stmt, params))
69-
.await
70-
.map_err(|_| Error::MissingField(format!("query timed out after {QUERY_TIMEOUT:?}")))?
71-
.map_err(Into::into)
72-
}
73-
74-
pub async fn query_opt_timeout<T>(
75-
pg: &tokio_postgres::Client,
76-
stmt: &T,
77-
params: &[&(dyn ToSql + Sync)],
78-
) -> Result<Option<Row>>
79-
where
80-
T: ?Sized + ToStatement,
81-
{
82-
tokio::time::timeout(QUERY_TIMEOUT, pg.query_opt(stmt, params))
83-
.await
84-
.map_err(|_| Error::MissingField(format!("query timed out after {QUERY_TIMEOUT:?}")))?
85-
.map_err(Into::into)
86-
}
87-
88-
pub async fn execute_timeout<T>(
89-
pg: &tokio_postgres::Client,
90-
stmt: &T,
91-
params: &[&(dyn ToSql + Sync)],
92-
) -> Result<u64>
93-
where
94-
T: ?Sized + ToStatement,
95-
{
96-
tokio::time::timeout(QUERY_TIMEOUT, pg.execute(stmt, params))
97-
.await
98-
.map_err(|_| Error::MissingField(format!("execute timed out after {QUERY_TIMEOUT:?}")))?
99-
.map_err(Into::into)
100-
}
101-
10239
/// Holds a Postgres client and any resources that must stay alive for the
10340
/// duration of the connection (e.g. a port-forwarder).
10441
pub struct PgConnection {
@@ -282,15 +219,15 @@ pub async fn discover_restore_database(
282219
.await?;
283220
let pg = &conn.client;
284221

285-
let row = query_opt_timeout(
286-
pg,
287-
"SELECT datname FROM pg_database \
288-
WHERE datname NOT IN ('postgres', 'template0', 'template1') \
289-
ORDER BY pg_database_size(datname) DESC \
290-
LIMIT 1",
291-
&[],
292-
)
293-
.await?;
222+
let row = pg
223+
.query_opt(
224+
"SELECT datname FROM pg_database \
225+
WHERE datname NOT IN ('postgres', 'template0', 'template1') \
226+
ORDER BY pg_database_size(datname) DESC \
227+
LIMIT 1",
228+
&[],
229+
)
230+
.await?;
294231

295232
match row {
296233
Some(r) => {
@@ -314,7 +251,9 @@ pub async fn discover_restore_database(
314251
/// you're going to make multiple queries on the same database so the
315252
/// connection can be reused.
316253
pub async fn database_size_on(pg: &tokio_postgres::Client) -> Result<u64> {
317-
let row = query_one_timeout(pg, "SELECT pg_database_size(current_database())", &[]).await?;
254+
let row = pg
255+
.query_one("SELECT pg_database_size(current_database())", &[])
256+
.await?;
318257
let size: i64 = row.get(0);
319258
Ok(size as u64)
320259
}
@@ -356,12 +295,12 @@ pub async fn existing_schemas_on(
356295
if candidates.is_empty() {
357296
return Ok(Vec::new());
358297
}
359-
let rows = query_timeout(
360-
pg,
361-
"SELECT nspname FROM pg_catalog.pg_namespace WHERE nspname = ANY($1)",
362-
&[&candidates],
363-
)
364-
.await?;
298+
let rows = pg
299+
.query(
300+
"SELECT nspname FROM pg_catalog.pg_namespace WHERE nspname = ANY($1)",
301+
&[&candidates],
302+
)
303+
.await?;
365304
let found: HashSet<String> = rows.iter().map(|r| r.get::<_, String>(0)).collect();
366305
Ok(candidates
367306
.iter()
@@ -376,7 +315,7 @@ pub async fn drop_schemas_on(pg: &tokio_postgres::Client, schemas: &[String]) ->
376315
for schema in schemas {
377316
let stmt = format!("DROP SCHEMA IF EXISTS {} CASCADE", quote_ident(schema));
378317
debug!(schema = schema, "dropping schema");
379-
execute_timeout(pg, stmt.as_str(), &[]).await?;
318+
pg.execute(stmt.as_str(), &[]).await?;
380319
}
381320
Ok(())
382321
}

0 commit comments

Comments
 (0)