Skip to content

Commit cc7fc10

Browse files
committed
Migrate to KVStore/FilesystemStore
Firstly, we switch our BP over to use `FilesystemStore`, which also gives us test coverage and ensures the compatibility. Then, we remove the superseded `KVStorePersister` trait and the `FilesystemPersister` code.
1 parent bf51037 commit cc7fc10

6 files changed

Lines changed: 68 additions & 589 deletions

File tree

bench/benches/bench.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ criterion_group!(benches,
1515
lightning::routing::router::benches::generate_large_mpp_routes_with_probabilistic_scorer,
1616
lightning::sign::benches::bench_get_secure_random_bytes,
1717
lightning::ln::channelmanager::bench::bench_sends,
18-
lightning_persister::bench::bench_sends,
1918
lightning_rapid_gossip_sync::bench::bench_reading_full_graph_from_file,
2019
lightning::routing::gossip::benches::read_network_graph,
2120
lightning::routing::gossip::benches::write_network_graph);

lightning-background-processor/src/lib.rs

Lines changed: 55 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -500,9 +500,16 @@ use core::task;
500500
/// For example, in order to process background events in a [Tokio](https://tokio.rs/) task, you
501501
/// could setup `process_events_async` like this:
502502
/// ```
503-
/// # struct MyPersister {}
504-
/// # impl lightning::util::persist::KVStorePersister for MyPersister {
505-
/// # fn persist<W: lightning::util::ser::Writeable>(&self, key: &str, object: &W) -> lightning::io::Result<()> { Ok(()) }
503+
/// # use lightning::io;
504+
/// # use std::sync::{Arc, Mutex};
505+
/// # use std::sync::atomic::{AtomicBool, Ordering};
506+
/// # use lightning_background_processor::{process_events_async, GossipSync};
507+
/// # struct MyStore {}
508+
/// # impl lightning::util::persist::KVStore for MyStore {
509+
/// # fn read(&self, namespace: &str, key: &str) -> io::Result<Vec<u8>> { Ok(Vec::new()) }
510+
/// # fn write(&self, namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) }
511+
/// # fn remove(&self, namespace: &str, key: &str) -> io::Result<()> { Ok(()) }
512+
/// # fn list(&self, namespace: &str) -> io::Result<Vec<String>> { Ok(Vec::new()) }
506513
/// # }
507514
/// # struct MyEventHandler {}
508515
/// # impl MyEventHandler {
@@ -514,23 +521,20 @@ use core::task;
514521
/// # fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize { 0 }
515522
/// # fn disconnect_socket(&mut self) {}
516523
/// # }
517-
/// # use std::sync::{Arc, Mutex};
518-
/// # use std::sync::atomic::{AtomicBool, Ordering};
519-
/// # use lightning_background_processor::{process_events_async, GossipSync};
520524
/// # type MyBroadcaster = dyn lightning::chain::chaininterface::BroadcasterInterface + Send + Sync;
521525
/// # type MyFeeEstimator = dyn lightning::chain::chaininterface::FeeEstimator + Send + Sync;
522526
/// # type MyNodeSigner = dyn lightning::sign::NodeSigner + Send + Sync;
523527
/// # type MyUtxoLookup = dyn lightning::routing::utxo::UtxoLookup + Send + Sync;
524528
/// # type MyFilter = dyn lightning::chain::Filter + Send + Sync;
525529
/// # type MyLogger = dyn lightning::util::logger::Logger + Send + Sync;
526-
/// # type MyChainMonitor = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<MyFilter>, Arc<MyBroadcaster>, Arc<MyFeeEstimator>, Arc<MyLogger>, Arc<MyPersister>>;
530+
/// # type MyChainMonitor = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<MyFilter>, Arc<MyBroadcaster>, Arc<MyFeeEstimator>, Arc<MyLogger>, Arc<MyStore>>;
527531
/// # type MyPeerManager = lightning::ln::peer_handler::SimpleArcPeerManager<MySocketDescriptor, MyChainMonitor, MyBroadcaster, MyFeeEstimator, MyUtxoLookup, MyLogger>;
528532
/// # type MyNetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<MyLogger>>;
529533
/// # type MyGossipSync = lightning::routing::gossip::P2PGossipSync<Arc<MyNetworkGraph>, Arc<MyUtxoLookup>, Arc<MyLogger>>;
530534
/// # type MyChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager<MyChainMonitor, MyBroadcaster, MyFeeEstimator, MyLogger>;
531535
/// # type MyScorer = Mutex<lightning::routing::scoring::ProbabilisticScorer<Arc<MyNetworkGraph>, Arc<MyLogger>>>;
532536
///
533-
/// # async fn setup_background_processing(my_persister: Arc<MyPersister>, my_event_handler: Arc<MyEventHandler>, my_chain_monitor: Arc<MyChainMonitor>, my_channel_manager: Arc<MyChannelManager>, my_gossip_sync: Arc<MyGossipSync>, my_logger: Arc<MyLogger>, my_scorer: Arc<MyScorer>, my_peer_manager: Arc<MyPeerManager>) {
537+
/// # async fn setup_background_processing(my_persister: Arc<MyStore>, my_event_handler: Arc<MyEventHandler>, my_chain_monitor: Arc<MyChainMonitor>, my_channel_manager: Arc<MyChannelManager>, my_gossip_sync: Arc<MyGossipSync>, my_logger: Arc<MyLogger>, my_scorer: Arc<MyScorer>, my_peer_manager: Arc<MyPeerManager>) {
534538
/// let background_persister = Arc::clone(&my_persister);
535539
/// let background_event_handler = Arc::clone(&my_event_handler);
536540
/// let background_chain_mon = Arc::clone(&my_chain_monitor);
@@ -866,8 +870,8 @@ mod tests {
866870
use lightning::util::config::UserConfig;
867871
use lightning::util::ser::Writeable;
868872
use lightning::util::test_utils;
869-
use lightning::util::persist::KVStorePersister;
870-
use lightning_persister::FilesystemPersister;
873+
use lightning::util::persist::{KVStore, CHANNEL_MANAGER_PERSISTENCE_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY, SCORER_PERSISTENCE_NAMESPACE, SCORER_PERSISTENCE_KEY};
874+
use lightning_persister::fs_store::FilesystemStore;
871875
use std::collections::VecDeque;
872876
use std::{fs, env};
873877
use std::path::PathBuf;
@@ -906,7 +910,7 @@ mod tests {
906910
>,
907911
Arc<test_utils::TestLogger>>;
908912

909-
type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
913+
type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemStore>>;
910914

911915
type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
912916
type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
@@ -917,7 +921,7 @@ mod tests {
917921
rapid_gossip_sync: RGS,
918922
peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler, Arc<KeysManager>>>,
919923
chain_monitor: Arc<ChainMonitor>,
920-
persister: Arc<FilesystemPersister>,
924+
kv_store: Arc<FilesystemStore>,
921925
tx_broadcaster: Arc<test_utils::TestBroadcaster>,
922926
network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
923927
logger: Arc<test_utils::TestLogger>,
@@ -941,9 +945,9 @@ mod tests {
941945

942946
impl Drop for Node {
943947
fn drop(&mut self) {
944-
let data_dir = self.persister.get_data_dir();
948+
let data_dir = self.kv_store.get_data_dir();
945949
match fs::remove_dir_all(data_dir.clone()) {
946-
Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
950+
Err(e) => println!("Failed to remove test store directory {}: {}", data_dir.display(), e),
947951
_ => {}
948952
}
949953
}
@@ -954,13 +958,13 @@ mod tests {
954958
graph_persistence_notifier: Option<SyncSender<()>>,
955959
manager_error: Option<(std::io::ErrorKind, &'static str)>,
956960
scorer_error: Option<(std::io::ErrorKind, &'static str)>,
957-
filesystem_persister: FilesystemPersister,
961+
kv_store: FilesystemStore,
958962
}
959963

960964
impl Persister {
961-
fn new(data_dir: String) -> Self {
962-
let filesystem_persister = FilesystemPersister::new(data_dir);
963-
Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
965+
fn new(data_dir: PathBuf) -> Self {
966+
let kv_store = FilesystemStore::new(data_dir);
967+
Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, kv_store }
964968
}
965969

966970
fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
@@ -980,15 +984,19 @@ mod tests {
980984
}
981985
}
982986

983-
impl KVStorePersister for Persister {
984-
fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
985-
if key == "manager" {
987+
impl KVStore for Persister {
988+
fn read(&self, namespace: &str, key: &str) -> lightning::io::Result<Vec<u8>> {
989+
self.kv_store.read(namespace, key)
990+
}
991+
992+
fn write(&self, namespace: &str, key: &str, buf: &[u8]) -> lightning::io::Result<()> {
993+
if namespace == CHANNEL_MANAGER_PERSISTENCE_NAMESPACE && key == CHANNEL_MANAGER_PERSISTENCE_KEY {
986994
if let Some((error, message)) = self.manager_error {
987995
return Err(std::io::Error::new(error, message))
988996
}
989997
}
990998

991-
if key == "network_graph" {
999+
if namespace == NETWORK_GRAPH_PERSISTENCE_NAMESPACE && key == NETWORK_GRAPH_PERSISTENCE_KEY {
9921000
if let Some(sender) = &self.graph_persistence_notifier {
9931001
match sender.send(()) {
9941002
Ok(()) => {},
@@ -1001,13 +1009,21 @@ mod tests {
10011009
}
10021010
}
10031011

1004-
if key == "scorer" {
1012+
if namespace == SCORER_PERSISTENCE_NAMESPACE && key == SCORER_PERSISTENCE_KEY {
10051013
if let Some((error, message)) = self.scorer_error {
10061014
return Err(std::io::Error::new(error, message))
10071015
}
10081016
}
10091017

1010-
self.filesystem_persister.persist(key, object)
1018+
self.kv_store.write(namespace, key, buf)
1019+
}
1020+
1021+
fn remove(&self, namespace: &str, key: &str) -> lightning::io::Result<()> {
1022+
self.kv_store.remove(namespace, key)
1023+
}
1024+
1025+
fn list(&self, namespace: &str) -> lightning::io::Result<Vec<String>> {
1026+
self.kv_store.list(namespace)
10111027
}
10121028
}
10131029

@@ -1157,10 +1173,10 @@ mod tests {
11571173
let seed = [i as u8; 32];
11581174
let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone(), ()));
11591175
let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Bitcoin));
1160-
let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", &persist_dir, i)));
1176+
let kv_store = Arc::new(FilesystemStore::new(format!("{}_persister_{}", &persist_dir, i).into()));
11611177
let now = Duration::from_secs(genesis_block.header.time as u64);
11621178
let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
1163-
let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
1179+
let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), kv_store.clone()));
11641180
let best_block = BestBlock::from_network(network);
11651181
let params = ChainParameters { network, best_block };
11661182
let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), UserConfig::default(), params, genesis_block.header.time));
@@ -1172,7 +1188,7 @@ mod tests {
11721188
onion_message_handler: IgnoringMessageHandler{}, custom_message_handler: IgnoringMessageHandler{}
11731189
};
11741190
let peer_manager = Arc::new(PeerManager::new(msg_handler, 0, &seed, logger.clone(), keys_manager.clone()));
1175-
let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
1191+
let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, kv_store, tx_broadcaster, network_graph, logger, best_block, scorer };
11761192
nodes.push(node);
11771193
}
11781194

@@ -1267,7 +1283,7 @@ mod tests {
12671283
let tx = open_channel!(nodes[0], nodes[1], 100000);
12681284

12691285
// Initiate the background processors to watch each node.
1270-
let data_dir = nodes[0].persister.get_data_dir();
1286+
let data_dir = nodes[0].kv_store.get_data_dir();
12711287
let persister = Arc::new(Persister::new(data_dir));
12721288
let event_handler = |_: _| {};
12731289
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
@@ -1332,7 +1348,7 @@ mod tests {
13321348
// `ChainMonitor::rebroadcast_pending_claims` is called every `REBROADCAST_TIMER`, and
13331349
// `PeerManager::timer_tick_occurred` every `PING_TIMER`.
13341350
let (_, nodes) = create_nodes(1, "test_timer_tick_called");
1335-
let data_dir = nodes[0].persister.get_data_dir();
1351+
let data_dir = nodes[0].kv_store.get_data_dir();
13361352
let persister = Arc::new(Persister::new(data_dir));
13371353
let event_handler = |_: _| {};
13381354
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
@@ -1359,7 +1375,7 @@ mod tests {
13591375
let (_, nodes) = create_nodes(2, "test_persist_error");
13601376
open_channel!(nodes[0], nodes[1], 100000);
13611377

1362-
let data_dir = nodes[0].persister.get_data_dir();
1378+
let data_dir = nodes[0].kv_store.get_data_dir();
13631379
let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
13641380
let event_handler = |_: _| {};
13651381
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
@@ -1379,7 +1395,7 @@ mod tests {
13791395
let (_, nodes) = create_nodes(2, "test_persist_error_sync");
13801396
open_channel!(nodes[0], nodes[1], 100000);
13811397

1382-
let data_dir = nodes[0].persister.get_data_dir();
1398+
let data_dir = nodes[0].kv_store.get_data_dir();
13831399
let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
13841400

13851401
let bp_future = super::process_events_async(
@@ -1405,7 +1421,7 @@ mod tests {
14051421
fn test_network_graph_persist_error() {
14061422
// Test that if we encounter an error during network graph persistence, an error gets returned.
14071423
let (_, nodes) = create_nodes(2, "test_persist_network_graph_error");
1408-
let data_dir = nodes[0].persister.get_data_dir();
1424+
let data_dir = nodes[0].kv_store.get_data_dir();
14091425
let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
14101426
let event_handler = |_: _| {};
14111427
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
@@ -1423,7 +1439,7 @@ mod tests {
14231439
fn test_scorer_persist_error() {
14241440
// Test that if we encounter an error during scorer persistence, an error gets returned.
14251441
let (_, nodes) = create_nodes(2, "test_persist_scorer_error");
1426-
let data_dir = nodes[0].persister.get_data_dir();
1442+
let data_dir = nodes[0].kv_store.get_data_dir();
14271443
let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
14281444
let event_handler = |_: _| {};
14291445
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
@@ -1441,7 +1457,7 @@ mod tests {
14411457
fn test_background_event_handling() {
14421458
let (_, mut nodes) = create_nodes(2, "test_background_event_handling");
14431459
let channel_value = 100000;
1444-
let data_dir = nodes[0].persister.get_data_dir();
1460+
let data_dir = nodes[0].kv_store.get_data_dir();
14451461
let persister = Arc::new(Persister::new(data_dir.clone()));
14461462

14471463
// Set up a background event handler for FundingGenerationReady events.
@@ -1514,7 +1530,7 @@ mod tests {
15141530
#[test]
15151531
fn test_scorer_persistence() {
15161532
let (_, nodes) = create_nodes(2, "test_scorer_persistence");
1517-
let data_dir = nodes[0].persister.get_data_dir();
1533+
let data_dir = nodes[0].kv_store.get_data_dir();
15181534
let persister = Arc::new(Persister::new(data_dir));
15191535
let event_handler = |_: _| {};
15201536
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
@@ -1586,7 +1602,7 @@ mod tests {
15861602
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
15871603

15881604
let (_, nodes) = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion");
1589-
let data_dir = nodes[0].persister.get_data_dir();
1605+
let data_dir = nodes[0].kv_store.get_data_dir();
15901606
let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
15911607

15921608
let event_handler = |_: _| {};
@@ -1605,7 +1621,7 @@ mod tests {
16051621
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
16061622

16071623
let (_, nodes) = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion_async");
1608-
let data_dir = nodes[0].persister.get_data_dir();
1624+
let data_dir = nodes[0].kv_store.get_data_dir();
16091625
let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
16101626

16111627
let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
@@ -1745,7 +1761,7 @@ mod tests {
17451761
};
17461762

17471763
let (_, nodes) = create_nodes(1, "test_payment_path_scoring");
1748-
let data_dir = nodes[0].persister.get_data_dir();
1764+
let data_dir = nodes[0].kv_store.get_data_dir();
17491765
let persister = Arc::new(Persister::new(data_dir));
17501766
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
17511767

@@ -1778,7 +1794,7 @@ mod tests {
17781794
};
17791795

17801796
let (_, nodes) = create_nodes(1, "test_payment_path_scoring_async");
1781-
let data_dir = nodes[0].persister.get_data_dir();
1797+
let data_dir = nodes[0].kv_store.get_data_dir();
17821798
let persister = Arc::new(Persister::new(data_dir));
17831799

17841800
let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());

lightning-persister/Cargo.toml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,8 @@ rustdoc-args = ["--cfg", "docsrs"]
1616
[dependencies]
1717
bitcoin = "0.29.0"
1818
lightning = { version = "0.0.116", path = "../lightning" }
19-
libc = "0.2"
2019

2120
[target.'cfg(windows)'.dependencies]
22-
winapi = { version = "0.3", features = ["winbase"] }
2321
windows-sys = { version = "0.48.0", default-features = false, features = ["Win32_Storage_FileSystem", "Win32_Foundation"] }
2422

2523
[target.'cfg(ldk_bench)'.dependencies]

0 commit comments

Comments
 (0)