Skip to content

Commit 313b10d

Browse files
committed
Fetch blocks from source in parallel during initial sync
In `init::synchronize_listeners` we may end up spending a decent chunk of our time just fetching block data. Here we parallelize that step across up to 36 blocks at a time. On my node with bitcoind on localhost, the impact of this is somewhat muted by block deserialization being the bulk of the work, however a networked bitcoind would likely change that. Even still, fetching a batch of 36 blocks in parallel happens on my node in ~615 ms vs ~815ms in serial.
1 parent 3ccf563 commit 313b10d

File tree

1 file changed

+46
-39
lines changed

1 file changed

+46
-39
lines changed

lightning-block-sync/src/init.rs

Lines changed: 46 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
//! Utilities to assist in the initial sync required to initialize or reload Rust-Lightning objects
22
//! from disk.
33
4-
use crate::poll::{ChainPoller, Validate, ValidatedBlockHeader};
5-
use crate::{BlockSource, BlockSourceResult, ChainNotifier, HeaderCache};
4+
use crate::async_poll::{MultiResultFuturePoller, ResultFuture};
5+
use crate::poll::{ChainPoller, Poll, Validate, ValidatedBlockHeader};
6+
use crate::{BlockData, BlockSource, BlockSourceResult, ChainNotifier, HeaderCache};
67

78
use bitcoin::block::Header;
89
use bitcoin::network::Network;
@@ -150,7 +151,6 @@ where
150151
// Find differences and disconnect blocks for each listener individually.
151152
let mut chain_poller = ChainPoller::new(block_source, network);
152153
let mut chain_listeners_at_height = Vec::new();
153-
let mut most_common_ancestor = None;
154154
let mut most_connected_blocks = Vec::new();
155155
let mut header_cache = HeaderCache::new();
156156
for (old_best_block, chain_listener) in chain_listeners.drain(..) {
@@ -169,19 +169,53 @@ where
169169
// Keep track of the most common ancestor and all blocks connected across all listeners.
170170
chain_listeners_at_height.push((common_ancestor.height, chain_listener));
171171
if connected_blocks.len() > most_connected_blocks.len() {
172-
most_common_ancestor = Some(common_ancestor);
173172
most_connected_blocks = connected_blocks;
174173
}
175174
}
176175

177-
// Connect new blocks for all listeners at once to avoid re-fetching blocks.
178-
if let Some(common_ancestor) = most_common_ancestor {
179-
let chain_listener = &ChainListenerSet(chain_listeners_at_height);
180-
let mut chain_notifier = ChainNotifier { header_cache: &mut header_cache, chain_listener };
181-
chain_notifier
182-
.connect_blocks(common_ancestor, most_connected_blocks, &mut chain_poller)
183-
.await
184-
.map_err(|(e, _)| e)?;
176+
while !most_connected_blocks.is_empty() {
177+
#[cfg(not(test))]
178+
const MAX_BLOCKS_AT_ONCE: usize = 6 * 6; // Six hours of blocks, 144MiB encoded
179+
#[cfg(test)]
180+
const MAX_BLOCKS_AT_ONCE: usize = 2;
181+
182+
let mut fetch_block_futures =
183+
Vec::with_capacity(core::cmp::min(MAX_BLOCKS_AT_ONCE, most_connected_blocks.len()));
184+
for header in most_connected_blocks.iter().rev().take(MAX_BLOCKS_AT_ONCE) {
185+
let fetch_future = chain_poller.fetch_block(header);
186+
fetch_block_futures.push(ResultFuture::Pending(Box::pin(async move {
187+
(header, fetch_future.await)
188+
})));
189+
}
190+
let results = MultiResultFuturePoller::new(fetch_block_futures).await.into_iter();
191+
192+
let mut fetched_blocks = [const { None }; MAX_BLOCKS_AT_ONCE];
193+
for ((header, block_res), result) in results.into_iter().zip(fetched_blocks.iter_mut()) {
194+
*result = Some((header.height, block_res?));
195+
}
196+
debug_assert!(fetched_blocks.iter().take(most_connected_blocks.len()).all(|r| r.is_some()));
197+
debug_assert!(fetched_blocks.is_sorted_by_key(|r| r.as_ref().map(|(height, _)| *height).unwrap_or(u32::MAX)));
198+
199+
for (listener_height, listener) in chain_listeners_at_height.iter() {
200+
// Connect blocks for this listener.
201+
for result in fetched_blocks.iter() {
202+
if let Some((height, block_data)) = result {
203+
if *height > *listener_height {
204+
match &**block_data {
205+
BlockData::FullBlock(block) => {
206+
listener.block_connected(&block, *height);
207+
},
208+
BlockData::HeaderOnly(header_data) => {
209+
listener.filtered_block_connected(&header_data, &[], *height);
210+
},
211+
}
212+
}
213+
}
214+
}
215+
}
216+
217+
most_connected_blocks
218+
.truncate(most_connected_blocks.len().saturating_sub(MAX_BLOCKS_AT_ONCE));
185219
}
186220

187221
Ok((header_cache, best_header))
@@ -202,33 +236,6 @@ impl<'a, L: chain::Listen + ?Sized> chain::Listen for DynamicChainListener<'a, L
202236
}
203237
}
204238

205-
/// A set of dynamically sized chain listeners, each paired with a starting block height.
206-
struct ChainListenerSet<'a, L: chain::Listen + ?Sized>(Vec<(u32, &'a L)>);
207-
208-
impl<'a, L: chain::Listen + ?Sized> chain::Listen for ChainListenerSet<'a, L> {
209-
fn block_connected(&self, block: &bitcoin::Block, height: u32) {
210-
for (starting_height, chain_listener) in self.0.iter() {
211-
if height > *starting_height {
212-
chain_listener.block_connected(block, height);
213-
}
214-
}
215-
}
216-
217-
fn filtered_block_connected(
218-
&self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32,
219-
) {
220-
for (starting_height, chain_listener) in self.0.iter() {
221-
if height > *starting_height {
222-
chain_listener.filtered_block_connected(header, txdata, height);
223-
}
224-
}
225-
}
226-
227-
fn blocks_disconnected(&self, _fork_point: BestBlock) {
228-
unreachable!()
229-
}
230-
}
231-
232239
#[cfg(test)]
233240
mod tests {
234241
use super::*;

0 commit comments

Comments
 (0)