Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 21 additions & 13 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,17 @@ use crate::peer_store::PeerStore;
use crate::runtime::{Runtime, RuntimeSpawner};
use crate::tx_broadcaster::TransactionBroadcaster;
use crate::types::{
AsyncPersister, ChainMonitor, ChannelManager, DynStore, DynStoreRef, DynStoreWrapper,
GossipSync, Graph, KeysManager, MessageRouter, OnionMessenger, PaymentStore, PeerManager,
PendingPaymentStore, SyncAndAsyncKVStore,
AsyncPersister, BatchingStore, ChainMonitor, ChannelManager, DynStore, DynStoreRef,
DynStoreWrapper, GossipSync, Graph, KeysManager, MessageRouter, OnionMessenger, PaymentStore,
PeerManager, PendingPaymentStore, SyncAndAsyncKVStore,
};
use crate::wallet::persist::KVStoreWalletPersister;
use crate::wallet::Wallet;
use crate::{Node, NodeMetrics};

const LSPS_HARDENED_CHILD_INDEX: u32 = 577;
const PERSISTER_MAX_PENDING_UPDATES: u64 = 100;
const STORE_READ_BATCH_SIZE: usize = 50;

#[derive(Debug, Clone)]
enum ChainDataSourceConfig {
Expand Down Expand Up @@ -1265,14 +1266,18 @@ fn build_with_store_internal(
let tx_broadcaster = Arc::new(TransactionBroadcaster::new(Arc::clone(&logger)));
let fee_estimator = Arc::new(OnchainFeeEstimator::new());

let kv_store_ref = Arc::clone(&kv_store);
// Wrap the store with concurrency limiting for parallel initialization reads.
let batch_store: Arc<DynStore> =
Arc::new(DynStoreWrapper(BatchingStore::new(Arc::clone(&kv_store), STORE_READ_BATCH_SIZE)));

let batch_store_ref = Arc::clone(&batch_store);
let logger_ref = Arc::clone(&logger);
let (payment_store_res, node_metris_res, pending_payment_store_res) =
runtime.block_on(async move {
tokio::join!(
read_payments(&*kv_store_ref, Arc::clone(&logger_ref)),
read_node_metrics(&*kv_store_ref, Arc::clone(&logger_ref)),
read_pending_payments(&*kv_store_ref, Arc::clone(&logger_ref))
read_payments(&*batch_store_ref, Arc::clone(&logger_ref)),
read_node_metrics(&*batch_store_ref, Arc::clone(&logger_ref)),
read_pending_payments(&*batch_store_ref, Arc::clone(&logger_ref))
)
});

Expand Down Expand Up @@ -1515,12 +1520,12 @@ fn build_with_store_internal(
));

// Read ChannelMonitors and the NetworkGraph
let kv_store_ref = Arc::clone(&kv_store);
let batch_store_ref = Arc::clone(&batch_store);
let logger_ref = Arc::clone(&logger);
let (monitor_read_res, network_graph_res) = runtime.block_on(async {
tokio::join!(
monitor_reader.read_all_channel_monitors_with_updates_parallel(),
read_network_graph(&*kv_store_ref, logger_ref),
read_network_graph(&*batch_store_ref, logger_ref),
)
});

Expand Down Expand Up @@ -1566,7 +1571,10 @@ fn build_with_store_internal(
},
};

