Skip to content

Commit ba16c92

Browse files
authored
Merge pull request #658 from Camillarhi/payment-store-events-sync
Use BDK events in `update_payment_store`
2 parents e0fbff2 + 9f02ec0 commit ba16c92

File tree

9 files changed

+449
-75
lines changed

9 files changed

+449
-75
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ lightning-macros = { git = "https://github.com/lightningdevkit/rust-lightning",
5454
bdk_chain = { version = "0.23.0", default-features = false, features = ["std"] }
5555
bdk_esplora = { version = "0.22.0", default-features = false, features = ["async-https-rustls", "tokio"]}
5656
bdk_electrum = { version = "0.23.0", default-features = false, features = ["use-rustls-ring"]}
57-
bdk_wallet = { version = "2.2.0", default-features = false, features = ["std", "keys-bip39"]}
57+
bdk_wallet = { version = "2.3.0", default-features = false, features = ["std", "keys-bip39"]}
5858

5959
bitreq = { version = "0.3", default-features = false, features = ["async-https"] }
6060
rustls = { version = "0.23", default-features = false }

src/builder.rs

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -55,12 +55,14 @@ use crate::gossip::GossipSource;
5555
use crate::io::sqlite_store::SqliteStore;
5656
use crate::io::utils::{
5757
read_event_queue, read_external_pathfinding_scores_from_cache, read_network_graph,
58-
read_node_metrics, read_output_sweeper, read_payments, read_peer_info, read_scorer,
59-
write_node_metrics,
58+
read_node_metrics, read_output_sweeper, read_payments, read_peer_info, read_pending_payments,
59+
read_scorer, write_node_metrics,
6060
};
6161
use crate::io::vss_store::VssStoreBuilder;
6262
use crate::io::{
6363
self, PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
64+
PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
65+
PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
6466
};
6567
use crate::liquidity::{
6668
LSPS1ClientConfig, LSPS2ClientConfig, LSPS2ServiceConfig, LiquiditySourceBuilder,
@@ -73,8 +75,8 @@ use crate::runtime::{Runtime, RuntimeSpawner};
7375
use crate::tx_broadcaster::TransactionBroadcaster;
7476
use crate::types::{
7577
AsyncPersister, ChainMonitor, ChannelManager, DynStore, DynStoreWrapper, GossipSync, Graph,
76-
KeysManager, MessageRouter, OnionMessenger, PaymentStore, PeerManager, Persister,
77-
SyncAndAsyncKVStore,
78+
KeysManager, MessageRouter, OnionMessenger, PaymentStore, PeerManager, PendingPaymentStore,
79+
Persister, SyncAndAsyncKVStore,
7880
};
7981
use crate::wallet::persist::KVStoreWalletPersister;
8082
use crate::wallet::Wallet;
@@ -1057,12 +1059,14 @@ fn build_with_store_internal(
10571059

10581060
let kv_store_ref = Arc::clone(&kv_store);
10591061
let logger_ref = Arc::clone(&logger);
1060-
let (payment_store_res, node_metris_res) = runtime.block_on(async move {
1061-
tokio::join!(
1062-
read_payments(&*kv_store_ref, Arc::clone(&logger_ref)),
1063-
read_node_metrics(&*kv_store_ref, Arc::clone(&logger_ref)),
1064-
)
1065-
});
1062+
let (payment_store_res, node_metris_res, pending_payment_store_res) =
1063+
runtime.block_on(async move {
1064+
tokio::join!(
1065+
read_payments(&*kv_store_ref, Arc::clone(&logger_ref)),
1066+
read_node_metrics(&*kv_store_ref, Arc::clone(&logger_ref)),
1067+
read_pending_payments(&*kv_store_ref, Arc::clone(&logger_ref))
1068+
)
1069+
});
10661070

10671071
// Initialize the status fields.
10681072
let node_metrics = match node_metris_res {
@@ -1243,6 +1247,20 @@ fn build_with_store_internal(
12431247
},
12441248
};
12451249

1250+
let pending_payment_store = match pending_payment_store_res {
1251+
Ok(pending_payments) => Arc::new(PendingPaymentStore::new(
1252+
pending_payments,
1253+
PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
1254+
PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE.to_string(),
1255+
Arc::clone(&kv_store),
1256+
Arc::clone(&logger),
1257+
)),
1258+
Err(e) => {
1259+
log_error!(logger, "Failed to read pending payment data from store: {}", e);
1260+
return Err(BuildError::ReadFailed);
1261+
},
1262+
};
1263+
12461264
let wallet = Arc::new(Wallet::new(
12471265
bdk_wallet,
12481266
wallet_persister,
@@ -1251,6 +1269,7 @@ fn build_with_store_internal(
12511269
Arc::clone(&payment_store),
12521270
Arc::clone(&config),
12531271
Arc::clone(&logger),
1272+
Arc::clone(&pending_payment_store),
12541273
));
12551274

12561275
// Initialize the KeysManager

src/data_store.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,10 @@ where
167167
})?;
168168
Ok(())
169169
}
170+
171+
pub(crate) fn contains_key(&self, id: &SO::Id) -> bool {
172+
self.objects.lock().unwrap().contains_key(id)
173+
}
170174
}
171175

