Skip to content

Commit 8ee0d43

Browse files
committed
Wire PaymentMetadataStore into Builder, Node, Bolt11Payment, and EventHandler
Add `read_payment_metadata` to the startup `tokio::join!` call in the builder to load persisted metadata entries. Construct an `Arc<PaymentMetadataStore>` and thread it through to `Node`, `Bolt11Payment` (along with `KeysManager`), and `EventHandler`. Update `Node::remove_payment` to also call `payment_metadata_store.remove_payment_id` so the reverse index stays consistent when payment store entries are removed. Generated with the assistance of AI tools. Co-Authored-By: HAL 9000
1 parent 0478d42 commit 8ee0d43

5 files changed

Lines changed: 123 additions & 11 deletions

File tree

src/builder.rs

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@ use crate::gossip::GossipSource;
5656
use crate::io::sqlite_store::SqliteStore;
5757
use crate::io::utils::{
5858
read_event_queue, read_external_pathfinding_scores_from_cache, read_network_graph,
59-
read_node_metrics, read_output_sweeper, read_payments, read_peer_info, read_pending_payments,
60-
read_scorer, write_node_metrics,
59+
read_node_metrics, read_output_sweeper, read_payment_metadata, read_payments, read_peer_info,
60+
read_pending_payments, read_scorer, write_node_metrics,
6161
};
6262
use crate::io::vss_store::VssStoreBuilder;
6363
use crate::io::{
@@ -71,6 +71,7 @@ use crate::liquidity::{
7171
use crate::logger::{log_error, LdkLogger, LogLevel, LogWriter, Logger};
7272
use crate::message_handler::NodeCustomMessageHandler;
7373
use crate::payment::asynchronous::om_mailbox::OnionMessageMailbox;
74+
use crate::payment::metadata_store::PaymentMetadataStore;
7475
use crate::peer_store::PeerStore;
7576
use crate::runtime::{Runtime, RuntimeSpawner};
7677
use crate::tx_broadcaster::TransactionBroadcaster;
@@ -1083,12 +1084,13 @@ fn build_with_store_internal(
10831084

10841085
let kv_store_ref = Arc::clone(&kv_store);
10851086
let logger_ref = Arc::clone(&logger);
1086-
let (payment_store_res, node_metris_res, pending_payment_store_res) =
1087+
let (payment_store_res, node_metris_res, pending_payment_store_res, payment_metadata_res) =
10871088
runtime.block_on(async move {
10881089
tokio::join!(
10891090
read_payments(&*kv_store_ref, Arc::clone(&logger_ref)),
10901091
read_node_metrics(&*kv_store_ref, Arc::clone(&logger_ref)),
1091-
read_pending_payments(&*kv_store_ref, Arc::clone(&logger_ref))
1092+
read_pending_payments(&*kv_store_ref, Arc::clone(&logger_ref)),
1093+
read_payment_metadata(&*kv_store_ref, Arc::clone(&logger_ref))
10921094
)
10931095
});
10941096

@@ -1119,6 +1121,18 @@ fn build_with_store_internal(
11191121
},
11201122
};
11211123

1124+
let payment_metadata_store = match payment_metadata_res {
1125+
Ok(metadata_entries) => Arc::new(PaymentMetadataStore::new(
1126+
metadata_entries,
1127+
Arc::clone(&kv_store),
1128+
Arc::clone(&logger),
1129+
)),
1130+
Err(e) => {
1131+
log_error!(logger, "Failed to read payment metadata from store: {}", e);
1132+
return Err(BuildError::ReadFailed);
1133+
},
1134+
};
1135+
11221136
let (chain_source, chain_tip_opt) = match chain_data_source_config {
11231137
Some(ChainDataSourceConfig::Esplora { server_url, headers, sync_config }) => {
11241138
let sync_config = sync_config.unwrap_or(EsploraSyncConfig::default());
@@ -1809,6 +1823,7 @@ fn build_with_store_internal(
18091823
scorer,
18101824
peer_store,
18111825
payment_store,
1826+
payment_metadata_store,
18121827
is_running,
18131828
node_metrics,
18141829
om_mailbox,

src/event.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ use crate::liquidity::LiquiditySource;
4545
use crate::logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger};
4646
use crate::payment::asynchronous::om_mailbox::OnionMessageMailbox;
4747
use crate::payment::asynchronous::static_invoice_store::StaticInvoiceStore;
48+
use crate::payment::metadata_store::PaymentMetadataStore;
4849
use crate::payment::store::{
4950
PaymentDetails, PaymentDetailsUpdate, PaymentDirection, PaymentKind, PaymentStatus,
5051
};
@@ -489,6 +490,7 @@ where
489490
network_graph: Arc<Graph>,
490491
liquidity_source: Option<Arc<LiquiditySource<Arc<Logger>>>>,
491492
payment_store: Arc<PaymentStore>,
493+
payment_metadata_store: Arc<PaymentMetadataStore>,
492494
peer_store: Arc<PeerStore<L>>,
493495
keys_manager: Arc<KeysManager>,
494496
runtime: Arc<Runtime>,
@@ -509,10 +511,11 @@ where
509511
channel_manager: Arc<ChannelManager>, connection_manager: Arc<ConnectionManager<L>>,
510512
output_sweeper: Arc<Sweeper>, network_graph: Arc<Graph>,
511513
liquidity_source: Option<Arc<LiquiditySource<Arc<Logger>>>>,
512-
payment_store: Arc<PaymentStore>, peer_store: Arc<PeerStore<L>>,
513-
keys_manager: Arc<KeysManager>, static_invoice_store: Option<StaticInvoiceStore>,
514-
onion_messenger: Arc<OnionMessenger>, om_mailbox: Option<Arc<OnionMessageMailbox>>,
515-
runtime: Arc<Runtime>, logger: L, config: Arc<Config>,
514+
payment_store: Arc<PaymentStore>, payment_metadata_store: Arc<PaymentMetadataStore>,
515+
peer_store: Arc<PeerStore<L>>, keys_manager: Arc<KeysManager>,
516+
static_invoice_store: Option<StaticInvoiceStore>, onion_messenger: Arc<OnionMessenger>,
517+
om_mailbox: Option<Arc<OnionMessageMailbox>>, runtime: Arc<Runtime>, logger: L,
518+
config: Arc<Config>,
516519
) -> Self {
517520
Self {
518521
event_queue,
@@ -524,6 +527,7 @@ where
524527
network_graph,
525528
liquidity_source,
526529
payment_store,
530+
payment_metadata_store,
527531
peer_store,
528532
keys_manager,
529533
logger,

src/io/utils.rs

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ use crate::io::{
4444
NODE_METRICS_KEY, NODE_METRICS_PRIMARY_NAMESPACE, NODE_METRICS_SECONDARY_NAMESPACE,
4545
};
4646
use crate::logger::{log_error, LdkLogger, Logger};
47+
use crate::payment::metadata_store::PaymentMetadataEntry;
4748
use crate::payment::PendingPaymentDetails;
4849
use crate::peer_store::PeerStore;
4950
use crate::types::{Broadcaster, DynStore, KeysManager, Sweeper};
@@ -298,6 +299,83 @@ where
298299
Ok(res)
299300
}
300301

302+
/// Read previously persisted payment metadata from the store.
303+
pub(crate) async fn read_payment_metadata<L: Deref>(
304+
kv_store: &DynStore, logger: L,
305+
) -> Result<Vec<PaymentMetadataEntry>, std::io::Error>
306+
where
307+
L::Target: LdkLogger,
308+
{
309+
let mut res = Vec::new();
310+
311+
let mut stored_keys = KVStore::list(
312+
&*kv_store,
313+
PAYMENT_METADATA_PERSISTENCE_PRIMARY_NAMESPACE,
314+
PAYMENT_METADATA_PERSISTENCE_SECONDARY_NAMESPACE,
315+
)
316+
.await?;
317+
318+
const BATCH_SIZE: usize = 50;
319+
320+
let mut set = tokio::task::JoinSet::new();
321+
322+
// Fill JoinSet with tasks if possible
323+
while set.len() < BATCH_SIZE && !stored_keys.is_empty() {
324+
if let Some(next_key) = stored_keys.pop() {
325+
let fut = KVStore::read(
326+
&*kv_store,
327+
PAYMENT_METADATA_PERSISTENCE_PRIMARY_NAMESPACE,
328+
PAYMENT_METADATA_PERSISTENCE_SECONDARY_NAMESPACE,
329+
&next_key,
330+
);
331+
set.spawn(fut);
332+
debug_assert!(set.len() <= BATCH_SIZE);
333+
}
334+
}
335+
336+
while let Some(read_res) = set.join_next().await {
337+
// Exit early if we get an IO error.
338+
let reader = read_res
339+
.map_err(|e| {
340+
log_error!(logger, "Failed to read PaymentMetadataEntry: {}", e);
341+
set.abort_all();
342+
e
343+
})?
344+
.map_err(|e| {
345+
log_error!(logger, "Failed to read PaymentMetadataEntry: {}", e);
346+
set.abort_all();
347+
e
348+
})?;
349+
350+
// Refill set for every finished future, if we still have something to do.
351+
if let Some(next_key) = stored_keys.pop() {
352+
let fut = KVStore::read(
353+
&*kv_store,
354+
PAYMENT_METADATA_PERSISTENCE_PRIMARY_NAMESPACE,
355+
PAYMENT_METADATA_PERSISTENCE_SECONDARY_NAMESPACE,
356+
&next_key,
357+
);
358+
set.spawn(fut);
359+
debug_assert!(set.len() <= BATCH_SIZE);
360+
}
361+
362+
// Handle result.
363+
let entry = PaymentMetadataEntry::read(&mut &*reader).map_err(|e| {
364+
log_error!(logger, "Failed to deserialize PaymentMetadataEntry: {}", e);
365+
std::io::Error::new(
366+
std::io::ErrorKind::InvalidData,
367+
"Failed to deserialize PaymentMetadataEntry",
368+
)
369+
})?;
370+
res.push(entry);
371+
}
372+
373+
debug_assert!(set.is_empty());
374+
debug_assert!(stored_keys.is_empty());
375+
376+
Ok(res)
377+
}
378+
301379
/// Read `OutputSweeper` state from the store.
302380
pub(crate) async fn read_output_sweeper(
303381
broadcaster: Arc<Broadcaster>, fee_estimator: Arc<OnchainFeeEstimator>,

src/lib.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ use liquidity::{LSPS1Liquidity, LiquiditySource};
151151
use logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger};
152152
use payment::asynchronous::om_mailbox::OnionMessageMailbox;
153153
use payment::asynchronous::static_invoice_store::StaticInvoiceStore;
154+
use payment::metadata_store::PaymentMetadataStore;
154155
use payment::{
155156
Bolt11Payment, Bolt12Payment, OnchainPayment, PaymentDetails, SpontaneousPayment,
156157
UnifiedPayment,
@@ -220,6 +221,7 @@ pub struct Node {
220221
scorer: Arc<Mutex<Scorer>>,
221222
peer_store: Arc<PeerStore<Arc<Logger>>>,
222223
payment_store: Arc<PaymentStore>,
224+
payment_metadata_store: Arc<PaymentMetadataStore>,
223225
is_running: Arc<RwLock<bool>>,
224226
node_metrics: Arc<RwLock<NodeMetrics>>,
225227
om_mailbox: Option<Arc<OnionMessageMailbox>>,
@@ -571,6 +573,7 @@ impl Node {
571573
Arc::clone(&self.network_graph),
572574
self.liquidity_source.clone(),
573575
Arc::clone(&self.payment_store),
576+
Arc::clone(&self.payment_metadata_store),
574577
Arc::clone(&self.peer_store),
575578
Arc::clone(&self.keys_manager),
576579
static_invoice_store,
@@ -856,6 +859,8 @@ impl Node {
856859
Arc::clone(&self.connection_manager),
857860
self.liquidity_source.clone(),
858861
Arc::clone(&self.payment_store),
862+
Arc::clone(&self.payment_metadata_store),
863+
Arc::clone(&self.keys_manager),
859864
Arc::clone(&self.peer_store),
860865
Arc::clone(&self.config),
861866
Arc::clone(&self.is_running),
@@ -874,6 +879,8 @@ impl Node {
874879
Arc::clone(&self.connection_manager),
875880
self.liquidity_source.clone(),
876881
Arc::clone(&self.payment_store),
882+
Arc::clone(&self.payment_metadata_store),
883+
Arc::clone(&self.keys_manager),
877884
Arc::clone(&self.peer_store),
878885
Arc::clone(&self.config),
879886
Arc::clone(&self.is_running),
@@ -1552,7 +1559,9 @@ impl Node {
15521559

15531560
/// Remove the payment with the given id from the store.
15541561
pub fn remove_payment(&self, payment_id: &PaymentId) -> Result<(), Error> {
1555-
self.payment_store.remove(&payment_id)
1562+
self.payment_store.remove(&payment_id)?;
1563+
self.payment_metadata_store.remove_payment_id(payment_id)?;
1564+
Ok(())
15561565
}
15571566

15581567
/// Retrieves an overview of all known balances.

src/payment/bolt11.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,14 @@ use crate::error::Error;
3030
use crate::ffi::{maybe_deref, maybe_try_convert_enum, maybe_wrap};
3131
use crate::liquidity::LiquiditySource;
3232
use crate::logger::{log_error, log_info, LdkLogger, Logger};
33+
use crate::payment::metadata_store::PaymentMetadataStore;
3334
use crate::payment::store::{
3435
LSPFeeLimits, PaymentDetails, PaymentDetailsUpdate, PaymentDirection, PaymentKind,
3536
PaymentStatus,
3637
};
3738
use crate::peer_store::{PeerInfo, PeerStore};
3839
use crate::runtime::Runtime;
39-
use crate::types::{ChannelManager, PaymentStore};
40+
use crate::types::{ChannelManager, KeysManager, PaymentStore};
4041

4142
#[cfg(not(feature = "uniffi"))]
4243
type Bolt11Invoice = LdkBolt11Invoice;
@@ -60,6 +61,8 @@ pub struct Bolt11Payment {
6061
connection_manager: Arc<ConnectionManager<Arc<Logger>>>,
6162
liquidity_source: Option<Arc<LiquiditySource<Arc<Logger>>>>,
6263
payment_store: Arc<PaymentStore>,
64+
payment_metadata_store: Arc<PaymentMetadataStore>,
65+
keys_manager: Arc<KeysManager>,
6366
peer_store: Arc<PeerStore<Arc<Logger>>>,
6467
config: Arc<Config>,
6568
is_running: Arc<RwLock<bool>>,
@@ -71,7 +74,8 @@ impl Bolt11Payment {
7174
runtime: Arc<Runtime>, channel_manager: Arc<ChannelManager>,
7275
connection_manager: Arc<ConnectionManager<Arc<Logger>>>,
7376
liquidity_source: Option<Arc<LiquiditySource<Arc<Logger>>>>,
74-
payment_store: Arc<PaymentStore>, peer_store: Arc<PeerStore<Arc<Logger>>>,
77+
payment_store: Arc<PaymentStore>, payment_metadata_store: Arc<PaymentMetadataStore>,
78+
keys_manager: Arc<KeysManager>, peer_store: Arc<PeerStore<Arc<Logger>>>,
7579
config: Arc<Config>, is_running: Arc<RwLock<bool>>, logger: Arc<Logger>,
7680
) -> Self {
7781
Self {
@@ -80,6 +84,8 @@ impl Bolt11Payment {
8084
connection_manager,
8185
liquidity_source,
8286
payment_store,
87+
payment_metadata_store,
88+
keys_manager,
8389
peer_store,
8490
config,
8591
is_running,

0 commit comments

Comments
 (0)