Skip to content

Commit 5b8cebf

Browse files
committed
DRY up batched KVStore reads with semaphore-based BatchingStore
Introduce `BatchingStore`, a `KVStore` wrapper that limits concurrent async I/O via a `tokio::sync::Semaphore`. During initialization the builder wraps the store in `BatchingStore` so all parallel reads share a single concurrency cap, rather than each reader maintaining its own `JoinSet`-based batch queue. Replace the duplicated ~75-line batching loops in `read_payments` and `read_pending_payments` with a generic `read_all_objects<T: Readable>` helper that spawns all reads into a `JoinSet` (relying on the store wrapper for throttling) and collects deserialized results. Both functions become thin one-line delegations. Co-Authored-By: HAL 9000
1 parent 64e3154 commit 5b8cebf

File tree

3 files changed

+170
-135
lines changed

3 files changed

+170
-135
lines changed

src/builder.rs

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -76,16 +76,17 @@ use crate::peer_store::PeerStore;
7676
use crate::runtime::{Runtime, RuntimeSpawner};
7777
use crate::tx_broadcaster::TransactionBroadcaster;
7878
use crate::types::{
79-
AsyncPersister, ChainMonitor, ChannelManager, DynStore, DynStoreRef, DynStoreWrapper,
80-
GossipSync, Graph, KeysManager, MessageRouter, OnionMessenger, PaymentStore, PeerManager,
81-
PendingPaymentStore, SyncAndAsyncKVStore,
79+
AsyncPersister, BatchingStore, ChainMonitor, ChannelManager, DynStore, DynStoreRef,
80+
DynStoreWrapper, GossipSync, Graph, KeysManager, MessageRouter, OnionMessenger, PaymentStore,
81+
PeerManager, PendingPaymentStore, SyncAndAsyncKVStore,
8282
};
8383
use crate::wallet::persist::KVStoreWalletPersister;
8484
use crate::wallet::Wallet;
8585
use crate::{Node, NodeMetrics};
8686

8787
const LSPS_HARDENED_CHILD_INDEX: u32 = 577;
8888
const PERSISTER_MAX_PENDING_UPDATES: u64 = 100;
89+
const STORE_READ_BATCH_SIZE: usize = 50;
8990

9091
#[derive(Debug, Clone)]
9192
enum ChainDataSourceConfig {
@@ -1265,14 +1266,18 @@ fn build_with_store_internal(
12651266
let tx_broadcaster = Arc::new(TransactionBroadcaster::new(Arc::clone(&logger)));
12661267
let fee_estimator = Arc::new(OnchainFeeEstimator::new());
12671268

1268-
let kv_store_ref = Arc::clone(&kv_store);
1269+
// Wrap the store with concurrency limiting for parallel initialization reads.
1270+
let batch_store: Arc<DynStore> =
1271+
Arc::new(DynStoreWrapper(BatchingStore::new(Arc::clone(&kv_store), STORE_READ_BATCH_SIZE)));
1272+
1273+
let batch_store_ref = Arc::clone(&batch_store);
12691274
let logger_ref = Arc::clone(&logger);
12701275
let (payment_store_res, node_metris_res, pending_payment_store_res) =
12711276
runtime.block_on(async move {
12721277
tokio::join!(
1273-
read_payments(&*kv_store_ref, Arc::clone(&logger_ref)),
1274-
read_node_metrics(&*kv_store_ref, Arc::clone(&logger_ref)),
1275-
read_pending_payments(&*kv_store_ref, Arc::clone(&logger_ref))
1278+
read_payments(&*batch_store_ref, Arc::clone(&logger_ref)),
1279+
read_node_metrics(&*batch_store_ref, Arc::clone(&logger_ref)),
1280+
read_pending_payments(&*batch_store_ref, Arc::clone(&logger_ref))
12761281
)
12771282
});
12781283

@@ -1515,12 +1520,12 @@ fn build_with_store_internal(
15151520
));
15161521

15171522
// Read ChannelMonitors and the NetworkGraph
1518-
let kv_store_ref = Arc::clone(&kv_store);
1523+
let batch_store_ref = Arc::clone(&batch_store);
15191524
let logger_ref = Arc::clone(&logger);
15201525
let (monitor_read_res, network_graph_res) = runtime.block_on(async {
15211526
tokio::join!(
15221527
monitor_reader.read_all_channel_monitors_with_updates_parallel(),
1523-
read_network_graph(&*kv_store_ref, logger_ref),
1528+
read_network_graph(&*batch_store_ref, logger_ref),
15241529
)
15251530
});
15261531

@@ -1566,7 +1571,10 @@ fn build_with_store_internal(
15661571
},
15671572
};
15681573

