Skip to content

Commit 1eabd3f

Browse files
committed
Fetch blocks from source in parallel during initial sync
In `init::synchronize_listeners` we may end up spending a decent chunk of our time just fetching block data. Here we parallelize that step across up to 36 blocks at a time. On my node with bitcoind on localhost, the impact of this is somewhat muted by block deserialization being the bulk of the work, however a networked bitcoind would likely change that. Even still, fetching a batch of 36 blocks in parallel happens on my node in ~615 ms vs ~815ms in serial.
1 parent f36e1ff commit 1eabd3f

File tree

1 file changed

+46
-39
lines changed

1 file changed

+46
-39
lines changed

lightning-block-sync/src/init.rs

Lines changed: 46 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
//! Utilities to assist in the initial sync required to initialize or reload Rust-Lightning objects
22
//! from disk.
33
4-
use crate::poll::{ChainPoller, Validate, ValidatedBlockHeader};
5-
use crate::{BlockSource, BlockSourceResult, ChainNotifier, HeaderCache};
4+
use crate::async_poll::{MultiResultFuturePoller, ResultFuture};
5+
use crate::poll::{ChainPoller, Poll, Validate, ValidatedBlockHeader};
6+
use crate::{BlockData, BlockSource, BlockSourceResult, ChainNotifier, HeaderCache};
67

78
use bitcoin::block::Header;
89
use bitcoin::network::Network;
@@ -146,7 +147,6 @@ where
146147
// Find differences and disconnect blocks for each listener individually.
147148
let mut chain_poller = ChainPoller::new(block_source, network);
148149
let mut chain_listeners_at_height = Vec::new();
149-
let mut most_common_ancestor = None;
150150
let mut most_connected_blocks = Vec::new();
151151
let mut header_cache = HeaderCache::new();
152152
for (old_best_block, chain_listener) in chain_listeners.drain(..) {
@@ -167,19 +167,53 @@ where
167167
// Keep track of the most common ancestor and all blocks connected across all listeners.
168168
chain_listeners_at_height.push((common_ancestor.height, chain_listener));
169169
if connected_blocks.len() > most_connected_blocks.len() {
170-
most_common_ancestor = Some(common_ancestor);
171170
most_connected_blocks = connected_blocks;
172171
}
173172
}
174173

175-
// Connect new blocks for all listeners at once to avoid re-fetching blocks.
176-
if let Some(common_ancestor) = most_common_ancestor {
177-
let chain_listener = &ChainListenerSet(chain_listeners_at_height);
178-
let mut chain_notifier = ChainNotifier { header_cache: &mut header_cache, chain_listener };
179-
chain_notifier
180-
.connect_blocks(common_ancestor, most_connected_blocks, &mut chain_poller)
181-
.await
182-
.map_err(|(e, _)| e)?;
174+
while !most_connected_blocks.is_empty() {
175+
#[cfg(not(test))]
176+
const MAX_BLOCKS_AT_ONCE: usize = 6 * 6; // Six hours of blocks, 144MiB encoded
177+
#[cfg(test)]
178+
const MAX_BLOCKS_AT_ONCE: usize = 2;
179+
180+
let mut fetch_block_futures =
181+
Vec::with_capacity(core::cmp::min(MAX_BLOCKS_AT_ONCE, most_connected_blocks.len()));
182+
for header in most_connected_blocks.iter().rev().take(MAX_BLOCKS_AT_ONCE) {
183+
let fetch_future = chain_poller.fetch_block(header);
184+
fetch_block_futures
185+
.push(ResultFuture::Pending(Box::pin(async move { (header, fetch_future.await) })));
186+
}
187+
let results = MultiResultFuturePoller::new(fetch_block_futures).await.into_iter();
188+
189+
let mut fetched_blocks = [const { None }; MAX_BLOCKS_AT_ONCE];
190+
for ((header, block_res), result) in results.into_iter().zip(fetched_blocks.iter_mut()) {
191+
*result = Some((header.height, block_res?));
192+
}
193+
debug_assert!(fetched_blocks.iter().take(most_connected_blocks.len()).all(|r| r.is_some()));
194+
debug_assert!(fetched_blocks
195+
.is_sorted_by_key(|r| r.as_ref().map(|(height, _)| *height).unwrap_or(u32::MAX)));
196+
197+
for (listener_height, listener) in chain_listeners_at_height.iter() {
198+
// Connect blocks for this listener.
199+
for result in fetched_blocks.iter() {
200+
if let Some((height, block_data)) = result {
201+
if *height > *listener_height {
202+
match &**block_data {
203+
BlockData::FullBlock(block) => {
204+
listener.block_connected(&block, *height);
205+
},
206+
BlockData::HeaderOnly(header_data) => {
207+
listener.filtered_block_connected(&header_data, &[], *height);
208+
},
209+
}
210+
}
211+
}
212+
}
213+
}
214+
215+
most_connected_blocks
216+
.truncate(most_connected_blocks.len().saturating_sub(MAX_BLOCKS_AT_ONCE));
183217
}
184218

185219
Ok((header_cache, best_header))
@@ -200,33 +234,6 @@ impl<'a, L: chain::Listen + ?Sized> chain::Listen for DynamicChainListener<'a, L
200234
}
201235
}
202236

203-
/// A set of dynamically sized chain listeners, each paired with a starting block height.
204-
struct ChainListenerSet<'a, L: chain::Listen + ?Sized>(Vec<(u32, &'a L)>);
205-
206-
impl<'a, L: chain::Listen + ?Sized> chain::Listen for ChainListenerSet<'a, L> {
207-
fn block_connected(&self, block: &bitcoin::Block, height: u32) {
208-
for (starting_height, chain_listener) in self.0.iter() {
209-
if height > *starting_height {
210-
chain_listener.block_connected(block, height);
211-
}
212-
}
213-
}
214-
215-
fn filtered_block_connected(
216-
&self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32,
217-
) {
218-
for (starting_height, chain_listener) in self.0.iter() {
219-
if height > *starting_height {
220-
chain_listener.filtered_block_connected(header, txdata, height);
221-
}
222-
}
223-
}
224-
225-
fn blocks_disconnected(&self, _fork_point: BestBlock) {
226-
unreachable!()
227-
}
228-
}
229-
230237
#[cfg(test)]
231238
mod tests {
232239
use super::*;

0 commit comments

Comments
 (0)