Skip to content

Commit e01ecce

Browse files
authored
Merge pull request #4658 from tnull/2026-06-migratable-kv-store-async
Add async variant of `MigratableKVStore`
2 parents 467cd0b + 94cff3b commit e01ecce

5 files changed

Lines changed: 367 additions & 108 deletions

File tree

lightning-persister/src/fs_store/common.rs

Lines changed: 105 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,94 @@ impl FilesystemStoreInner {
470470

471471
Ok(keys)
472472
}
473+
474+
fn list_all_keys(
475+
&self, use_empty_ns_dir: bool,
476+
) -> Result<Vec<(String, String, String)>, lightning::io::Error> {
477+
let prefixed_dest = &self.data_dir;
478+
if !prefixed_dest.exists() {
479+
return Ok(Vec::new());
480+
}
481+
482+
let mut keys = Vec::new();
483+
484+
'primary_loop: for primary_entry in fs::read_dir(prefixed_dest)? {
485+
let primary_entry = primary_entry?;
486+
let primary_path = primary_entry.path();
487+
if dir_entry_is_store_artifact(&primary_path) {
488+
continue 'primary_loop;
489+
}
490+
491+
if dir_entry_is_key(&primary_entry)? {
492+
let primary_namespace = String::new();
493+
let secondary_namespace = String::new();
494+
let key = get_key_from_dir_entry_path(&primary_path, prefixed_dest, false)?;
495+
keys.push((primary_namespace, secondary_namespace, key));
496+
continue 'primary_loop;
497+
}
498+
499+
// The primary_entry is actually also a directory.
500+
'secondary_loop: for secondary_entry in fs::read_dir(&primary_path)? {
501+
let secondary_entry = secondary_entry?;
502+
let secondary_path = secondary_entry.path();
503+
if dir_entry_is_store_artifact(&secondary_path) {
504+
continue 'secondary_loop;
505+
}
506+
507+
if dir_entry_is_key(&secondary_entry)? {
508+
let primary_namespace = get_key_from_dir_entry_path(
509+
&primary_path,
510+
prefixed_dest,
511+
use_empty_ns_dir,
512+
)?;
513+
let secondary_namespace = String::new();
514+
let key = get_key_from_dir_entry_path(&secondary_path, &primary_path, false)?;
515+
keys.push((primary_namespace, secondary_namespace, key));
516+
continue 'secondary_loop;
517+
}
518+
519+
// The secondary_entry is actually also a directory.
520+
for tertiary_entry in fs::read_dir(&secondary_path)? {
521+
let tertiary_entry = tertiary_entry?;
522+
let tertiary_path = tertiary_entry.path();
523+
if dir_entry_is_store_artifact(&tertiary_path) {
524+
continue;
525+
}
526+
527+
if dir_entry_is_key(&tertiary_entry)? {
528+
let primary_namespace = get_key_from_dir_entry_path(
529+
&primary_path,
530+
prefixed_dest,
531+
use_empty_ns_dir,
532+
)?;
533+
let secondary_namespace = get_key_from_dir_entry_path(
534+
&secondary_path,
535+
&primary_path,
536+
use_empty_ns_dir,
537+
)?;
538+
let key =
539+
get_key_from_dir_entry_path(&tertiary_path, &secondary_path, false)?;
540+
keys.push((primary_namespace, secondary_namespace, key));
541+
} else {
542+
debug_assert!(
543+
false,
544+
"Failed to list keys of path {}: only two levels of namespaces are supported",
545+
PrintableString(tertiary_path.to_str().unwrap_or_default())
546+
);
547+
let msg = format!(
548+
"Failed to list keys of path {}: only two levels of namespaces are supported",
549+
PrintableString(tertiary_path.to_str().unwrap_or_default())
550+
);
551+
return Err(lightning::io::Error::new(
552+
lightning::io::ErrorKind::Other,
553+
msg,
554+
));
555+
}
556+
}
557+
}
558+
}
559+
Ok(keys)
560+
}
473561
}
474562

475563
impl FilesystemStoreState {
@@ -640,92 +728,26 @@ impl FilesystemStoreState {
640728
}
641729
}
642730

