Skip to content

Commit bf7713d

Browse files
authored
Merge pull request #782 from joostjager/chain-mon-deferred-writes
Enable deferred writing
2 parents 4c6dfec + 5b39741 commit bf7713d

File tree

5 files changed

+97
-52
lines changed

5 files changed

+97
-52
lines changed

Cargo.toml

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -39,17 +39,17 @@ default = []
3939
#lightning-liquidity = { version = "0.2.0", features = ["std"] }
4040
#lightning-macros = { version = "0.2.0" }
4141

42-
lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "49912057895ddfbd69d503de67c80d5576c09953", features = ["std"] }
43-
lightning-types = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "49912057895ddfbd69d503de67c80d5576c09953" }
44-
lightning-invoice = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "49912057895ddfbd69d503de67c80d5576c09953", features = ["std"] }
45-
lightning-net-tokio = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "49912057895ddfbd69d503de67c80d5576c09953" }
46-
lightning-persister = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "49912057895ddfbd69d503de67c80d5576c09953", features = ["tokio"] }
47-
lightning-background-processor = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "49912057895ddfbd69d503de67c80d5576c09953" }
48-
lightning-rapid-gossip-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "49912057895ddfbd69d503de67c80d5576c09953" }
49-
lightning-block-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "49912057895ddfbd69d503de67c80d5576c09953", features = ["rest-client", "rpc-client", "tokio"] }
50-
lightning-transaction-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "49912057895ddfbd69d503de67c80d5576c09953", features = ["esplora-async-https", "time", "electrum-rustls-ring"] }
51-
lightning-liquidity = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "49912057895ddfbd69d503de67c80d5576c09953", features = ["std"] }
52-
lightning-macros = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "49912057895ddfbd69d503de67c80d5576c09953" }
42+
lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "dcf0c203e166da2348bef12b2e5eff4a250cdec7", features = ["std"] }
43+
lightning-types = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "dcf0c203e166da2348bef12b2e5eff4a250cdec7" }
44+
lightning-invoice = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "dcf0c203e166da2348bef12b2e5eff4a250cdec7", features = ["std"] }
45+
lightning-net-tokio = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "dcf0c203e166da2348bef12b2e5eff4a250cdec7" }
46+
lightning-persister = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "dcf0c203e166da2348bef12b2e5eff4a250cdec7", features = ["tokio"] }
47+
lightning-background-processor = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "dcf0c203e166da2348bef12b2e5eff4a250cdec7" }
48+
lightning-rapid-gossip-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "dcf0c203e166da2348bef12b2e5eff4a250cdec7" }
49+
lightning-block-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "dcf0c203e166da2348bef12b2e5eff4a250cdec7", features = ["rest-client", "rpc-client", "tokio"] }
50+
lightning-transaction-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "dcf0c203e166da2348bef12b2e5eff4a250cdec7", features = ["esplora-async-https", "time", "electrum-rustls-ring"] }
51+
lightning-liquidity = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "dcf0c203e166da2348bef12b2e5eff4a250cdec7", features = ["std"] }
52+
lightning-macros = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "dcf0c203e166da2348bef12b2e5eff4a250cdec7" }
5353

5454
bdk_chain = { version = "0.23.0", default-features = false, features = ["std"] }
5555
bdk_esplora = { version = "0.22.0", default-features = false, features = ["async-https-rustls", "tokio"]}
@@ -79,13 +79,13 @@ async-trait = { version = "0.1", default-features = false }
7979
vss-client = { package = "vss-client-ng", version = "0.5" }
8080
prost = { version = "0.11.6", default-features = false}
8181
#bitcoin-payment-instructions = { version = "0.6" }
82-
bitcoin-payment-instructions = { git = "https://github.com/tnull/bitcoin-payment-instructions", rev = "e9d7c07d7affc7714b023c853a65771e45277467" }
82+
bitcoin-payment-instructions = { git = "https://github.com/joostjager/bitcoin-payment-instructions", branch = "ldk-dcf0c203e166da2348bef12b2e5eff4a250cdec7" }
8383

8484
[target.'cfg(windows)'.dependencies]
8585
winapi = { version = "0.3", features = ["winbase"] }
8686

8787
[dev-dependencies]
88-
lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "49912057895ddfbd69d503de67c80d5576c09953", features = ["std", "_test_utils"] }
88+
lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "dcf0c203e166da2348bef12b2e5eff4a250cdec7", features = ["std", "_test_utils"] }
8989
rand = { version = "0.9.2", default-features = false, features = ["std", "thread_rng", "os_rng"] }
9090
proptest = "1.0.0"
9191
regex = "1.5.6"

src/builder.rs

