Skip to content

Commit e7d8207

Browse files
committed
Introduce two separate VssClients for async/blocking contexts
To avoid any blocking cross-runtime behavior that could arise from reusing a single client's TCP connections in different runtime contexts, we here split out the `VssStore` behavior to use one dedicated `VssClient` per context. I.e., we're now using two connections/connection pools and make sure only the `blocking_client` is used in `KVStoreSync` contexts, and `async_client` in `KVStore` contexts.
1 parent 52a3812 commit e7d8207

File tree

1 file changed

+84
-37
lines changed

1 file changed

+84
-37
lines changed

src/io/vss_store.rs

Lines changed: 84 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -102,24 +102,22 @@ impl VssStore {
102102
let (data_encryption_key, obfuscation_master_key) =
103103
derive_data_encryption_and_obfuscation_keys(&vss_seed);
104104
let key_obfuscator = KeyObfuscator::new(obfuscation_master_key);
105-
let retry_policy = ExponentialBackoffRetryPolicy::new(Duration::from_millis(10))
106-
.with_max_attempts(100)
107-
.with_max_total_delay(Duration::from_secs(180))
108-
.with_max_jitter(Duration::from_millis(100))
109-
.skip_retry_on_error(Box::new(|e: &VssError| {
110-
matches!(
111-
e,
112-
VssError::NoSuchKeyError(..)
113-
| VssError::InvalidRequestError(..)
114-
| VssError::ConflictError(..)
115-
)
116-
}) as _);
117105

118-
let client = VssClient::new_with_headers(base_url, retry_policy, header_provider);
106+
let sync_retry_policy = retry_policy();
107+
let blocking_client = VssClient::new_with_headers(
108+
base_url.clone(),
109+
sync_retry_policy,
110+
header_provider.clone(),
111+
);
112+
113+
let async_retry_policy = retry_policy();
114+
let async_client =
115+
VssClient::new_with_headers(base_url, async_retry_policy, header_provider);
119116

120117
let inner = Arc::new(VssStoreInner::new(
121118
schema_version,
122-
client,
119+
blocking_client,
120+
async_client,
123121
store_id,
124122
data_encryption_key,
125123
key_obfuscator,
@@ -168,8 +166,11 @@ impl KVStoreSync for VssStore {
168166
let secondary_namespace = secondary_namespace.to_string();
169167
let key = key.to_string();
170168
let inner = Arc::clone(&self.inner);
171-
let fut =
172-
async move { inner.read_internal(primary_namespace, secondary_namespace, key).await };
169+
let fut = async move {
170+
inner
171+
.read_internal(&inner.blocking_client, primary_namespace, secondary_namespace, key)
172+
.await
173+
};
173174
tokio::task::block_in_place(move || internal_runtime.block_on(fut))
174175
}
175176

@@ -190,6 +191,7 @@ impl KVStoreSync for VssStore {
190191
let fut = async move {
191192
inner
192193
.write_internal(
194+
&inner.blocking_client,
193195
inner_lock_ref,
194196
locking_key,
195197
version,
@@ -220,6 +222,7 @@ impl KVStoreSync for VssStore {
220222
let fut = async move {
221223
inner
222224
.remove_internal(
225+
&inner.blocking_client,
223226
inner_lock_ref,
224227
locking_key,
225228
version,
@@ -241,7 +244,11 @@ impl KVStoreSync for VssStore {
241244
let primary_namespace = primary_namespace.to_string();
242245
let secondary_namespace = secondary_namespace.to_string();
243246
let inner = Arc::clone(&self.inner);
244-
let fut = async move { inner.list_internal(primary_namespace, secondary_namespace).await };
247+
let fut = async move {
248+
inner
249+
.list_internal(&inner.blocking_client, primary_namespace, secondary_namespace)
250+
.await
251+
};
245252
tokio::task::block_in_place(move || internal_runtime.block_on(fut))
246253
}
247254
}
@@ -254,9 +261,11 @@ impl KVStore for VssStore {
254261
let secondary_namespace = secondary_namespace.to_string();
255262
let key = key.to_string();
256263
let inner = Arc::clone(&self.inner);
257-
Box::pin(
258-
async move { inner.read_internal(primary_namespace, secondary_namespace, key).await },
259-
)
264+
Box::pin(async move {
265+
inner
266+
.read_internal(&inner.async_client, primary_namespace, secondary_namespace, key)
267+
.await
268+
})
260269
}
261270
fn write(
262271
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
@@ -270,6 +279,7 @@ impl KVStore for VssStore {
270279
Box::pin(async move {
271280
inner
272281
.write_internal(
282+
&inner.async_client,
273283
inner_lock_ref,
274284
locking_key,
275285
version,
@@ -293,6 +303,7 @@ impl KVStore for VssStore {
293303
Box::pin(async move {
294304
inner
295305
.remove_internal(
306+
&inner.async_client,
296307
inner_lock_ref,
297308
locking_key,
298309
version,
@@ -309,7 +320,9 @@ impl KVStore for VssStore {
309320
let primary_namespace = primary_namespace.to_string();
310321
let secondary_namespace = secondary_namespace.to_string();
311322
let inner = Arc::clone(&self.inner);
312-
Box::pin(async move { inner.list_internal(primary_namespace, secondary_namespace).await })
323+
Box::pin(async move {
324+
inner.list_internal(&inner.async_client, primary_namespace, secondary_namespace).await
325+
})
313326
}
314327
}
315328

@@ -322,7 +335,10 @@ impl Drop for VssStore {
322335

323336
struct VssStoreInner {
324337
schema_version: VssSchemaVersion,
325-
client: VssClient<CustomRetryPolicy>,
338+
blocking_client: VssClient<CustomRetryPolicy>,
339+
// A secondary client that will only be used for async persistence via `KVStore`, to ensure TCP
340+
// connections aren't shared between our outer and the internal runtime.
341+
async_client: VssClient<CustomRetryPolicy>,
326342
store_id: String,
327343
data_encryption_key: [u8; 32],
328344
key_obfuscator: KeyObfuscator,
@@ -333,11 +349,20 @@ struct VssStoreInner {
333349

334350
impl VssStoreInner {
335351
pub(crate) fn new(
336-
schema_version: VssSchemaVersion, client: VssClient<CustomRetryPolicy>, store_id: String,
352+
schema_version: VssSchemaVersion, blocking_client: VssClient<CustomRetryPolicy>,
353+
async_client: VssClient<CustomRetryPolicy>, store_id: String,
337354
data_encryption_key: [u8; 32], key_obfuscator: KeyObfuscator,
338355
) -> Self {
339356
let locks = Mutex::new(HashMap::new());
340-
Self { schema_version, client, store_id, data_encryption_key, key_obfuscator, locks }
357+
Self {
358+
schema_version,
359+
blocking_client,
360+
async_client,
361+
store_id,
362+
data_encryption_key,
363+
key_obfuscator,
364+
locks,
365+
}
341366
}
342367

343368
fn get_inner_lock_ref(&self, locking_key: String) -> Arc<tokio::sync::Mutex<u64>> {
@@ -397,7 +422,8 @@ impl VssStoreInner {
397422
}
398423

399424
async fn list_all_keys(
400-
&self, primary_namespace: &str, secondary_namespace: &str,
425+
&self, client: &VssClient<CustomRetryPolicy>, primary_namespace: &str,
426+
secondary_namespace: &str,
401427
) -> io::Result<Vec<String>> {
402428
let mut page_token = None;
403429
let mut keys = vec![];
@@ -410,7 +436,7 @@ impl VssStoreInner {
410436
page_size: None,
411437
};
412438

413-
let response = self.client.list_key_versions(&request).await.map_err(|e| {
439+
let response = client.list_key_versions(&request).await.map_err(|e| {
414440
let msg = format!(
415441
"Failed to list keys in {}/{}: {}",
416442
primary_namespace, secondary_namespace, e
@@ -427,13 +453,14 @@ impl VssStoreInner {
427453
}
428454

429455
async fn read_internal(
430-
&self, primary_namespace: String, secondary_namespace: String, key: String,
456+
&self, client: &VssClient<CustomRetryPolicy>, primary_namespace: String,
457+
secondary_namespace: String, key: String,
431458
) -> io::Result<Vec<u8>> {
432459
check_namespace_key_validity(&primary_namespace, &secondary_namespace, Some(&key), "read")?;
433460

434461
let store_key = self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key);
435462
let request = GetObjectRequest { store_id: self.store_id.clone(), key: store_key.clone() };
436-
let resp = self.client.get_object(&request).await.map_err(|e| {
463+
let resp = client.get_object(&request).await.map_err(|e| {
437464
let msg = format!(
438465
"Failed to read from key {}/{}/{}: {}",
439466
primary_namespace, secondary_namespace, key, e
@@ -462,8 +489,9 @@ impl VssStoreInner {
462489
}
463490

464491
async fn write_internal(
465-
&self, inner_lock_ref: Arc<tokio::sync::Mutex<u64>>, locking_key: String, version: u64,
466-
primary_namespace: String, secondary_namespace: String, key: String, buf: Vec<u8>,
492+
&self, client: &VssClient<CustomRetryPolicy>, inner_lock_ref: Arc<tokio::sync::Mutex<u64>>,
493+
locking_key: String, version: u64, primary_namespace: String, secondary_namespace: String,
494+
key: String, buf: Vec<u8>,
467495
) -> io::Result<()> {
468496
check_namespace_key_validity(
469497
&primary_namespace,
@@ -491,7 +519,7 @@ impl VssStoreInner {
491519
};
492520

493521
self.execute_locked_write(inner_lock_ref, locking_key, version, async move || {
494-
self.client.put_object(&request).await.map_err(|e| {
522+
client.put_object(&request).await.map_err(|e| {
495523
let msg = format!(
496524
"Failed to write to key {}/{}/{}: {}",
497525
primary_namespace, secondary_namespace, key, e
@@ -505,8 +533,9 @@ impl VssStoreInner {
505533
}
506534

507535
async fn remove_internal(
508-
&self, inner_lock_ref: Arc<tokio::sync::Mutex<u64>>, locking_key: String, version: u64,
509-
primary_namespace: String, secondary_namespace: String, key: String,
536+
&self, client: &VssClient<CustomRetryPolicy>, inner_lock_ref: Arc<tokio::sync::Mutex<u64>>,
537+
locking_key: String, version: u64, primary_namespace: String, secondary_namespace: String,
538+
key: String,
510539
) -> io::Result<()> {
511540
check_namespace_key_validity(
512541
&primary_namespace,
@@ -523,7 +552,7 @@ impl VssStoreInner {
523552
key_value: Some(KeyValue { key: obfuscated_key, version: -1, value: vec![] }),
524553
};
525554

526-
self.client.delete_object(&request).await.map_err(|e| {
555+
client.delete_object(&request).await.map_err(|e| {
527556
let msg = format!(
528557
"Failed to delete key {}/{}/{}: {}",
529558
primary_namespace, secondary_namespace, key, e
@@ -537,12 +566,15 @@ impl VssStoreInner {
537566
}
538567

539568
async fn list_internal(
540-
&self, primary_namespace: String, secondary_namespace: String,
569+
&self, client: &VssClient<CustomRetryPolicy>, primary_namespace: String,
570+
secondary_namespace: String,
541571
) -> io::Result<Vec<String>> {
542572
check_namespace_key_validity(&primary_namespace, &secondary_namespace, None, "list")?;
543573

544-
let keys =
545-
self.list_all_keys(&primary_namespace, &secondary_namespace).await.map_err(|e| {
574+
let keys = self
575+
.list_all_keys(client, &primary_namespace, &secondary_namespace)
576+
.await
577+
.map_err(|e| {
546578
let msg = format!(
547579
"Failed to retrieve keys in namespace: {}/{} : {}",
548580
primary_namespace, secondary_namespace, e
@@ -611,6 +643,21 @@ fn derive_data_encryption_and_obfuscation_keys(vss_seed: &[u8; 32]) -> ([u8; 32]
611643
(k1, k2)
612644
}
613645

646+
fn retry_policy() -> CustomRetryPolicy {
647+
ExponentialBackoffRetryPolicy::new(Duration::from_millis(10))
648+
.with_max_attempts(100)
649+
.with_max_total_delay(Duration::from_secs(180))
650+
.with_max_jitter(Duration::from_millis(100))
651+
.skip_retry_on_error(Box::new(|e: &VssError| {
652+
matches!(
653+
e,
654+
VssError::NoSuchKeyError(..)
655+
| VssError::InvalidRequestError(..)
656+
| VssError::ConflictError(..)
657+
)
658+
}) as _)
659+
}
660+
614661
/// A source for generating entropy/randomness using [`rand`].
615662
pub(crate) struct RandEntropySource;
616663

0 commit comments

Comments
 (0)