Skip to content

Commit fc211db

Browse files
committed
Wire PaymentMetadataStore into Builder, Node, Bolt11Payment, and EventHandler
Add `read_payment_metadata` to the startup `tokio::join!` call in the builder to load persisted metadata entries. Construct an `Arc<PaymentMetadataStore>` and thread it through to `Node`, `Bolt11Payment` (along with `KeysManager`), and `EventHandler`. Update `Node::remove_payment` to also call `payment_metadata_store.remove_payment_id` so the reverse index stays consistent when payment store entries are removed. Generated with the assistance of AI tools. Co-Authored-By: HAL 9000
1 parent 9cce048 commit fc211db

File tree

5 files changed

+123
-11
lines changed

5 files changed

+123
-11
lines changed

src/builder.rs

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@ use crate::gossip::GossipSource;
5656
use crate::io::sqlite_store::SqliteStore;
5757
use crate::io::utils::{
5858
read_event_queue, read_external_pathfinding_scores_from_cache, read_network_graph,
59-
read_node_metrics, read_output_sweeper, read_payments, read_peer_info, read_pending_payments,
60-
read_scorer, write_node_metrics,
59+
read_node_metrics, read_output_sweeper, read_payment_metadata, read_payments, read_peer_info,
60+
read_pending_payments, read_scorer, write_node_metrics,
6161
};
6262
use crate::io::vss_store::VssStoreBuilder;
6363
use crate::io::{
@@ -72,6 +72,7 @@ use crate::lnurl_auth::LnurlAuth;
7272
use crate::logger::{log_error, LdkLogger, LogLevel, LogWriter, Logger};
7373
use crate::message_handler::NodeCustomMessageHandler;
7474
use crate::payment::asynchronous::om_mailbox::OnionMessageMailbox;
75+
use crate::payment::metadata_store::PaymentMetadataStore;
7576
use crate::peer_store::PeerStore;
7677
use crate::runtime::{Runtime, RuntimeSpawner};
7778
use crate::tx_broadcaster::TransactionBroadcaster;
@@ -1267,12 +1268,13 @@ fn build_with_store_internal(
12671268

12681269
let kv_store_ref = Arc::clone(&kv_store);
12691270
let logger_ref = Arc::clone(&logger);
1270-
let (payment_store_res, node_metris_res, pending_payment_store_res) =
1271+
let (payment_store_res, node_metris_res, pending_payment_store_res, payment_metadata_res) =
12711272
runtime.block_on(async move {
12721273
tokio::join!(
12731274
read_payments(&*kv_store_ref, Arc::clone(&logger_ref)),
12741275
read_node_metrics(&*kv_store_ref, Arc::clone(&logger_ref)),
1275-
read_pending_payments(&*kv_store_ref, Arc::clone(&logger_ref))
1276+
read_pending_payments(&*kv_store_ref, Arc::clone(&logger_ref)),
1277+
read_payment_metadata(&*kv_store_ref, Arc::clone(&logger_ref))
12761278
)
12771279
});
12781280

@@ -1303,6 +1305,18 @@ fn build_with_store_internal(
13031305
},
13041306
};
13051307

1308+
let payment_metadata_store = match payment_metadata_res {
1309+
Ok(metadata_entries) => Arc::new(PaymentMetadataStore::new(
1310+
metadata_entries,
1311+
Arc::clone(&kv_store),
1312+
Arc::clone(&logger),
1313+
)),
1314+
Err(e) => {
1315+
log_error!(logger, "Failed to read payment metadata from store: {}", e);
1316+
return Err(BuildError::ReadFailed);
1317+
},
1318+
};
1319+
13061320
let (chain_source, chain_tip_opt) = match chain_data_source_config {
13071321
Some(ChainDataSourceConfig::Esplora { server_url, headers, sync_config }) => {
13081322
let sync_config = sync_config.unwrap_or(EsploraSyncConfig::default());
@@ -1996,6 +2010,7 @@ fn build_with_store_internal(
19962010
scorer,
19972011
peer_store,
19982012
payment_store,
2013+
payment_metadata_store,
19992014
lnurl_auth,
20002015
is_running,
20012016
node_metrics,

src/event.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ use crate::liquidity::LiquiditySource;
4949
use crate::logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger};
5050
use crate::payment::asynchronous::om_mailbox::OnionMessageMailbox;
5151
use crate::payment::asynchronous::static_invoice_store::StaticInvoiceStore;
52+
use crate::payment::metadata_store::PaymentMetadataStore;
5253
use crate::payment::store::{
5354
PaymentDetails, PaymentDetailsUpdate, PaymentDirection, PaymentKind, PaymentStatus,
5455
};
@@ -507,6 +508,7 @@ where
507508
network_graph: Arc<Graph>,
508509
liquidity_source: Option<Arc<LiquiditySource<Arc<Logger>>>>,
509510
payment_store: Arc<PaymentStore>,
511+
payment_metadata_store: Arc<PaymentMetadataStore>,
510512
peer_store: Arc<PeerStore<L>>,
511513
keys_manager: Arc<KeysManager>,
512514
runtime: Arc<Runtime>,
@@ -527,10 +529,11 @@ where
527529
channel_manager: Arc<ChannelManager>, connection_manager: Arc<ConnectionManager<L>>,
528530
output_sweeper: Arc<Sweeper>, network_graph: Arc<Graph>,
529531
liquidity_source: Option<Arc<LiquiditySource<Arc<Logger>>>>,
530-
payment_store: Arc<PaymentStore>, peer_store: Arc<PeerStore<L>>,
531-
keys_manager: Arc<KeysManager>, static_invoice_store: Option<StaticInvoiceStore>,
532-
onion_messenger: Arc<OnionMessenger>, om_mailbox: Option<Arc<OnionMessageMailbox>>,
533-
runtime: Arc<Runtime>, logger: L, config: Arc<Config>,
532+
payment_store: Arc<PaymentStore>, payment_metadata_store: Arc<PaymentMetadataStore>,
533+
peer_store: Arc<PeerStore<L>>, keys_manager: Arc<KeysManager>,
534+
static_invoice_store: Option<StaticInvoiceStore>, onion_messenger: Arc<OnionMessenger>,
535+
om_mailbox: Option<Arc<OnionMessageMailbox>>, runtime: Arc<Runtime>, logger: L,
536+
config: Arc<Config>,
534537
) -> Self {
535538
Self {
536539
event_queue,
@@ -542,6 +545,7 @@ where
542545
network_graph,
543546
liquidity_source,
544547
payment_store,
548+
payment_metadata_store,
545549
peer_store,
546550
keys_manager,
547551
logger,

src/io/utils.rs

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ use crate::io::{
4444
NODE_METRICS_KEY, NODE_METRICS_PRIMARY_NAMESPACE, NODE_METRICS_SECONDARY_NAMESPACE,
4545
};
4646
use crate::logger::{log_error, LdkLogger, Logger};
47+
use crate::payment::metadata_store::PaymentMetadataEntry;
4748
use crate::payment::PendingPaymentDetails;
4849
use crate::peer_store::PeerStore;
4950
use crate::types::{Broadcaster, DynStore, KeysManager, Sweeper};
@@ -298,6 +299,83 @@ where
298299
Ok(res)
299300
}
300301

302+
/// Read previously persisted payment metadata from the store.
303+
pub(crate) async fn read_payment_metadata<L: Deref>(
304+
kv_store: &DynStore, logger: L,
305+
) -> Result<Vec<PaymentMetadataEntry>, std::io::Error>
306+
where
307+
L::Target: LdkLogger,
308+
{
309+
let mut res = Vec::new();
310+
311+
let mut stored_keys = KVStore::list(
312+
&*kv_store,
313+
PAYMENT_METADATA_PERSISTENCE_PRIMARY_NAMESPACE,
314+
PAYMENT_METADATA_PERSISTENCE_SECONDARY_NAMESPACE,
315+
)
316+
.await?;
317+
318+
const BATCH_SIZE: usize = 50;
319+
320+
let mut set = tokio::task::JoinSet::new();
321+
322+
// Fill JoinSet with tasks if possible
323+
while set.len() < BATCH_SIZE && !stored_keys.is_empty() {
324+
if let Some(next_key) = stored_keys.pop() {
325+
let fut = KVStore::read(
326+
&*kv_store,
327+
PAYMENT_METADATA_PERSISTENCE_PRIMARY_NAMESPACE,
328+
PAYMENT_METADATA_PERSISTENCE_SECONDARY_NAMESPACE,
329+
&next_key,
330+
);
331+
set.spawn(fut);
332+
debug_assert!(set.len() <= BATCH_SIZE);
333+
}
334+
}
335+
336+
while let Some(read_res) = set.join_next().await {
337+
// Exit early if we get an IO error.
338+
let reader = read_res
339+
.map_err(|e| {
340+
log_error!(logger, "Failed to read PaymentMetadataEntry: {}", e);
341+
set.abort_all();
342+
e
343+
})?
344+
.map_err(|e| {
345+
log_error!(logger, "Failed to read PaymentMetadataEntry: {}", e);
346+
set.abort_all();
347+
e
348+
})?;
349+
350+
// Refill set for every finished future, if we still have something to do.
351+
if let Some(next_key) = stored_keys.pop() {
352+
let fut = KVStore::read(
353+
&*kv_store,
354+
PAYMENT_METADATA_PERSISTENCE_PRIMARY_NAMESPACE,
355+
PAYMENT_METADATA_PERSISTENCE_SECONDARY_NAMESPACE,
356+
&next_key,
357+
);
358+
set.spawn(fut);
359+
debug_assert!(set.len() <= BATCH_SIZE);
360+
}
361+
362+
// Handle result.
363+
let entry = PaymentMetadataEntry::read(&mut &*reader).map_err(|e| {
364+
log_error!(logger, "Failed to deserialize PaymentMetadataEntry: {}", e);
365+
std::io::Error::new(
366+
std::io::ErrorKind::InvalidData,
367+
"Failed to deserialize PaymentMetadataEntry",
368+
)
369+
})?;
370+
res.push(entry);
371+
}
372+
373+
debug_assert!(set.is_empty());
374+
debug_assert!(stored_keys.is_empty());
375+
376+
Ok(res)
377+
}
378+
301379
/// Read `OutputSweeper` state from the store.
302380
pub(crate) async fn read_output_sweeper(
303381
broadcaster: Arc<Broadcaster>, fee_estimator: Arc<OnchainFeeEstimator>,

src/lib.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ use lnurl_auth::LnurlAuth;
165165
use logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger};
166166
use payment::asynchronous::om_mailbox::OnionMessageMailbox;
167167
use payment::asynchronous::static_invoice_store::StaticInvoiceStore;
168+
use payment::metadata_store::PaymentMetadataStore;
168169
use payment::{
169170
Bolt11Payment, Bolt12Payment, OnchainPayment, PaymentDetails, SpontaneousPayment,
170171
UnifiedPayment,
@@ -233,6 +234,7 @@ pub struct Node {
233234
scorer: Arc<Mutex<Scorer>>,
234235
peer_store: Arc<PeerStore<Arc<Logger>>>,
235236
payment_store: Arc<PaymentStore>,
237+
payment_metadata_store: Arc<PaymentMetadataStore>,
236238
lnurl_auth: Arc<LnurlAuth>,
237239
is_running: Arc<RwLock<bool>>,
238240
node_metrics: Arc<RwLock<NodeMetrics>>,
@@ -600,6 +602,7 @@ impl Node {
600602
Arc::clone(&self.network_graph),
601603
self.liquidity_source.clone(),
602604
Arc::clone(&self.payment_store),
605+
Arc::clone(&self.payment_metadata_store),
603606
Arc::clone(&self.peer_store),
604607
Arc::clone(&self.keys_manager),
605608
static_invoice_store,
@@ -891,6 +894,8 @@ impl Node {
891894
Arc::clone(&self.connection_manager),
892895
self.liquidity_source.clone(),
893896
Arc::clone(&self.payment_store),
897+
Arc::clone(&self.payment_metadata_store),
898+
Arc::clone(&self.keys_manager),
894899
Arc::clone(&self.peer_store),
895900
Arc::clone(&self.config),
896901
Arc::clone(&self.is_running),
@@ -909,6 +914,8 @@ impl Node {
909914
Arc::clone(&self.connection_manager),
910915
self.liquidity_source.clone(),
911916
Arc::clone(&self.payment_store),
917+
Arc::clone(&self.payment_metadata_store),
918+
Arc::clone(&self.keys_manager),
912919
Arc::clone(&self.peer_store),
913920
Arc::clone(&self.config),
914921
Arc::clone(&self.is_running),
@@ -1884,7 +1891,9 @@ impl Node {
18841891

18851892
/// Remove the payment with the given id from the store.
18861893
pub fn remove_payment(&self, payment_id: &PaymentId) -> Result<(), Error> {
1887-
self.payment_store.remove(&payment_id)
1894+
self.payment_store.remove(&payment_id)?;
1895+
self.payment_metadata_store.remove_payment_id(payment_id)?;
1896+
Ok(())
18881897
}
18891898

18901899
/// Retrieves an overview of all known balances.

src/payment/bolt11.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,14 @@ use crate::error::Error;
3030
use crate::ffi::{maybe_deref, maybe_try_convert_enum, maybe_wrap};
3131
use crate::liquidity::LiquiditySource;
3232
use crate::logger::{log_error, log_info, LdkLogger, Logger};
33+
use crate::payment::metadata_store::PaymentMetadataStore;
3334
use crate::payment::store::{
3435
LSPFeeLimits, PaymentDetails, PaymentDetailsUpdate, PaymentDirection, PaymentKind,
3536
PaymentStatus,
3637
};
3738
use crate::peer_store::{PeerInfo, PeerStore};
3839
use crate::runtime::Runtime;
39-
use crate::types::{ChannelManager, PaymentStore};
40+
use crate::types::{ChannelManager, KeysManager, PaymentStore};
4041

4142
#[cfg(not(feature = "uniffi"))]
4243
type Bolt11Invoice = LdkBolt11Invoice;
@@ -61,6 +62,8 @@ pub struct Bolt11Payment {
6162
connection_manager: Arc<ConnectionManager<Arc<Logger>>>,
6263
liquidity_source: Option<Arc<LiquiditySource<Arc<Logger>>>>,
6364
payment_store: Arc<PaymentStore>,
65+
payment_metadata_store: Arc<PaymentMetadataStore>,
66+
keys_manager: Arc<KeysManager>,
6467
peer_store: Arc<PeerStore<Arc<Logger>>>,
6568
config: Arc<Config>,
6669
is_running: Arc<RwLock<bool>>,
@@ -72,7 +75,8 @@ impl Bolt11Payment {
7275
runtime: Arc<Runtime>, channel_manager: Arc<ChannelManager>,
7376
connection_manager: Arc<ConnectionManager<Arc<Logger>>>,
7477
liquidity_source: Option<Arc<LiquiditySource<Arc<Logger>>>>,
75-
payment_store: Arc<PaymentStore>, peer_store: Arc<PeerStore<Arc<Logger>>>,
78+
payment_store: Arc<PaymentStore>, payment_metadata_store: Arc<PaymentMetadataStore>,
79+
keys_manager: Arc<KeysManager>, peer_store: Arc<PeerStore<Arc<Logger>>>,
7680
config: Arc<Config>, is_running: Arc<RwLock<bool>>, logger: Arc<Logger>,
7781
) -> Self {
7882
Self {
@@ -81,6 +85,8 @@ impl Bolt11Payment {
8185
connection_manager,
8286
liquidity_source,
8387
payment_store,
88+
payment_metadata_store,
89+
keys_manager,
8490
peer_store,
8591
config,
8692
is_running,

0 commit comments

Comments
 (0)