// Read various smaller LDK and ldk-node objects from the store
// Read various smaller LDK and ldk-node objects from the store.
// Functions that take &DynStore (borrow-only) use batch_store for throttled reads.
// Functions that take Arc<DynStore> (persist into runtime objects) use the original kv_store.
let batch_store_ref = Arc::clone(&batch_store);
let kv_store_ref = Arc::clone(&kv_store);
let logger_ref = Arc::clone(&logger);
let network_graph_ref = Arc::clone(&network_graph);
Expand All @@ -1587,10 +1595,10 @@ fn build_with_store_internal(
peer_info_res,
) = runtime.block_on(async move {
tokio::join!(
read_scorer(&*kv_store_ref, network_graph_ref, Arc::clone(&logger_ref)),
read_external_pathfinding_scores_from_cache(&*kv_store_ref, Arc::clone(&logger_ref)),
read_scorer(&*batch_store_ref, network_graph_ref, Arc::clone(&logger_ref)),
read_external_pathfinding_scores_from_cache(&*batch_store_ref, Arc::clone(&logger_ref)),
KVStore::read(
&*kv_store_ref,
&*batch_store_ref,
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_KEY,
Expand Down
164 changes: 42 additions & 122 deletions src/io/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,81 +221,62 @@ where
})
}

/// Read previously persisted payments information from the store.
pub(crate) async fn read_payments<L: Deref>(
kv_store: &DynStore, logger: L,
) -> Result<Vec<PaymentDetails>, std::io::Error>
/// Read all objects of type `T` from the given namespace, spawning reads in parallel.
///
/// Concurrency is expected to be limited externally (e.g., via [`BatchingStore`]).
///
/// [`BatchingStore`]: crate::types::BatchingStore
pub(crate) async fn read_all_objects<T, L>(
kv_store: &DynStore, primary_namespace: &str, secondary_namespace: &str, logger: L,
) -> Result<Vec<T>, std::io::Error>
where
T: Readable,
L: Deref,
L::Target: LdkLogger,
{
let mut res = Vec::new();

let mut stored_keys = KVStore::list(
&*kv_store,
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
)
.await?;

const BATCH_SIZE: usize = 50;
let keys = KVStore::list(&*kv_store, primary_namespace, secondary_namespace).await?;

let mut set = tokio::task::JoinSet::new();

// Fill JoinSet with tasks if possible
while set.len() < BATCH_SIZE && !stored_keys.is_empty() {
if let Some(next_key) = stored_keys.pop() {
let fut = KVStore::read(
&*kv_store,
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
&next_key,
);
set.spawn(fut);
debug_assert!(set.len() <= BATCH_SIZE);
}
for key in keys {
set.spawn(KVStore::read(kv_store, primary_namespace, secondary_namespace, &key));
}

while let Some(read_res) = set.join_next().await {
// Exit early if we get an IO error.
let reader = read_res
let mut results = Vec::with_capacity(set.len());
while let Some(res) = set.join_next().await {
let bytes = res
.map_err(|e| {
log_error!(logger, "Failed to read PaymentDetails: {}", e);
set.abort_all();
log_error!(logger, "Failed to join read task: {}", e);
e
})?
.map_err(|e| {
log_error!(logger, "Failed to read PaymentDetails: {}", e);
set.abort_all();
log_error!(logger, "Failed to read object: {}", e);
e
})?;

// Refill set for every finished future, if we still have something to do.
if let Some(next_key) = stored_keys.pop() {
let fut = KVStore::read(
&*kv_store,
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
&next_key,
);
set.spawn(fut);
debug_assert!(set.len() <= BATCH_SIZE);
}

// Handle result.
let payment = PaymentDetails::read(&mut &*reader).map_err(|e| {
log_error!(logger, "Failed to deserialize PaymentDetails: {}", e);
results.push(T::read(&mut &*bytes).map_err(|e| {
log_error!(logger, "Failed to deserialize object: {}", e);
std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Failed to deserialize PaymentDetails",
format!("Failed to deserialize: {}", e),
)
})?;
res.push(payment);
})?);
}
Ok(results)
}

debug_assert!(set.is_empty());
debug_assert!(stored_keys.is_empty());

Ok(res)
/// Read previously persisted payments information from the store.
pub(crate) async fn read_payments<L: Deref>(
kv_store: &DynStore, logger: L,
) -> Result<Vec<PaymentDetails>, std::io::Error>
where
L::Target: LdkLogger,
{
read_all_objects(
kv_store,
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
logger,
)
.await
}

/// Read `OutputSweeper` state from the store.
Expand Down Expand Up @@ -632,74 +613,13 @@ pub(crate) async fn read_pending_payments<L: Deref>(
where
L::Target: LdkLogger,
{
let mut res = Vec::new();

let mut stored_keys = KVStore::list(
&*kv_store,
read_all_objects(
kv_store,
PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
logger,
)
.await?;

const BATCH_SIZE: usize = 50;

let mut set = tokio::task::JoinSet::new();

// Fill JoinSet with tasks if possible
while set.len() < BATCH_SIZE && !stored_keys.is_empty() {
if let Some(next_key) = stored_keys.pop() {
let fut = KVStore::read(
&*kv_store,
PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
&next_key,
);
set.spawn(fut);
debug_assert!(set.len() <= BATCH_SIZE);
}
}

while let Some(read_res) = set.join_next().await {
// Exit early if we get an IO error.
let reader = read_res
.map_err(|e| {
log_error!(logger, "Failed to read PendingPaymentDetails: {}", e);
set.abort_all();
e
})?
.map_err(|e| {
log_error!(logger, "Failed to read PendingPaymentDetails: {}", e);
set.abort_all();
e
})?;

// Refill set for every finished future, if we still have something to do.
if let Some(next_key) = stored_keys.pop() {
let fut = KVStore::read(
&*kv_store,
PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
&next_key,
);
set.spawn(fut);
debug_assert!(set.len() <= BATCH_SIZE);
}

// Handle result.
let pending_payment = PendingPaymentDetails::read(&mut &*reader).map_err(|e| {
log_error!(logger, "Failed to deserialize PendingPaymentDetails: {}", e);
std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Failed to deserialize PendingPaymentDetails",
)
})?;
res.push(pending_payment);
}

debug_assert!(set.is_empty());
debug_assert!(stored_keys.is_empty());

Ok(res)
.await
}

#[cfg(test)]
Expand Down
107 changes: 107 additions & 0 deletions src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,113 @@ impl<T: SyncAndAsyncKVStore + Send + Sync> DynStoreTrait for DynStoreWrapper<T>
}
}

/// A [`KVStore`] wrapper that limits the number of concurrent async I/O operations using a
/// semaphore. Sync methods pass through without throttling.
///
/// This is used during node initialization to cap the number of inflight reads across all
/// parallel readers to a single configurable limit.
pub(crate) struct BatchingStore {
inner: Arc<DynStore>,
semaphore: Arc<tokio::sync::Semaphore>,
}

impl BatchingStore {
pub(crate) fn new(inner: Arc<DynStore>, max_concurrent: usize) -> Self {
Self { inner, semaphore: Arc::new(tokio::sync::Semaphore::new(max_concurrent)) }
}
}

impl KVStore for BatchingStore {
fn read(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
) -> impl Future<Output = Result<Vec<u8>, bitcoin::io::Error>> + Send + 'static {
let inner = Arc::clone(&self.inner);
let semaphore = Arc::clone(&self.semaphore);
let primary_namespace = primary_namespace.to_string();
let secondary_namespace = secondary_namespace.to_string();
let key = key.to_string();
async move {
let _permit = semaphore.acquire_owned().await.map_err(|e| {
bitcoin::io::Error::new(bitcoin::io::ErrorKind::Other, format!("{}", e))
})?;
inner.read_async(&primary_namespace, &secondary_namespace, &key).await
}
}

fn write(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
) -> impl Future<Output = Result<(), bitcoin::io::Error>> + Send + 'static {
let inner = Arc::clone(&self.inner);
let semaphore = Arc::clone(&self.semaphore);
let primary_namespace = primary_namespace.to_string();
let secondary_namespace = secondary_namespace.to_string();
let key = key.to_string();
async move {
let _permit = semaphore.acquire_owned().await.map_err(|e| {
bitcoin::io::Error::new(bitcoin::io::ErrorKind::Other, format!("{}", e))
})?;
inner.write_async(&primary_namespace, &secondary_namespace, &key, buf).await
}
}

