Skip to content

Commit 7155c40

Browse files
committed
fixup! Add per-key version-locked writes to FfiDynStore
Route FfiDynStore sync calls through the async bridge Make `FfiDynStore` async-first for Rust-side store access so sync and async callers share the same ordered mutation path. `FfiDynStore` exists to preserve per-key write ordering guarantees across the FFI boundary regardless of how a foreign store chooses to implement its sync and async methods. Previously, the sync path used `blocking_lock()` on a Tokio mutex, which could panic when invoked from within a Tokio runtime. An earlier attempt to patch that by introducing a per-instance runtime also created a runtime-drop hazard in async contexts. This change makes the async path canonical and bridges Rust-side sync calls through it using a shared process-wide Tokio runtime. Concretely: - add a shared `OnceLock`-backed Tokio runtime for the sync bridge - add `run_sync_via_async` to execute Rust-side sync calls through the async implementation - route sync `read`/`write`/`remove`/`list` through the async internal methods - remove the old sync internal methods and the `blocking_lock()`-based path - document that Rust-side sync calls may invoke the foreign async methods rather than the foreign sync methods directly This preserves shared write-ordering guarantees across sync and async callers while avoiding runtime-sensitive blocking behavior and per-instance runtime teardown issues.
1 parent d52cd8b commit 7155c40

1 file changed

Lines changed: 112 additions & 105 deletions

File tree

src/ffi/io.rs

Lines changed: 112 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use std::collections::HashMap;
88
use std::future::Future;
99
use std::pin::Pin;
1010
use std::sync::atomic::{AtomicU64, Ordering};
11-
use std::sync::{Arc, Mutex};
11+
use std::sync::{Arc, Mutex, OnceLock};
1212

