Skip to content

Commit 3b293b1

Browse files
committed
test(integration): add contention validation tests for connection-scoped cipher
Add three integration tests that prove shared mutex contention exists across concurrent proxy connections. Tests measure scaling factor, per-connection latency under concurrency, and cross-connection blocking. Assertions are set to fail under current shared-client architecture and will pass after the per-connection cipher fix.
1 parent 7cdc851 commit 3b293b1

2 files changed

Lines changed: 226 additions & 0 deletions

File tree

Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
#[cfg(test)]
2+
mod tests {
3+
use crate::common::{clear, connect_with_tls, random_id, random_string, trace, PROXY};
4+
use std::time::Instant;
5+
use tokio::task::JoinSet;
6+
7+
const CONNECTIONS: usize = 10;
8+
const INSERTS_PER_CONNECTION: usize = 5;
9+
10+
/// Perform N encrypted inserts on the given client, returning the wall-clock duration.
11+
async fn do_encrypted_inserts(client: &tokio_postgres::Client, n: usize) -> std::time::Duration {
12+
let start = Instant::now();
13+
for _ in 0..n {
14+
let id = random_id();
15+
let val = random_string();
16+
client
17+
.query(
18+
"INSERT INTO encrypted (id, encrypted_text) VALUES ($1, $2)",
19+
&[&id, &val],
20+
)
21+
.await
22+
.unwrap();
23+
}
24+
start.elapsed()
25+
}
26+
27+
/// Measures whether concurrent encrypted inserts scale better than sequential.
28+
///
29+
/// Sequential: 10 serial connections, each doing 5 encrypted inserts.
30+
/// Concurrent: 10 parallel connections, each doing 5 encrypted inserts.
31+
///
32+
/// With shared mutex contention, concurrent will be ~same or slower than sequential.
33+
/// After per-connection cipher fix, concurrent should be significantly faster.
34+
#[tokio::test]
35+
async fn concurrent_encrypted_inserts_measure_scaling() {
36+
trace();
37+
clear().await;
38+
39+
// --- Sequential phase ---
40+
let seq_start = Instant::now();
41+
for _ in 0..CONNECTIONS {
42+
let client = connect_with_tls(PROXY).await;
43+
do_encrypted_inserts(&client, INSERTS_PER_CONNECTION).await;
44+
}
45+
let sequential_duration = seq_start.elapsed();
46+
47+
clear().await;
48+
49+
// --- Concurrent phase ---
50+
let conc_start = Instant::now();
51+
let mut join_set = JoinSet::new();
52+
53+
for _ in 0..CONNECTIONS {
54+
join_set.spawn(async move {
55+
let client = connect_with_tls(PROXY).await;
56+
do_encrypted_inserts(&client, INSERTS_PER_CONNECTION).await;
57+
});
58+
}
59+
60+
while let Some(result) = join_set.join_next().await {
61+
result.unwrap();
62+
}
63+
let concurrent_duration = conc_start.elapsed();
64+
65+
// --- Diagnostics ---
66+
let scaling_factor = concurrent_duration.as_secs_f64() / sequential_duration.as_secs_f64();
67+
68+
eprintln!("=== concurrent_encrypted_inserts_measure_scaling ===");
69+
eprintln!(
70+
" Sequential ({CONNECTIONS} serial connections x {INSERTS_PER_CONNECTION} inserts): {:.3}s",
71+
sequential_duration.as_secs_f64()
72+
);
73+
eprintln!(
74+
" Concurrent ({CONNECTIONS} parallel connections x {INSERTS_PER_CONNECTION} inserts): {:.3}s",
75+
concurrent_duration.as_secs_f64()
76+
);
77+
eprintln!(" Scaling factor (concurrent / sequential): {scaling_factor:.3}");
78+
eprintln!(" (After fix: expect scaling_factor < 0.5)");
79+
eprintln!("====================================================");
80+
81+
assert!(
82+
scaling_factor < 0.5,
83+
"Expected concurrent to be at least 2x faster than sequential, got scaling_factor={scaling_factor:.3}"
84+
);
85+
}
86+
87+
/// Measures whether per-connection latency increases under concurrency.
88+
///
89+
/// Solo: 1 connection doing 5 encrypted inserts.
90+
/// Concurrent: 10 connections each doing 5 encrypted inserts, measuring per-connection avg.
91+
///
92+
/// With shared mutex contention, per-connection latency will increase significantly.
93+
/// After per-connection cipher fix, latency should remain stable.
94+
#[tokio::test]
95+
async fn per_connection_latency_increases_with_concurrency() {
96+
trace();
97+
clear().await;
98+
99+
// --- Solo phase ---
100+
let solo_client = connect_with_tls(PROXY).await;
101+
let solo_duration = do_encrypted_inserts(&solo_client, INSERTS_PER_CONNECTION).await;
102+
103+
clear().await;
104+
105+
// --- Concurrent phase ---
106+
let mut join_set = JoinSet::new();
107+
108+
for _ in 0..CONNECTIONS {
109+
join_set.spawn(async move {
110+
let client = connect_with_tls(PROXY).await;
111+
do_encrypted_inserts(&client, INSERTS_PER_CONNECTION).await
112+
});
113+
}
114+
115+
let mut concurrent_durations = Vec::with_capacity(CONNECTIONS);
116+
while let Some(result) = join_set.join_next().await {
117+
concurrent_durations.push(result.unwrap());
118+
}
119+
120+
let avg_concurrent = concurrent_durations
121+
.iter()
122+
.map(|d| d.as_secs_f64())
123+
.sum::<f64>()
124+
/ concurrent_durations.len() as f64;
125+
126+
let max_concurrent = concurrent_durations
127+
.iter()
128+
.map(|d| d.as_secs_f64())
129+
.fold(0.0_f64, f64::max);
130+
131+
// --- Diagnostics ---
132+
let latency_multiplier = avg_concurrent / solo_duration.as_secs_f64();
133+
134+
eprintln!("=== per_connection_latency_increases_with_concurrency ===");
135+
eprintln!(
136+
" Solo (1 connection x {INSERTS_PER_CONNECTION} inserts): {:.3}s",
137+
solo_duration.as_secs_f64()
138+
);
139+
eprintln!(
140+
" Concurrent avg ({CONNECTIONS} connections x {INSERTS_PER_CONNECTION} inserts): {avg_concurrent:.3}s",
141+
);
142+
eprintln!(" Concurrent max: {max_concurrent:.3}s");
143+
eprintln!(" Latency multiplier (avg_concurrent / solo): {latency_multiplier:.3}");
144+
eprintln!(" (After fix: expect latency_multiplier < 2.0)");
145+
eprintln!("=========================================================");
146+
147+
assert!(
148+
latency_multiplier < 2.0,
149+
"Expected per-connection latency to stay stable under concurrency, got multiplier={latency_multiplier:.3}"
150+
);
151+
}
152+
153+
/// Verifies that a slow connection does not block other connections.
154+
///
155+
/// Connection A: encrypted insert then pg_sleep(0.5).
156+
/// Connection B (spawned 50ms after A): 3 encrypted inserts, measure total time.
157+
///
158+
/// With shared mutex contention, B may be blocked while A holds a lock during sleep.
159+
/// After per-connection cipher fix, B should complete independently of A's sleep.
160+
#[tokio::test]
161+
async fn slow_connection_does_not_block_other_connections() {
162+
trace();
163+
clear().await;
164+
165+
// Connection A: insert then sleep
166+
let a_handle = tokio::spawn(async move {
167+
let client = connect_with_tls(PROXY).await;
168+
let id = random_id();
169+
let val = random_string();
170+
client
171+
.query(
172+
"INSERT INTO encrypted (id, encrypted_text) VALUES ($1, $2)",
173+
&[&id, &val],
174+
)
175+
.await
176+
.unwrap();
177+
178+
// Hold this connection busy with a sleep
179+
client
180+
.simple_query("SELECT pg_sleep(0.5)")
181+
.await
182+
.unwrap();
183+
});
184+
185+
// Small delay so A is likely in-flight before B starts
186+
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
187+
188+
// Connection B: 3 encrypted inserts, timed
189+
let b_handle = tokio::spawn(async move {
190+
let client = connect_with_tls(PROXY).await;
191+
let start = Instant::now();
192+
for _ in 0..3 {
193+
let id = random_id();
194+
let val = random_string();
195+
client
196+
.query(
197+
"INSERT INTO encrypted (id, encrypted_text) VALUES ($1, $2)",
198+
&[&id, &val],
199+
)
200+
.await
201+
.unwrap();
202+
}
203+
start.elapsed()
204+
});
205+
206+
// Wait for both
207+
let b_duration = b_handle.await.unwrap();
208+
a_handle.await.unwrap();
209+
210+
// --- Diagnostics ---
211+
eprintln!("=== slow_connection_does_not_block_other_connections ===");
212+
eprintln!(
213+
" Connection B (3 encrypted inserts while A sleeps): {:.3}s",
214+
b_duration.as_secs_f64()
215+
);
216+
eprintln!(" (After fix: expect B completes well under 0.5s, independent of A's sleep)");
217+
eprintln!("========================================================");
218+
219+
assert!(
220+
b_duration.as_secs_f64() < 0.5,
221+
"Connection B should not be blocked by A's sleep, took {:.3}s",
222+
b_duration.as_secs_f64()
223+
);
224+
}
225+
}

packages/cipherstash-proxy-integration/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
mod common;
2+
mod contention;
23
mod decrypt;
34
mod diagnostics;
45
mod disable_mapping;

0 commit comments

Comments
 (0)