1569-
// Read various smaller LDK and ldk-node objects from the store
1574+
// Read various smaller LDK and ldk-node objects from the store.
1575+
// Functions that take &DynStore (borrow-only) use batch_store for throttled reads.
1576+
// Functions that take Arc<DynStore> (persist into runtime objects) use the original kv_store.
1577+
let batch_store_ref = Arc::clone(&batch_store);
15701578
let kv_store_ref = Arc::clone(&kv_store);
15711579
let logger_ref = Arc::clone(&logger);
15721580
let network_graph_ref = Arc::clone(&network_graph);
@@ -1587,10 +1595,10 @@ fn build_with_store_internal(
15871595
peer_info_res,
15881596
) = runtime.block_on(async move {
15891597
tokio::join!(
1590-
read_scorer(&*kv_store_ref, network_graph_ref, Arc::clone(&logger_ref)),
1591-
read_external_pathfinding_scores_from_cache(&*kv_store_ref, Arc::clone(&logger_ref)),
1598+
read_scorer(&*batch_store_ref, network_graph_ref, Arc::clone(&logger_ref)),
1599+
read_external_pathfinding_scores_from_cache(&*batch_store_ref, Arc::clone(&logger_ref)),
15921600
KVStore::read(
1593-
&*kv_store_ref,
1601+
&*batch_store_ref,
15941602
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
15951603
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
15961604
CHANNEL_MANAGER_PERSISTENCE_KEY,

src/io/utils.rs

Lines changed: 42 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -221,81 +221,62 @@ where
221221
})
222222
}
223223

224-
/// Read previously persisted payments information from the store.
225-
pub(crate) async fn read_payments<L: Deref>(
226-
kv_store: &DynStore, logger: L,
227-
) -> Result<Vec<PaymentDetails>, std::io::Error>
224+
/// Read all objects of type `T` from the given namespace, spawning reads in parallel.
225+
///
226+
/// Concurrency is expected to be limited externally (e.g., via [`BatchingStore`]).
227+
///
228+
/// [`BatchingStore`]: crate::types::BatchingStore
229+
pub(crate) async fn read_all_objects<T, L>(
230+
kv_store: &DynStore, primary_namespace: &str, secondary_namespace: &str, logger: L,
231+
) -> Result<Vec<T>, std::io::Error>
228232
where
233+
T: Readable,
234+
L: Deref,
229235
L::Target: LdkLogger,
230236
{
231-
let mut res = Vec::new();
232-
233-
let mut stored_keys = KVStore::list(
234-
&*kv_store,
235-
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
236-
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
237-
)
238-
.await?;
239-
240-
const BATCH_SIZE: usize = 50;
237+
let keys = KVStore::list(&*kv_store, primary_namespace, secondary_namespace).await?;
241238

242239
let mut set = tokio::task::JoinSet::new();
243-
244-
// Fill JoinSet with tasks if possible
245-
while set.len() < BATCH_SIZE && !stored_keys.is_empty() {
246-
if let Some(next_key) = stored_keys.pop() {
247-
let fut = KVStore::read(
248-
&*kv_store,
249-
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
250-
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
251-
&next_key,
252-
);
253-
set.spawn(fut);
254-
debug_assert!(set.len() <= BATCH_SIZE);
255-
}
240+
for key in keys {
241+
set.spawn(KVStore::read(kv_store, primary_namespace, secondary_namespace, &key));
256242
}
257243

258-
while let Some(read_res) = set.join_next().await {
259-
// Exit early if we get an IO error.
260-
let reader = read_res
244+
let mut results = Vec::with_capacity(set.len());
245+
while let Some(res) = set.join_next().await {
246+
let bytes = res
261247
.map_err(|e| {
262-
log_error!(logger, "Failed to read PaymentDetails: {}", e);
263-
set.abort_all();
248+
log_error!(logger, "Failed to join read task: {}", e);
264249
e
265250
})?
266251
.map_err(|e| {
267-
log_error!(logger, "Failed to read PaymentDetails: {}", e);
268-
set.abort_all();
252+
log_error!(logger, "Failed to read object: {}", e);
269253
e
270254
})?;
271-
272-
// Refill set for every finished future, if we still have something to do.
273-
if let Some(next_key) = stored_keys.pop() {
274-
let fut = KVStore::read(
275-
&*kv_store,
276-
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
277-
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
278-
&next_key,
279-
);
280-
set.spawn(fut);
281-
debug_assert!(set.len() <= BATCH_SIZE);
282-
}
283-
284-
// Handle result.
285-
let payment = PaymentDetails::read(&mut &*reader).map_err(|e| {
286-
log_error!(logger, "Failed to deserialize PaymentDetails: {}", e);
255+
results.push(T::read(&mut &*bytes).map_err(|e| {
256+
log_error!(logger, "Failed to deserialize object: {}", e);
287257
std::io::Error::new(
288258
std::io::ErrorKind::InvalidData,
289-
"Failed to deserialize PaymentDetails",
259+
format!("Failed to deserialize: {}", e),
290260
)
291-
})?;
292-
res.push(payment);
261+
})?);
293262
}
263+
Ok(results)
264+
}
294265

295-
debug_assert!(set.is_empty());
296-
debug_assert!(stored_keys.is_empty());
297-
298-
Ok(res)
266+
/// Read previously persisted payments information from the store.
267+
pub(crate) async fn read_payments<L: Deref>(
268+
kv_store: &DynStore, logger: L,
269+
) -> Result<Vec<PaymentDetails>, std::io::Error>
270+
where
271+
L::Target: LdkLogger,
272+
{
273+
read_all_objects(
274+
kv_store,
275+
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
276+
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
277+
logger,
278+
)
279+
.await
299280
}
300281

301282
/// Read `OutputSweeper` state from the store.
@@ -632,74 +613,13 @@ pub(crate) async fn read_pending_payments<L: Deref>(
632613
where
633614
L::Target: LdkLogger,
634615
{
635-
let mut res = Vec::new();
636-
637-
let mut stored_keys = KVStore::list(
638-
&*kv_store,
616+
read_all_objects(
617+
kv_store,
639618
PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
640619
PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
620+
logger,
641621
)
642-
.await?;
643-
644-
const BATCH_SIZE: usize = 50;
645-
646-
let mut set = tokio::task::JoinSet::new();
647-
648-
// Fill JoinSet with tasks if possible
649-
while set.len() < BATCH_SIZE && !stored_keys.is_empty() {
650-
if let Some(next_key) = stored_keys.pop() {
651-
let fut = KVStore::read(
652-
&*kv_store,
653-
PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
654-
PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
655-
&next_key,
656-
);
657-
set.spawn(fut);
658-
debug_assert!(set.len() <= BATCH_SIZE);
659-
}
660-
}
661-
662-
while let Some(read_res) = set.join_next().await {
663-
// Exit early if we get an IO error.
664-
let reader = read_res
665-
.map_err(|e| {
666-
log_error!(logger, "Failed to read PendingPaymentDetails: {}", e);
667-
set.abort_all();
668-
e
669-
})?
670-
.map_err(|e| {
671-
log_error!(logger, "Failed to read PendingPaymentDetails: {}", e);
672-
set.abort_all();
673-
e
674-
})?;
675-
676-
// Refill set for every finished future, if we still have something to do.
677-
if let Some(next_key) = stored_keys.pop() {
678-
let fut = KVStore::read(
679-
&*kv_store,
680-
PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
681-
PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
682-
&next_key,
683-
);
684-
set.spawn(fut);
685-
debug_assert!(set.len() <= BATCH_SIZE);
686-
}
687-
688-
// Handle result.
689-
let pending_payment = PendingPaymentDetails::read(&mut &*reader).map_err(|e| {
690-
log_error!(logger, "Failed to deserialize PendingPaymentDetails: {}", e);
691-
std::io::Error::new(
692-
std::io::ErrorKind::InvalidData,
693-
"Failed to deserialize PendingPaymentDetails",
694-
)
695-
})?;
696-
res.push(pending_payment);
697-
}
698-
699-
debug_assert!(set.is_empty());
700-
debug_assert!(stored_keys.is_empty());
701-
702-
Ok(res)
622+
.await
703623
}
704624

705625
#[cfg(test)]

src/types.rs

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,113 @@ impl<T: SyncAndAsyncKVStore + Send + Sync> DynStoreTrait for DynStoreWrapper<T>
218218
}
219219
}
220220

221+
/// A [`KVStore`] wrapper that limits the number of concurrent async I/O operations using a
222+
/// semaphore. Sync methods pass through without throttling.
223+
///
224+
/// This is used during node initialization to cap the number of inflight reads across all
225+
/// parallel readers to a single configurable limit.
226+
pub(crate) struct BatchingStore {
227+
inner: Arc<DynStore>,
228+
semaphore: Arc<tokio::sync::Semaphore>,
229+
}
230+
231+
impl BatchingStore {
232+
pub(crate) fn new(inner: Arc<DynStore>, max_concurrent: usize) -> Self {
233+
Self { inner, semaphore: Arc::new(tokio::sync::Semaphore::new(max_concurrent)) }
234+
}
235+
}
236+
237+
impl KVStore for BatchingStore {
238+
fn read(
239+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
240+
) -> impl Future<Output = Result<Vec<u8>, bitcoin::io::Error>> + Send + 'static {
241+
let inner = Arc::clone(&self.inner);
242+
let semaphore = Arc::clone(&self.semaphore);
243+
let primary_namespace = primary_namespace.to_string();
244+
let secondary_namespace = secondary_namespace.to_string();
245+
let key = key.to_string();
246+
async move {
247+
let _permit = semaphore.acquire_owned().await.map_err(|e| {
248+
bitcoin::io::Error::new(bitcoin::io::ErrorKind::Other, format!("{}", e))
249+
})?;
250+
inner.read_async(&primary_namespace, &secondary_namespace, &key).await
251+
}
252+
}
253+
254+
fn write(
255+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
256+
) -> impl Future<Output = Result<(), bitcoin::io::Error>> + Send + 'static {
257+
let inner = Arc::clone(&self.inner);
258+
let semaphore = Arc::clone(&self.semaphore);
259+
let primary_namespace = primary_namespace.to_string();
260+
let secondary_namespace = secondary_namespace.to_string();
261+
let key = key.to_string();
262+
async move {
263+
let _permit = semaphore.acquire_owned().await.map_err(|e| {
264+
bitcoin::io::Error::new(bitcoin::io::ErrorKind::Other, format!("{}", e))
265+
})?;
266+
inner.write_async(&primary_namespace, &secondary_namespace, &key, buf).await
267+
}
268+
}
269+
270+
fn remove(
271+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
272+
) -> impl Future<Output = Result<(), bitcoin::io::Error>> + Send + 'static {
273+
let inner = Arc::clone(&self.inner);
274+
let semaphore = Arc::clone(&self.semaphore);
275+
let primary_namespace = primary_namespace.to_string();
276+
let secondary_namespace = secondary_namespace.to_string();
277+
let key = key.to_string();
278+
async move {
279+
let _permit = semaphore.acquire_owned().await.map_err(|e| {
280+
bitcoin::io::Error::new(bitcoin::io::ErrorKind::Other, format!("{}", e))
281+
})?;
282+
inner.remove_async(&primary_namespace, &secondary_namespace, &key, lazy).await
283+
}
284+
}
285+
286+
fn list(
287+
&self, primary_namespace: &str, secondary_namespace: &str,
288+
) -> impl Future<Output = Result<Vec<String>, bitcoin::io::Error>> + Send + 'static {
289+
let inner = Arc::clone(&self.inner);
290+
let semaphore = Arc::clone(&self.semaphore);
291+
let primary_namespace = primary_namespace.to_string();
292+
let secondary_namespace = secondary_namespace.to_string();
293+
async move {
294+
let _permit = semaphore.acquire_owned().await.map_err(|e| {
295+
bitcoin::io::Error::new(bitcoin::io::ErrorKind::Other, format!("{}", e))
296+
})?;
297+
inner.list_async(&primary_namespace, &secondary_namespace).await
298+
}
299+
}
300+
}
301+
302+
impl KVStoreSync for BatchingStore {
303+
fn read(
304+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
305+
) -> Result<Vec<u8>, bitcoin::io::Error> {
306+
DynStoreTrait::read(&*self.inner, primary_namespace, secondary_namespace, key)
307+
}
308+
309+
fn write(
310+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
311+
) -> Result<(), bitcoin::io::Error> {
312+
DynStoreTrait::write(&*self.inner, primary_namespace, secondary_namespace, key, buf)
313+
}
314+
315+
fn remove(
316+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
317+
) -> Result<(), bitcoin::io::Error> {
318+
DynStoreTrait::remove(&*self.inner, primary_namespace, secondary_namespace, key, lazy)
319+
}
320+
321+
fn list(
322+
&self, primary_namespace: &str, secondary_namespace: &str,
323+
) -> Result<Vec<String>, bitcoin::io::Error> {
324+
DynStoreTrait::list(&*self.inner, primary_namespace, secondary_namespace)
325+
}
326+
}
327+
221328
pub(crate) type AsyncPersister = MonitorUpdatingPersisterAsync<
222329
DynStoreRef,
223330
RuntimeSpawner,

0 commit comments

Comments
 (0)