172176
#[cfg(test)]

src/io/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,3 +78,7 @@ pub(crate) const BDK_WALLET_INDEXER_KEY: &str = "indexer";
7878
///
7979
/// [`StaticInvoice`]: lightning::offers::static_invoice::StaticInvoice
8080
pub(crate) const STATIC_INVOICE_STORE_PRIMARY_NAMESPACE: &str = "static_invoices";
81+
82+
/// The pending payment information will be persisted under this prefix.
83+
pub(crate) const PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str = "pending_payments";
84+
pub(crate) const PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";

src/io/utils.rs

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ use crate::io::{
4646
NODE_METRICS_KEY, NODE_METRICS_PRIMARY_NAMESPACE, NODE_METRICS_SECONDARY_NAMESPACE,
4747
};
4848
use crate::logger::{log_error, LdkLogger, Logger};
49+
use crate::payment::PendingPaymentDetails;
4950
use crate::peer_store::PeerStore;
5051
use crate::types::{Broadcaster, DynStore, KeysManager, Sweeper};
5152
use crate::wallet::ser::{ChangeSetDeserWrapper, ChangeSetSerWrapper};
@@ -626,6 +627,83 @@ pub(crate) fn read_bdk_wallet_change_set(
626627
Ok(Some(change_set))
627628
}
628629

