Skip to content

Commit 3c86f2f

Browse files
committed
Drop DelayedStore and associated backup test
These were created to test that our backup store does not impact the primary store writes but the boilerplate appears too much for the functionality being tested.
1 parent 6eea996 commit 3c86f2f

2 files changed

Lines changed: 4 additions & 215 deletions

File tree

src/io/test_utils.rs

Lines changed: 1 addition & 169 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,7 @@ use std::collections::{hash_map, HashMap};
99
use std::future::Future;
1010
use std::panic::RefUnwindSafe;
1111
use std::path::PathBuf;
12-
use std::sync::{Arc, Mutex};
13-
use std::time::Duration;
12+
use std::sync::Mutex;
1413

1514
use lightning::events::ClosureReason;
1615
use lightning::ln::functional_test_utils::{
@@ -26,8 +25,6 @@ use lightning::{check_closed_broadcast, io};
2625
use rand::distr::Alphanumeric;
2726
use rand::{rng, Rng};
2827

29-
use crate::runtime::Runtime;
30-
3128
type TestMonitorUpdatePersister<'a, K> = MonitorUpdatingPersister<
3229
&'a K,
3330
&'a test_utils::TestLogger,
@@ -355,168 +352,3 @@ pub(crate) fn do_test_store<K: KVStoreSync + Sync>(store_0: &K, store_1: &K) {
355352
// Make sure everything is persisted as expected after close.
356353
check_persisted_data!(persister_0_max_pending_updates * 2 * EXPECTED_UPDATES_PER_PAYMENT + 1);
357354
}
358-
359-
struct DelayedStoreInner {
360-
storage: Mutex<HashMap<String, Vec<u8>>>,
361-
delay: Duration,
362-
}
363-
364-
impl DelayedStoreInner {
365-
fn new(delay: Duration) -> Self {
366-
Self { storage: Mutex::new(HashMap::new()), delay }
367-
}
368-
369-
fn make_key(pn: &str, sn: &str, key: &str) -> String {
370-
format!("{}/{}/{}", pn, sn, key)
371-
}
372-
373-
async fn read_internal(
374-
&self, primary_namespace: String, secondary_namespace: String, key: String,
375-
) -> Result<Vec<u8>, io::Error> {
376-
tokio::time::sleep(self.delay).await;
377-
378-
let full_key = Self::make_key(&primary_namespace, &secondary_namespace, &key);
379-
let storage = self.storage.lock().unwrap();
380-
storage
381-
.get(&full_key)
382-
.cloned()
383-
.ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "key not found"))
384-
}
385-
386-
async fn write_internal(
387-
&self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec<u8>,
388-
) -> Result<(), io::Error> {
389-
tokio::time::sleep(self.delay).await;
390-
391-
let full_key = Self::make_key(&primary_namespace, &secondary_namespace, &key);
392-
let mut storage = self.storage.lock().unwrap();
393-
storage.insert(full_key, buf);
394-
Ok(())
395-
}
396-
397-
async fn remove_internal(
398-
&self, primary_namespace: String, secondary_namespace: String, key: String,
399-
) -> Result<(), io::Error> {
400-
tokio::time::sleep(self.delay).await;
401-
402-
let full_key = Self::make_key(&primary_namespace, &secondary_namespace, &key);
403-
let mut storage = self.storage.lock().unwrap();
404-
storage.remove(&full_key);
405-
Ok(())
406-
}
407-
408-
async fn list_internal(
409-
&self, primary_namespace: String, secondary_namespace: String,
410-
) -> Result<Vec<String>, io::Error> {
411-
tokio::time::sleep(self.delay).await;
412-
413-
let prefix = format!("{}/{}/", primary_namespace, secondary_namespace);
414-
let storage = self.storage.lock().unwrap();
415-
Ok(storage
416-
.keys()
417-
.filter(|k| k.starts_with(&prefix))
418-
.map(|k| k.strip_prefix(&prefix).unwrap().to_string())
419-
.collect())
420-
}
421-
}
422-
423-
pub struct DelayedStore {
424-
inner: Arc<DelayedStoreInner>,
425-
runtime: Arc<Runtime>,
426-
}
427-
428-
impl DelayedStore {
429-
pub fn new(delay_ms: u64, runtime: Arc<Runtime>) -> Self {
430-
Self { inner: Arc::new(DelayedStoreInner::new(Duration::from_millis(delay_ms))), runtime }
431-
}
432-
}
433-
434-
impl KVStore for DelayedStore {
435-
fn read(
436-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
437-
) -> impl Future<Output = Result<Vec<u8>, io::Error>> + 'static + Send {
438-
let inner = Arc::clone(&self.inner);
439-
let pn = primary_namespace.to_string();
440-
let sn = secondary_namespace.to_string();
441-
let key = key.to_string();
442-
443-
async move { inner.read_internal(pn, sn, key).await }
444-
}
445-
446-
fn write(
447-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
448-
) -> impl Future<Output = Result<(), io::Error>> + 'static + Send {
449-
let inner = Arc::clone(&self.inner);
450-
let pn = primary_namespace.to_string();
451-
let sn = secondary_namespace.to_string();
452-
let key = key.to_string();
453-
454-
async move { inner.write_internal(pn, sn, key, buf).await }
455-
}
456-
457-
fn remove(
458-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool,
459-
) -> impl Future<Output = Result<(), io::Error>> + 'static + Send {
460-
let inner = Arc::clone(&self.inner);
461-
let pn = primary_namespace.to_string();
462-
let sn = secondary_namespace.to_string();
463-
let key = key.to_string();
464-
465-
async move { inner.remove_internal(pn, sn, key).await }
466-
}
467-
468-
fn list(
469-
&self, primary_namespace: &str, secondary_namespace: &str,
470-
) -> impl Future<Output = Result<Vec<String>, io::Error>> + 'static + Send {
471-
let inner = Arc::clone(&self.inner);
472-
let pn = primary_namespace.to_string();
473-
let sn = secondary_namespace.to_string();
474-
475-
async move { inner.list_internal(pn, sn).await }
476-
}
477-
}
478-
479-
impl KVStoreSync for DelayedStore {
480-
fn read(
481-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
482-
) -> Result<Vec<u8>, io::Error> {
483-
let inner = Arc::clone(&self.inner);
484-
let pn = primary_namespace.to_string();
485-
let sn = secondary_namespace.to_string();
486-
let key = key.to_string();
487-
488-
self.runtime.block_on(async move { inner.read_internal(pn, sn, key).await })
489-
}
490-
491-
fn write(
492-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
493-
) -> Result<(), io::Error> {
494-
let inner = Arc::clone(&self.inner);
495-
let pn = primary_namespace.to_string();
496-
let sn = secondary_namespace.to_string();
497-
let key = key.to_string();
498-
499-
self.runtime.block_on(async move { inner.write_internal(pn, sn, key, buf).await })
500-
}
501-
502-
fn remove(
503-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool,
504-
) -> Result<(), io::Error> {
505-
let inner = Arc::clone(&self.inner);
506-
let pn = primary_namespace.to_string();
507-
let sn = secondary_namespace.to_string();
508-
let key = key.to_string();
509-
510-
self.runtime.block_on(async move { inner.remove_internal(pn, sn, key).await })
511-
}
512-
513-
fn list(
514-
&self, primary_namespace: &str, secondary_namespace: &str,
515-
) -> Result<Vec<String>, io::Error> {
516-
let inner = Arc::clone(&self.inner);
517-
let pn = primary_namespace.to_string();
518-
let sn = secondary_namespace.to_string();
519-
520-
self.runtime.block_on(async move { inner.list_internal(pn, sn).await })
521-
}
522-
}