643-
pub(crate) fn list_all_keys_impl(
731+
#[cfg(feature = "tokio")]
732+
pub(crate) fn list_all_keys_async(
644733
&self, use_empty_ns_dir: bool,
645-
) -> Result<Vec<(String, String, String)>, lightning::io::Error> {
646-
let prefixed_dest = &self.inner.data_dir;
647-
if !prefixed_dest.exists() {
648-
return Ok(Vec::new());
649-
}
650-
651-
let mut keys = Vec::new();
652-
653-
'primary_loop: for primary_entry in fs::read_dir(prefixed_dest)? {
654-
let primary_entry = primary_entry?;
655-
let primary_path = primary_entry.path();
656-
if dir_entry_is_store_artifact(&primary_path) {
657-
continue 'primary_loop;
658-
}
659-
660-
if dir_entry_is_key(&primary_entry)? {
661-
let primary_namespace = String::new();
662-
let secondary_namespace = String::new();
663-
let key = get_key_from_dir_entry_path(&primary_path, prefixed_dest, false)?;
664-
keys.push((primary_namespace, secondary_namespace, key));
665-
continue 'primary_loop;
666-
}
667-
668-
// The primary_entry is actually also a directory.
669-
'secondary_loop: for secondary_entry in fs::read_dir(&primary_path)? {
670-
let secondary_entry = secondary_entry?;
671-
let secondary_path = secondary_entry.path();
672-
if dir_entry_is_store_artifact(&secondary_path) {
673-
continue 'secondary_loop;
674-
}
675-
676-
if dir_entry_is_key(&secondary_entry)? {
677-
let primary_namespace = get_key_from_dir_entry_path(
678-
&primary_path,
679-
prefixed_dest,
680-
use_empty_ns_dir,
681-
)?;
682-
let secondary_namespace = String::new();
683-
let key = get_key_from_dir_entry_path(&secondary_path, &primary_path, false)?;
684-
keys.push((primary_namespace, secondary_namespace, key));
685-
continue 'secondary_loop;
686-
}
687-
688-
// The secondary_entry is actually also a directory.
689-
for tertiary_entry in fs::read_dir(&secondary_path)? {
690-
let tertiary_entry = tertiary_entry?;
691-
let tertiary_path = tertiary_entry.path();
692-
if dir_entry_is_store_artifact(&tertiary_path) {
693-
continue;
694-
}
734+
) -> impl Future<Output = Result<Vec<(String, String, String)>, lightning::io::Error>> + 'static + Send
735+
{
736+
let this = Arc::clone(&self.inner);
695737

696-
if dir_entry_is_key(&tertiary_entry)? {
697-
let primary_namespace = get_key_from_dir_entry_path(
698-
&primary_path,
699-
prefixed_dest,
700-
use_empty_ns_dir,
701-
)?;
702-
let secondary_namespace = get_key_from_dir_entry_path(
703-
&secondary_path,
704-
&primary_path,
705-
use_empty_ns_dir,
706-
)?;
707-
let key =
708-
get_key_from_dir_entry_path(&tertiary_path, &secondary_path, false)?;
709-
keys.push((primary_namespace, secondary_namespace, key));
710-
} else {
711-
debug_assert!(
712-
false,
713-
"Failed to list keys of path {}: only two levels of namespaces are supported",
714-
PrintableString(tertiary_path.to_str().unwrap_or_default())
715-
);
716-
let msg = format!(
717-
"Failed to list keys of path {}: only two levels of namespaces are supported",
718-
PrintableString(tertiary_path.to_str().unwrap_or_default())
719-
);
720-
return Err(lightning::io::Error::new(
721-
lightning::io::ErrorKind::Other,
722-
msg,
723-
));
724-
}
725-
}
726-
}
738+
async move {
739+
tokio::task::spawn_blocking(move || this.list_all_keys(use_empty_ns_dir))
740+
.await
741+
.unwrap_or_else(|e| {
742+
Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))
743+
})
727744
}
728-
Ok(keys)
745+
}
746+
747+
pub(crate) fn list_all_keys_impl(
748+
&self, use_empty_ns_dir: bool,
749+
) -> Result<Vec<(String, String, String)>, lightning::io::Error> {
750+
self.inner.list_all_keys(use_empty_ns_dir)
729751
}
730752
}
731753

lightning-persister/src/fs_store/v1.rs

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//! Objects related to [`FilesystemStore`] live here.
22
use crate::fs_store::common::FilesystemStoreState;
33

4-
use lightning::util::persist::{KVStoreSync, MigratableKVStore};
4+
use lightning::util::persist::{KVStoreSync, MigratableKVStoreSync};
55

66
use std::path::PathBuf;
77

@@ -88,15 +88,27 @@ impl KVStore for FilesystemStore {
8888
}
8989
}
9090