630+
/// Read previously persisted pending payments information from the store.
631+
pub(crate) async fn read_pending_payments<L: Deref>(
632+
kv_store: &DynStore, logger: L,
633+
) -> Result<Vec<PendingPaymentDetails>, std::io::Error>
634+
where
635+
L::Target: LdkLogger,
636+
{
637+
let mut res = Vec::new();
638+
639+
let mut stored_keys = KVStore::list(
640+
&*kv_store,
641+
PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
642+
PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
643+
)
644+
.await?;
645+
646+
const BATCH_SIZE: usize = 50;
647+
648+
let mut set = tokio::task::JoinSet::new();
649+
650+
// Fill JoinSet with tasks if possible
651+
while set.len() < BATCH_SIZE && !stored_keys.is_empty() {
652+
if let Some(next_key) = stored_keys.pop() {
653+
let fut = KVStore::read(
654+
&*kv_store,
655+
PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
656+
PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
657+
&next_key,
658+
);
659+
set.spawn(fut);
660+
debug_assert!(set.len() <= BATCH_SIZE);
661+
}
662+
}
663+
664+
while let Some(read_res) = set.join_next().await {
665+
// Exit early if we get an IO error.
666+
let reader = read_res
667+
.map_err(|e| {
668+
log_error!(logger, "Failed to read PendingPaymentDetails: {}", e);
669+
set.abort_all();
670+
e
671+
})?
672+
.map_err(|e| {
673+
log_error!(logger, "Failed to read PendingPaymentDetails: {}", e);
674+
set.abort_all();
675+
e
676+
})?;
677+
678+
// Refill set for every finished future, if we still have something to do.
679+
if let Some(next_key) = stored_keys.pop() {
680+
let fut = KVStore::read(
681+
&*kv_store,
682+
PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
683+
PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
684+
&next_key,
685+
);
686+
set.spawn(fut);
687+
debug_assert!(set.len() <= BATCH_SIZE);
688+
}
689+
690+
// Handle result.
691+
let pending_payment = PendingPaymentDetails::read(&mut &*reader).map_err(|e| {
692+
log_error!(logger, "Failed to deserialize PendingPaymentDetails: {}", e);
693+
std::io::Error::new(
694+
std::io::ErrorKind::InvalidData,
695+
"Failed to deserialize PendingPaymentDetails",
696+
)
697+
})?;
698+
res.push(pending_payment);
699+
}
700+
701+
debug_assert!(set.is_empty());
702+
debug_assert!(stored_keys.is_empty());
703+
704+
Ok(res)
705+
}
706+
629707
#[cfg(test)]
630708
mod tests {
631709
use super::read_or_generate_seed_file;

src/payment/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,15 @@ pub(crate) mod asynchronous;
1111
mod bolt11;
1212
mod bolt12;
1313
mod onchain;
14+
pub(crate) mod pending_payment_store;
1415
mod spontaneous;
1516
pub(crate) mod store;
1617
mod unified;
1718

1819
pub use bolt11::Bolt11Payment;
1920
pub use bolt12::Bolt12Payment;
2021
pub use onchain::OnchainPayment;
22+
pub use pending_payment_store::PendingPaymentDetails;
2123
pub use spontaneous::SpontaneousPayment;
2224
pub use store::{
2325
ConfirmationStatus, LSPFeeLimits, PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus,
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
// This file is Copyright its original authors, visible in version control history.
2+
//
3+
// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4+
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5+
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
6+
// accordance with one or both of these licenses.
7+
8+
use bitcoin::Txid;
9+
use lightning::{impl_writeable_tlv_based, ln::channelmanager::PaymentId};
10+
11+
use crate::{
12+
data_store::{StorableObject, StorableObjectUpdate},
13+
payment::{store::PaymentDetailsUpdate, PaymentDetails},
14+
};
15+
16+
/// Represents a pending payment
17+
#[derive(Clone, Debug, PartialEq, Eq)]
18+
pub struct PendingPaymentDetails {
19+
/// The full payment details
20+
pub details: PaymentDetails,
21+
/// Transaction IDs that have replaced or conflict with this payment.
22+
pub conflicting_txids: Vec<Txid>,
23+
}
24+
25+
impl PendingPaymentDetails {
26+
pub(crate) fn new(details: PaymentDetails, conflicting_txids: Vec<Txid>) -> Self {
27+
Self { details, conflicting_txids }
28+
}
29+
30+
/// Convert to finalized payment for the main payment store
31+
pub fn into_payment_details(self) -> PaymentDetails {
32+
self.details
33+
}
34+
}
35+
36+
impl_writeable_tlv_based!(PendingPaymentDetails, {
37+
(0, details, required),
38+
(2, conflicting_txids, optional_vec),
39+
});
40+
41+
#[derive(Clone, Debug, PartialEq, Eq)]
42+
pub(crate) struct PendingPaymentDetailsUpdate {
43+
pub id: PaymentId,
44+
pub payment_update: Option<PaymentDetailsUpdate>,
45+
pub conflicting_txids: Option<Vec<Txid>>,
46+
}
47+
48+
impl StorableObject for PendingPaymentDetails {
49+
type Id = PaymentId;
50+
type Update = PendingPaymentDetailsUpdate;
51+
52+
fn id(&self) -> Self::Id {
53+
self.details.id
54+
}
55+
56+
fn update(&mut self, update: &Self::Update) -> bool {
57+
let mut updated = false;
58+
59+
// Update the underlying payment details if present
60+
if let Some(payment_update) = &update.payment_update {
61+
updated |= self.details.update(payment_update);
62+
}
63+
64+
if let Some(new_conflicting_txids) = &update.conflicting_txids {
65+
if &self.conflicting_txids != new_conflicting_txids {
66+
self.conflicting_txids = new_conflicting_txids.clone();
67+
updated = true;
68+
}
69+
}
70+
71+
updated
72+
}
73+
74+
fn to_update(&self) -> Self::Update {
75+
self.into()
76+
}
77+
}
78+
79+
impl StorableObjectUpdate<PendingPaymentDetails> for PendingPaymentDetailsUpdate {
80+
fn id(&self) -> <PendingPaymentDetails as StorableObject>::Id {
81+
self.id
82+
}
83+
}
84+
85+
impl From<&PendingPaymentDetails> for PendingPaymentDetailsUpdate {
86+
fn from(value: &PendingPaymentDetails) -> Self {
87+
Self {
88+
id: value.id(),
89+
payment_update: Some(value.details.to_update()),
90+
conflicting_txids: Some(value.conflicting_txids.clone()),
91+
}
92+
}
93+
}

src/types.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use crate::data_store::DataStore;
3939
use crate::fee_estimator::OnchainFeeEstimator;
4040
use crate::logger::Logger;
4141
use crate::message_handler::NodeCustomMessageHandler;
42-
use crate::payment::PaymentDetails;
42+
use crate::payment::{PaymentDetails, PendingPaymentDetails};
4343
use crate::runtime::RuntimeSpawner;
4444

4545
/// A supertrait that requires that a type implements both [`KVStore`] and [`KVStoreSync`] at the
@@ -621,3 +621,5 @@ impl From<&(u64, Vec<u8>)> for CustomTlvRecord {
621621
CustomTlvRecord { type_num: tlv.0, value: tlv.1.clone() }
622622
}
623623
}
624+
625+
pub(crate) type PendingPaymentStore = DataStore<PendingPaymentDetails, Arc<Logger>>;

0 commit comments

Comments
 (0)