src/io/tier_store.rs

Lines changed: 3 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -701,9 +701,7 @@ mod tests {
701701
};
702702
use lightning_persister::fs_store::FilesystemStore;
703703

704-
use crate::io::test_utils::{
705-
do_read_write_remove_list_persist, random_storage_path, DelayedStore,
706-
};
704+
use crate::io::test_utils::{do_read_write_remove_list_persist, random_storage_path};
707705
use crate::io::tier_store::TierStore;
708706
use crate::logger::Logger;
709707
use crate::runtime::Runtime;
@@ -873,47 +871,6 @@ mod tests {
873871
assert_eq!(backup_read_cm.unwrap(), data);
874872
}
875873

876-
#[test]
877-
fn backup_overflow_doesnt_fail_writes() {
878-
let base_dir = random_storage_path();
879-
let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned();
880-
let logger = Arc::new(Logger::new_fs_writer(log_path.clone(), Level::Trace).unwrap());
881-
let runtime = Arc::new(Runtime::new(Arc::clone(&logger)).unwrap());
882-
883-
let _cleanup = CleanupDir(base_dir.clone());
884-
885-
let primary_store: Arc<DynStore> =
886-
Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("primary"))));
887-
let mut tier =
888-
setup_tier_store(Arc::clone(&primary_store), Arc::clone(&logger), Arc::clone(&runtime));
889-
890-
let backup_store: Arc<DynStore> =
891-
Arc::new(DynStoreWrapper(DelayedStore::new(100, runtime)));
892-
tier.set_backup_store(Arc::clone(&backup_store));
893-
894-
let data = vec![42u8; 32];
895-
896-
let key = CHANNEL_MANAGER_PERSISTENCE_KEY;
897-
for i in 0..=10 {
898-
let result = KVStoreSync::write(
899-
&tier,
900-
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
901-
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
902-
&format!("{}_{}", key, i),
903-
data.clone(),
904-
);
905-
906-
assert!(result.is_ok(), "Write {} should succeed", i);
907-
}
908-
909-
// Check logs for backup queue overflow message
910-
let log_contents = std::fs::read_to_string(&log_path).unwrap();
911-
assert!(
912-
log_contents.contains("Backup queue is full"),
913-
"Logs should contain backup queue overflow message"
914-
);
915-
}
916-
917874
#[test]
918875
fn lazy_removal() {
919876
let base_dir = random_storage_path();
@@ -929,7 +886,7 @@ mod tests {
929886
setup_tier_store(Arc::clone(&primary_store), Arc::clone(&logger), Arc::clone(&runtime));
930887

931888
let backup_store: Arc<DynStore> =
932-
Arc::new(DynStoreWrapper(DelayedStore::new(100, runtime)));
889+
Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("backup"))));
933890
tier.set_backup_store(Arc::clone(&backup_store));
934891

935892
let data = vec![42u8; 32];
@@ -944,7 +901,7 @@ mod tests {
944901
);
945902
assert!(write_result.is_ok(), "Write should succeed");
946903

947-
thread::sleep(Duration::from_millis(10));
904+
thread::sleep(Duration::from_millis(100));
948905

949906
assert_eq!(
950907
KVStoreSync::read(

0 commit comments

Comments
 (0)