Skip to content

Commit 94cff3b

Browse files
committed
Add async migratable filesystem stores
Allow the filesystem stores to use the async migration helper and cover both store versions with async migration tests. Co-Authored-By: HAL 9000
1 parent 9b11fda commit 94cff3b

4 files changed

Lines changed: 216 additions & 93 deletions

File tree

lightning-persister/src/fs_store/common.rs

Lines changed: 105 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,94 @@ impl FilesystemStoreInner {
470470

471471
Ok(keys)
472472
}
473+
474+
fn list_all_keys(
475+
&self, use_empty_ns_dir: bool,
476+
) -> Result<Vec<(String, String, String)>, lightning::io::Error> {
477+
let prefixed_dest = &self.data_dir;
478+
if !prefixed_dest.exists() {
479+
return Ok(Vec::new());
480+
}
481+
482+
let mut keys = Vec::new();
483+
484+
'primary_loop: for primary_entry in fs::read_dir(prefixed_dest)? {
485+
let primary_entry = primary_entry?;
486+
let primary_path = primary_entry.path();
487+
if dir_entry_is_store_artifact(&primary_path) {
488+
continue 'primary_loop;
489+
}
490+
491+
if dir_entry_is_key(&primary_entry)? {
492+
let primary_namespace = String::new();
493+
let secondary_namespace = String::new();
494+
let key = get_key_from_dir_entry_path(&primary_path, prefixed_dest, false)?;
495+
keys.push((primary_namespace, secondary_namespace, key));
496+
continue 'primary_loop;
497+
}
498+
499+
// The primary_entry is actually also a directory.
500+
'secondary_loop: for secondary_entry in fs::read_dir(&primary_path)? {
501+
let secondary_entry = secondary_entry?;
502+
let secondary_path = secondary_entry.path();
503+
if dir_entry_is_store_artifact(&secondary_path) {
504+
continue 'secondary_loop;
505+
}
506+
507+
if dir_entry_is_key(&secondary_entry)? {
508+
let primary_namespace = get_key_from_dir_entry_path(
509+
&primary_path,
510+
prefixed_dest,
511+
use_empty_ns_dir,
512+
)?;
513+
let secondary_namespace = String::new();
514+
let key = get_key_from_dir_entry_path(&secondary_path, &primary_path, false)?;
515+
keys.push((primary_namespace, secondary_namespace, key));
516+
continue 'secondary_loop;
517+
}
518+
519+
// The secondary_entry is actually also a directory.
520+
for tertiary_entry in fs::read_dir(&secondary_path)? {
521+
let tertiary_entry = tertiary_entry?;
522+
let tertiary_path = tertiary_entry.path();
523+
if dir_entry_is_store_artifact(&tertiary_path) {
524+
continue;
525+
}
526+
527+
if dir_entry_is_key(&tertiary_entry)? {
528+
let primary_namespace = get_key_from_dir_entry_path(
529+
&primary_path,
530+
prefixed_dest,
531+
use_empty_ns_dir,
532+
)?;
533+
let secondary_namespace = get_key_from_dir_entry_path(
534+
&secondary_path,
535+
&primary_path,
536+
use_empty_ns_dir,
537+
)?;
538+
let key =
539+
get_key_from_dir_entry_path(&tertiary_path, &secondary_path, false)?;
540+
keys.push((primary_namespace, secondary_namespace, key));
541+
} else {
542+
debug_assert!(
543+
false,
544+
"Failed to list keys of path {}: only two levels of namespaces are supported",
545+
PrintableString(tertiary_path.to_str().unwrap_or_default())
546+
);
547+
let msg = format!(
548+
"Failed to list keys of path {}: only two levels of namespaces are supported",
549+
PrintableString(tertiary_path.to_str().unwrap_or_default())
550+
);
551+
return Err(lightning::io::Error::new(
552+
lightning::io::ErrorKind::Other,
553+
msg,
554+
));
555+
}
556+
}
557+
}
558+
}
559+
Ok(keys)
560+
}
473561
}
474562

475563
impl FilesystemStoreState {
@@ -640,92 +728,26 @@ impl FilesystemStoreState {
640728
}
641729
}
642730

