Skip to content

Commit 6d5279d

Browse files
Add idle connection timeout to pgwire proxy
1 parent 1194e9c commit 6d5279d

7 files changed

Lines changed: 315 additions & 4 deletions

File tree

.claude/CLAUDE.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ QueryProxy is a Rust PostgreSQL wire protocol proxy. See `README.md` for full de
99
Key versions: pgwire 0.38, DataFusion 51, axum 0.8, SeaORM 1, tokio-postgres 0.7, argon2 0.5, aes-gcm 0.10, jsonwebtoken 9. Admin UI: React 19, Vite 6, Tailwind 4, TanStack Query 5, react-router-dom 7.
1010

1111
## Key Files
12+
- `proxy/src/server.rs``process_socket_with_idle_timeout()` (replaces pgwire's `process_socket`; adds idle + startup timeouts)
1213
- `proxy/src/admin/mod.rs``AdminState`, `ApiErr`, `admin_router()`
1314
- `proxy/src/admin/jwt.rs``AdminClaims` / `AuthClaims` extractors
1415
- `proxy/src/admin/datasource_types.rs``split_config`, `merge_config`, `get_type_defs`
@@ -51,6 +52,7 @@ Always get Arrow types from the library's `get_schema()` during discovery — th
5152
- Cache invalidation: `engine_cache.invalidate(name)` after catalog operations (keeps shared pool). `engine_cache.invalidate_all(name)` after datasource edit/delete (removes pool too). Never swap these — see README § Performance.
5253
- Discovery jobs: one active job per datasource enforced by `JobStore.active_by_ds`; cancellation via `CancellationToken` passed through all `DiscoveryProvider` methods
5354
- Catalog UUID v5 key format: `"{parent_uuid}:{child_name}"` — same natural key → same ID → re-discovery is a safe upsert
55+
- Idle timeout: `process_socket_with_idle_timeout` in `server.rs` replaces `pgwire::tokio::process_socket`. Env var `BR_IDLE_TIMEOUT_SECS` (default 900). Tests use `tokio::time::pause()` + `advance()` — do not add real `sleep()` calls in server tests.
5456

5557
## Pre-commit Hook
5658
A `.githooks/pre-commit` script runs `cargo fmt --check` and `cargo clippy -p proxy -- -D warnings`. Enable it once per clone:

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

README.md

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ betweenrows/
155155
│ └── types/ TypeScript interfaces
156156
└── proxy/src/
157157
├── main.rs entry point: CLI, DB init, EngineCache, servers
158+
├── server.rs process_socket_with_idle_timeout (idle + startup timeouts)
158159
├── handler.rs pgwire StartupHandler + query handlers
159160
├── auth.rs Argon2 auth, user creation
160161
├── crypto.rs AES-256-GCM encrypt/decrypt
@@ -210,6 +211,7 @@ cargo run -p proxy -- user create --username alice --password secret --tenant ac
210211
| `BR_ADMIN_USER` | `admin` | Auto-seed username |
211212
| `BR_ADMIN_PASSWORD` | *(required on first boot)* | Auto-seed password. **Must be set** when no users exist in DB. |
212213
| `BR_ADMIN_TENANT` | `default` | Auto-seed tenant |
214+
| `BR_IDLE_TIMEOUT_SECS` | `900` (15 min) | Close pgwire connections that receive no messages for this many seconds. Enables Fly.io `auto_stop_machines` to work correctly with GUI clients (e.g. TablePlus) that hold idle connections indefinitely. Set to `0` to disable (not recommended on Fly.io). |
213215
| `BR_CORS_ALLOWED_ORIGINS` | *(empty, same-origin only)* | Comma-separated list of allowed CORS origins for the Admin API |
214216
| `RUST_LOG` | `info` | Log filter (standard Rust/tracing convention) |
215217

@@ -225,6 +227,19 @@ Data sources are configured via the Admin UI (`/datasources`) or REST API — no
225227

226228
## Performance
227229

230+
### Idle Connection Timeout
231+
232+
pgwire 0.38 has no built-in idle timeout — `socket.next().await` blocks indefinitely after authentication. This prevents Fly.io `auto_stop_machines` from ever triggering when a GUI client like TablePlus is open, because the VM only stops when it has zero connections.
233+
234+
`proxy/src/server.rs` replaces pgwire's `process_socket` with a custom message loop (`process_socket_with_idle_timeout`) that adds a `tokio::select!` branch racing each `socket.next()` against a `sleep(idle_timeout)`. The timer resets after every received message — a running query does not count as idle.
235+
236+
Default timeout is 15 minutes (`BR_IDLE_TIMEOUT_SECS=900`). TCP keepalive (60 s time, 10 s interval) is also set on each accepted socket to detect dead connections from crashed clients or network failures.
237+
238+
When idle timeout fires, a log line is emitted at `INFO` level:
239+
```
240+
Idle connection timed out after 900s
241+
```
242+
228243
### Arrow Type Alignment (query time)
229244

230245
During catalog discovery, column types are captured using `datafusion-table-providers`' own `get_schema()` function rather than a manual PG-to-Arrow mapping. This guarantees that the stored schema matches exactly what the library produces at query time.
@@ -406,6 +421,6 @@ Catalog entity IDs (schemas, tables, columns) are deterministic UUID v5 fingerpr
406421

407422
```bash
408423
cargo build -p proxy # compile
409-
cargo test -p proxy # run tests (84 unit tests)
424+
cargo test -p proxy # run tests (101 unit tests)
410425
cd admin-ui && npm run build # production UI bundle
411426
```

proxy/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ rand_core = { version = "0.6", features = ["std"] }
4141
aes-gcm = "0.10"
4242
base64 = "0.22"
4343

44+
# TCP socket options (keepalive)
45+
socket2 = { version = "0.5", features = ["all"] }
46+
4447
# Logging
4548
tracing = "0.1"
4649
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

proxy/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,5 @@ pub mod engine;
1111
pub mod entity;
1212
pub mod handler;
1313
pub mod hooks;
14+
pub mod server;
1415
pub mod sql_rewrite;

proxy/src/main.rs

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
use clap::{Parser, Subcommand};
22
use migration::{Migrator, MigratorTrait};
3-
use pgwire::tokio::process_socket;
43
use proxy::admin::{AdminState, admin_router};
54
use proxy::auth::Auth;
65
use proxy::engine::EngineCache;
76
use proxy::handler::ProxyHandler;
7+
use proxy::server::process_socket_with_idle_timeout;
88
use rand_core::RngCore;
99
use sea_orm::Database;
10+
use socket2::{SockRef, TcpKeepalive};
1011
use std::sync::Arc;
12+
use std::time::Duration;
1113
use tokio::net::TcpListener;
1214

1315
#[derive(Parser)]
@@ -229,15 +231,36 @@ async fn serve(
229231

230232
let bind_addr =
231233
std::env::var("BR_PROXY_BIND_ADDR").unwrap_or_else(|_| "127.0.0.1:5434".to_string());
234+
235+
let idle_timeout_secs: u64 = std::env::var("BR_IDLE_TIMEOUT_SECS")
236+
.ok()
237+
.and_then(|v| v.parse().ok())
238+
.unwrap_or(900);
239+
let idle_timeout = Duration::from_secs(idle_timeout_secs);
240+
tracing::info!(
241+
secs = idle_timeout_secs,
242+
"Idle connection timeout configured"
243+
);
244+
245+
let keepalive = TcpKeepalive::new()
246+
.with_time(Duration::from_secs(60))
247+
.with_interval(Duration::from_secs(10));
248+
232249
let listener = TcpListener::bind(&bind_addr).await?;
233250
tracing::info!(addr = %bind_addr, "Proxy online");
234251

235252
loop {
236253
let (incoming_socket, _) = listener.accept().await?;
237-
let handler_clone = handler.clone();
238254

255+
if let Err(e) = SockRef::from(&incoming_socket).set_tcp_keepalive(&keepalive) {
256+
tracing::warn!(error = %e, "Failed to set TCP keepalive");
257+
}
258+
259+
let handler_clone = handler.clone();
239260
tokio::spawn(async move {
240-
if let Err(e) = process_socket(incoming_socket, None, handler_clone).await {
261+
if let Err(e) =
262+
process_socket_with_idle_timeout(incoming_socket, handler_clone, idle_timeout).await
263+
{
241264
tracing::error!(error = %e, "Connection error");
242265
}
243266
});

0 commit comments

Comments
 (0)