91-
impl MigratableKVStore for FilesystemStore {
91+
impl MigratableKVStoreSync for FilesystemStore {
9292
fn list_all_keys(&self) -> Result<Vec<(String, String, String)>, lightning::io::Error> {
9393
self.state.list_all_keys_impl(false)
9494
}
9595
}
9696

97+
#[cfg(feature = "tokio")]
98+
impl lightning::util::persist::MigratableKVStore for FilesystemStore {
99+
fn list_all_keys(
100+
&self,
101+
) -> impl Future<Output = Result<Vec<(String, String, String)>, lightning::io::Error>> + 'static + Send
102+
{
103+
self.state.list_all_keys_async(false)
104+
}
105+
}
106+
97107
#[cfg(test)]
98108
mod tests {
99109
use super::*;
110+
#[cfg(feature = "tokio")]
111+
use crate::test_utils::do_test_data_migration_async;
100112
use crate::test_utils::{
101113
do_read_write_remove_list_persist, do_test_data_migration, do_test_store,
102114
};
@@ -221,6 +233,20 @@ mod tests {
221233
do_test_data_migration(&mut source_store, &mut target_store);
222234
}
223235

236+
#[cfg(feature = "tokio")]
237+
#[tokio::test]
238+
async fn test_data_migration_async() {
239+
let mut source_temp_path = std::env::temp_dir();
240+
source_temp_path.push("test_data_migration_source_async");
241+
let source_store = FilesystemStore::new(source_temp_path);
242+
243+
let mut target_temp_path = std::env::temp_dir();
244+
target_temp_path.push("test_data_migration_target_async");
245+
let target_store = FilesystemStore::new(target_temp_path);
246+
247+
do_test_data_migration_async(&source_store, &target_store).await;
248+
}
249+
224250
#[test]
225251
fn test_if_monitors_is_not_dir() {
226252
let store = FilesystemStore::new("test_monitors_is_not_dir".into());

lightning-persister/src/fs_store/v2.rs

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use crate::fs_store::common::{
44
};
55

66
use lightning::util::persist::{
7-
KVStoreSync, MigratableKVStore, PageToken, PaginatedKVStoreSync, PaginatedListResponse,
7+
KVStoreSync, MigratableKVStoreSync, PageToken, PaginatedKVStoreSync, PaginatedListResponse,
88
};
99

1010
use std::fs;
@@ -315,12 +315,22 @@ impl PaginatedKVStore for FilesystemStoreV2 {
315315
}
316316
}
317317

318-
impl MigratableKVStore for FilesystemStoreV2 {
318+
impl MigratableKVStoreSync for FilesystemStoreV2 {
319319
fn list_all_keys(&self) -> Result<Vec<(String, String, String)>, lightning::io::Error> {
320320
self.inner.list_all_keys_impl(true)
321321
}
322322
}
323323

324+
#[cfg(feature = "tokio")]
325+
impl lightning::util::persist::MigratableKVStore for FilesystemStoreV2 {
326+
fn list_all_keys(
327+
&self,
328+
) -> impl Future<Output = Result<Vec<(String, String, String)>, lightning::io::Error>> + 'static + Send
329+
{
330+
self.inner.list_all_keys_async(true)
331+
}
332+
}
333+
324334
/// Formats a page token from mtime (millis since epoch) and key.
325335
pub(crate) fn format_page_token(mtime_millis: u64, key: &str) -> String {
326336
format!("{mtime_millis:016}:{key}")
@@ -351,6 +361,8 @@ pub(crate) fn parse_page_token(token: &str) -> lightning::io::Result<(u64, Strin
351361
mod tests {
352362
use super::*;
353363
use crate::fs_store::common::EMPTY_NAMESPACE_DIR;
364+
#[cfg(feature = "tokio")]
365+
use crate::test_utils::do_test_data_migration_async;
354366
use crate::test_utils::{
355367
do_read_write_remove_list_persist, do_test_data_migration, do_test_store,
356368
};
@@ -445,6 +457,20 @@ mod tests {
445457
do_test_data_migration(&mut source_store, &mut target_store);
446458
}
447459

460+
#[cfg(feature = "tokio")]
461+
#[tokio::test]
462+
async fn test_data_migration_async() {
463+
let mut source_temp_path = std::env::temp_dir();
464+
source_temp_path.push("test_data_migration_source_async_v2");
465+
let source_store = FilesystemStoreV2::new(source_temp_path).unwrap();
466+
467+
let mut target_temp_path = std::env::temp_dir();
468+
target_temp_path.push("test_data_migration_target_async_v2");
469+
let target_store = FilesystemStoreV2::new(target_temp_path).unwrap();
470+
471+
do_test_data_migration_async(&source_store, &target_store).await;
472+
}
473+
448474
#[test]
449475
fn test_filesystem_store_v2() {
450476
// Create the nodes, giving them FilesystemStoreV2s for data stores.

0 commit comments

Comments
 (0)