Skip to content

Commit 5438877

Browse files
committed
Fetch blocks from source in parallel during initial sync
In `init::synchronize_listeners` we may end up spending a decent chunk of our time just fetching block data. Here we parallelize that step across up to 36 blocks at a time. On my node with bitcoind on localhost, the impact of this is somewhat muted by block deserialization being the bulk of the work, however a networked bitcoind would likely change that. Even still, fetching a batch of 36 blocks in parallel happens on my node in ~615 ms vs ~815ms in serial.
1 parent 4e47b89 commit 5438877

1 file changed

Lines changed: 52 additions & 39 deletions

File tree

lightning-block-sync/src/init.rs

Lines changed: 52 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
//! Utilities to assist in the initial sync required to initialize or reload Rust-Lightning objects
22
//! from disk.
33
4-
use crate::poll::{ChainPoller, Validate, ValidatedBlockHeader};
5-
use crate::{BlockSource, BlockSourceResult, ChainNotifier, HeaderCache};
4+
use crate::async_poll::{MultiResultFuturePoller, ResultFuture};
5+
use crate::poll::{ChainPoller, Poll, Validate, ValidatedBlockHeader};
6+
use crate::{BlockData, BlockSource, BlockSourceResult, ChainNotifier, HeaderCache};
67

78
use bitcoin::block::Header;
89
use bitcoin::network::Network;
@@ -149,7 +150,6 @@ where
149150
// Find differences and disconnect blocks for each listener individually.
150151
let mut chain_poller = ChainPoller::new(block_source, network);
151152
let mut chain_listeners_at_height = Vec::new();
152-
let mut most_common_ancestor = None;
153153
let mut most_connected_blocks = Vec::new();
154154
let mut header_cache = HeaderCache::new();
155155
for (old_best_block, chain_listener) in chain_listeners.drain(..) {
@@ -170,19 +170,59 @@ where
170170
// Keep track of the most common ancestor and all blocks connected across all listeners.
171171
chain_listeners_at_height.push((common_ancestor.height, chain_listener));
172172
if connected_blocks.len() > most_connected_blocks.len() {
173-
most_common_ancestor = Some(common_ancestor);
174173
most_connected_blocks = connected_blocks;
175174
}
176175
}
177176

178-
// Connect new blocks for all listeners at once to avoid re-fetching blocks.
179-
if let Some(common_ancestor) = most_common_ancestor {
180-
let chain_listener = &ChainListenerSet(chain_listeners_at_height);
181-
let mut chain_notifier = ChainNotifier { header_cache: &mut header_cache, chain_listener };
182-
chain_notifier
183-
.connect_blocks(common_ancestor, most_connected_blocks, &mut chain_poller)
184-
.await
185-
.map_err(|(e, _)| e)?;
177+
while !most_connected_blocks.is_empty() {
178+
#[cfg(not(test))]
179+
const MAX_BLOCKS_AT_ONCE: usize = 6 * 6; // Six hours of blocks, 144MiB encoded
180+
#[cfg(test)]
181+
const MAX_BLOCKS_AT_ONCE: usize = 2;
182+
183+
let mut fetch_block_futures =
184+
Vec::with_capacity(core::cmp::min(MAX_BLOCKS_AT_ONCE, most_connected_blocks.len()));
185+
for header in most_connected_blocks.iter().rev().take(MAX_BLOCKS_AT_ONCE) {
186+
let fetch_future = chain_poller.fetch_block(header);
187+
fetch_block_futures
188+
.push(ResultFuture::Pending(Box::pin(async move { (header, fetch_future.await) })));
189+
}
190+
let results = MultiResultFuturePoller::new(fetch_block_futures).await.into_iter();
191+
192+
const NO_BLOCK: Option<(u32, crate::poll::ValidatedBlock)> = None;
193+
let mut fetched_blocks = [NO_BLOCK; MAX_BLOCKS_AT_ONCE];
194+
for ((header, block_res), result) in results.into_iter().zip(fetched_blocks.iter_mut()) {
195+
*result = Some((header.height, block_res?));
196+
}
197+
debug_assert!(fetched_blocks.iter().take(most_connected_blocks.len()).all(|r| r.is_some()));
198+
// TODO: When our MSRV is 1.82, use is_sorted_by_key
199+
debug_assert!(fetched_blocks.windows(2).all(|blocks| {
200+
if let (Some(a), Some(b)) = (&blocks[0], &blocks[1]) {
201+
a.0 < b.0
202+
} else {
203+
// Any non-None blocks have to come before any None entries
204+
blocks[1].is_none()
205+
}
206+
}));
207+
208+
for (listener_height, listener) in chain_listeners_at_height.iter() {
209+
// Connect blocks for this listener.
210+
for (height, block_data) in fetched_blocks.iter().flatten() {
211+
if *height > *listener_height {
212+
match &**block_data {
213+
BlockData::FullBlock(block) => {
214+
listener.block_connected(&block, *height);
215+
},
216+
BlockData::HeaderOnly(header_data) => {
217+
listener.filtered_block_connected(&header_data, &[], *height);
218+
},
219+
}
220+
}
221+
}
222+
}
223+
224+
most_connected_blocks
225+
.truncate(most_connected_blocks.len().saturating_sub(MAX_BLOCKS_AT_ONCE));
186226
}
187227

188228
Ok((header_cache, best_header))
@@ -203,33 +243,6 @@ impl<'a, L: chain::Listen + ?Sized> chain::Listen for DynamicChainListener<'a, L
203243
}
204244
}
205245

206-
/// A set of dynamically sized chain listeners, each paired with a starting block height.
207-
struct ChainListenerSet<'a, L: chain::Listen + ?Sized>(Vec<(u32, &'a L)>);
208-
209-
impl<'a, L: chain::Listen + ?Sized> chain::Listen for ChainListenerSet<'a, L> {
210-
fn block_connected(&self, block: &bitcoin::Block, height: u32) {
211-
for (starting_height, chain_listener) in self.0.iter() {
212-
if height > *starting_height {
213-
chain_listener.block_connected(block, height);
214-
}
215-
}
216-
}
217-
218-
fn filtered_block_connected(
219-
&self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32,
220-
) {
221-
for (starting_height, chain_listener) in self.0.iter() {
222-
if height > *starting_height {
223-
chain_listener.filtered_block_connected(header, txdata, height);
224-
}
225-
}
226-
}
227-
228-
fn blocks_disconnected(&self, _fork_point: BestBlock) {
229-
unreachable!()
230-
}
231-
}
232-
233246
#[cfg(test)]
234247
mod tests {
235248
use super::*;

0 commit comments

Comments
 (0)