Skip to content

Commit 30aafc6

Browse files
authored
Merge pull request #379 from cipherstash/connection-error-handling
Connection error handling
2 parents 0a412c9 + 5ebf5a8 commit 30aafc6

File tree

12 files changed

+379
-8
lines changed

12 files changed

+379
-8
lines changed

CLAUDE.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,12 @@ mise run postgres:up --extra-args "--detach --wait"
5656
mise run postgres:setup # Install EQL and schema
5757
```
5858

59+
> **macOS Note:** If you hit file descriptor limits during development (e.g. "Too many open files"), you may need to increase the limit:
60+
> ```bash
61+
> ulimit -n 10240
62+
> ```
63+
> To make this persistent, add it to your shell profile (e.g. `~/.zshrc`).
64+
5965
### Core Development Workflow
6066
```bash
6167
# Build and run Proxy as a process (development)

DEVELOPMENT.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,9 @@ mise run proxy:down
321321
Running Proxy in a container cross-compiles a binary for Linux and the current architecture (`amd64`, `arm64`), then copies the binary into the container.
322322
We cross-compile binary outside the container because it's generally faster, due to packages already being cached, and slower network and disk IO in Docker.
323323

324+
> [!IMPORTANT]
325+
> **Proxy must always be built from source for testing.** The `proxy:up` task builds a binary from source (`build:binary`), packages it into a Docker image tagged `cipherstash/proxy:latest` (`build:docker`), then starts it via `docker compose up`. The `tests/docker-compose.yml` file uses `pull_policy: never` on the proxy services to ensure Docker never pulls the released image from Docker Hub. If you see an error like `pull access denied` or `image not found`, run `mise run build` first to build the local image.
326+
324327
### Building
325328

326329
Build a binary and Docker image:
@@ -460,6 +463,8 @@ This project uses `docker compose` to manage containers and networking.
460463

461464
The configuration for those containers is in `tests/docker-compose.yml`.
462465

466+
The proxy services in `tests/docker-compose.yml` use `pull_policy: never` to ensure Docker never pulls the released `cipherstash/proxy:latest` image from Docker Hub. The image must be built locally from source via `mise run proxy:up` (or `mise run build`). This guarantees integration tests always run against the current source code.
467+
463468
The integration tests use the `proxy:up` and `proxy:down` commands documented above to run containers in different configurations.
464469

465470
#### Configuration: configuring PostgreSQL containers in integration tests