643-
pub(crate) fn list_all_keys_impl(
731+
#[cfg(feature = "tokio")]
732+
pub(crate) fn list_all_keys_async(
644733
&self, use_empty_ns_dir: bool,
645-
) -> Result<Vec<(String, String, String)>, lightning::io::Error> {
646-
let prefixed_dest = &self.inner.data_dir;
647-
if !prefixed_dest.exists() {
648-
return Ok(Vec::new());
649-
}
650-
651-
let mut keys = Vec::new();
652-
653-
'primary_loop: for primary_entry in fs::read_dir(prefixed_dest)? {
654-
let primary_entry = primary_entry?;
655-
let primary_path = primary_entry.path();
656-
if dir_entry_is_store_artifact(&primary_path) {
657-
continue 'primary_loop;
658-
}
659-
660-
if dir_entry_is_key(&primary_entry)? {
661-
let primary_namespace = String::new();
662-
let secondary_namespace = String::new();
663-
let key = get_key_from_dir_entry_path(&primary_path, prefixed_dest, false)?;
664-
keys.push((primary_namespace, secondary_namespace, key));
665-
continue 'primary_loop;
666-
}
667-
668-
// The primary_entry is actually also a directory.
669-
'secondary_loop: for secondary_entry in fs::read_dir(&primary_path)? {
670-
let secondary_entry = secondary_entry?;
671-
let secondary_path = secondary_entry.path();
672-
if dir_entry_is_store_artifact(&secondary_path) {
673-
continue 'secondary_loop;
674-
}
675-
676-
if dir_entry_is_key(&secondary_entry)? {
677-
let primary_namespace = get_key_from_dir_entry_path(
678-
&primary_path,
679-
prefixed_dest,
680-
use_empty_ns_dir,
681-
)?;
682-
let secondary_namespace = String::new();
683-
let key = get_key_from_dir_entry_path(&secondary_path, &primary_path, false)?;
684-
keys.push((primary_namespace, secondary_namespace, key));
685-
continue 'secondary_loop;
686-
}
687-
688-
// The secondary_entry is actually also a directory.
689-
for tertiary_entry in fs::read_dir(&secondary_path)? {
690-
let tertiary_entry = tertiary_entry?;
691-
let tertiary_path = tertiary_entry.path();
692-
if dir_entry_is_store_artifact(&tertiary_path) {
693-
continue;
694-
}
734+
) -> impl Future<Output = Result<Vec<(String, String, String)>, lightning::io::Error>> + 'static + Send
735+
{
736+
let this = Arc::clone(&self.inner);
695737

696-
if dir_entry_is_key(&tertiary_entry)? {
697-
let primary_namespace = get_key_from_dir_entry_path(
698-
&primary_path,
699-
prefixed_dest,
700-
use_empty_ns_dir,
701-
)?;
702-
let secondary_namespace = get_key_from_dir_entry_path(
703-
&secondary_path,
704-
&primary_path,
705-
use_empty_ns_dir,
706-
)?;
707-
let key =
708-
get_key_from_dir_entry_path(&tertiary_path, &secondary_path, false)?;
709-
keys.push((primary_namespace, secondary_namespace, key));
710-
} else {
711-
debug_assert!(
712-
false,
713-
"Failed to list keys of path {}: only two levels of namespaces are supported",
714-
PrintableString(tertiary_path.to_str().unwrap_or_default())
715-
);
716-
let msg = format!(
717-
"Failed to list keys of path {}: only two levels of namespaces are supported",
718-
PrintableString(tertiary_path.to_str().unwrap_or_default())
719-
);
720-
return Err(lightning::io::Error::new(
721-
lightning::io::ErrorKind::Other,
722-
msg,
723-
));
724-
}
725-
}
726-
}
738+
async move {
739+
tokio::task::spawn_blocking(move || this.list_all_keys(use_empty_ns_dir))
740+
.await
741+
.unwrap_or_else(|e| {
742+
Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))
743+
})
727744
}
728-
Ok(keys)
745+
}
746+
747+
pub(crate) fn list_all_keys_impl(
748+
&self, use_empty_ns_dir: bool,
749+
) -> Result<Vec<(String, String, String)>, lightning::io::Error> {
750+
self.inner.list_all_keys(use_empty_ns_dir)
729751
}
730752
}
731753

