Skip to content

Commit 5b39741

Browse files
joostjagerclaude
andcommitted
Use async chain monitor persister
Use `ChainMonitor::new_async_beta` with `MonitorUpdatingPersisterAsync` for chain monitor persistence. Add `DynStoreRef`, a newtype wrapper that bridges the object-safe `DynStoreTrait` (boxed futures) to LDK's generic `KVStore` trait (`impl Future`), as required by `MonitorUpdatingPersisterAsync`. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 8f10f90 commit 5b39741

File tree

2 files changed

+63
-39
lines changed

2 files changed

+63
-39
lines changed

src/builder.rs

Lines changed: 19 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,9 @@ use crate::peer_store::PeerStore;
7676
use crate::runtime::{Runtime, RuntimeSpawner};
7777
use crate::tx_broadcaster::TransactionBroadcaster;
7878
use crate::types::{
79-
AsyncPersister, ChainMonitor, ChannelManager, DynStore, DynStoreWrapper, GossipSync, Graph,
80-
KeysManager, MessageRouter, OnionMessenger, PaymentStore, PeerManager, PendingPaymentStore,
81-
Persister, SyncAndAsyncKVStore,
79+
AsyncPersister, ChainMonitor, ChannelManager, DynStore, DynStoreRef, DynStoreWrapper,
80+
GossipSync, Graph, KeysManager, MessageRouter, OnionMessenger, PaymentStore, PeerManager,
81+
PendingPaymentStore, SyncAndAsyncKVStore,
8282
};
8383
use crate::wallet::persist::KVStoreWalletPersister;
8484
use crate::wallet::Wallet;
@@ -1495,7 +1495,7 @@ fn build_with_store_internal(
14951495

14961496
let peer_storage_key = keys_manager.get_peer_storage_key();
14971497
let monitor_reader = Arc::new(AsyncPersister::new(
1498-
Arc::clone(&kv_store),
1498+
DynStoreRef(Arc::clone(&kv_store)),
14991499
RuntimeSpawner::new(Arc::clone(&runtime)),
15001500
Arc::clone(&logger),
15011501
PERSISTER_MAX_PENDING_UPDATES,
@@ -1508,7 +1508,7 @@ fn build_with_store_internal(
15081508
// Read ChannelMonitors and the NetworkGraph
15091509
let kv_store_ref = Arc::clone(&kv_store);
15101510
let logger_ref = Arc::clone(&logger);
1511-
let (monitor_read_res, network_graph_res) = runtime.block_on(async move {
1511+
let (monitor_read_res, network_graph_res) = runtime.block_on(async {
15121512
tokio::join!(
15131513
monitor_reader.read_all_channel_monitors_with_updates_parallel(),
15141514
read_network_graph(&*kv_store_ref, logger_ref),
@@ -1528,27 +1528,21 @@ fn build_with_store_internal(
15281528
},
15291529
};
15301530

1531-
let persister = Arc::new(Persister::new(
1532-
Arc::clone(&kv_store),
1533-
Arc::clone(&logger),
1534-
PERSISTER_MAX_PENDING_UPDATES,
1535-
Arc::clone(&keys_manager),
1536-
Arc::clone(&keys_manager),
1537-
Arc::clone(&tx_broadcaster),
1538-
Arc::clone(&fee_estimator),
1539-
));
1540-
15411531
// Initialize the ChainMonitor
1542-
let chain_monitor: Arc<ChainMonitor> = Arc::new(chainmonitor::ChainMonitor::new(
1543-
Some(Arc::clone(&chain_source)),
1544-
Arc::clone(&tx_broadcaster),
1545-
Arc::clone(&logger),
1546-
Arc::clone(&fee_estimator),
1547-
Arc::clone(&persister),
1548-
Arc::clone(&keys_manager),
1549-
peer_storage_key,
1550-
true,
1551-
));
1532+
let chain_monitor: Arc<ChainMonitor> = {
1533+
let persister = Arc::try_unwrap(monitor_reader)
1534+
.unwrap_or_else(|_| panic!("Arc<AsyncPersister> should have no other references"));
1535+
Arc::new(chainmonitor::ChainMonitor::new_async_beta(
1536+
Some(Arc::clone(&chain_source)),
1537+
Arc::clone(&tx_broadcaster),
1538+
Arc::clone(&logger),
1539+
Arc::clone(&fee_estimator),
1540+
persister,
1541+
Arc::clone(&keys_manager),
1542+
peer_storage_key,
1543+
true,
1544+
))
1545+
};
15521546

15531547
// Initialize the network graph, scorer, and router
15541548
let network_graph = match network_graph_res {

src/types.rs

Lines changed: 44 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,7 @@ use lightning::routing::gossip;
2323
use lightning::routing::router::DefaultRouter;
2424
use lightning::routing::scoring::{CombinedScorer, ProbabilisticScoringFeeParameters};
2525
use lightning::sign::InMemorySigner;
26-
use lightning::util::persist::{
27-
KVStore, KVStoreSync, MonitorUpdatingPersister, MonitorUpdatingPersisterAsync,
28-
};
26+
use lightning::util::persist::{KVStore, KVStoreSync, MonitorUpdatingPersisterAsync};
2927
use lightning::util::ser::{Readable, Writeable, Writer};
3028
use lightning::util::sweep::OutputSweeper;
3129
use lightning_block_sync::gossip::GossipVerifier;
@@ -135,6 +133,39 @@ impl<'a> KVStoreSync for dyn DynStoreTrait + 'a {
135133

136134
pub(crate) type DynStore = dyn DynStoreTrait;
137135

136+
// Newtype wrapper that implements `KVStore` for `Arc<DynStore>`. This is needed because `KVStore`
137+
// methods return `impl Future`, which is not object-safe. `DynStoreTrait` works around this by
138+
// returning `Pin<Box<dyn Future>>` instead, and this wrapper bridges the two by delegating
139+
// `KVStore` methods to the corresponding `DynStoreTrait::*_async` methods.
140+
#[derive(Clone)]
141+
pub(crate) struct DynStoreRef(pub(crate) Arc<DynStore>);
142+
143+
impl KVStore for DynStoreRef {
144+
fn read(
145+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
146+
) -> impl Future<Output = Result<Vec<u8>, bitcoin::io::Error>> + Send + 'static {
147+
DynStoreTrait::read_async(&*self.0, primary_namespace, secondary_namespace, key)
148+
}
149+
150+
fn write(
151+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
152+
) -> impl Future<Output = Result<(), bitcoin::io::Error>> + Send + 'static {
153+
DynStoreTrait::write_async(&*self.0, primary_namespace, secondary_namespace, key, buf)
154+
}
155+
156+
fn remove(
157+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
158+
) -> impl Future<Output = Result<(), bitcoin::io::Error>> + Send + 'static {
159+
DynStoreTrait::remove_async(&*self.0, primary_namespace, secondary_namespace, key, lazy)
160+
}
161+
162+
fn list(
163+
&self, primary_namespace: &str, secondary_namespace: &str,
164+
) -> impl Future<Output = Result<Vec<String>, bitcoin::io::Error>> + Send + 'static {
165+
DynStoreTrait::list_async(&*self.0, primary_namespace, secondary_namespace)
166+
}
167+
}
168+
138169
pub(crate) struct DynStoreWrapper<T: SyncAndAsyncKVStore + Send + Sync>(pub(crate) T);
139170

140171
impl<T: SyncAndAsyncKVStore + Send + Sync> DynStoreTrait for DynStoreWrapper<T> {
@@ -188,7 +219,7 @@ impl<T: SyncAndAsyncKVStore + Send + Sync> DynStoreTrait for DynStoreWrapper<T>
188219
}
189220

190221
pub(crate) type AsyncPersister = MonitorUpdatingPersisterAsync<
191-
Arc<DynStore>,
222+
DynStoreRef,
192223
RuntimeSpawner,
193224
Arc<Logger>,
194225
Arc<KeysManager>,
@@ -197,22 +228,21 @@ pub(crate) type AsyncPersister = MonitorUpdatingPersisterAsync<
197228
Arc<OnchainFeeEstimator>,
198229
>;
199230

200-
pub type Persister = MonitorUpdatingPersister<
201-
Arc<DynStore>,
202-
Arc<Logger>,
203-
Arc<KeysManager>,
204-
Arc<KeysManager>,
205-
Arc<Broadcaster>,
206-
Arc<OnchainFeeEstimator>,
207-
>;
208-
209231
pub(crate) type ChainMonitor = chainmonitor::ChainMonitor<
210232
InMemorySigner,
211233
Arc<ChainSource>,
212234
Arc<Broadcaster>,
213235
Arc<OnchainFeeEstimator>,
214236
Arc<Logger>,
215-
Arc<Persister>,
237+
chainmonitor::AsyncPersister<
238+
DynStoreRef,
239+
RuntimeSpawner,
240+
Arc<Logger>,
241+
Arc<KeysManager>,
242+
Arc<KeysManager>,
243+
Arc<Broadcaster>,
244+
Arc<OnchainFeeEstimator>,
245+
>,
216246
Arc<KeysManager>,
217247
>;
218248

0 commit comments

Comments
 (0)