Skip to content

Commit 5647237

Browse files
committed
fixup! Implement tiered storage
Respect inner sync store semantics in TierStore and FFI store bridging Here we rework TierStore's `KVStoreSync` implementation so that synchronous calls no longer route through TierStore's own runtime and async helpers. Instead, sync operations now delegate through dedicated sync internal helpers that call the underlying stores' `KVStoreSync` methods directly. This preserves backend-specific sync behavior for wrapped stores, including stores that intentionally use a different execution strategy for synchronous operations than for async ones. In particular, it avoids forcing inner stores through TierStore's runtime when the inner store already defines its own sync semantics. As part of this change: - remove the `Runtime` dependency from `TierStore` - update `TierStore::new` and its call sites accordingly - add sync internal helpers for read/write/remove/list routing - add sync primary/backup helpers mirroring the async dual-store behavior - share ephemeral routing and primary/backup result handling across sync and async paths to reduce duplication We also update `FfiDynStore`'s sync write/remove path to avoid directly calling `tokio::sync::Mutex::blocking_lock` on a Tokio runtime worker thread. The sync path now goes through a small helper that uses `block_in_place` when already inside a runtime, preventing panics in runtime-backed test scenarios such as `start_stop_reinit`, and many others. The net effect is: - TierStore sync operations now honor the inner store's sync contract - FFI-backed sync writes/removals no longer panic when invoked from within a Tokio runtime - TierStore construction and tests are simplified by dropping the now-unused runtime plumbing
1 parent 8c5b995 commit 5647237

File tree

3 files changed

+344
-96
lines changed

3 files changed

+344
-96
lines changed

src/builder.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -838,8 +838,7 @@ impl NodeBuilder {
838838
};
839839

840840
let ts_config = self.tier_store_config.as_ref();
841-
let mut tier_store =
842-
TierStore::new(primary_store, Arc::clone(&runtime), Arc::clone(&logger));
841+
let mut tier_store = TierStore::new(primary_store, Arc::clone(&logger));
843842
if let Some(config) = ts_config {
844843
config.ephemeral.as_ref().map(|s| tier_store.set_ephemeral_store(Arc::clone(s)));
845844
config.backup.as_ref().map(|s| tier_store.set_backup_store(Arc::clone(s)));

src/ffi/types.rs

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -451,8 +451,7 @@ impl FfiDynStoreInner {
451451
"write",
452452
)?;
453453

454-
let res = {
455-
let mut last_written_version = inner_lock_ref.blocking_lock();
454+
let res = self.with_blocking_lock(&inner_lock_ref, |last_written_version| {
456455
if version <= *last_written_version {
457456
Ok(())
458457
} else {
@@ -463,7 +462,7 @@ impl FfiDynStoreInner {
463462
*last_written_version = version;
464463
})
465464
}
466-
};
465+
});
467466

468467
self.clean_locks(&inner_lock_ref, locking_key);
469468
res
@@ -504,8 +503,7 @@ impl FfiDynStoreInner {
504503
"remove",
505504
)?;
506505

507-
let res = {
508-
let mut last_written_version = inner_lock_ref.blocking_lock();
506+
let res = self.with_blocking_lock(&inner_lock_ref, |last_written_version| {
509507
if version <= *last_written_version {
510508
Ok(())
511509
} else {
@@ -516,7 +514,7 @@ impl FfiDynStoreInner {
516514
*last_written_version = version;
517515
})
518516
}
519-
};
517+
});
520518

521519
self.clean_locks(&inner_lock_ref, locking_key);
522520
res
@@ -582,6 +580,20 @@ impl FfiDynStoreInner {
582580
outer_lock.remove(&locking_key);
583581
}
584582
}
583+
584+
fn with_blocking_lock<T, F: FnOnce(&mut u64) -> T>(
585+
&self, inner_lock_ref: &Arc<tokio::sync::Mutex<u64>>, f: F,
586+
) -> T {
587+
if tokio::runtime::Handle::try_current().is_ok() {
588+
tokio::task::block_in_place(|| {
589+
let mut last_written_version = inner_lock_ref.blocking_lock();
590+
f(&mut last_written_version)
591+
})
592+
} else {
593+
let mut last_written_version = inner_lock_ref.blocking_lock();
594+
f(&mut last_written_version)
595+
}
596+
}
585597
}
586598

587599
#[async_trait::async_trait]

0 commit comments

Comments
 (0)