1313
use crate::{
1414
io::utils::check_namespace_key_validity, DynStoreTrait, DynStoreWrapper, SyncAndAsyncKVStore,
@@ -114,6 +114,23 @@ impl std::fmt::Display for IOError {
114114

115115
/// FFI-safe version of [`DynStoreTrait`].
116116
///
117+
/// Foreign implementations must provide both synchronous and asynchronous store
118+
/// methods so they can be used across the Rust and language-binding surfaces.
119+
///
120+
/// Internally, [`FfiDynStore`] treats the asynchronous write/remove path as the
121+
/// canonical implementation for ordered mutations. Its synchronous methods are
122+
/// bridged through that async path to preserve per-key write ordering guarantees
123+
/// across both sync and async callers.
124+
///
125+
/// As a result, Rust-side synchronous calls routed through [`FfiDynStore`] may
126+
/// invoke the async methods of a foreign implementation rather than its sync
127+
/// methods directly.
128+
///
129+
/// Note: Foreign implementations are free to make either the sync or async methods
130+
/// their primary implementation and have the other delegate to it. [`FfiDynStore`]
131+
/// does not assume which side is primary, but its Rust-side sync bridge executes
132+
/// through the async interface.
133+
///
117134
/// [`DynStoreTrait`]: crate::types::DynStoreTrait
118135
#[uniffi::export(with_foreign)]
119136
#[async_trait::async_trait]
@@ -145,6 +162,16 @@ pub trait FfiDynStoreTrait: Send + Sync {
145162
) -> Result<Vec<String>, IOError>;
146163
}
147164

165+
/// Bridges a foreign [`FfiDynStoreTrait`] implementation into Rust's
166+
/// [`DynStoreTrait`] while enforcing per-key write ordering.
167+
///
168+
/// Ordered writes/removals are coordinated within [`FfiDynStore`] so the
169+
/// underlying foreign store does not need to provide its own cross-call ordering
170+
/// guarantees.
171+
///
172+
/// The async mutation path is canonical. Sync operations are executed by running
173+
/// the corresponding async operation on an internal runtime, which preserves the
174+
/// same ordering guarantees for both sync and async callers.
148175
#[derive(Clone, uniffi::Object)]
149176
pub struct FfiDynStore {
150177
inner: Arc<FfiDynStoreInner>,
@@ -156,6 +183,7 @@ impl FfiDynStore {
156183
#[uniffi::constructor]
157184
pub fn from_store(store: Arc<dyn FfiDynStoreTrait>) -> Self {
158185
let inner = Arc::new(FfiDynStoreInner::new(store));
186+
159187
Self { inner, next_write_version: Arc::new(AtomicU64::new(1)) }
160188
}
161189
}
@@ -183,6 +211,34 @@ impl FfiDynStore {
183211

184212
(inner_lock_ref, version)
185213
}
214+
215+
/// Runs a synchronous Rust-side store call via FfiDynStore's canonical async path.
216+
///
217+
/// This keeps sync callers aligned with the same ordered mutation logic used by
218+
/// async callers and avoids blocking directly on async coordination primitives.
219+
///
220+
/// If already inside a Tokio runtime, the async operation is run from a
221+
/// dedicated OS thread using a shared process-wide runtime.
222+
fn run_sync_via_async<T, F>(&self, fut: F) -> Result<T, bitcoin::io::Error>
223+
where
224+
T: Send + 'static,
225+
F: Future<Output = Result<T, bitcoin::io::Error>> + Send + 'static,
226+
{
227+
let runtime = ffi_dynstore_runtime();
228+
229+
// If we are already inside a Tokio runtime, avoid blocking the current Tokio runtime
230+
// thread directly and run the async operation on a dedicated OS thread.
231+
if tokio::runtime::Handle::try_current().is_ok() {
232+
std::thread::spawn(move || runtime.block_on(fut)).join().unwrap_or_else(|_| {
233+
Err(bitcoin::io::Error::new(
234+
bitcoin::io::ErrorKind::Other,
235+
"FfiDynStore sync bridge thread panicked",
236+
))
237+
})
238+
} else {
239+
runtime.block_on(fut)
240+
}
241+
}
186242
}
187243

188244
impl DynStoreTrait for FfiDynStore {
@@ -280,7 +336,12 @@ impl DynStoreTrait for FfiDynStore {
280336
let secondary_namespace = secondary_namespace.to_string();
281337
let key = key.to_string();
282338

283-
store.read_internal(primary_namespace, secondary_namespace, key)
339+
self.run_sync_via_async(async move {
340+
store
341+
.read_internal_async(primary_namespace, secondary_namespace, key)
342+
.await
343+
.map_err(|e| e.into())
344+
})
284345
}
285346

286347
fn write(
@@ -295,15 +356,20 @@ impl DynStoreTrait for FfiDynStore {
295356
let locking_key = self.build_locking_key(&primary_namespace, &secondary_namespace, &key);
296357
let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone());
297358

298-
store.write_internal(
299-
inner_lock_ref,
300-
locking_key,
301-
version,
302-
primary_namespace,
303-
secondary_namespace,
304-
key,
305-
buf,
306-
)
359+
self.run_sync_via_async(async move {
360+
store
361+
.write_internal_async(
362+
inner_lock_ref,
363+
locking_key,
364+
version,
365+
primary_namespace,
366+
secondary_namespace,
367+
key,
368+
buf,
369+
)
370+
.await
371+
.map_err(|e| e.into())
372+
})
307373
}
308374

309375
fn remove(
@@ -318,15 +384,20 @@ impl DynStoreTrait for FfiDynStore {
318384
let locking_key = self.build_locking_key(&primary_namespace, &secondary_namespace, &key);
319385
let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone());
320386

321-
store.remove_internal(
322-
inner_lock_ref,
323-
locking_key,
324-
version,
325-
primary_namespace,
326-
secondary_namespace,
327-
key,
328-
lazy,
329-
)
387+
self.run_sync_via_async(async move {
388+
store
389+
.remove_internal_async(
390+
inner_lock_ref,
391+
locking_key,
392+
version,
393+
primary_namespace,
394+
secondary_namespace,
395+
key,
396+
lazy,
397+
)
398+
.await
399+
.map_err(|e| e.into())
400+
})
330401
}
331402

332403
fn list(
@@ -337,7 +408,12 @@ impl DynStoreTrait for FfiDynStore {
337408
let primary_namespace = primary_namespace.to_string();
338409
let secondary_namespace = secondary_namespace.to_string();
339410

340-
store.list_internal(primary_namespace, secondary_namespace)
411+
self.run_sync_via_async(async move {
412+
store
413+
.list_internal_async(primary_namespace, secondary_namespace)
414+
.await
415+
.map_err(|e| e.into())
416+
})
341417
}
342418
}
343419

@@ -366,13 +442,6 @@ impl FfiDynStoreInner {
366442
.map_err(|e| e.into())
367443
}
368444

369-
fn read_internal(
370-
&self, primary_namespace: String, secondary_namespace: String, key: String,
371-
) -> bitcoin::io::Result<Vec<u8>> {
372-
check_namespace_key_validity(&primary_namespace, &secondary_namespace, Some(&key), "read")?;
373-
self.ffi_store.read(primary_namespace, secondary_namespace, key).map_err(|e| e.into())
374-
}
375-
376445
async fn write_internal_async(
377446
&self, inner_lock_ref: Arc<tokio::sync::Mutex<u64>>, locking_key: String, version: u64,
378447
primary_namespace: String, secondary_namespace: String, key: String, buf: Vec<u8>,
@@ -397,34 +466,6 @@ impl FfiDynStoreInner {
397466
.await
398467
}
399468

400-
fn write_internal(
401-
&self, inner_lock_ref: Arc<tokio::sync::Mutex<u64>>, locking_key: String, version: u64,
402-
primary_namespace: String, secondary_namespace: String, key: String, buf: Vec<u8>,
403-
) -> bitcoin::io::Result<()> {
404-
check_namespace_key_validity(
405-
&primary_namespace,
406-
&secondary_namespace,
407-
Some(&key),
408-
"write",
409-
)?;
410-
411-
let res = self.with_blocking_lock(&inner_lock_ref, |last_written_version| {
412-
if version <= *last_written_version {
413-
Ok(())
414-
} else {
415-
self.ffi_store
416-
.write(primary_namespace, secondary_namespace, key, buf)
417-
.map_err(|e| e.into())
418-
.map(|_| {
419-
*last_written_version = version;
420-
})
421-
}
422-
});
423-
424-
self.clean_locks(&inner_lock_ref, locking_key);
425-
res
426-
}
427-
428469
async fn remove_internal_async(
429470
&self, inner_lock_ref: Arc<tokio::sync::Mutex<u64>>, locking_key: String, version: u64,
430471
primary_namespace: String, secondary_namespace: String, key: String, lazy: bool,
@@ -449,34 +490,6 @@ impl FfiDynStoreInner {
449490
.await
450491
}
451492

452-
fn remove_internal(
453-
&self, inner_lock_ref: Arc<tokio::sync::Mutex<u64>>, locking_key: String, version: u64,
454-
primary_namespace: String, secondary_namespace: String, key: String, lazy: bool,
455-
) -> bitcoin::io::Result<()> {
456-
check_namespace_key_validity(
457-
&primary_namespace,
458-
&secondary_namespace,
459-
Some(&key),
460-
"remove",
461-
)?;
462-
463-
let res = self.with_blocking_lock(&inner_lock_ref, |last_written_version| {
464-
if version <= *last_written_version {
465-
Ok(())
466-
} else {
467-
self.ffi_store
468-
.remove(primary_namespace, secondary_namespace, key, lazy)
469-
.map_err(|e| <IOError as Into<bitcoin::io::Error>>::into(e))
470-
.map(|_| {
471-
*last_written_version = version;
472-
})
473-
}
474-
});
475-
476-
self.clean_locks(&inner_lock_ref, locking_key);
477-
res
478-
}
479-
480493
async fn list_internal_async(
481494
&self, primary_namespace: String, secondary_namespace: String,
482495
) -> bitcoin::io::Result<Vec<String>> {
@@ -487,13 +500,6 @@ impl FfiDynStoreInner {
487500
.map_err(|e| e.into())
488501
}
489502

490-
fn list_internal(
491-
&self, primary_namespace: String, secondary_namespace: String,
492-
) -> bitcoin::io::Result<Vec<String>> {
493-
check_namespace_key_validity(&primary_namespace, &secondary_namespace, None, "list")?;
494-
self.ffi_store.list(primary_namespace, secondary_namespace).map_err(|e| e.into())
495-
}
496-
497503
async fn execute_locked_write<
498504
F: Future<Output = Result<(), bitcoin::io::Error>>,
499505
FN: FnOnce() -> F,
@@ -537,20 +543,6 @@ impl FfiDynStoreInner {
537543
outer_lock.remove(&locking_key);
538544
}
539545
}
540-
541-
fn with_blocking_lock<T, F: FnOnce(&mut u64) -> T>(
542-
&self, inner_lock_ref: &Arc<tokio::sync::Mutex<u64>>, f: F,
543-
) -> T {
544-
if tokio::runtime::Handle::try_current().is_ok() {
545-
tokio::task::block_in_place(|| {
546-
let mut last_written_version = inner_lock_ref.blocking_lock();
547-
f(&mut last_written_version)
548-
})
549-
} else {
550-
let mut last_written_version = inner_lock_ref.blocking_lock();
551-
f(&mut last_written_version)
552-
}
553-
}
554546
}
555547

556548
#[async_trait::async_trait]
@@ -620,3 +612,18 @@ impl<T: SyncAndAsyncKVStore + Send + Sync + 'static> From<T> for FfiDynStore {
620612
Self::from_store(Arc::new(DynStoreWrapper(store)))
621613
}
622614
}
615+
616+
/// Returns the process-wide runtime used to bridge Rust-side synchronous
617+
/// `FfiDynStore` calls through the canonical async store path.
618+
fn ffi_dynstore_runtime() -> &'static tokio::runtime::Runtime {
619+
static RUNTIME: OnceLock<tokio::runtime::Runtime> = OnceLock::new();
620+
621+
RUNTIME.get_or_init(|| {
622+
tokio::runtime::Builder::new_multi_thread()
623+
.enable_all()
624+
.worker_threads(2)
625+
.max_blocking_threads(2)
626+
.build()
627+
.expect("Failed to build FfiDynStore runtime")
628+
})
629+
}

0 commit comments

Comments
 (0)