Skip to content

Commit 71fdb1d

Browse files
committed
Parallelize store reads in init
Since I was editing the init logic anyway I couldn't resist going ahead and parallelizing various read calls. Since we added support for an async `KVStore` in LDK 0.2/ldk-node 0.7, we can now practically do initialization reads in parallel. Thus, rather than making a long series of read calls in `build`, we use `tokio::join` to reduce the number of round-trips to our backing store, which should be a very large win for initialization cost on those using remote storage (e.g. VSS). Sadly we can't trivially do all our reads in one go, we need the payment history to initialize the BDK wallet, which is used in the `Walet` object which is referenced in our `KeysManager`. Thus we first read the payment store and node metrics before moving on. Then, we need a reference to the `NetworkGraph` when we build the scorer. While we could/eventually should move to reading the *bytes* for the scorer while reading the graph and only building the scorer later, that's a larger refactor we leave for later. In the end, we end up with: * 1 round-trip to load the payment history and node metrics, * 2 round-trips to load ChannelMonitors and NetworkGraph (where there's an internal extra round-trip after listing the monitor updates for a monitor), * 1 round-trip to validate bitcoind RPC/REST access for those using bitcoind as a chain source, * 1 round-trip to load various smaller LDK and ldk-node objects, * and 1 additional round-trip to drop the rgs snapshot timestamp for nodes using P2P network gossip syncing for a total of 4 round-trips in the common case and 6 for nodes using less common chain source and gossip sync sources. We then have additional round-trips to our storage and chain source during node start, but those are in many cases already async.
1 parent 53bcbb1 commit 71fdb1d

File tree

1 file changed

+80
-55
lines changed

1 file changed

+80
-55
lines changed

src/builder.rs

Lines changed: 80 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use lightning::routing::scoring::{
3333
};
3434
use lightning::sign::{EntropySource, NodeSigner};
3535
use lightning::util::persist::{
36-
KVStoreSync, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
36+
KVStore, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
3737
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
3838
};
3939
use lightning::util::ser::ReadableArgs;
@@ -1052,10 +1052,20 @@ fn build_with_store_internal(
10521052
}
10531053
}
10541054