lightning-persister/src/fs_store/v1.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,21 @@ impl MigratableKVStoreSync for FilesystemStore {
9494
}
9595
}
9696

97+
#[cfg(feature = "tokio")]
98+
impl lightning::util::persist::MigratableKVStore for FilesystemStore {
99+
fn list_all_keys(
100+
&self,
101+
) -> impl Future<Output = Result<Vec<(String, String, String)>, lightning::io::Error>> + 'static + Send
102+
{
103+
self.state.list_all_keys_async(false)
104+
}
105+
}
106+
97107
#[cfg(test)]
98108
mod tests {
99109
use super::*;
110+
#[cfg(feature = "tokio")]
111+
use crate::test_utils::do_test_data_migration_async;
100112
use crate::test_utils::{
101113
do_read_write_remove_list_persist, do_test_data_migration, do_test_store,
102114
};
@@ -221,6 +233,20 @@ mod tests {
221233
do_test_data_migration(&mut source_store, &mut target_store);
222234
}
223235

236+
#[cfg(feature = "tokio")]
237+
#[tokio::test]
238+
async fn test_data_migration_async() {
239+
let mut source_temp_path = std::env::temp_dir();
240+
source_temp_path.push("test_data_migration_source_async");
241+
let source_store = FilesystemStore::new(source_temp_path);
242+
243+
let mut target_temp_path = std::env::temp_dir();
244+
target_temp_path.push("test_data_migration_target_async");
245+
let target_store = FilesystemStore::new(target_temp_path);
246+
247+
do_test_data_migration_async(&source_store, &target_store).await;
248+
}
249+
224250
#[test]
225251
fn test_if_monitors_is_not_dir() {
226252
let store = FilesystemStore::new("test_monitors_is_not_dir".into());

lightning-persister/src/fs_store/v2.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,16 @@ impl MigratableKVStoreSync for FilesystemStoreV2 {
321321
}
322322
}
323323

324+
#[cfg(feature = "tokio")]
325+
impl lightning::util::persist::MigratableKVStore for FilesystemStoreV2 {
326+
fn list_all_keys(
327+
&self,
328+
) -> impl Future<Output = Result<Vec<(String, String, String)>, lightning::io::Error>> + 'static + Send
329+
{
330+
self.inner.list_all_keys_async(true)
331+
}
332+
}
333+
324334
/// Formats a page token from mtime (millis since epoch) and key.
325335
pub(crate) fn format_page_token(mtime_millis: u64, key: &str) -> String {
326336
format!("{mtime_millis:016}:{key}")
@@ -351,6 +361,8 @@ pub(crate) fn parse_page_token(token: &str) -> lightning::io::Result<(u64, Strin
351361
mod tests {
352362
use super::*;
353363
use crate::fs_store::common::EMPTY_NAMESPACE_DIR;
364+
#[cfg(feature = "tokio")]
365+
use crate::test_utils::do_test_data_migration_async;
354366
use crate::test_utils::{
355367
do_read_write_remove_list_persist, do_test_data_migration, do_test_store,
356368
};
@@ -445,6 +457,20 @@ mod tests {
445457
do_test_data_migration(&mut source_store, &mut target_store);
446458
}
447459

460+
#[cfg(feature = "tokio")]
461+
#[tokio::test]
462+
async fn test_data_migration_async() {
463+
let mut source_temp_path = std::env::temp_dir();
464+
source_temp_path.push("test_data_migration_source_async_v2");
465+
let source_store = FilesystemStoreV2::new(source_temp_path).unwrap();
466+
467+
let mut target_temp_path = std::env::temp_dir();
468+
target_temp_path.push("test_data_migration_target_async_v2");
469+
let target_store = FilesystemStoreV2::new(target_temp_path).unwrap();
470+
471+
do_test_data_migration_async(&source_store, &target_store).await;
472+
}
473+
448474
#[test]
449475
fn test_filesystem_store_v2() {
450476
// Create the nodes, giving them FilesystemStoreV2s for data stores.

lightning-persister/src/test_utils.rs

Lines changed: 59 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -59,15 +59,11 @@ pub(crate) fn do_read_write_remove_list_persist<K: KVStoreSync + RefUnwindSafe>(
5959
assert_eq!(listed_keys.len(), 0);
6060
}
6161

62-
pub(crate) fn do_test_data_migration<S: MigratableKVStoreSync, T: MigratableKVStoreSync>(
63-
source_store: &mut S, target_store: &mut T,
64-
) {
65-
// We fill the source with some bogus keys.
66-
let dummy_data = vec![42u8; 32];
62+
fn data_migration_test_keys() -> Vec<(String, String, String)> {
6763
let num_primary_namespaces = 3;
6864
let num_secondary_namespaces = 3;
6965
let num_keys = 3;
70-
let mut expected_keys = Vec::new();
66+
let mut keys = Vec::new();
7167
for i in 0..num_primary_namespaces {
7268
let primary_namespace = if i == 0 {
7369
String::new()
@@ -83,13 +79,25 @@ pub(crate) fn do_test_data_migration<S: MigratableKVStoreSync, T: MigratableKVSt
8379
for k in 0..num_keys {
8480
let key =
8581
format!("testkey{}", KVSTORE_NAMESPACE_KEY_ALPHABET.chars().nth(k).unwrap());
86-
source_store
87-
.write(&primary_namespace, &secondary_namespace, &key, dummy_data.clone())
88-
.unwrap();
89-
expected_keys.push((primary_namespace.clone(), secondary_namespace.clone(), key));
82+
keys.push((primary_namespace.clone(), secondary_namespace.clone(), key));
9083
}
9184
}
9285
}
86+
87+
keys
88+
}
89+
90+
pub(crate) fn do_test_data_migration<S: MigratableKVStoreSync, T: MigratableKVStoreSync>(
91+
source_store: &mut S, target_store: &mut T,
92+
) {
93+
// We fill the source with some bogus keys.
94+
let dummy_data = vec![42u8; 32];
95+
let mut expected_keys = data_migration_test_keys();
96+
for (primary_namespace, secondary_namespace, key) in &expected_keys {
97+
source_store
98+
.write(primary_namespace, secondary_namespace, key, dummy_data.clone())
99+
.unwrap();
100+
}
93101
expected_keys.sort();
94102
expected_keys.dedup();
95103

@@ -108,6 +116,47 @@ pub(crate) fn do_test_data_migration<S: MigratableKVStoreSync, T: MigratableKVSt
108116
}
109117
}
110118

119+
#[cfg(feature = "tokio")]
120+
pub(crate) async fn do_test_data_migration_async<
121+
S: lightning::util::persist::MigratableKVStore,
122+
T: lightning::util::persist::MigratableKVStore,
123+
>(
124+
source_store: &S, target_store: &T,
125+
) {
126+
use lightning::util::persist::{migrate_kv_store_data_async, KVStore, MigratableKVStore};
127+
128+
// We fill the source with some bogus keys.
129+
let dummy_data = vec![42u8; 32];
130+
let mut expected_keys = data_migration_test_keys();
131+
for (primary_namespace, secondary_namespace, key) in &expected_keys {
132+
KVStore::write(
133+
source_store,
134+
primary_namespace,
135+
secondary_namespace,
136+
key,
137+
dummy_data.clone(),
138+
)
139+
.await
140+
.unwrap();
141+
}
142+
expected_keys.sort();
143+
expected_keys.dedup();
144+
145+
let mut source_list = MigratableKVStore::list_all_keys(source_store).await.unwrap();
146+
source_list.sort();
147+
assert_eq!(source_list, expected_keys);
148+
149+
migrate_kv_store_data_async(source_store, target_store).await.unwrap();
150+
151+
let mut target_list = MigratableKVStore::list_all_keys(target_store).await.unwrap();
152+
target_list.sort();
153+
assert_eq!(target_list, expected_keys);
154+
155+
for (p, s, k) in expected_keys.iter() {
156+
assert_eq!(KVStore::read(target_store, p, s, k).await.unwrap(), dummy_data.clone());
157+
}
158+
}
159+
111160
// Integration-test the given KVStore implementation. Test relaying a few payments and check that
112161
// the persisted data is updated the appropriate number of times.
113162
pub(crate) fn do_test_store<K: KVStoreSync + Sync>(store_0: &K, store_1: &K) {

0 commit comments

Comments
 (0)