Skip to content

Commit 8f122cb

Browse files
committed
fixup! Add per-key version-locked writes to FfiDynStore
Route FfiDynStore sync calls through the async bridge Make `FfiDynStore` async-first for Rust-side store access so sync and async callers share the same ordered mutation path. `FfiDynStore` exists to preserve per-key write ordering guarantees across the FFI boundary regardless of how a foreign store chooses to implement its sync and async methods. Previously, the sync path used `blocking_lock()` on a Tokio mutex, which could panic when invoked from within a Tokio runtime. An earlier attempt to patch that by introducing a per-instance runtime also created a runtime-drop hazard in async contexts. This change makes the async path canonical and bridges Rust-side sync calls through it using a shared process-wide Tokio runtime. Concretely: - add a shared `OnceLock`-backed Tokio runtime for the sync bridge - add `run_sync_via_async` to execute Rust-side sync calls through the async implementation - route sync `read`/`write`/`remove`/`list` through the async internal methods - remove the old sync internal methods and the `blocking_lock()`-based path - document that Rust-side sync calls may invoke the foreign async methods rather than the foreign sync methods directly This preserves shared write-ordering guarantees across sync and async callers while avoiding runtime-sensitive blocking behavior and per-instance runtime teardown issues.
1 parent 04855d7 commit 8f122cb

File tree

1 file changed

+116
-113
lines changed

1 file changed

+116
-113
lines changed

src/ffi/io.rs

Lines changed: 116 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,11 @@
44
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
55
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
66

7-
use std::{
8-
collections::HashMap,
9-
future::Future,
10-
pin::Pin,
11-
sync::{
12-
atomic::{AtomicU64, Ordering},
13-
Arc, Mutex,
14-
},
15-
};
7+
use std::collections::HashMap;
8+
use std::future::Future;
9+
use std::pin::Pin;
10+
use std::sync::atomic::{AtomicU64, Ordering};
11+
use std::sync::{Arc, Mutex, OnceLock};
1612

