Skip to content

Commit bd4e79d

Browse files
committed
Drop DelayedStore and associated backup test
These were created to test that our backup store does not impact the primary store writes but the boilerplate appears too much for the functionality being tested.
1 parent 23815f3 commit bd4e79d

File tree

2 files changed

+4
-215
lines changed

2 files changed

+4
-215
lines changed

src/io/test_utils.rs

Lines changed: 1 addition & 169 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,7 @@ use std::collections::{hash_map, HashMap};
99
use std::future::Future;
1010
use std::panic::RefUnwindSafe;
1111
use std::path::PathBuf;
12-
use std::sync::{Arc, Mutex};
13-
use std::time::Duration;
12+
use std::sync::Mutex;
1413

1514
use lightning::events::ClosureReason;
1615
use lightning::io;
@@ -27,8 +26,6 @@ use lightning::util::test_utils;
2726
use rand::distr::Alphanumeric;
2827
use rand::{rng, Rng};
2928

30-
use crate::runtime::Runtime;
31-
3229
type TestMonitorUpdatePersister<'a, K> = MonitorUpdatingPersister<
3330
&'a K,
3431
&'a test_utils::TestLogger,
@@ -356,168 +353,3 @@ pub(crate) fn do_test_store<K: KVStoreSync + Sync>(store_0: &K, store_1: &K) {
356353
// Make sure everything is persisted as expected after close.
357354
check_persisted_data!(persister_0_max_pending_updates * 2 * EXPECTED_UPDATES_PER_PAYMENT + 1);
358355
}
359-
360-
struct DelayedStoreInner {
361-
storage: Mutex<HashMap<String, Vec<u8>>>,
362-
delay: Duration,
363-
}
364-
365-
impl DelayedStoreInner {
366-
fn new(delay: Duration) -> Self {
367-
Self { storage: Mutex::new(HashMap::new()), delay }
368-
}
369-
370-
fn make_key(pn: &str, sn: &str, key: &str) -> String {
371-
format!("{}/{}/{}", pn, sn, key)
372-
}
373-
374-
async fn read_internal(
375-
&self, primary_namespace: String, secondary_namespace: String, key: String,
376-
) -> Result<Vec<u8>, io::Error> {
377-
tokio::time::sleep(self.delay).await;
378-
379-
let full_key = Self::make_key(&primary_namespace, &secondary_namespace, &key);
380-
let storage = self.storage.lock().unwrap();
381-
storage
382-
.get(&full_key)
383-
.cloned()
384-
.ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "key not found"))
385-
}
386-
387-
async fn write_internal(
388-
&self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec<u8>,
389-
) -> Result<(), io::Error> {
390-
tokio::time::sleep(self.delay).await;
391-
392-
let full_key = Self::make_key(&primary_namespace, &secondary_namespace, &key);
393-
let mut storage = self.storage.lock().unwrap();
394-
storage.insert(full_key, buf);
395-
Ok(())
396-
}
397-
398-
async fn remove_internal(
399-
&self, primary_namespace: String, secondary_namespace: String, key: String,
400-
) -> Result<(), io::Error> {
401-
tokio::time::sleep(self.delay).await;
402-
403-
let full_key = Self::make_key(&primary_namespace, &secondary_namespace, &key);
404-
let mut storage = self.storage.lock().unwrap();
405-
storage.remove(&full_key);
406-
Ok(())
407-
}
408-
409-
async fn list_internal(
410-
&self, primary_namespace: String, secondary_namespace: String,
411-
) -> Result<Vec<String>, io::Error> {
412-
tokio::time::sleep(self.delay).await;
413-
414-
let prefix = format!("{}/{}/", primary_namespace, secondary_namespace);
415-
let storage = self.storage.lock().unwrap();
416-
Ok(storage
417-
.keys()
418-
.filter(|k| k.starts_with(&prefix))
419-
.map(|k| k.strip_prefix(&prefix).unwrap().to_string())
420-
.collect())
421-
}
422-
}
423-
424-
pub struct DelayedStore {
425-
inner: Arc<DelayedStoreInner>,
426-
runtime: Arc<Runtime>,
427-
}
428-
429-
impl DelayedStore {
430-
pub fn new(delay_ms: u64, runtime: Arc<Runtime>) -> Self {
431-
Self { inner: Arc::new(DelayedStoreInner::new(Duration::from_millis(delay_ms))), runtime }
432-
}
433-
}
434-
435-
impl KVStore for DelayedStore {
436-
fn read(
437-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
438-
) -> impl Future<Output = Result<Vec<u8>, io::Error>> + 'static + Send {
439-
let inner = Arc::clone(&self.inner);
440-
let pn = primary_namespace.to_string();
441-
let sn = secondary_namespace.to_string();
442-
let key = key.to_string();
443-
444-
async move { inner.read_internal(pn, sn, key).await }
445-
}
446-
447-
fn write(
448-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
449-
) -> impl Future<Output = Result<(), io::Error>> + 'static + Send {
450-
let inner = Arc::clone(&self.inner);
451-
let pn = primary_namespace.to_string();
452-
let sn = secondary_namespace.to_string();
453-
let key = key.to_string();
454-
455-
async move { inner.write_internal(pn, sn, key, buf).await }
456-
}
457-
458-
fn remove(
459-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool,
460-
) -> impl Future<Output = Result<(), io::Error>> + 'static + Send {
461-
let inner = Arc::clone(&self.inner);
462-
let pn = primary_namespace.to_string();
463-
let sn = secondary_namespace.to_string();
464-
let key = key.to_string();
465-
466-
async move { inner.remove_internal(pn, sn, key).await }
467-
}
468-
469-
fn list(
470-
&self, primary_namespace: &str, secondary_namespace: &str,
471-
) -> impl Future<Output = Result<Vec<String>, io::Error>> + 'static + Send {
472-
let inner = Arc::clone(&self.inner);
473-
let pn = primary_namespace.to_string();
474-
let sn = secondary_namespace.to_string();
475-
476-
async move { inner.list_internal(pn, sn).await }
477-
}
478-
}
479-
480-
impl KVStoreSync for DelayedStore {
481-
fn read(
482-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
483-
) -> Result<Vec<u8>, io::Error> {
484-
let inner = Arc::clone(&self.inner);
485-
let pn = primary_namespace.to_string();
486-
let sn = secondary_namespace.to_string();
487-
let key = key.to_string();
488-
489-
self.runtime.block_on(async move { inner.read_internal(pn, sn, key).await })
490-
}
491-
492-
fn write(
493-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
494-
) -> Result<(), io::Error> {
495-
let inner = Arc::clone(&self.inner);
496-
let pn = primary_namespace.to_string();
497-
let sn = secondary_namespace.to_string();
498-
let key = key.to_string();
499-
500-
self.runtime.block_on(async move { inner.write_internal(pn, sn, key, buf).await })
501-
}
502-
503-
fn remove(
504-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool,
505-
) -> Result<(), io::Error> {
506-
let inner = Arc::clone(&self.inner);
507-
let pn = primary_namespace.to_string();
508-
let sn = secondary_namespace.to_string();
509-
let key = key.to_string();
510-
511-
self.runtime.block_on(async move { inner.remove_internal(pn, sn, key).await })
512-
}
513-
514-
fn list(
515-
&self, primary_namespace: &str, secondary_namespace: &str,
516-
) -> Result<Vec<String>, io::Error> {
517-
let inner = Arc::clone(&self.inner);
518-
let pn = primary_namespace.to_string();
519-
let sn = secondary_namespace.to_string();
520-
521-
self.runtime.block_on(async move { inner.list_internal(pn, sn).await })
522-
}
523-
}

