Skip to content

Commit 494bf6f

Browse files
authored
Merge pull request #373 from cipherstash/connection-scoped-cipher
Update cipherstash-client and add contention validation tests for scoped cipher
2 parents 7cdc851 + 7f2683d commit 494bf6f

4 files changed

Lines changed: 317 additions & 4 deletions

File tree

Cargo.lock

Lines changed: 2 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ debug = true
4343

4444
[workspace.dependencies]
4545
sqltk = { version = "0.10.0" }
46-
cipherstash-client = { version = "0.33.1" }
46+
cipherstash-client = { version = "0.33.2" }
4747
cts-common = { version = "0.4.1" }
4848

4949
thiserror = "2.0.9"
Lines changed: 313 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,313 @@
1+
/// Tests that validate mutex contention across concurrent multitenant connections.
2+
///
3+
/// All proxy connections share a single `Arc<ZerokmsClient>` which internally holds two mutexes:
4+
/// - `Mutex<ChaChaRng>` in `ViturClient` — serializes IV generation during every encrypt call
5+
/// - `Arc<Mutex<ServiceCredentials>>` in `AutoRefresh` — serializes token retrieval
6+
///
7+
/// In multitenant deployments, different tenants (keysets) encrypting concurrently all contend
8+
/// on these same mutexes. These tests prove that contention exists and will validate the
9+
/// per-connection cipher fix.
10+
///
11+
/// IMPORTANT: These tests require `CS_DEFAULT_KEYSET_ID` to be unset and tenant keyset
12+
/// env vars to be set. They run in the multitenant integration test phase.
13+
#[cfg(test)]
14+
mod tests {
15+
use crate::common::{clear, connect_with_tls, random_id, random_string, trace, PROXY};
16+
use std::sync::Arc;
17+
use std::time::Instant;
18+
use tokio::sync::Notify;
19+
use tokio::task::JoinSet;
20+
21+
/// Number of tenant connections per test phase.
22+
const TENANTS_PER_BATCH: usize = 10;
23+
24+
/// Number of encrypted inserts each tenant performs.
25+
const INSERTS_PER_TENANT: usize = 50;
26+
27+
/// Read tenant keyset IDs from environment, cycling through the 3 available keysets.
28+
fn tenant_keyset_ids(count: usize) -> Vec<String> {
29+
let keysets = [
30+
std::env::var("CS_TENANT_KEYSET_ID_1").unwrap(),
31+
std::env::var("CS_TENANT_KEYSET_ID_2").unwrap(),
32+
std::env::var("CS_TENANT_KEYSET_ID_3").unwrap(),
33+
];
34+
(0..count)
35+
.map(|i| keysets[i % keysets.len()].clone())
36+
.collect()
37+
}
38+
39+
/// Establish a connection and set the keyset for a tenant.
40+
/// Returns the ready-to-use client (connection setup is excluded from timing).
41+
async fn connect_as_tenant(keyset_id: &str) -> tokio_postgres::Client {
42+
let client = connect_with_tls(PROXY).await;
43+
// SET doesn't support parameterized values; keyset_id is from trusted env vars
44+
let sql = format!("SET CIPHERSTASH.KEYSET_ID = '{keyset_id}'");
45+
client.execute(&sql, &[]).await.unwrap();
46+
client
47+
}
48+
49+
/// Perform N encrypted inserts on an already-connected client.
50+
/// Returns the wall-clock duration of the insert phase only.
51+
async fn do_encrypted_inserts(
52+
client: &tokio_postgres::Client,
53+
n: usize,
54+
) -> std::time::Duration {
55+
let start = Instant::now();
56+
for _ in 0..n {
57+
let id = random_id();
58+
let val = random_string();
59+
client
60+
.execute(
61+
"INSERT INTO encrypted (id, encrypted_text) VALUES ($1, $2)",
62+
&[&id, &val],
63+
)
64+
.await
65+
.unwrap();
66+
}
67+
start.elapsed()
68+
}
69+
70+
/// Measures whether concurrent multitenant encrypted inserts scale better than sequential.
71+
///
72+
/// Sequential: 10 tenants in series, each doing 50 encrypted inserts.
73+
/// Concurrent: 10 tenants in parallel, each doing 50 encrypted inserts.
74+
///
75+
/// Connection setup and keyset configuration happen before timing starts.
76+
/// Only the encrypt+insert phase is measured.
77+
///
78+
/// With shared mutex contention, concurrent wall-clock will be ~same as sequential.
79+
/// After per-connection cipher fix, concurrent should be significantly faster.
80+
#[tokio::test]
81+
async fn multitenant_concurrent_encrypted_inserts_measure_scaling() {
82+
trace();
83+
clear().await;
84+
85+
let keyset_ids = tenant_keyset_ids(TENANTS_PER_BATCH);
86+
87+
// --- Sequential phase: establish all connections first, then measure inserts ---
88+
let mut seq_clients = Vec::with_capacity(TENANTS_PER_BATCH);
89+
for keyset_id in &keyset_ids {
90+
seq_clients.push(connect_as_tenant(keyset_id).await);
91+
}
92+
93+
let seq_start = Instant::now();
94+
for client in &seq_clients {
95+
do_encrypted_inserts(client, INSERTS_PER_TENANT).await;
96+
}
97+
let sequential_duration = seq_start.elapsed();
98+
drop(seq_clients);
99+
100+
clear().await;
101+
102+
// --- Concurrent phase: establish all connections first, then measure inserts ---
103+
let mut conc_clients = Vec::with_capacity(TENANTS_PER_BATCH);
104+
for keyset_id in &keyset_ids {
105+
conc_clients.push(connect_as_tenant(keyset_id).await);
106+
}
107+
108+
let conc_start = Instant::now();
109+
let mut join_set = JoinSet::new();
110+
111+
for client in conc_clients {
112+
join_set.spawn(async move {
113+
do_encrypted_inserts(&client, INSERTS_PER_TENANT).await;
114+
});
115+
}
116+
117+
while let Some(result) = join_set.join_next().await {
118+
result.unwrap();
119+
}
120+
let concurrent_duration = conc_start.elapsed();
121+
122+
// --- Diagnostics ---
123+
let scaling_factor = concurrent_duration.as_secs_f64() / sequential_duration.as_secs_f64();
124+
125+
eprintln!("=== multitenant_concurrent_encrypted_inserts_measure_scaling ===");
126+
eprintln!(
127+
" Sequential ({TENANTS_PER_BATCH} tenants x {INSERTS_PER_TENANT} inserts): {:.3}s",
128+
sequential_duration.as_secs_f64()
129+
);
130+
eprintln!(
131+
" Concurrent ({TENANTS_PER_BATCH} tenants x {INSERTS_PER_TENANT} inserts): {:.3}s",
132+
concurrent_duration.as_secs_f64()
133+
);
134+
eprintln!(" Scaling factor (concurrent / sequential): {scaling_factor:.3}");
135+
eprintln!(" (After fix: expect scaling_factor < 0.5)");
136+
eprintln!("================================================================");
137+
138+
// Diagnostic only: scaling_factor < 0.5 indicates the per-connection cipher fix is effective.
139+
// Not asserted because CI runners exhibit variable performance under load.
140+
if scaling_factor >= 0.5 {
141+
eprintln!(
142+
" WARNING: scaling_factor >= 0.5 — concurrent inserts not scaling as expected"
143+
);
144+
}
145+
}
146+
147+
/// Measures whether per-tenant latency increases under concurrent multitenant load.
148+
///
149+
/// Solo: 1 tenant doing 50 encrypted inserts alone.
150+
/// Concurrent: 10 tenants each doing 50 encrypted inserts, measuring per-tenant duration.
151+
///
152+
/// Connection setup is excluded from timing.
153+
///
154+
/// With shared mutex contention, per-tenant latency will increase significantly.
155+
/// After per-connection cipher fix, latency should remain stable.
156+
#[tokio::test]
157+
async fn multitenant_per_connection_latency_increases_with_concurrency() {
158+
trace();
159+
clear().await;
160+
161+
let keyset_ids = tenant_keyset_ids(TENANTS_PER_BATCH);
162+
163+
// --- Solo phase ---
164+
let solo_client = connect_as_tenant(&keyset_ids[0]).await;
165+
let solo_duration = do_encrypted_inserts(&solo_client, INSERTS_PER_TENANT).await;
166+
drop(solo_client);
167+
168+
clear().await;
169+
170+
// --- Concurrent phase: establish all connections, then measure ---
171+
let mut clients = Vec::with_capacity(TENANTS_PER_BATCH);
172+
for keyset_id in &keyset_ids {
173+
clients.push(connect_as_tenant(keyset_id).await);
174+
}
175+
176+
let mut join_set = JoinSet::new();
177+
for client in clients {
178+
join_set.spawn(async move { do_encrypted_inserts(&client, INSERTS_PER_TENANT).await });
179+
}
180+
181+
let mut concurrent_durations = Vec::with_capacity(TENANTS_PER_BATCH);
182+
while let Some(result) = join_set.join_next().await {
183+
concurrent_durations.push(result.unwrap());
184+
}
185+
186+
let avg_concurrent = concurrent_durations
187+
.iter()
188+
.map(|d| d.as_secs_f64())
189+
.sum::<f64>()
190+
/ concurrent_durations.len() as f64;
191+
192+
let max_concurrent = concurrent_durations
193+
.iter()
194+
.map(|d| d.as_secs_f64())
195+
.fold(0.0_f64, f64::max);
196+
197+
// --- Diagnostics ---
198+
let latency_multiplier = avg_concurrent / solo_duration.as_secs_f64();
199+
200+
eprintln!("=== multitenant_per_connection_latency_increases_with_concurrency ===");
201+
eprintln!(
202+
" Solo (1 tenant x {INSERTS_PER_TENANT} inserts): {:.3}s",
203+
solo_duration.as_secs_f64()
204+
);
205+
eprintln!(
206+
" Concurrent avg ({TENANTS_PER_BATCH} tenants x {INSERTS_PER_TENANT} inserts): {avg_concurrent:.3}s",
207+
);
208+
eprintln!(" Concurrent max: {max_concurrent:.3}s");
209+
eprintln!(" Latency multiplier (avg_concurrent / solo): {latency_multiplier:.3}");
210+
eprintln!(" (After fix: expect latency_multiplier < 2.0)");
211+
eprintln!("=====================================================================");
212+
213+
// Diagnostic only: latency_multiplier < 2.0 indicates stable per-tenant latency.
214+
// Not asserted because CI runners exhibit variable performance under load.
215+
if latency_multiplier >= 2.0 {
216+
eprintln!(" WARNING: latency_multiplier >= 2.0 — per-tenant latency degraded under concurrency");
217+
}
218+
}
219+
220+
/// Number of encrypted inserts for the slow-connection test.
221+
const SLOW_CONN_INSERTS: usize = 10;
222+
223+
/// Verifies that a slow tenant connection does not block other tenants.
224+
///
225+
/// First measures a solo baseline: one tenant doing N encrypted inserts alone.
226+
/// Then runs the contention scenario:
227+
/// Tenant A: encrypted insert, signals readiness, then pg_sleep(2).
228+
/// Tenant B (different keyset): waits for A's signal, then does N encrypted inserts, timed.
229+
///
230+
/// Asserts that B's time under contention is within 2x of the solo baseline,
231+
/// proving B is not blocked by A's sleep. Uses a relative comparison instead
232+
/// of an absolute threshold to avoid CI environment speed sensitivity.
233+
///
234+
/// Connection setup is excluded from timing. A `Notify` ensures B starts only after
235+
/// A has completed its encrypted insert and entered pg_sleep, avoiding timing fragility.
236+
#[tokio::test]
237+
async fn multitenant_slow_connection_does_not_block_other_tenants() {
238+
trace();
239+
clear().await;
240+
241+
let keyset_ids = tenant_keyset_ids(2);
242+
243+
// --- Solo baseline: measure how long N inserts take with no contention ---
244+
let baseline_client = connect_as_tenant(&keyset_ids[1]).await;
245+
let baseline_duration = do_encrypted_inserts(&baseline_client, SLOW_CONN_INSERTS).await;
246+
drop(baseline_client);
247+
248+
clear().await;
249+
250+
// --- Contention scenario ---
251+
// Establish both connections before timing
252+
let client_a = connect_as_tenant(&keyset_ids[0]).await;
253+
let client_b = connect_as_tenant(&keyset_ids[1]).await;
254+
255+
// A signals after its encrypted insert completes, just before entering pg_sleep.
256+
let a_ready = Arc::new(Notify::new());
257+
let a_ready_tx = a_ready.clone();
258+
259+
// Tenant A: encrypted insert, signal, then sleep (2s to be clearly longer than inserts)
260+
let a_handle = tokio::spawn(async move {
261+
let id = random_id();
262+
let val = random_string();
263+
client_a
264+
.execute(
265+
"INSERT INTO encrypted (id, encrypted_text) VALUES ($1, $2)",
266+
&[&id, &val],
267+
)
268+
.await
269+
.unwrap();
270+
271+
// Signal that the encrypted insert is done; A is now entering pg_sleep
272+
a_ready_tx.notify_one();
273+
274+
// Hold this connection busy with a long sleep
275+
client_a.simple_query("SELECT pg_sleep(2)").await.unwrap();
276+
});
277+
278+
// Wait for A to complete its encrypted insert before starting B
279+
a_ready.notified().await;
280+
281+
// Tenant B: encrypted inserts, timed
282+
let b_handle =
283+
tokio::spawn(async move { do_encrypted_inserts(&client_b, SLOW_CONN_INSERTS).await });
284+
285+
// Wait for both
286+
let b_duration = b_handle.await.unwrap();
287+
a_handle.await.unwrap();
288+
289+
// --- Diagnostics ---
290+
let contention_ratio = b_duration.as_secs_f64() / baseline_duration.as_secs_f64();
291+
292+
eprintln!("=== multitenant_slow_connection_does_not_block_other_tenants ===");
293+
eprintln!(
294+
" Solo baseline ({SLOW_CONN_INSERTS} encrypted inserts): {:.3}s",
295+
baseline_duration.as_secs_f64()
296+
);
297+
eprintln!(
298+
" Tenant B ({SLOW_CONN_INSERTS} encrypted inserts while A sleeps): {:.3}s",
299+
b_duration.as_secs_f64()
300+
);
301+
eprintln!(" Contention ratio (B / baseline): {contention_ratio:.3}");
302+
eprintln!(" (After fix: expect ratio < 2.0, B completes independently of A's sleep)");
303+
eprintln!("=================================================================");
304+
305+
assert!(
306+
contention_ratio < 2.0,
307+
"Tenant B should not be blocked by Tenant A's sleep, \
308+
contention ratio={contention_ratio:.3} (B={:.3}s, baseline={:.3}s)",
309+
b_duration.as_secs_f64(),
310+
baseline_duration.as_secs_f64()
311+
);
312+
}
313+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
1+
mod contention;
12
mod set_keyset_id;
23
mod set_keyset_name;

0 commit comments

Comments
 (0)