mise.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -663,6 +663,9 @@ cp -v {{config_root}}/target/{{ target }}/release/cipherstash-proxy {{config_roo
663663

664664
[tasks."build:docker"]
665665
depends = ["eql:download"]
666+
# Tags the image as cipherstash/proxy:latest locally.
667+
# tests/docker-compose.yml uses pull_policy: never to ensure this local image
668+
# is always used instead of the released image on Docker Hub.
666669
description = "Build a Docker image for cipherstash-proxy"
667670
run = """
668671
{% set default_platform = "linux/" ~ arch() | replace(from="x86_64", to="amd64") %}

packages/cipherstash-proxy-integration/src/common.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,8 @@ pub fn connection_config(port: u16) -> tokio_postgres::Config {
165165
.port(port)
166166
.user(&username)
167167
.password(&password)
168-
.dbname(&name);
168+
.dbname(&name)
169+
.connect_timeout(std::time::Duration::from_secs(10));
169170

170171
db_config
171172
}
@@ -271,7 +272,7 @@ where
271272
}
272273

273274
/// Get database port from environment or use default.
274-
fn get_database_port() -> u16 {
275+
pub fn get_database_port() -> u16 {
275276
std::env::var("CS_DATABASE__PORT")
276277
.ok()
277278
.and_then(|s| s.parse().ok())
Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
/// Tests that validate proxy connection isolation under load.
2+
///
3+
/// These tests verify that:
4+
/// - Slow queries on one connection don't block other connections
5+
/// - The proxy accepts new connections after client disconnect
6+
/// - Concurrent connections under load remain responsive
7+
/// - Blocked backend connections don't affect other proxy connections
8+
#[cfg(test)]
9+
mod tests {
10+
use crate::common::{connect_with_tls, get_database_port, PROXY};
11+
use std::time::Instant;
12+
use tokio::task::JoinSet;
13+
use tokio::time::{timeout, Duration};
14+
use tokio_postgres::SimpleQueryMessage;
15+
16+
/// Advisory lock ID used in isolation tests. Arbitrary value — just needs to be
17+
/// unique across concurrently running test suites against the same database.
18+
const ADVISORY_LOCK_ID: i64 = 99_001;
19+
20+
/// A slow query on one connection does not block other connections through the proxy.
21+
#[tokio::test]
22+
async fn slow_query_does_not_block_other_connections() {
23+
let result = timeout(Duration::from_secs(30), async {
24+
let client_a = connect_with_tls(PROXY).await;
25+
let client_b = connect_with_tls(PROXY).await;
26+
27+
// Connection A: run a slow query
28+
let a_handle = tokio::spawn(async move {
29+
client_a.simple_query("SELECT pg_sleep(5)").await.unwrap();
30+
});
31+
32+
// Brief pause to ensure A's query is in flight
33+
tokio::time::sleep(Duration::from_millis(200)).await;
34+
35+
// Connection B: run a fast query, should complete promptly
36+
let start = Instant::now();
37+
let rows = client_b.simple_query("SELECT 1").await.unwrap();
38+
let elapsed = start.elapsed();
39+
40+
assert!(!rows.is_empty(), "Expected result from SELECT 1");
41+
assert!(
42+
elapsed < Duration::from_secs(2),
43+
"Fast query took {elapsed:?}, expected < 2s — proxy may be blocking"
44+
);
45+
46+
a_handle.await.unwrap();
47+
})
48+
.await;
49+
50+
result.expect("Test timed out after 30s");
51+
}
52+
53+
/// Proxy accepts new connections after a client disconnects.
54+
#[tokio::test]
55+
async fn proxy_accepts_new_connections_after_client_disconnect() {
56+
let result = timeout(Duration::from_secs(10), async {
57+
// First connection: query, then drop
58+
{
59+
let client = connect_with_tls(PROXY).await;
60+
let rows = client.simple_query("SELECT 1").await.unwrap();
61+
assert!(!rows.is_empty());
62+
}
63+
// Client dropped here
64+
65+
// Brief pause
66+
tokio::time::sleep(Duration::from_millis(100)).await;
67+
68+
// Second connection: should work fine
69+
let client = connect_with_tls(PROXY).await;
70+
let rows = client.simple_query("SELECT 1").await.unwrap();
71+
assert!(!rows.is_empty());
72+
})
73+
.await;
74+
75+
result.expect("Test timed out after 10s");
76+
}
77+
78+
/// Concurrent slow and fast connections: fast queries complete promptly under slow load.
79+
#[tokio::test]
80+
async fn concurrent_connections_under_slow_load() {
81+
let result = timeout(Duration::from_secs(30), async {
82+
let mut join_set = JoinSet::new();
83+
84+
// 5 slow connections
85+
for _ in 0..5 {
86+
join_set.spawn(async {
87+
let client = connect_with_tls(PROXY).await;
88+
client.simple_query("SELECT pg_sleep(3)").await.unwrap();
89+
});
90+
}
91+
92+
// Brief pause to let slow queries start
93+
tokio::time::sleep(Duration::from_millis(300)).await;
94+
95+
// 5 fast connections, each should complete promptly
96+
for _ in 0..5 {
97+
join_set.spawn(async {
98+
let start = Instant::now();
99+
let client = connect_with_tls(PROXY).await;
100+
let rows = client.simple_query("SELECT 1").await.unwrap();
101+
let elapsed = start.elapsed();
102+
103+
assert!(!rows.is_empty());
104+
assert!(
105+
elapsed < Duration::from_secs(5),
106+
"Fast query took {elapsed:?} under slow load, expected < 5s"
107+
);
108+
});
109+
}
110+
111+
while let Some(result) = join_set.join_next().await {
112+
result.unwrap();
113+
}
114+
})
115+
.await;
116+
117+
result.expect("Test timed out after 30s");
118+
}
119+
120+
/// An advisory-lock-blocked connection through the proxy does not block other proxy connections.
121+
///
122+
/// Uses pg_locks polling to deterministically wait for client_b to be blocked on the
123+
/// advisory lock, rather than relying on a fixed sleep.
124+
#[tokio::test]
125+
async fn advisory_lock_blocked_connection_does_not_block_proxy() {
126+
let lock_query = format!("SELECT pg_advisory_lock({ADVISORY_LOCK_ID})");
127+
let unlock_query = format!("SELECT pg_advisory_unlock({ADVISORY_LOCK_ID})");
128+
129+
let result = timeout(Duration::from_secs(30), async {
130+
// Connection A: hold an advisory lock (connect directly to PG to avoid proxy interference)
131+
let pg_port = get_database_port();
132+
let client_a = connect_with_tls(pg_port).await;
133+
client_a
134+
.simple_query(&lock_query)
135+
.await
136+
.unwrap();
137+
138+
let b_lock_query = lock_query.clone();
139+
let b_unlock_query = unlock_query.clone();
140+
141+
// Connection B: through proxy, attempt to acquire the same lock (will block)
142+
let b_handle = tokio::spawn(async move {
143+
let client_b = connect_with_tls(PROXY).await;
144+
// This will block until A releases the lock
145+
client_b
146+
.simple_query(&b_lock_query)
147+
.await
148+
.unwrap();
149+
// Release after acquiring
150+
client_b
151+
.simple_query(&b_unlock_query)
152+
.await
153+
.unwrap();
154+
});
155+
156+
// Poll pg_locks until client_b is observed waiting for the advisory lock
157+
let poll_query = format!(
158+
"SELECT 1 FROM pg_locks WHERE locktype = 'advisory' AND NOT granted AND classid = 0 AND objid = {ADVISORY_LOCK_ID}"
159+
);
160+
let deadline = Instant::now() + Duration::from_secs(10);
161+
loop {
162+
let result = client_a.simple_query(&poll_query).await.unwrap();
163+
let has_waiting = result.iter().any(|m| matches!(m, SimpleQueryMessage::Row(_)));
164+
if has_waiting {
165+
break;
166+
}
167+
assert!(
168+
Instant::now() < deadline,
169+
"Timed out waiting for client_b to be blocked on advisory lock"
170+
);
171+
tokio::time::sleep(Duration::from_millis(50)).await;
172+
}
173+
174+
// Connection C: through proxy, should complete immediately despite B being blocked
175+
let start = Instant::now();
176+
let client_c = connect_with_tls(PROXY).await;
177+
let rows = client_c.simple_query("SELECT 1").await.unwrap();
178+
let elapsed = start.elapsed();
179+
180+
assert!(!rows.is_empty());
181+
assert!(
182+
elapsed < Duration::from_secs(2),
183+
"Connection C took {elapsed:?}, expected < 2s — blocked connection may be affecting proxy"
184+
);
185+
186+
// Release the lock so B can complete
187+
client_a
188+
.simple_query(&unlock_query)
189+
.await
190+
.unwrap();
191+
192+
b_handle.await.unwrap();
193+
})
194+
.await;
195+
196+
result.expect("Test timed out after 30s");
197+
}
198+
}

packages/cipherstash-proxy-integration/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
mod common;
2+
mod connection_resilience;
23
mod decrypt;
34
mod diagnostics;
45
mod disable_mapping;

packages/cipherstash-proxy/src/config/database.rs

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,14 @@ impl DatabaseConfig {
7575
self.password.to_owned().risky_unwrap()
7676
}
7777

78+
const DEFAULT_CONNECTION_TIMEOUT_MS: u64 = 120_000;
79+
7880
pub fn connection_timeout(&self) -> Option<Duration> {
79-
self.connection_timeout.map(Duration::from_millis)
81+
match self.connection_timeout {
82+
Some(0) => None,
83+
Some(ms) => Some(Duration::from_millis(ms)),
84+
None => Some(Duration::from_millis(Self::DEFAULT_CONNECTION_TIMEOUT_MS)),
85+
}
8086
}
8187

8288
pub fn server_name(&self) -> Result<ServerName<'_>, Error> {
@@ -116,3 +122,31 @@ impl Display for DatabaseConfig {
116122
)
117123
}
118124
}
125+
126+
#[cfg(test)]
127+
mod tests {
128+
use super::*;
129+
130+
#[test]
131+
fn connection_timeout_defaults_to_120_seconds() {
132+
let config = DatabaseConfig::for_testing();
133+
assert_eq!(config.connection_timeout(), Some(Duration::from_secs(120)));
134+
}
135+
136+
#[test]
137+
fn connection_timeout_zero_disables_timeout() {
138+
let mut config = DatabaseConfig::for_testing();
139+
config.connection_timeout = Some(0);
140+
assert_eq!(config.connection_timeout(), None);
141+
}
142+
143+
#[test]
144+
fn connection_timeout_custom_value_in_millis() {
145+
let mut config = DatabaseConfig::for_testing();
146+
config.connection_timeout = Some(5000);
147+
assert_eq!(
148+
config.connection_timeout(),
149+
Some(Duration::from_millis(5000))
150+
);
151+
}
152+
}

packages/cipherstash-proxy/src/error.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ pub enum Error {
2424
#[error("Connection closed by client")]
2525
ConnectionClosed,
2626

27-
#[error("Connection timed out after {} ms", duration.as_secs())]
27+
#[error("Connection timed out after {} ms", duration.as_millis())]
2828
ConnectionTimeout { duration: Duration },
2929

3030
#[error("Error creating connection")]
@@ -531,4 +531,12 @@ mod tests {
531531

532532
assert_eq!(format!("Statement encountered an internal error. This may be a bug in the statement mapping module of CipherStash Proxy. Please visit {ERROR_DOC_BASE_URL}#mapping-internal-error for more information."), message);
533533
}
534+
535+
#[test]
536+
fn connection_timeout_message_shows_millis() {
537+
let error = Error::ConnectionTimeout {
538+
duration: Duration::from_millis(5000),
539+
};
540+
assert_eq!(error.to_string(), "Connection timed out after 5000 ms");
541+
}
534542
}

0 commit comments

Comments
 (0)