Skip to content

Commit 66992c2

Browse files
Matt CoralloTheBlueMatt
authored andcommitted
Pass a BestBlock to init::synchronize_listeners
On restart, LDK expects the chain to be replayed starting from where it was when objects were last serialized. This is fine in the normal case, but if there was a reorg and the node which we were syncing from either resynced or was changed, the last block that we were synced as of might no longer be available. As a result, it becomes impossible to figure out where the fork point is, and thus to replay the chain. Luckily, changing the block source during a reorg isn't exactly common, but we shouldn't end up with a bricked node. To address this, `lightning-block-sync` allows the user to pass in `Cache` which can be used to cache recent blocks and thus allow for reorg handling in this case. However, serialization for, and a reasonable default implementation of a `Cache` was never built. Instead, here, we start taking a different approach. To avoid developers having to persist yet another object, we move `BestBlock` to storing some number of recent block hashes. This allows us to find the fork point with just the serialized state. In a previous commit, we moved deserialization of various structs to return the `BestBlock` rather than a `BlockHash`. Here we move to actually using it, taking a `BestBlock` in place of `BlockHash` to `init::synchronize_listeners` and walking the `previous_blocks` list to find the fork point rather than relying on the `Cache`.
1 parent b617fbf commit 66992c2

File tree

4 files changed

+93
-33
lines changed

4 files changed

+93
-33
lines changed

lightning-block-sync/src/init.rs