fn remove(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
) -> impl Future<Output = Result<(), bitcoin::io::Error>> + Send + 'static {
let inner = Arc::clone(&self.inner);
let semaphore = Arc::clone(&self.semaphore);
let primary_namespace = primary_namespace.to_string();
let secondary_namespace = secondary_namespace.to_string();
let key = key.to_string();
async move {
let _permit = semaphore.acquire_owned().await.map_err(|e| {
bitcoin::io::Error::new(bitcoin::io::ErrorKind::Other, format!("{}", e))
})?;
inner.remove_async(&primary_namespace, &secondary_namespace, &key, lazy).await
}
}

fn list(
&self, primary_namespace: &str, secondary_namespace: &str,
) -> impl Future<Output = Result<Vec<String>, bitcoin::io::Error>> + Send + 'static {
let inner = Arc::clone(&self.inner);
let semaphore = Arc::clone(&self.semaphore);
let primary_namespace = primary_namespace.to_string();
let secondary_namespace = secondary_namespace.to_string();
async move {
let _permit = semaphore.acquire_owned().await.map_err(|e| {
bitcoin::io::Error::new(bitcoin::io::ErrorKind::Other, format!("{}", e))
})?;
inner.list_async(&primary_namespace, &secondary_namespace).await
}
}
}

impl KVStoreSync for BatchingStore {
fn read(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
) -> Result<Vec<u8>, bitcoin::io::Error> {
DynStoreTrait::read(&*self.inner, primary_namespace, secondary_namespace, key)
}

fn write(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
) -> Result<(), bitcoin::io::Error> {
DynStoreTrait::write(&*self.inner, primary_namespace, secondary_namespace, key, buf)
}

fn remove(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
) -> Result<(), bitcoin::io::Error> {
DynStoreTrait::remove(&*self.inner, primary_namespace, secondary_namespace, key, lazy)
}

fn list(
&self, primary_namespace: &str, secondary_namespace: &str,
) -> Result<Vec<String>, bitcoin::io::Error> {
DynStoreTrait::list(&*self.inner, primary_namespace, secondary_namespace)
}
}

pub(crate) type AsyncPersister = MonitorUpdatingPersisterAsync<
DynStoreRef,
RuntimeSpawner,
Expand Down
Loading