Skip to content

Commit cd98c26

Browse files
committed
test(integration): add contention validation tests for connection-scoped cipher
Add three integration tests that prove shared mutex contention exists across concurrent proxy connections. Tests measure scaling factor, per-connection latency under concurrency, and cross-connection blocking. Assertions are set to fail under current shared-client architecture and will pass after the per-connection cipher fix.
1 parent 7cdc851 commit cd98c26

2 files changed

Lines changed: 226 additions & 0 deletions

File tree

Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
#[cfg(test)]
2+
mod tests {
3+
use crate::common::{clear, connect_with_tls, random_id, random_string, trace, PROXY};
4+
use std::time::Instant;
5+
use tokio::task::JoinSet;
6+
7+
const CONNECTIONS: usize = 10;
8+
const INSERTS_PER_CONNECTION: usize = 5;
9+
10+
/// Perform N encrypted inserts on the given client, returning the wall-clock duration.
11+
async fn do_encrypted_inserts(
12+
client: &tokio_postgres::Client,
13+
n: usize,
14+
) -> std::time::Duration {
15+
let start = Instant::now();
16+
for _ in 0..n {
17+
let id = random_id();
18+
let val = random_string();
19+
client
20+
.query(
21+
"INSERT INTO encrypted (id, encrypted_text) VALUES ($1, $2)",
22+
&[&id, &val],
23+
)
24+
.await
25+
.unwrap();
26+
}
27+
start.elapsed()
28+
}
29+
30+
/// Measures whether concurrent encrypted inserts scale better than sequential.
31+
///
32+
/// Sequential: 10 serial connections, each doing 5 encrypted inserts.
33+
/// Concurrent: 10 parallel connections, each doing 5 encrypted inserts.
34+
///
35+
/// With shared mutex contention, concurrent will be ~same or slower than sequential.
36+
/// After per-connection cipher fix, concurrent should be significantly faster.
37+
#[tokio::test]
38+
async fn concurrent_encrypted_inserts_measure_scaling() {
39+
trace();
40+
clear().await;
41+
42+
// --- Sequential phase ---
43+
let seq_start = Instant::now();
44+
for _ in 0..CONNECTIONS {
45+
let client = connect_with_tls(PROXY).await;
46+
do_encrypted_inserts(&client, INSERTS_PER_CONNECTION).await;
47+
}
48+
let sequential_duration = seq_start.elapsed();
49+
50+
clear().await;
51+
52+
// --- Concurrent phase ---
53+
let conc_start = Instant::now();
54+
let mut join_set = JoinSet::new();
55+
56+
for _ in 0..CONNECTIONS {
57+
join_set.spawn(async move {
58+
let client = connect_with_tls(PROXY).await;
59+
do_encrypted_inserts(&client, INSERTS_PER_CONNECTION).await;
60+
});
61+
}
62+
63+
while let Some(result) = join_set.join_next().await {
64+
result.unwrap();
65+
}
66+
let concurrent_duration = conc_start.elapsed();
67+
68+
// --- Diagnostics ---
69+
let scaling_factor = concurrent_duration.as_secs_f64() / sequential_duration.as_secs_f64();
70+
71+
eprintln!("=== concurrent_encrypted_inserts_measure_scaling ===");
72+
eprintln!(
73+
" Sequential ({CONNECTIONS} serial connections x {INSERTS_PER_CONNECTION} inserts): {:.3}s",
74+
sequential_duration.as_secs_f64()
75+
);
76+
eprintln!(
77+
" Concurrent ({CONNECTIONS} parallel connections x {INSERTS_PER_CONNECTION} inserts): {:.3}s",
78+
concurrent_duration.as_secs_f64()
79+
);
80+
eprintln!(" Scaling factor (concurrent / sequential): {scaling_factor:.3}");
81+
eprintln!(" (After fix: expect scaling_factor < 0.5)");
82+
eprintln!("====================================================");
83+
84+
assert!(
85+
scaling_factor < 0.5,
86+
"Expected concurrent to be at least 2x faster than sequential, got scaling_factor={scaling_factor:.3}"
87+
);
88+
}
89+
90+
/// Measures whether per-connection latency increases under concurrency.
91+
///
92+
/// Solo: 1 connection doing 5 encrypted inserts.
93+
/// Concurrent: 10 connections each doing 5 encrypted inserts, measuring per-connection avg.
94+
///
95+
/// With shared mutex contention, per-connection latency will increase significantly.
96+
/// After per-connection cipher fix, latency should remain stable.
97+
#[tokio::test]
98+
async fn per_connection_latency_increases_with_concurrency() {
99+
trace();
100+
clear().await;
101+
102+
// --- Solo phase ---
103+
let solo_client = connect_with_tls(PROXY).await;
104+
let solo_duration = do_encrypted_inserts(&solo_client, INSERTS_PER_CONNECTION).await;
105+
106+
clear().await;
107+
108+
// --- Concurrent phase ---
109+
let mut join_set = JoinSet::new();
110+
111+
for _ in 0..CONNECTIONS {
112+
join_set.spawn(async move {
113+
let client = connect_with_tls(PROXY).await;
114+
do_encrypted_inserts(&client, INSERTS_PER_CONNECTION).await
115+
});
116+
}
117+
118+
let mut concurrent_durations = Vec::with_capacity(CONNECTIONS);
119+
while let Some(result) = join_set.join_next().await {
120+
concurrent_durations.push(result.unwrap());
121+
}
122+
123+
let avg_concurrent = concurrent_durations
124+
.iter()
125+
.map(|d| d.as_secs_f64())
126+
.sum::<f64>()
127+
/ concurrent_durations.len() as f64;
128+
129+
let max_concurrent = concurrent_durations
130+
.iter()
131+
.map(|d| d.as_secs_f64())
132+
.fold(0.0_f64, f64::max);
133+
134+
// --- Diagnostics ---
135+
let latency_multiplier = avg_concurrent / solo_duration.as_secs_f64();
136+
137+
eprintln!("=== per_connection_latency_increases_with_concurrency ===");
138+
eprintln!(
139+
" Solo (1 connection x {INSERTS_PER_CONNECTION} inserts): {:.3}s",
140+
solo_duration.as_secs_f64()
141+
);
142+
eprintln!(
143+
" Concurrent avg ({CONNECTIONS} connections x {INSERTS_PER_CONNECTION} inserts): {avg_concurrent:.3}s",
144+
);
145+
eprintln!(" Concurrent max: {max_concurrent:.3}s");
146+
eprintln!(" Latency multiplier (avg_concurrent / solo): {latency_multiplier:.3}");
147+
eprintln!(" (After fix: expect latency_multiplier < 2.0)");
148+
eprintln!("=========================================================");
149+
150+
assert!(
151+
latency_multiplier < 2.0,
152+
"Expected per-connection latency to stay stable under concurrency, got multiplier={latency_multiplier:.3}"
153+
);
154+
}
155+
156+
/// Verifies that a slow connection does not block other connections.
157+
///
158+
/// Connection A: encrypted insert then pg_sleep(0.5).
159+
/// Connection B (spawned 50ms after A): 3 encrypted inserts, measure total time.
160+
///
161+
/// With shared mutex contention, B may be blocked while A holds a lock during sleep.
162+
/// After per-connection cipher fix, B should complete independently of A's sleep.
163+
#[tokio::test]
164+
async fn slow_connection_does_not_block_other_connections() {
165+
trace();
166+
clear().await;
167+
168+
// Connection A: insert then sleep
169+
let a_handle = tokio::spawn(async move {
170+
let client = connect_with_tls(PROXY).await;
171+
let id = random_id();
172+
let val = random_string();
173+
client
174+
.query(
175+
"INSERT INTO encrypted (id, encrypted_text) VALUES ($1, $2)",
176+
&[&id, &val],
177+
)
178+
.await
179+
.unwrap();
180+
181+
// Hold this connection busy with a sleep
182+
client.simple_query("SELECT pg_sleep(0.5)").await.unwrap();
183+
});
184+
185+
// Small delay so A is likely in-flight before B starts
186+
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
187+
188+
// Connection B: 3 encrypted inserts, timed
189+
let b_handle = tokio::spawn(async move {
190+
let client = connect_with_tls(PROXY).await;
191+
let start = Instant::now();
192+
for _ in 0..3 {
193+
let id = random_id();
194+
let val = random_string();
195+
client
196+
.query(
197+
"INSERT INTO encrypted (id, encrypted_text) VALUES ($1, $2)",
198+
&[&id, &val],
199+
)
200+
.await
201+
.unwrap();
202+
}
203+
start.elapsed()
204+
});
205+
206+
// Wait for both
207+
let b_duration = b_handle.await.unwrap();
208+
a_handle.await.unwrap();
209+
210+
// --- Diagnostics ---
211+
eprintln!("=== slow_connection_does_not_block_other_connections ===");
212+
eprintln!(
213+
" Connection B (3 encrypted inserts while A sleeps): {:.3}s",
214+
b_duration.as_secs_f64()
215+
);
216+
eprintln!(" (After fix: expect B completes well under 0.5s, independent of A's sleep)");
217+
eprintln!("========================================================");
218+
219+
assert!(
220+
b_duration.as_secs_f64() < 0.5,
221+
"Connection B should not be blocked by A's sleep, took {:.3}s",
222+
b_duration.as_secs_f64()
223+
);
224+
}
225+
}

packages/cipherstash-proxy-integration/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
mod common;
2+
mod contention;
23
mod decrypt;
34
mod diagnostics;
45
mod disable_mapping;

0 commit comments

Comments
 (0)