src/io/tier_store.rs

Lines changed: 3 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -701,9 +701,7 @@ mod tests {
701701
use lightning_persister::fs_store::v1::FilesystemStore;
702702

703703
use super::*;
704-
use crate::io::test_utils::{
705-
do_read_write_remove_list_persist, random_storage_path, DelayedStore,
706-
};
704+
use crate::io::test_utils::{do_read_write_remove_list_persist, random_storage_path};
707705
use crate::io::tier_store::TierStore;
708706
use crate::logger::Logger;
709707
use crate::runtime::Runtime;
@@ -871,47 +869,6 @@ mod tests {
871869
assert_eq!(backup_read_cm.unwrap(), data);
872870
}
873871

874-
#[test]
875-
fn backup_overflow_doesnt_fail_writes() {
876-
let base_dir = random_storage_path();
877-
let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned();
878-
let logger = Arc::new(Logger::new_fs_writer(log_path.clone(), Level::Trace).unwrap());
879-
let runtime = Arc::new(Runtime::new(Arc::clone(&logger)).unwrap());
880-
881-
let _cleanup = CleanupDir(base_dir.clone());
882-
883-
let primary_store: Arc<DynStore> =
884-
Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("primary"))));
885-
let mut tier =
886-
setup_tier_store(Arc::clone(&primary_store), Arc::clone(&logger), Arc::clone(&runtime));
887-
888-
let backup_store: Arc<DynStore> =
889-
Arc::new(DynStoreWrapper(DelayedStore::new(100, runtime)));
890-
tier.set_backup_store(Arc::clone(&backup_store));
891-
892-
let data = vec![42u8; 32];
893-
894-
let key = CHANNEL_MANAGER_PERSISTENCE_KEY;
895-
for i in 0..=10 {
896-
let result = KVStoreSync::write(
897-
&tier,
898-
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
899-
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
900-
&format!("{}_{}", key, i),
901-
data.clone(),
902-
);
903-
904-
assert!(result.is_ok(), "Write {} should succeed", i);
905-
}
906-
907-
// Check logs for backup queue overflow message
908-
let log_contents = std::fs::read_to_string(&log_path).unwrap();
909-
assert!(
910-
log_contents.contains("Backup queue is full"),
911-
"Logs should contain backup queue overflow message"
912-
);
913-
}
914-
915872
#[test]
916873
fn lazy_removal() {
917874
let base_dir = random_storage_path();
@@ -927,7 +884,7 @@ mod tests {
927884
setup_tier_store(Arc::clone(&primary_store), Arc::clone(&logger), Arc::clone(&runtime));
928885

929886
let backup_store: Arc<DynStore> =
930-
Arc::new(DynStoreWrapper(DelayedStore::new(100, runtime)));
887+
Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("backup"))));
931888
tier.set_backup_store(Arc::clone(&backup_store));
932889

933890
let data = vec![42u8; 32];
@@ -942,7 +899,7 @@ mod tests {
942899
);
943900
assert!(write_result.is_ok(), "Write should succeed");
944901

945-
thread::sleep(Duration::from_millis(10));
902+
thread::sleep(Duration::from_millis(100));
946903

947904
assert_eq!(
948905
KVStoreSync::read(

0 commit comments

Comments
 (0)