Lines changed: 19 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,9 @@ use crate::peer_store::PeerStore;
7676
use crate::runtime::{Runtime, RuntimeSpawner};
7777
use crate::tx_broadcaster::TransactionBroadcaster;
7878
use crate::types::{
79-
AsyncPersister, ChainMonitor, ChannelManager, DynStore, DynStoreWrapper, GossipSync, Graph,
80-
KeysManager, MessageRouter, OnionMessenger, PaymentStore, PeerManager, PendingPaymentStore,
81-
Persister, SyncAndAsyncKVStore,
79+
AsyncPersister, ChainMonitor, ChannelManager, DynStore, DynStoreRef, DynStoreWrapper,
80+
GossipSync, Graph, KeysManager, MessageRouter, OnionMessenger, PaymentStore, PeerManager,
81+
PendingPaymentStore, SyncAndAsyncKVStore,
8282
};
8383
use crate::wallet::persist::KVStoreWalletPersister;
8484
use crate::wallet::Wallet;
@@ -1495,7 +1495,7 @@ fn build_with_store_internal(
14951495

14961496
let peer_storage_key = keys_manager.get_peer_storage_key();
14971497
let monitor_reader = Arc::new(AsyncPersister::new(
1498-
Arc::clone(&kv_store),
1498+
DynStoreRef(Arc::clone(&kv_store)),
14991499
RuntimeSpawner::new(Arc::clone(&runtime)),
15001500
Arc::clone(&logger),
15011501
PERSISTER_MAX_PENDING_UPDATES,
@@ -1508,7 +1508,7 @@ fn build_with_store_internal(
15081508
// Read ChannelMonitors and the NetworkGraph
15091509
let kv_store_ref = Arc::clone(&kv_store);
15101510
let logger_ref = Arc::clone(&logger);
1511-
let (monitor_read_res, network_graph_res) = runtime.block_on(async move {
1511+
let (monitor_read_res, network_graph_res) = runtime.block_on(async {
15121512
tokio::join!(
15131513
monitor_reader.read_all_channel_monitors_with_updates_parallel(),
15141514
read_network_graph(&*kv_store_ref, logger_ref),
@@ -1528,26 +1528,21 @@ fn build_with_store_internal(
15281528
},
15291529
};
15301530

1531-
let persister = Arc::new(Persister::new(
1532-
Arc::clone(&kv_store),
1533-
Arc::clone(&logger),
1534-
PERSISTER_MAX_PENDING_UPDATES,
1535-
Arc::clone(&keys_manager),
1536-
Arc::clone(&keys_manager),
1537-
Arc::clone(&tx_broadcaster),
1538-
Arc::clone(&fee_estimator),
1539-
));
1540-
15411531
// Initialize the ChainMonitor
1542-
let chain_monitor: Arc<ChainMonitor> = Arc::new(chainmonitor::ChainMonitor::new(
1543-
Some(Arc::clone(&chain_source)),
1544-
Arc::clone(&tx_broadcaster),
1545-
Arc::clone(&logger),
1546-
Arc::clone(&fee_estimator),
1547-
Arc::clone(&persister),
1548-
Arc::clone(&keys_manager),
1549-
peer_storage_key,
1550-
));
1532+
let chain_monitor: Arc<ChainMonitor> = {
1533+
let persister = Arc::try_unwrap(monitor_reader)
1534+
.unwrap_or_else(|_| panic!("Arc<AsyncPersister> should have no other references"));
1535+
Arc::new(chainmonitor::ChainMonitor::new_async_beta(
1536+
Some(Arc::clone(&chain_source)),
1537+
Arc::clone(&tx_broadcaster),
1538+
Arc::clone(&logger),
1539+
Arc::clone(&fee_estimator),
1540+
persister,
1541+
Arc::clone(&keys_manager),
1542+
peer_storage_key,
1543+
true,
1544+
))
1545+
};
15511546

15521547
// Initialize the network graph, scorer, and router
15531548
let network_graph = match network_graph_res {

src/event.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -691,6 +691,26 @@ where
691691
if info.status == PaymentStatus::Succeeded
692692
|| matches!(info.kind, PaymentKind::Spontaneous { .. })
693693
{
694+
let stored_preimage = match info.kind {
695+
PaymentKind::Bolt11 { preimage, .. }
696+
| PaymentKind::Bolt11Jit { preimage, .. }
697+
| PaymentKind::Bolt12Offer { preimage, .. }
698+
| PaymentKind::Bolt12Refund { preimage, .. }
699+
| PaymentKind::Spontaneous { preimage, .. } => preimage,
700+
_ => None,
701+
};
702+
703+
if let Some(preimage) = stored_preimage {
704+
log_info!(
705+
self.logger,
706+
"Re-claiming previously succeeded payment with hash {} of {}msat",
707+
hex_utils::to_string(&payment_hash.0),
708+
amount_msat,
709+
);
710+
self.channel_manager.claim_funds(preimage);
711+
return Ok(());
712+
}
713+
694714
log_info!(
695715
self.logger,
696716
"Refused duplicate inbound payment from payment hash {} of {}msat",

src/types.rs

Lines changed: 44 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,7 @@ use lightning::routing::gossip;
2323
use lightning::routing::router::DefaultRouter;
2424
use lightning::routing::scoring::{CombinedScorer, ProbabilisticScoringFeeParameters};
2525
use lightning::sign::InMemorySigner;
26-
use lightning::util::persist::{
27-
KVStore, KVStoreSync, MonitorUpdatingPersister, MonitorUpdatingPersisterAsync,
28-
};
26+
use lightning::util::persist::{KVStore, KVStoreSync, MonitorUpdatingPersisterAsync};
2927
use lightning::util::ser::{Readable, Writeable, Writer};
3028
use lightning::util::sweep::OutputSweeper;
3129
use lightning_block_sync::gossip::GossipVerifier;
@@ -135,6 +133,39 @@ impl<'a> KVStoreSync for dyn DynStoreTrait + 'a {
135133

136134
pub(crate) type DynStore = dyn DynStoreTrait;
137135

136+
// Newtype wrapper that implements `KVStore` for `Arc<DynStore>`. This is needed because `KVStore`
137+
// methods return `impl Future`, which is not object-safe. `DynStoreTrait` works around this by
138+
// returning `Pin<Box<dyn Future>>` instead, and this wrapper bridges the two by delegating
139+
// `KVStore` methods to the corresponding `DynStoreTrait::*_async` methods.
140+
#[derive(Clone)]
141+
pub(crate) struct DynStoreRef(pub(crate) Arc<DynStore>);
142+
143+
impl KVStore for DynStoreRef {
144+
fn read(
145+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
146+
) -> impl Future<Output = Result<Vec<u8>, bitcoin::io::Error>> + Send + 'static {
147+
DynStoreTrait::read_async(&*self.0, primary_namespace, secondary_namespace, key)
148+
}
149+
150+
fn write(
151+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
152+
) -> impl Future<Output = Result<(), bitcoin::io::Error>> + Send + 'static {
153+
DynStoreTrait::write_async(&*self.0, primary_namespace, secondary_namespace, key, buf)
154+
}
155+
156+
fn remove(
157+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
158+
) -> impl Future<Output = Result<(), bitcoin::io::Error>> + Send + 'static {
159+
DynStoreTrait::remove_async(&*self.0, primary_namespace, secondary_namespace, key, lazy)
160+
}
161+
162+
fn list(
163+
&self, primary_namespace: &str, secondary_namespace: &str,
164+
) -> impl Future<Output = Result<Vec<String>, bitcoin::io::Error>> + Send + 'static {
165+
DynStoreTrait::list_async(&*self.0, primary_namespace, secondary_namespace)
166+
}
167+
}
168+
138169
pub(crate) struct DynStoreWrapper<T: SyncAndAsyncKVStore + Send + Sync>(pub(crate) T);
139170

140171
impl<T: SyncAndAsyncKVStore + Send + Sync> DynStoreTrait for DynStoreWrapper<T> {
@@ -188,7 +219,7 @@ impl<T: SyncAndAsyncKVStore + Send + Sync> DynStoreTrait for DynStoreWrapper<T>
188219
}
189220

190221
pub(crate) type AsyncPersister = MonitorUpdatingPersisterAsync<
191-
Arc<DynStore>,
222+
DynStoreRef,
192223
RuntimeSpawner,
193224
Arc<Logger>,
194225
Arc<KeysManager>,
@@ -197,22 +228,21 @@ pub(crate) type AsyncPersister = MonitorUpdatingPersisterAsync<
197228
Arc<OnchainFeeEstimator>,
198229
>;
199230

200-
pub type Persister = MonitorUpdatingPersister<
201-
Arc<DynStore>,
202-
Arc<Logger>,
203-
Arc<KeysManager>,
204-
Arc<KeysManager>,
205-
Arc<Broadcaster>,
206-
Arc<OnchainFeeEstimator>,
207-
>;
208-
209231
pub(crate) type ChainMonitor = chainmonitor::ChainMonitor<
210232
InMemorySigner,
211233
Arc<ChainSource>,
212234
Arc<Broadcaster>,
213235
Arc<OnchainFeeEstimator>,
214236
Arc<Logger>,
215-
Arc<Persister>,
237+
chainmonitor::AsyncPersister<
238+
DynStoreRef,
239+
RuntimeSpawner,
240+
Arc<Logger>,
241+
Arc<KeysManager>,
242+
Arc<KeysManager>,
243+
Arc<Broadcaster>,
244+
Arc<OnchainFeeEstimator>,
245+
>,
216246
Arc<KeysManager>,
217247
>;
218248

tests/common/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1262,8 +1262,8 @@ pub(crate) async fn do_channel_full_cycle<E: ElectrumApi>(
12621262
);
12631263

12641264
println!("\nB close_channel (force: {})", force_close);
1265+
tokio::time::sleep(Duration::from_secs(1)).await;
12651266
if force_close {
1266-
tokio::time::sleep(Duration::from_secs(1)).await;
12671267
node_a.force_close_channel(&user_channel_id_a, node_b.node_id(), None).unwrap();
12681268
} else {
12691269
node_a.close_channel(&user_channel_id_a, node_b.node_id()).unwrap();

0 commit comments

Comments
 (0)