1055+
let tx_broadcaster = Arc::new(TransactionBroadcaster::new(Arc::clone(&logger)));
1056+
let fee_estimator = Arc::new(OnchainFeeEstimator::new());
1057+
1058+
let kv_store_ref = Arc::clone(&kv_store);
1059+
let logger_ref = Arc::clone(&logger);
1060+
let (payment_store_res, node_metris_res) = runtime.block_on(async move {
1061+
tokio::join!(
1062+
read_payments(&*kv_store_ref, Arc::clone(&logger_ref)),
1063+
read_node_metrics(&*kv_store_ref, Arc::clone(&logger_ref)),
1064+
)
1065+
});
1066+
10551067
// Initialize the status fields.
1056-
let node_metrics = match runtime
1057-
.block_on(async { read_node_metrics(&*kv_store, Arc::clone(&logger)).await })
1058-
{
1068+
let node_metrics = match node_metris_res {
10591069
Ok(metrics) => Arc::new(RwLock::new(metrics)),
10601070
Err(e) => {
10611071
if e.kind() == std::io::ErrorKind::NotFound {
@@ -1066,23 +1076,20 @@ fn build_with_store_internal(
10661076
}
10671077
},
10681078
};
1069-
let tx_broadcaster = Arc::new(TransactionBroadcaster::new(Arc::clone(&logger)));
1070-
let fee_estimator = Arc::new(OnchainFeeEstimator::new());
10711079

1072-
let payment_store =
1073-
match runtime.block_on(async { read_payments(&*kv_store, Arc::clone(&logger)).await }) {
1074-
Ok(payments) => Arc::new(PaymentStore::new(
1075-
payments,
1076-
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
1077-
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE.to_string(),
1078-
Arc::clone(&kv_store),
1079-
Arc::clone(&logger),
1080-
)),
1081-
Err(e) => {
1082-
log_error!(logger, "Failed to read payment data from store: {}", e);
1083-
return Err(BuildError::ReadFailed);
1084-
},
1085-
};
1080+
let payment_store = match payment_store_res {
1081+
Ok(payments) => Arc::new(PaymentStore::new(
1082+
payments,
1083+
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
1084+
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE.to_string(),
1085+
Arc::clone(&kv_store),
1086+
Arc::clone(&logger),
1087+
)),
1088+
Err(e) => {
1089+
log_error!(logger, "Failed to read payment data from store: {}", e);
1090+
return Err(BuildError::ReadFailed);
1091+
},
1092+
};
10861093

10871094
let (chain_source, chain_tip_opt) = match chain_data_source_config {
10881095
Some(ChainDataSourceConfig::Esplora { server_url, headers, sync_config }) => {
@@ -1273,10 +1280,18 @@ fn build_with_store_internal(
12731280
Arc::clone(&fee_estimator),
12741281
));
12751282

1283+
// Read ChannelMonitors and the NetworkGraph
1284+
let kv_store_ref = Arc::clone(&kv_store);
1285+
let logger_ref = Arc::clone(&logger);
1286+
let (monitor_read_res, network_graph_res) = runtime.block_on(async move {
1287+
tokio::join!(
1288+
monitor_reader.read_all_channel_monitors_with_updates_parallel(),
1289+
read_network_graph(&*kv_store_ref, logger_ref),
1290+
)
1291+
});
1292+
12761293
// Read ChannelMonitor state from store
1277-
let monitor_read_result =
1278-
runtime.block_on(monitor_reader.read_all_channel_monitors_with_updates_parallel());
1279-
let channel_monitors = match monitor_read_result {
1294+
let channel_monitors = match monitor_read_res {
12801295
Ok(monitors) => monitors,
12811296
Err(e) => {
12821297
if e.kind() == lightning::io::ErrorKind::NotFound {
@@ -1310,9 +1325,7 @@ fn build_with_store_internal(
13101325
));
13111326

13121327
// Initialize the network graph, scorer, and router
1313-
let network_graph = match runtime
1314-
.block_on(async { read_network_graph(&*kv_store, Arc::clone(&logger)).await })
1315-
{
1328+
let network_graph = match network_graph_res {
13161329
Ok(graph) => Arc::new(graph),
13171330
Err(e) => {
13181331
if e.kind() == std::io::ErrorKind::NotFound {
@@ -1324,9 +1337,42 @@ fn build_with_store_internal(
13241337
},
13251338
};
13261339

1327-
let local_scorer = match runtime.block_on(async {
1328-
read_scorer(&*kv_store, Arc::clone(&network_graph), Arc::clone(&logger)).await
1329-
}) {
1340+
// Read various smaller LDK and ldk-node objects from the store
1341+
let kv_store_ref = Arc::clone(&kv_store);
1342+
let logger_ref = Arc::clone(&logger);
1343+
let network_graph_ref = Arc::clone(&network_graph);
1344+
let output_sweeper_future = read_output_sweeper(
1345+
Arc::clone(&tx_broadcaster),
1346+
Arc::clone(&fee_estimator),
1347+
Arc::clone(&chain_source),
1348+
Arc::clone(&keys_manager),
1349+
Arc::clone(&kv_store_ref),
1350+
Arc::clone(&logger_ref),
1351+
);
1352+
let (
1353+
scorer_res,
1354+
external_scores_res,
1355+
channel_manager_bytes_res,
1356+
sweeper_bytes_res,
1357+
event_queue_res,
1358+
peer_info_res,
1359+
) = runtime.block_on(async move {
1360+
tokio::join!(
1361+
read_scorer(&*kv_store_ref, network_graph_ref, Arc::clone(&logger_ref)),
1362+
read_external_pathfinding_scores_from_cache(&*kv_store_ref, Arc::clone(&logger_ref)),
1363+
KVStore::read(
1364+
&*kv_store_ref,
1365+
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1366+
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
1367+
CHANNEL_MANAGER_PERSISTENCE_KEY,
1368+
),
1369+
output_sweeper_future,
1370+
read_event_queue(Arc::clone(&kv_store_ref), Arc::clone(&logger_ref)),
1371+
read_peer_info(Arc::clone(&kv_store_ref), Arc::clone(&logger_ref)),
1372+
)
1373+
});
1374+
1375+
let local_scorer = match scorer_res {
13301376
Ok(scorer) => scorer,
13311377
Err(e) => {
13321378
if e.kind() == std::io::ErrorKind::NotFound {
@@ -1342,9 +1388,7 @@ fn build_with_store_internal(
13421388
let scorer = Arc::new(Mutex::new(CombinedScorer::new(local_scorer)));
13431389

13441390
// Restore external pathfinding scores from cache if possible.
1345-
match runtime.block_on(async {
1346-
read_external_pathfinding_scores_from_cache(&*kv_store, Arc::clone(&logger)).await
1347-
}) {
1391+
match external_scores_res {
13481392
Ok(external_scores) => {
13491393
scorer.lock().unwrap().merge(external_scores, cur_time);
13501394
log_trace!(logger, "External scores from cache merged successfully");
@@ -1397,12 +1441,7 @@ fn build_with_store_internal(
13971441

13981442
// Initialize the ChannelManager
13991443
let channel_manager = {
1400-
if let Ok(reader) = KVStoreSync::read(
1401-
&*kv_store,
1402-
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1403-
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
1404-
CHANNEL_MANAGER_PERSISTENCE_KEY,
1405-
) {
1444+
if let Ok(reader) = channel_manager_bytes_res {
14061445
let channel_monitor_references =
14071446
channel_monitors.iter().map(|(_, chanmon)| chanmon).collect();
14081447
let read_args = ChannelManagerReadArgs::new(
@@ -1627,17 +1666,7 @@ fn build_with_store_internal(
16271666
let connection_manager =
16281667
Arc::new(ConnectionManager::new(Arc::clone(&peer_manager), Arc::clone(&logger)));
16291668

1630-
let output_sweeper = match runtime.block_on(async {
1631-
read_output_sweeper(
1632-
Arc::clone(&tx_broadcaster),
1633-
Arc::clone(&fee_estimator),
1634-
Arc::clone(&chain_source),
1635-
Arc::clone(&keys_manager),
1636-
Arc::clone(&kv_store),
1637-
Arc::clone(&logger),
1638-
)
1639-
.await
1640-
}) {
1669+
let output_sweeper = match sweeper_bytes_res {
16411670
Ok(output_sweeper) => Arc::new(output_sweeper),
16421671
Err(e) => {
16431672
if e.kind() == std::io::ErrorKind::NotFound {
@@ -1658,9 +1687,7 @@ fn build_with_store_internal(
16581687
},
16591688
};
16601689

1661-
let event_queue = match runtime
1662-
.block_on(async { read_event_queue(Arc::clone(&kv_store), Arc::clone(&logger)).await })
1663-
{
1690+
let event_queue = match event_queue_res {
16641691
Ok(event_queue) => Arc::new(event_queue),
16651692
Err(e) => {
16661693
if e.kind() == std::io::ErrorKind::NotFound {
@@ -1672,9 +1699,7 @@ fn build_with_store_internal(
16721699
},
16731700
};
16741701

1675-
let peer_store = match runtime
1676-
.block_on(async { read_peer_info(Arc::clone(&kv_store), Arc::clone(&logger)).await })
1677-
{
1702+
let peer_store = match peer_info_res {
16781703
Ok(peer_store) => Arc::new(peer_store),
16791704
Err(e) => {
16801705
if e.kind() == std::io::ErrorKind::NotFound {

0 commit comments

Comments
 (0)