1713
use crate::{
1814
io::utils::check_namespace_key_validity, DynStoreTrait, DynStoreWrapper, SyncAndAsyncKVStore,
@@ -118,6 +114,23 @@ impl std::fmt::Display for IOError {
118114

119115
/// FFI-safe version of [`DynStoreTrait`].
120116
///
117+
/// Foreign implementations must provide both synchronous and asynchronous store
118+
/// methods so they can be used across the Rust and language-binding surfaces.
119+
///
120+
/// Internally, [`FfiDynStore`] treats the asynchronous write/remove path as the
121+
/// canonical implementation for ordered mutations. Its synchronous methods are
122+
/// bridged through that async path to preserve per-key write ordering guarantees
123+
/// across both sync and async callers.
124+
///
125+
/// As a result, Rust-side synchronous calls routed through [`FfiDynStore`] may
126+
/// invoke the async methods of a foreign implementation rather than its sync
127+
/// methods directly.
128+
///
129+
/// Note: Foreign implementations are free to make either the sync or async methods
130+
/// their primary implementation and have the other delegate to it. [`FfiDynStore`]
131+
/// does not assume which side is primary, but its Rust-side sync bridge executes
132+
/// through the async interface.
133+
///
121134
/// [`DynStoreTrait`]: crate::types::DynStoreTrait
122135
#[uniffi::export(with_foreign)]
123136
#[async_trait::async_trait]
@@ -149,6 +162,16 @@ pub trait FfiDynStoreTrait: Send + Sync {
149162
) -> Result<Vec<String>, IOError>;
150163
}
151164

165+
/// Bridges a foreign [`FfiDynStoreTrait`] implementation into Rust's
166+
/// [`DynStoreTrait`] while enforcing per-key write ordering.
167+
///
168+
/// Ordered writes/removals are coordinated within [`FfiDynStore`] so the
169+
/// underlying foreign store does not need to provide its own cross-call ordering
170+
/// guarantees.
171+
///
172+
/// The async mutation path is canonical. Sync operations are executed by running
173+
/// the corresponding async operation on an internal runtime, which preserves the
174+
/// same ordering guarantees for both sync and async callers.
152175
#[derive(Clone, uniffi::Object)]
153176
pub struct FfiDynStore {
154177
inner: Arc<FfiDynStoreInner>,
@@ -160,6 +183,7 @@ impl FfiDynStore {
160183
#[uniffi::constructor]
161184
pub fn from_store(store: Arc<dyn FfiDynStoreTrait>) -> Self {
162185
let inner = Arc::new(FfiDynStoreInner::new(store));
186+
163187
Self { inner, next_write_version: Arc::new(AtomicU64::new(1)) }
164188
}
165189
}
@@ -187,6 +211,34 @@ impl FfiDynStore {
187211

188212
(inner_lock_ref, version)
189213
}
214+
215+
/// Runs a synchronous Rust-side store call via FfiDynStore's canonical async path.
216+
///
217+
/// This keeps sync callers aligned with the same ordered mutation logic used by
218+
/// async callers and avoids blocking directly on async coordination primitives.
219+
///
220+
/// If already inside a Tokio runtime, the async operation is run from a
221+
/// dedicated OS thread using a shared process-wide runtime.
222+
fn run_sync_via_async<T, F>(&self, fut: F) -> Result<T, bitcoin::io::Error>
223+
where
224+
T: Send + 'static,
225+
F: Future<Output = Result<T, bitcoin::io::Error>> + Send + 'static,
226+
{
227+
let runtime = ffi_dynstore_runtime();
228+
229+
// If we are already inside a Tokio runtime, avoid blocking the current Tokio runtime
230+
// thread directly and run the async operation on a dedicated OS thread.
231+
if tokio::runtime::Handle::try_current().is_ok() {
232+
std::thread::spawn(move || runtime.block_on(fut)).join().unwrap_or_else(|_| {
233+
Err(bitcoin::io::Error::new(
234+
bitcoin::io::ErrorKind::Other,
235+
"FfiDynStore sync bridge thread panicked",
236+
))
237+
})
238+
} else {
239+
runtime.block_on(fut)
240+
}
241+
}
190242
}
191243

192244
impl DynStoreTrait for FfiDynStore {
@@ -284,7 +336,12 @@ impl DynStoreTrait for FfiDynStore {
284336
let secondary_namespace = secondary_namespace.to_string();
285337
let key = key.to_string();
286338

287-
store.read_internal(primary_namespace, secondary_namespace, key)
339+
self.run_sync_via_async(async move {
340+
store
341+
.read_internal_async(primary_namespace, secondary_namespace, key)
342+
.await
343+
.map_err(|e| e.into())
344+
})
288345
}
289346

290347
fn write(
@@ -299,15 +356,20 @@ impl DynStoreTrait for FfiDynStore {
299356
let locking_key = self.build_locking_key(&primary_namespace, &secondary_namespace, &key);
300357
let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone());
301358

302-
store.write_internal(
303-
inner_lock_ref,
304-
locking_key,
305-
version,
306-
primary_namespace,
307-
secondary_namespace,
308-
key,
309-
buf,
310-
)
359+
self.run_sync_via_async(async move {
360+
store
361+
.write_internal_async(
362+
inner_lock_ref,
363+
locking_key,
364+
version,
365+
primary_namespace,
366+
secondary_namespace,
367+
key,
368+
buf,
369+
)
370+
.await
371+
.map_err(|e| e.into())
372+
})
311373
}
312374

313375
fn remove(
@@ -322,15 +384,20 @@ impl DynStoreTrait for FfiDynStore {
322384
let locking_key = self.build_locking_key(&primary_namespace, &secondary_namespace, &key);
323385
let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone());
324386

325-
store.remove_internal(
326-
inner_lock_ref,
327-
locking_key,
328-
version,
329-
primary_namespace,
330-
secondary_namespace,
331-
key,
332-
lazy,
333-
)
387+
self.run_sync_via_async(async move {
388+
store
389+
.remove_internal_async(
390+
inner_lock_ref,
391+
locking_key,
392+
version,
393+
primary_namespace,
394+
secondary_namespace,
395+
key,
396+
lazy,
397+
)
398+
.await
399+
.map_err(|e| e.into())
400+
})
334401
}
335402

336403
fn list(
@@ -341,7 +408,12 @@ impl DynStoreTrait for FfiDynStore {
341408
let primary_namespace = primary_namespace.to_string();
342409
let secondary_namespace = secondary_namespace.to_string();
343410

344-
store.list_internal(primary_namespace, secondary_namespace)
411+
self.run_sync_via_async(async move {
412+
store
413+
.list_internal_async(primary_namespace, secondary_namespace)
414+
.await
415+
.map_err(|e| e.into())
416+
})
345417
}
346418
}
347419

@@ -370,13 +442,6 @@ impl FfiDynStoreInner {
370442
.map_err(|e| e.into())
371443
}
372444

373-
fn read_internal(
374-
&self, primary_namespace: String, secondary_namespace: String, key: String,
375-
) -> bitcoin::io::Result<Vec<u8>> {
376-
check_namespace_key_validity(&primary_namespace, &secondary_namespace, Some(&key), "read")?;
377-
self.ffi_store.read(primary_namespace, secondary_namespace, key).map_err(|e| e.into())
378-
}
379-
380445
async fn write_internal_async(
381446
&self, inner_lock_ref: Arc<tokio::sync::Mutex<u64>>, locking_key: String, version: u64,
382447
primary_namespace: String, secondary_namespace: String, key: String, buf: Vec<u8>,
@@ -401,34 +466,6 @@ impl FfiDynStoreInner {
401466
.await
402467
}
403468

404-
fn write_internal(
405-
&self, inner_lock_ref: Arc<tokio::sync::Mutex<u64>>, locking_key: String, version: u64,
406-
primary_namespace: String, secondary_namespace: String, key: String, buf: Vec<u8>,
407-
) -> bitcoin::io::Result<()> {
408-
check_namespace_key_validity(
409-
&primary_namespace,
410-
&secondary_namespace,
411-
Some(&key),
412-
"write",
413-
)?;
414-
415-
let res = self.with_blocking_lock(&inner_lock_ref, |last_written_version| {
416-
if version <= *last_written_version {
417-
Ok(())
418-
} else {
419-
self.ffi_store
420-
.write(primary_namespace, secondary_namespace, key, buf)
421-
.map_err(|e| e.into())
422-
.map(|_| {
423-
*last_written_version = version;
424-
})
425-
}
426-
});
427-
428-
self.clean_locks(&inner_lock_ref, locking_key);
429-
res
430-
}
431-
432469
async fn remove_internal_async(
433470
&self, inner_lock_ref: Arc<tokio::sync::Mutex<u64>>, locking_key: String, version: u64,
434471
primary_namespace: String, secondary_namespace: String, key: String, lazy: bool,
@@ -453,34 +490,6 @@ impl FfiDynStoreInner {
453490
.await
454491
}
455492

456-
fn remove_internal(
457-
&self, inner_lock_ref: Arc<tokio::sync::Mutex<u64>>, locking_key: String, version: u64,
458-
primary_namespace: String, secondary_namespace: String, key: String, lazy: bool,
459-
) -> bitcoin::io::Result<()> {
460-
check_namespace_key_validity(
461-
&primary_namespace,
462-
&secondary_namespace,
463-
Some(&key),
464-
"remove",
465-
)?;
466-
467-
let res = self.with_blocking_lock(&inner_lock_ref, |last_written_version| {
468-
if version <= *last_written_version {
469-
Ok(())
470-
} else {
471-
self.ffi_store
472-
.remove(primary_namespace, secondary_namespace, key, lazy)
473-
.map_err(|e| <IOError as Into<bitcoin::io::Error>>::into(e))
474-
.map(|_| {
475-
*last_written_version = version;
476-
})
477-
}
478-
});
479-
480-
self.clean_locks(&inner_lock_ref, locking_key);
481-
res
482-
}
483-
484493
async fn list_internal_async(
485494
&self, primary_namespace: String, secondary_namespace: String,
486495
) -> bitcoin::io::Result<Vec<String>> {
@@ -491,13 +500,6 @@ impl FfiDynStoreInner {
491500
.map_err(|e| e.into())
492501
}
493502

494-
fn list_internal(
495-
&self, primary_namespace: String, secondary_namespace: String,
496-
) -> bitcoin::io::Result<Vec<String>> {
497-
check_namespace_key_validity(&primary_namespace, &secondary_namespace, None, "list")?;
498-
self.ffi_store.list(primary_namespace, secondary_namespace).map_err(|e| e.into())
499-
}
500-
501503
async fn execute_locked_write<
502504
F: Future<Output = Result<(), bitcoin::io::Error>>,
503505
FN: FnOnce() -> F,
@@ -541,20 +543,6 @@ impl FfiDynStoreInner {
541543
outer_lock.remove(&locking_key);
542544
}
543545
}
544-
545-
fn with_blocking_lock<T, F: FnOnce(&mut u64) -> T>(
546-
&self, inner_lock_ref: &Arc<tokio::sync::Mutex<u64>>, f: F,
547-
) -> T {
548-
if tokio::runtime::Handle::try_current().is_ok() {
549-
tokio::task::block_in_place(|| {
550-
let mut last_written_version = inner_lock_ref.blocking_lock();
551-
f(&mut last_written_version)
552-
})
553-
} else {
554-
let mut last_written_version = inner_lock_ref.blocking_lock();
555-
f(&mut last_written_version)
556-
}
557-
}
558546
}
559547

560548
#[async_trait::async_trait]
@@ -624,3 +612,18 @@ impl<T: SyncAndAsyncKVStore + Send + Sync + 'static> From<T> for FfiDynStore {
624612
Self::from_store(Arc::new(DynStoreWrapper(store)))
625613
}
626614
}
615+
616+
/// Returns the process-wide runtime used to bridge Rust-side synchronous
617+
/// `FfiDynStore` calls through the canonical async store path.
618+
fn ffi_dynstore_runtime() -> &'static tokio::runtime::Runtime {
619+
static RUNTIME: OnceLock<tokio::runtime::Runtime> = OnceLock::new();
620+
621+
RUNTIME.get_or_init(|| {
622+
tokio::runtime::Builder::new_multi_thread()
623+
.enable_all()
624+
.worker_threads(2)
625+
.max_blocking_threads(2)
626+
.build()
627+
.expect("Failed to build FfiDynStore runtime")
628+
})
629+
}

0 commit comments

Comments
 (0)