Lines changed: 20 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,8 @@ where
117117
/// let mut cache = UnboundedCache::new();
118118
/// let mut monitor_listener = (monitor, &*tx_broadcaster, &*fee_estimator, &*logger);
119119
/// let listeners = vec![
120-
/// (monitor_best_block.block_hash, &monitor_listener as &dyn chain::Listen),
121-
/// (manager_best_block.block_hash, &manager as &dyn chain::Listen),
120+
/// (monitor_best_block, &monitor_listener as &dyn chain::Listen),
121+
/// (manager_best_block, &manager as &dyn chain::Listen),
122122
/// ];
123123
/// let chain_tip = init::synchronize_listeners(
124124
/// block_source, Network::Bitcoin, &mut cache, listeners).await.unwrap();
@@ -143,39 +143,28 @@ pub async fn synchronize_listeners<
143143
L: chain::Listen + ?Sized,
144144
>(
145145
block_source: B, network: Network, header_cache: &mut C,
146-
mut chain_listeners: Vec<(BlockHash, &L)>,
146+
mut chain_listeners: Vec<(BestBlock, &L)>,
147147
) -> BlockSourceResult<ValidatedBlockHeader>
148148
where
149149
B::Target: BlockSource,
150150
{
151151
let best_header = validate_best_block_header(&*block_source).await?;
152152

153-
// Fetch the header for the block hash paired with each listener.
154-
let mut chain_listeners_with_old_headers = Vec::new();
155-
for (old_block_hash, chain_listener) in chain_listeners.drain(..) {
156-
let old_header = match header_cache.look_up(&old_block_hash) {
157-
Some(header) => *header,
158-
None => {
159-
block_source.get_header(&old_block_hash, None).await?.validate(old_block_hash)?
160-
},
161-
};
162-
chain_listeners_with_old_headers.push((old_header, chain_listener))
163-
}
164-
165153
// Find differences and disconnect blocks for each listener individually.
166154
let mut chain_poller = ChainPoller::new(block_source, network);
167155
let mut chain_listeners_at_height = Vec::new();
168156
let mut most_common_ancestor = None;
169157
let mut most_connected_blocks = Vec::new();
170-
for (old_header, chain_listener) in chain_listeners_with_old_headers.drain(..) {
158+
for (old_best_block, chain_listener) in chain_listeners.drain(..) {
171159
// Disconnect any stale blocks, but keep them in the cache for the next iteration.
172160
let header_cache = &mut ReadOnlyCache(header_cache);
173161
let (common_ancestor, connected_blocks) = {
174162
let chain_listener = &DynamicChainListener(chain_listener);
175163
let mut chain_notifier = ChainNotifier { header_cache, chain_listener };
176-
let difference =
177-
chain_notifier.find_difference(best_header, &old_header, &mut chain_poller).await?;
178-
if difference.common_ancestor != old_header {
164+
let difference = chain_notifier
165+
.find_difference_from_best_block(best_header, old_best_block, &mut chain_poller)
166+
.await?;
167+
if difference.common_ancestor.block_hash != old_best_block.block_hash {
179168
chain_notifier.disconnect_blocks(difference.common_ancestor);
180169
}
181170
(difference.common_ancestor, difference.connected_blocks)
@@ -281,9 +270,9 @@ mod tests {
281270
let listener_3 = MockChainListener::new().expect_block_connected(*chain.at_height(4));
282271

283272
let listeners = vec![
284-
(chain.at_height(1).block_hash, &listener_1 as &dyn chain::Listen),
285-
(chain.at_height(2).block_hash, &listener_2 as &dyn chain::Listen),
286-
(chain.at_height(3).block_hash, &listener_3 as &dyn chain::Listen),
273+
(chain.best_block_at_height(1), &listener_1 as &dyn chain::Listen),
274+
(chain.best_block_at_height(2), &listener_2 as &dyn chain::Listen),
275+
(chain.best_block_at_height(3), &listener_3 as &dyn chain::Listen),
287276
];
288277
let mut cache = chain.header_cache(0..=4);
289278
match synchronize_listeners(&chain, Network::Bitcoin, &mut cache, listeners).await {
@@ -313,9 +302,9 @@ mod tests {
313302
.expect_block_connected(*main_chain.at_height(4));
314303

315304
let listeners = vec![
316-
(fork_chain_1.tip().block_hash, &listener_1 as &dyn chain::Listen),
317-
(fork_chain_2.tip().block_hash, &listener_2 as &dyn chain::Listen),
318-
(fork_chain_3.tip().block_hash, &listener_3 as &dyn chain::Listen),
305+
(fork_chain_1.best_block(), &listener_1 as &dyn chain::Listen),
306+
(fork_chain_2.best_block(), &listener_2 as &dyn chain::Listen),
307+
(fork_chain_3.best_block(), &listener_3 as &dyn chain::Listen),
319308
];
320309
let mut cache = fork_chain_1.header_cache(2..=4);
321310
cache.extend(fork_chain_2.header_cache(3..=4));
@@ -350,9 +339,9 @@ mod tests {
350339
.expect_block_connected(*main_chain.at_height(4));
351340

352341
let listeners = vec![
353-
(fork_chain_1.tip().block_hash, &listener_1 as &dyn chain::Listen),
354-
(fork_chain_2.tip().block_hash, &listener_2 as &dyn chain::Listen),
355-
(fork_chain_3.tip().block_hash, &listener_3 as &dyn chain::Listen),
342+
(fork_chain_1.best_block(), &listener_1 as &dyn chain::Listen),
343+
(fork_chain_2.best_block(), &listener_2 as &dyn chain::Listen),
344+
(fork_chain_3.best_block(), &listener_3 as &dyn chain::Listen),
356345
];
357346
let mut cache = fork_chain_1.header_cache(2..=4);
358347
cache.extend(fork_chain_2.header_cache(3..=4));
@@ -368,18 +357,18 @@ mod tests {
368357
let main_chain = Blockchain::default().with_height(2);
369358
let fork_chain = main_chain.fork_at_height(1);
370359
let new_tip = main_chain.tip();
371-
let old_tip = fork_chain.tip();
360+
let old_best_block = fork_chain.best_block();
372361

373362
let listener = MockChainListener::new()
374363
.expect_blocks_disconnected(*fork_chain.at_height(1))
375364
.expect_block_connected(*new_tip);
376365

377-
let listeners = vec![(old_tip.block_hash, &listener as &dyn chain::Listen)];
366+
let listeners = vec![(old_best_block, &listener as &dyn chain::Listen)];
378367
let mut cache = fork_chain.header_cache(2..=2);
379368
match synchronize_listeners(&main_chain, Network::Bitcoin, &mut cache, listeners).await {
380369
Ok(_) => {
381370
assert!(cache.contains_key(&new_tip.block_hash));
382-
assert!(cache.contains_key(&old_tip.block_hash));
371+
assert!(cache.contains_key(&old_best_block.block_hash));
383372
},
384373
Err(e) => panic!("Unexpected error: {:?}", e),
385374
}

lightning-block-sync/src/lib.rs

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,7 @@ where
337337
chain_poller: &mut P,
338338
) -> Result<(), (BlockSourceError, Option<ValidatedBlockHeader>)> {
339339
let difference = self
340-
.find_difference(new_header, old_header, chain_poller)
340+
.find_difference_from_header(new_header, old_header, chain_poller)
341341
.await
342342
.map_err(|e| (e, None))?;
343343
if difference.common_ancestor != *old_header {
@@ -347,11 +347,52 @@ where
347347
.await
348348
}
349349

350+
/// Returns the changes needed to produce the chain with `current_header` as its tip from the
351+
/// chain with `prev_best_block` as its tip.
352+
///
353+
/// First resolves `prev_best_block` to a `ValidatedBlockHeader` using the `previous_blocks`
354+
/// field as fallback if needed, then finds the common ancestor.
355+
async fn find_difference_from_best_block<P: Poll>(
356+
&self, current_header: ValidatedBlockHeader, prev_best_block: BestBlock,
357+
chain_poller: &mut P,
358+
) -> BlockSourceResult<ChainDifference> {
359+
// Try to resolve the header for the previous best block. First try the block_hash,
360+
// then fall back to previous_blocks if that fails.
361+
let cur_tip = core::iter::once((0, &prev_best_block.block_hash));
362+
let prev_tips =
363+
prev_best_block.previous_blocks.iter().enumerate().filter_map(|(idx, hash_opt)| {
364+
if let Some(block_hash) = hash_opt {
365+
Some((idx as u32 + 1, block_hash))
366+
} else {
367+
None
368+
}
369+
});
370+
let mut found_header = None;
371+
for (height_diff, block_hash) in cur_tip.chain(prev_tips) {
372+
if let Some(header) = self.header_cache.look_up(block_hash) {
373+
found_header = Some(*header);
374+
break;
375+
}
376+
let height = prev_best_block.height.checked_sub(height_diff).ok_or(
377+
BlockSourceError::persistent("BestBlock had more previous_blocks than its height"),
378+
)?;
379+
if let Ok(header) = chain_poller.get_header(block_hash, Some(height)).await {
380+
found_header = Some(header);
381+
break;
382+
}
383+
}
384+
let found_header = found_header.ok_or_else(|| {
385+
BlockSourceError::persistent("could not resolve any block from BestBlock")
386+
})?;
387+
388+
self.find_difference_from_header(current_header, &found_header, chain_poller).await
389+
}
390+
350391
/// Returns the changes needed to produce the chain with `current_header` as its tip from the
351392
/// chain with `prev_header` as its tip.
352393
///
353394
/// Walks backwards from `current_header` and `prev_header`, finding the common ancestor.
354-
async fn find_difference<P: Poll>(
395+
async fn find_difference_from_header<P: Poll>(
355396
&self, current_header: ValidatedBlockHeader, prev_header: &ValidatedBlockHeader,
356397
chain_poller: &mut P,
357398
) -> BlockSourceResult<ChainDifference> {

lightning-block-sync/src/poll.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ pub trait Poll {
3131
fn fetch_block<'a>(
3232
&'a self, header: &'a ValidatedBlockHeader,
3333
) -> impl Future<Output = BlockSourceResult<ValidatedBlock>> + Send + 'a;
34+
35+
/// Returns the header for a given hash and optional height hint.
36+
fn get_header<'a>(
37+
&'a self, block_hash: &'a BlockHash, height_hint: Option<u32>,
38+
) -> impl Future<Output = BlockSourceResult<ValidatedBlockHeader>> + Send + 'a;
3439
}
3540

3641
/// A chain tip relative to another chain tip in terms of block hash and chainwork.
@@ -258,6 +263,14 @@ impl<B: Deref<Target = T> + Sized + Send + Sync, T: BlockSource + ?Sized> Poll
258263
) -> impl Future<Output = BlockSourceResult<ValidatedBlock>> + Send + 'a {
259264
async move { self.block_source.get_block(&header.block_hash).await?.validate(header.block_hash) }
260265
}
266+
267+
fn get_header<'a>(
268+
&'a self, block_hash: &'a BlockHash, height_hint: Option<u32>,
269+
) -> impl Future<Output = BlockSourceResult<ValidatedBlockHeader>> + Send + 'a {
270+
Box::pin(async move {
271+
self.block_source.get_header(block_hash, height_hint).await?.validate(*block_hash)
272+
})
273+
}
261274
}
262275

263276
#[cfg(test)]

lightning-block-sync/src/test_utils.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,18 @@ impl Blockchain {
104104
block_header.validate(block_hash).unwrap()
105105
}
106106

107+
pub fn best_block_at_height(&self, height: usize) -> BestBlock {
108+
let mut previous_blocks = [None; 12];
109+
for (i, height) in (0..height).rev().take(12).enumerate() {
110+
previous_blocks[i] = Some(self.blocks[height].block_hash());
111+
}
112+
BestBlock {
113+
height: height as u32,
114+
block_hash: self.blocks[height].block_hash(),
115+
previous_blocks,
116+
}
117+
}
118+
107119
fn at_height_unvalidated(&self, height: usize) -> BlockHeaderData {
108120
assert!(!self.blocks.is_empty());
109121
assert!(height < self.blocks.len());
@@ -123,6 +135,11 @@ impl Blockchain {
123135
self.at_height(self.blocks.len() - 1)
124136
}
125137

138+
pub fn best_block(&self) -> BestBlock {
139+
assert!(!self.blocks.is_empty());
140+
self.best_block_at_height(self.blocks.len() - 1)
141+
}
142+
126143
pub fn disconnect_tip(&mut self) -> Option<Block> {
127144
self.blocks.pop()
128145
}

0 commit comments

Comments
 (0)