Skip to content

Commit a6576fa

Browse files
author
Fernando Ledesma
committed
more parallelization of vss operations
1 parent b51e5de commit a6576fa

1 file changed

Lines changed: 50 additions & 50 deletions

File tree

src/builder.rs

Lines changed: 50 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1255,21 +1255,40 @@ fn build_with_store_internal(
12551255
}
12561256
}
12571257

1258-
// Initialize the status fields.
1259-
// PARALLEL GROUP 1: read_node_metrics + read_payments
1260-
let step_start = Instant::now();
1261-
let (node_metrics_result, payments_result) = runtime.block_on(async {
1262-
let kv_store_1 = Arc::clone(&kv_store);
1263-
let logger_1 = Arc::clone(&logger);
1264-
let kv_store_2 = Arc::clone(&kv_store);
1265-
let logger_2 = Arc::clone(&logger);
1266-
1267-
tokio::join!(
1268-
read_node_metrics(kv_store_1, logger_1),
1269-
read_payments(kv_store_2, logger_2)
1270-
)
1258+
// PHASE 1: Spawn independent reads (network_graph, event_queue, peer_info) on worker
1259+
// threads AND run required reads (metrics, payments) concurrently. The spawned reads
1260+
// continue running in the background during the sync wallet chain below.
1261+
let step_start_phase1 = Instant::now();
1262+
let (node_metrics_result, payments_result, independent_reads_handle) = runtime.block_on(async {
1263+
// Spawn independent reads on worker threads — they have NO dependency on wallet/chain_source
1264+
// and will keep running after this block_on returns.
1265+
let kv_store_ng = Arc::clone(&kv_store);
1266+
let logger_ng = Arc::clone(&logger);
1267+
let kv_store_eq = Arc::clone(&kv_store);
1268+
let logger_eq = Arc::clone(&logger);
1269+
let kv_store_pi = Arc::clone(&kv_store);
1270+
let logger_pi = Arc::clone(&logger);
1271+
let independent_handle = tokio::spawn(async move {
1272+
tokio::join!(
1273+
read_network_graph(kv_store_ng, logger_ng),
1274+
read_event_queue(kv_store_eq, logger_eq),
1275+
read_peer_info(kv_store_pi, logger_pi),
1276+
)
1277+
});
1278+
1279+
// Run metrics + payments on current thread (wallet chain depends on these)
1280+
let kv_store_nm = Arc::clone(&kv_store);
1281+
let logger_nm = Arc::clone(&logger);
1282+
let kv_store_py = Arc::clone(&kv_store);
1283+
let logger_py = Arc::clone(&logger);
1284+
let (nm, py) = tokio::join!(
1285+
read_node_metrics(kv_store_nm, logger_nm),
1286+
read_payments(kv_store_py, logger_py),
1287+
);
1288+
1289+
(nm, py, independent_handle)
12711290
});
1272-
eprintln!("TIMING: [ldk-node] PARALLEL read_node_metrics + read_payments took {}ms", step_start.elapsed().as_millis());
1291+
eprintln!("TIMING: [ldk-node] PHASE 1 (metrics + payments, spawned ng+eq+pi) took {}ms", step_start_phase1.elapsed().as_millis());
12731292

12741293
let node_metrics = match node_metrics_result {
12751294
Ok(metrics) => Arc::new(RwLock::new(metrics)),
@@ -1518,20 +1537,13 @@ fn build_with_store_internal(
15181537
peer_storage_key,
15191538
));
15201539

1521-
// PARALLEL GROUP 2: read_network_graph + read_event_queue
1522-
let step_start = Instant::now();
1523-
let (network_graph_result, event_queue_result) = runtime.block_on(async {
1524-
let kv_store_1 = Arc::clone(&kv_store);
1525-
let logger_1 = Arc::clone(&logger);
1526-
let kv_store_2 = Arc::clone(&kv_store);
1527-
let logger_2 = Arc::clone(&logger);
1528-
1529-
tokio::join!(
1530-
read_network_graph(kv_store_1, logger_1),
1531-
read_event_queue(kv_store_2, logger_2)
1532-
)
1540+
// PHASE 2: Await independent reads that were spawned in PHASE 1.
1541+
// These ran on worker threads during the sync wallet→keys→monitors chain above.
1542+
let step_start_phase2 = Instant::now();
1543+
let (network_graph_result, event_queue_result, peer_store_result) = runtime.block_on(async {
1544+
independent_reads_handle.await.expect("independent reads task panicked")
15331545
});
1534-
eprintln!("TIMING: [ldk-node] PARALLEL read_network_graph + read_event_queue took {}ms", step_start.elapsed().as_millis());
1546+
eprintln!("TIMING: [ldk-node] PHASE 2 await independent reads took {}ms (ran in parallel with wallet chain)", step_start_phase2.elapsed().as_millis());
15351547

15361548
let network_graph = match network_graph_result {
15371549
Ok(graph) => Arc::new(graph),
@@ -1900,31 +1912,19 @@ fn build_with_store_internal(
19001912
let connection_manager =
19011913
Arc::new(ConnectionManager::new(Arc::clone(&peer_manager), Arc::clone(&logger)));
19021914

1903-
// PARALLEL GROUP 3: read_output_sweeper + read_peer_info
1915+
// read_output_sweeper (peer_info was already read in the spawned group above)
19041916
let step_start = Instant::now();
1905-
let (output_sweeper_result, peer_store_result) = runtime.block_on(async {
1906-
let tx_broadcaster_clone = Arc::clone(&tx_broadcaster);
1907-
let fee_estimator_clone = Arc::clone(&fee_estimator);
1908-
let chain_source_clone = Arc::clone(&chain_source);
1909-
let keys_manager_clone = Arc::clone(&keys_manager);
1910-
let kv_store_1 = Arc::clone(&kv_store);
1911-
let logger_1 = Arc::clone(&logger);
1912-
let kv_store_2 = Arc::clone(&kv_store);
1913-
let logger_2 = Arc::clone(&logger);
1914-
1915-
tokio::join!(
1916-
read_output_sweeper(
1917-
tx_broadcaster_clone,
1918-
fee_estimator_clone,
1919-
chain_source_clone,
1920-
keys_manager_clone,
1921-
kv_store_1,
1922-
logger_1,
1923-
),
1924-
read_peer_info(kv_store_2, logger_2)
1925-
)
1917+
let output_sweeper_result = runtime.block_on(async {
1918+
read_output_sweeper(
1919+
Arc::clone(&tx_broadcaster),
1920+
Arc::clone(&fee_estimator),
1921+
Arc::clone(&chain_source),
1922+
Arc::clone(&keys_manager),
1923+
Arc::clone(&kv_store),
1924+
Arc::clone(&logger),
1925+
).await
19261926
});
1927-
eprintln!("TIMING: [ldk-node] PARALLEL read_output_sweeper + read_peer_info took {}ms", step_start.elapsed().as_millis());
1927+
eprintln!("TIMING: [ldk-node] read_output_sweeper took {}ms", step_start.elapsed().as_millis());
19281928

19291929
let output_sweeper = match output_sweeper_result {
19301930
Ok(output_sweeper) => Arc::new(output_sweeper),

0 commit comments

Comments
 (0)