Skip to content

Commit 5a67ebb

Browse files
committed
Make the Cache trait priv, just use UnboundedCache publicly
In the previous commit, we moved to relying on `BestBlock::previous_blocks` to find the fork point in `lightning-block-sync`'s `init::synchronize_listeners`. Here we now drop the `Cache` parameter as we no longer rely on it. Because we now have no reason to want a persistent `Cache`, we remove the trait from the public interface. However, to keep disconnections reliable we return the `UnboundedCache` we built up during initial sync from `init::synchronize_listeners` which we expect developers to pass to `SpvClient::new`.
1 parent 66992c2 commit 5a67ebb

File tree

2 files changed

+56
-99
lines changed

2 files changed

+56
-99
lines changed

lightning-block-sync/src/init.rs

Lines changed: 21 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//! from disk.
33
44
use crate::poll::{ChainPoller, Validate, ValidatedBlockHeader};
5-
use crate::{BlockSource, BlockSourceResult, Cache, ChainNotifier};
5+
use crate::{BlockSource, BlockSourceResult, Cache, ChainNotifier, UnboundedCache};
66

77
use bitcoin::block::Header;
88
use bitcoin::hash_types::BlockHash;
@@ -32,9 +32,9 @@ where
3232
/// Performs a one-time sync of chain listeners using a single *trusted* block source, bringing each
3333
/// listener's view of the chain from its paired block hash to `block_source`'s best chain tip.
3434
///
35-
/// Upon success, the returned header can be used to initialize [`SpvClient`]. In the case of
36-
/// failure, each listener may be left at a different block hash than the one it was originally
37-
/// paired with.
35+
/// Upon success, the returned header and header cache can be used to initialize [`SpvClient`]. In
36+
/// the case of failure, each listener may be left at a different block hash than the one it was
37+
/// originally paired with.
3838
///
3939
/// Useful during startup to bring the [`ChannelManager`] and each [`ChannelMonitor`] in sync before
4040
/// switching to [`SpvClient`]. For example:
@@ -114,14 +114,13 @@ where
114114
/// };
115115
///
116116
/// // Synchronize any channel monitors and the channel manager to be on the best block.
117-
/// let mut cache = UnboundedCache::new();
118117
/// let mut monitor_listener = (monitor, &*tx_broadcaster, &*fee_estimator, &*logger);
119118
/// let listeners = vec![
120119
/// (monitor_best_block, &monitor_listener as &dyn chain::Listen),
121120
/// (manager_best_block, &manager as &dyn chain::Listen),
122121
/// ];
123-
/// let chain_tip = init::synchronize_listeners(
124-
/// block_source, Network::Bitcoin, &mut cache, listeners).await.unwrap();
122+
/// let (chain_cache, chain_tip) = init::synchronize_listeners(
123+
/// block_source, Network::Bitcoin, listeners).await.unwrap();
125124
///
126125
/// // Allow the chain monitor to watch any channels.
127126
/// let monitor = monitor_listener.0;
@@ -130,21 +129,16 @@ where
130129
/// // Create an SPV client to notify the chain monitor and channel manager of block events.
131130
/// let chain_poller = poll::ChainPoller::new(block_source, Network::Bitcoin);
132131
/// let mut chain_listener = (chain_monitor, &manager);
133-
/// let spv_client = SpvClient::new(chain_tip, chain_poller, &mut cache, &chain_listener);
132+
/// let spv_client = SpvClient::new(chain_tip, chain_poller, chain_cache, &chain_listener);
134133
/// }
135134
/// ```
136135
///
137136
/// [`SpvClient`]: crate::SpvClient
138137
/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
139138
/// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
140-
pub async fn synchronize_listeners<
141-
B: Deref + Sized + Send + Sync,
142-
C: Cache,
143-
L: chain::Listen + ?Sized,
144-
>(
145-
block_source: B, network: Network, header_cache: &mut C,
146-
mut chain_listeners: Vec<(BestBlock, &L)>,
147-
) -> BlockSourceResult<ValidatedBlockHeader>
139+
pub async fn synchronize_listeners<B: Deref + Sized + Send + Sync, L: chain::Listen + ?Sized>(
140+
block_source: B, network: Network, mut chain_listeners: Vec<(BestBlock, &L)>,
141+
) -> BlockSourceResult<(UnboundedCache, ValidatedBlockHeader)>
148142
where
149143
B::Target: BlockSource,
150144
{
@@ -155,12 +149,13 @@ where
155149
let mut chain_listeners_at_height = Vec::new();
156150
let mut most_common_ancestor = None;
157151
let mut most_connected_blocks = Vec::new();
152+
let mut header_cache = UnboundedCache::new();
158153
for (old_best_block, chain_listener) in chain_listeners.drain(..) {
159154
// Disconnect any stale blocks, but keep them in the cache for the next iteration.
160-
let header_cache = &mut ReadOnlyCache(header_cache);
161155
let (common_ancestor, connected_blocks) = {
162156
let chain_listener = &DynamicChainListener(chain_listener);
163-
let mut chain_notifier = ChainNotifier { header_cache, chain_listener };
157+
let mut chain_notifier =
158+
ChainNotifier { header_cache: &mut header_cache, chain_listener };
164159
let difference = chain_notifier
165160
.find_difference_from_best_block(best_header, old_best_block, &mut chain_poller)
166161
.await?;
@@ -181,32 +176,14 @@ where
181176
// Connect new blocks for all listeners at once to avoid re-fetching blocks.
182177
if let Some(common_ancestor) = most_common_ancestor {
183178
let chain_listener = &ChainListenerSet(chain_listeners_at_height);
184-
let mut chain_notifier = ChainNotifier { header_cache, chain_listener };
179+
let mut chain_notifier = ChainNotifier { header_cache: &mut header_cache, chain_listener };
185180
chain_notifier
186181
.connect_blocks(common_ancestor, most_connected_blocks, &mut chain_poller)
187182
.await
188183
.map_err(|(e, _)| e)?;
189184
}
190185

191-
Ok(best_header)
192-
}
193-
194-
/// A wrapper to make a cache read-only.
195-
///
196-
/// Used to prevent losing headers that may be needed to disconnect blocks common to more than one
197-
/// listener.
198-
struct ReadOnlyCache<'a, C: Cache>(&'a mut C);
199-
200-
impl<'a, C: Cache> Cache for ReadOnlyCache<'a, C> {
201-
fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> {
202-
self.0.look_up(block_hash)
203-
}
204-
205-
fn block_connected(&mut self, _block_hash: BlockHash, _block_header: ValidatedBlockHeader) {
206-
unreachable!()
207-
}
208-
209-
fn blocks_disconnected(&mut self, _fork_point: &ValidatedBlockHeader) {}
186+
Ok((header_cache, best_header))
210187
}
211188

212189
/// Wrapper for supporting dynamically sized chain listeners.
@@ -274,9 +251,8 @@ mod tests {
274251
(chain.best_block_at_height(2), &listener_2 as &dyn chain::Listen),
275252
(chain.best_block_at_height(3), &listener_3 as &dyn chain::Listen),
276253
];
277-
let mut cache = chain.header_cache(0..=4);
278-
match synchronize_listeners(&chain, Network::Bitcoin, &mut cache, listeners).await {
279-
Ok(header) => assert_eq!(header, chain.tip()),
254+
match synchronize_listeners(&chain, Network::Bitcoin, listeners).await {
255+
Ok((_, header)) => assert_eq!(header, chain.tip()),
280256
Err(e) => panic!("Unexpected error: {:?}", e),
281257
}
282258
}
@@ -306,11 +282,8 @@ mod tests {
306282
(fork_chain_2.best_block(), &listener_2 as &dyn chain::Listen),
307283
(fork_chain_3.best_block(), &listener_3 as &dyn chain::Listen),
308284
];
309-
let mut cache = fork_chain_1.header_cache(2..=4);
310-
cache.extend(fork_chain_2.header_cache(3..=4));
311-
cache.extend(fork_chain_3.header_cache(4..=4));
312-
match synchronize_listeners(&main_chain, Network::Bitcoin, &mut cache, listeners).await {
313-
Ok(header) => assert_eq!(header, main_chain.tip()),
285+
match synchronize_listeners(&main_chain, Network::Bitcoin, listeners).await {
286+
Ok((_, header)) => assert_eq!(header, main_chain.tip()),
314287
Err(e) => panic!("Unexpected error: {:?}", e),
315288
}
316289
}
@@ -343,33 +316,8 @@ mod tests {
343316
(fork_chain_2.best_block(), &listener_2 as &dyn chain::Listen),
344317
(fork_chain_3.best_block(), &listener_3 as &dyn chain::Listen),
345318
];
346-
let mut cache = fork_chain_1.header_cache(2..=4);
347-
cache.extend(fork_chain_2.header_cache(3..=4));
348-
cache.extend(fork_chain_3.header_cache(4..=4));
349-
match synchronize_listeners(&main_chain, Network::Bitcoin, &mut cache, listeners).await {
350-
Ok(header) => assert_eq!(header, main_chain.tip()),
351-
Err(e) => panic!("Unexpected error: {:?}", e),
352-
}
353-
}
354-
355-
#[tokio::test]
356-
async fn cache_connected_and_keep_disconnected_blocks() {
357-
let main_chain = Blockchain::default().with_height(2);
358-
let fork_chain = main_chain.fork_at_height(1);
359-
let new_tip = main_chain.tip();
360-
let old_best_block = fork_chain.best_block();
361-
362-
let listener = MockChainListener::new()
363-
.expect_blocks_disconnected(*fork_chain.at_height(1))
364-
.expect_block_connected(*new_tip);
365-
366-
let listeners = vec![(old_best_block, &listener as &dyn chain::Listen)];
367-
let mut cache = fork_chain.header_cache(2..=2);
368-
match synchronize_listeners(&main_chain, Network::Bitcoin, &mut cache, listeners).await {
369-
Ok(_) => {
370-
assert!(cache.contains_key(&new_tip.block_hash));
371-
assert!(cache.contains_key(&old_best_block.block_hash));
372-
},
319+
match synchronize_listeners(&main_chain, Network::Bitcoin, listeners).await {
320+
Ok((_, header)) => assert_eq!(header, main_chain.tip()),
373321
Err(e) => panic!("Unexpected error: {:?}", e),
374322
}
375323
}

lightning-block-sync/src/lib.rs

Lines changed: 35 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -170,18 +170,13 @@ pub enum BlockData {
170170
/// sources for the best chain tip. During this process it detects any chain forks, determines which
171171
/// constitutes the best chain, and updates the listener accordingly with any blocks that were
172172
/// connected or disconnected since the last poll.
173-
///
174-
/// Block headers for the best chain are maintained in the parameterized cache, allowing for a
175-
/// custom cache eviction policy. This offers flexibility to those sensitive to resource usage.
176-
/// Hence, there is a trade-off between a lower memory footprint and potentially increased network
177-
/// I/O as headers are re-fetched during fork detection.
178-
pub struct SpvClient<'a, P: Poll, C: Cache, L: Deref>
173+
pub struct SpvClient<P: Poll, L: Deref>
179174
where
180175
L::Target: chain::Listen,
181176
{
182177
chain_tip: ValidatedBlockHeader,
183178
chain_poller: P,
184-
chain_notifier: ChainNotifier<'a, C, L>,
179+
chain_notifier: ChainNotifier<UnboundedCache, L>,
185180
}
186181

187182
/// The `Cache` trait defines behavior for managing a block header cache, where block headers are
@@ -194,7 +189,7 @@ where
194189
/// Implementations may define how long to retain headers such that it's unlikely they will ever be
195190
/// needed to disconnect a block. In cases where block sources provide access to headers on stale
196191
/// forks reliably, caches may be entirely unnecessary.
197-
pub trait Cache {
192+
pub(crate) trait Cache {
198193
/// Retrieves the block header keyed by the given block hash.
199194
fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader>;
200195

@@ -226,7 +221,21 @@ impl Cache for UnboundedCache {
226221
}
227222
}
228223

229-
impl<'a, P: Poll, C: Cache, L: Deref> SpvClient<'a, P, C, L>
224+
impl Cache for &mut UnboundedCache {
225+
fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> {
226+
self.get(block_hash)
227+
}
228+
229+
fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) {
230+
self.insert(block_hash, block_header);
231+
}
232+
233+
fn blocks_disconnected(&mut self, fork_point: &ValidatedBlockHeader) {
234+
self.retain(|_, block_info| block_info.height < fork_point.height);
235+
}
236+
}
237+
238+
impl<P: Poll, L: Deref> SpvClient<P, L>
230239
where
231240
L::Target: chain::Listen,
232241
{
@@ -241,7 +250,7 @@ where
241250
///
242251
/// [`poll_best_tip`]: SpvClient::poll_best_tip
243252
pub fn new(
244-
chain_tip: ValidatedBlockHeader, chain_poller: P, header_cache: &'a mut C,
253+
chain_tip: ValidatedBlockHeader, chain_poller: P, header_cache: UnboundedCache,
245254
chain_listener: L,
246255
) -> Self {
247256
let chain_notifier = ChainNotifier { header_cache, chain_listener };
@@ -295,15 +304,15 @@ where
295304
/// Notifies [listeners] of blocks that have been connected or disconnected from the chain.
296305
///
297306
/// [listeners]: lightning::chain::Listen
298-
pub struct ChainNotifier<'a, C: Cache, L: Deref>
307+
pub(crate) struct ChainNotifier<C: Cache, L: Deref>
299308
where
300309
L::Target: chain::Listen,
301310
{
302311
/// Cache for looking up headers before fetching from a block source.
303-
header_cache: &'a mut C,
312+
pub(crate) header_cache: C,
304313

305314
/// Listener that will be notified of connected or disconnected blocks.
306-
chain_listener: L,
315+
pub(crate) chain_listener: L,
307316
}
308317

309318
/// Changes made to the chain between subsequent polls that transformed it from having one chain tip
@@ -321,7 +330,7 @@ struct ChainDifference {
321330
connected_blocks: Vec<ValidatedBlockHeader>,
322331
}
323332

324-
impl<'a, C: Cache, L: Deref> ChainNotifier<'a, C, L>
333+
impl<C: Cache, L: Deref> ChainNotifier<C, L>
325334
where
326335
L::Target: chain::Listen,
327336
{
@@ -481,9 +490,9 @@ mod spv_client_tests {
481490
let best_tip = chain.at_height(1);
482491

483492
let poller = poll::ChainPoller::new(&mut chain, Network::Testnet);
484-
let mut cache = UnboundedCache::new();
493+
let cache = UnboundedCache::new();
485494
let mut listener = NullChainListener {};
486-
let mut client = SpvClient::new(best_tip, poller, &mut cache, &mut listener);
495+
let mut client = SpvClient::new(best_tip, poller, cache, &mut listener);
487496
match client.poll_best_tip().await {
488497
Err(e) => {
489498
assert_eq!(e.kind(), BlockSourceErrorKind::Persistent);
@@ -500,9 +509,9 @@ mod spv_client_tests {
500509
let common_tip = chain.tip();
501510

502511
let poller = poll::ChainPoller::new(&mut chain, Network::Testnet);
503-
let mut cache = UnboundedCache::new();
512+
let cache = UnboundedCache::new();
504513
let mut listener = NullChainListener {};
505-
let mut client = SpvClient::new(common_tip, poller, &mut cache, &mut listener);
514+
let mut client = SpvClient::new(common_tip, poller, cache, &mut listener);
506515
match client.poll_best_tip().await {
507516
Err(e) => panic!("Unexpected error: {:?}", e),
508517
Ok((chain_tip, blocks_connected)) => {
@@ -520,9 +529,9 @@ mod spv_client_tests {
520529
let old_tip = chain.at_height(1);
521530

522531
let poller = poll::ChainPoller::new(&mut chain, Network::Testnet);
523-
let mut cache = UnboundedCache::new();
532+
let cache = UnboundedCache::new();
524533
let mut listener = NullChainListener {};
525-
let mut client = SpvClient::new(old_tip, poller, &mut cache, &mut listener);
534+
let mut client = SpvClient::new(old_tip, poller, cache, &mut listener);
526535
match client.poll_best_tip().await {
527536
Err(e) => panic!("Unexpected error: {:?}", e),
528537
Ok((chain_tip, blocks_connected)) => {
@@ -540,9 +549,9 @@ mod spv_client_tests {
540549
let old_tip = chain.at_height(1);
541550

542551
let poller = poll::ChainPoller::new(&mut chain, Network::Testnet);
543-
let mut cache = UnboundedCache::new();
552+
let cache = UnboundedCache::new();
544553
let mut listener = NullChainListener {};
545-
let mut client = SpvClient::new(old_tip, poller, &mut cache, &mut listener);
554+
let mut client = SpvClient::new(old_tip, poller, cache, &mut listener);
546555
match client.poll_best_tip().await {
547556
Err(e) => panic!("Unexpected error: {:?}", e),
548557
Ok((chain_tip, blocks_connected)) => {
@@ -560,9 +569,9 @@ mod spv_client_tests {
560569
let old_tip = chain.at_height(1);
561570

562571
let poller = poll::ChainPoller::new(&mut chain, Network::Testnet);
563-
let mut cache = UnboundedCache::new();
572+
let cache = UnboundedCache::new();
564573
let mut listener = NullChainListener {};
565-
let mut client = SpvClient::new(old_tip, poller, &mut cache, &mut listener);
574+
let mut client = SpvClient::new(old_tip, poller, cache, &mut listener);
566575
match client.poll_best_tip().await {
567576
Err(e) => panic!("Unexpected error: {:?}", e),
568577
Ok((chain_tip, blocks_connected)) => {
@@ -581,9 +590,9 @@ mod spv_client_tests {
581590
let worse_tip = chain.tip();
582591

583592
let poller = poll::ChainPoller::new(&mut chain, Network::Testnet);
584-
let mut cache = UnboundedCache::new();
593+
let cache = UnboundedCache::new();
585594
let mut listener = NullChainListener {};
586-
let mut client = SpvClient::new(best_tip, poller, &mut cache, &mut listener);
595+
let mut client = SpvClient::new(best_tip, poller, cache, &mut listener);
587596
match client.poll_best_tip().await {
588597
Err(e) => panic!("Unexpected error: {:?}", e),
589598
Ok((chain_tip, blocks_connected)) => {

0 commit comments

Comments
 (0)