Skip to content

Commit 0e31d35

Browse files
tnullamackillop
authored andcommitted
Parallelize read_payments
Previously, we would read entries of our payment store sequentially. This is more or less fine when we read from a local store, but when we read from a remote (e.g., VSS) store, all the latency could result in considerable slowdown during startup. Here, we opt to read store entries in batches.
1 parent ef0163d commit 0e31d35

5 files changed

Lines changed: 59 additions & 12 deletions

File tree

src/builder.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use bip39::Mnemonic;
1919
use bitcoin::bip32::{ChildNumber, Xpriv};
2020
use bitcoin::secp256k1::PublicKey;
2121
use bitcoin::{BlockHash, Network};
22-
use bdk_chain::{BlockId, TxUpdate};
22+
use bitcoin_payment_instructions::onion_message_resolver::LDKOnionMessageDNSSECHrnResolver;
2323
use lightning::chain::{chainmonitor, BestBlock, Watch};
2424
use lightning::io::Cursor;
2525
use lightning::ln::channelmanager::{self, ChainParameters, ChannelManagerReadArgs};

src/ffi/types.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use lightning::offers::invoice::Bolt12Invoice as LdkBolt12Invoice;
2929
pub use lightning::offers::offer::OfferId;
3030
use lightning::offers::offer::{Amount as LdkAmount, Offer as LdkOffer};
3131
use lightning::offers::refund::Refund as LdkRefund;
32+
use lightning::onion_message::dns_resolution::HumanReadableName as LdkHumanReadableName;
3233
pub use lightning::routing::gossip::{NodeAlias, NodeId, RoutingFees};
3334
pub use lightning::routing::router::RouteParametersConfig;
3435
use lightning::util::ser::Writeable;
@@ -54,7 +55,7 @@ pub use crate::logger::{LogLevel, LogRecord, LogWriter};
5455
pub use crate::payment::store::{
5556
ConfirmationStatus, LSPFeeLimits, PaymentDirection, PaymentKind, PaymentStatus,
5657
};
57-
pub use crate::payment::QrPaymentResult;
58+
pub use crate::payment::UnifiedPaymentResult;
5859
use crate::{hex_utils, SocketAddress, UniffiCustomTypeConverter, UserChannelId};
5960

6061
impl UniffiCustomTypeConverter for PublicKey {

src/io/utils.rs

Lines changed: 50 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -281,22 +281,59 @@ where
281281
{
282282
let mut res = Vec::new();
283283

284-
for stored_key in KVStore::list(
284+
let mut stored_keys = KVStore::list(
285285
&*kv_store,
286286
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
287287
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
288288
)
289-
.await?
290-
{
291-
let mut reader = Cursor::new(
292-
KVStore::read(
289+
.await?;
290+
291+
const BATCH_SIZE: usize = 50;
292+
293+
let mut set = tokio::task::JoinSet::new();
294+
295+
// Fill JoinSet with tasks if possible
296+
while set.len() < BATCH_SIZE && !stored_keys.is_empty() {
297+
if let Some(next_key) = stored_keys.pop() {
298+
let fut = KVStore::read(
293299
&*kv_store,
294300
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
295301
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
296-
&stored_key,
297-
)
298-
.await?,
299-
);
302+
&next_key,
303+
);
304+
set.spawn(fut);
305+
debug_assert!(set.len() <= BATCH_SIZE);
306+
}
307+
}
308+
309+
while let Some(read_res) = set.join_next().await {
310+
// Exit early if we get an IO error.
311+
let read_res = read_res
312+
.map_err(|e| {
313+
log_error!(logger, "Failed to read PaymentDetails: {}", e);
314+
set.abort_all();
315+
e
316+
})?
317+
.map_err(|e| {
318+
log_error!(logger, "Failed to read PaymentDetails: {}", e);
319+
set.abort_all();
320+
e
321+
})?;
322+
323+
// Refill set for every finished future, if we still have something to do.
324+
if let Some(next_key) = stored_keys.pop() {
325+
let fut = KVStore::read(
326+
&*kv_store,
327+
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
328+
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
329+
&next_key,
330+
);
331+
set.spawn(fut);
332+
debug_assert!(set.len() <= BATCH_SIZE);
333+
}
334+
335+
// Handle result.
336+
let mut reader = Cursor::new(read_res);
300337
let payment = PaymentDetails::read(&mut reader).map_err(|e| {
301338
log_error!(logger, "Failed to deserialize PaymentDetails: {}", e);
302339
std::io::Error::new(
@@ -306,6 +343,10 @@ where
306343
})?;
307344
res.push(payment);
308345
}
346+
347+
debug_assert!(set.is_empty());
348+
debug_assert!(stored_keys.is_empty());
349+
309350
Ok(res)
310351
}
311352

src/payment/unified_qr.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ use bip21::de::ParamKind;
1818
use bip21::{DeserializationError, DeserializeParams, Param, SerializeParams};
1919
use bitcoin::address::{NetworkChecked, NetworkUnchecked};
2020
use bitcoin::{Amount, Txid};
21+
use bitcoin_payment_instructions::amount::Amount as BPIAmount;
22+
use bitcoin_payment_instructions::{PaymentInstructions, PaymentMethod};
2123
use lightning::ln::channelmanager::PaymentId;
2224
use lightning::offers::offer::Offer;
2325
use lightning::routing::router::RouteParametersConfig;
@@ -310,6 +312,7 @@ impl DeserializationError for Extras {
310312
mod tests {
311313
use std::str::FromStr;
312314

315+
use bitcoin::address::NetworkUnchecked;
313316
use bitcoin::{Address, Network};
314317

315318
use super::*;

src/types.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@ use std::fmt;
99
use std::sync::{Arc, Mutex};
1010

1111
use bitcoin::secp256k1::PublicKey;
12-
use bitcoin::OutPoint;
12+
use bitcoin::{OutPoint, ScriptBuf};
13+
use bitcoin_payment_instructions::onion_message_resolver::LDKOnionMessageDNSSECHrnResolver;
1314
use lightning::chain::chainmonitor;
1415
use lightning::impl_writeable_tlv_based;
1516
use lightning::ln::channel_state::ChannelDetails as LdkChannelDetails;
@@ -27,6 +28,7 @@ use lightning_block_sync::gossip::{GossipVerifier, UtxoSource};
2728
use lightning_liquidity::utils::time::DefaultTimeProvider;
2829
use lightning_net_tokio::SocketDescriptor;
2930

31+
use crate::chain::bitcoind::UtxoSourceClient;
3032
use crate::chain::ChainSource;
3133
use crate::config::ChannelConfig;
3234
use crate::data_store::DataStore;

0 commit comments

Comments
 (0)