11//! Utilities to assist in the initial sync required to initialize or reload Rust-Lightning objects
22//! from disk.
33
4- use crate :: poll:: { ChainPoller , Validate , ValidatedBlockHeader } ;
5- use crate :: { BlockSource , BlockSourceResult , ChainNotifier , HeaderCache } ;
4+ use crate :: async_poll:: { MultiResultFuturePoller , ResultFuture } ;
5+ use crate :: poll:: { ChainPoller , Poll , Validate , ValidatedBlockHeader } ;
6+ use crate :: { BlockData , BlockSource , BlockSourceResult , ChainNotifier , HeaderCache } ;
67
78use bitcoin:: block:: Header ;
89use bitcoin:: network:: Network ;
@@ -146,7 +147,6 @@ where
146147 // Find differences and disconnect blocks for each listener individually.
147148 let mut chain_poller = ChainPoller :: new ( block_source, network) ;
148149 let mut chain_listeners_at_height = Vec :: new ( ) ;
149- let mut most_common_ancestor = None ;
150150 let mut most_connected_blocks = Vec :: new ( ) ;
151151 let mut header_cache = HeaderCache :: new ( ) ;
152152 for ( old_best_block, chain_listener) in chain_listeners. drain ( ..) {
@@ -167,19 +167,53 @@ where
167167 // Keep track of the most common ancestor and all blocks connected across all listeners.
168168 chain_listeners_at_height. push ( ( common_ancestor. height , chain_listener) ) ;
169169 if connected_blocks. len ( ) > most_connected_blocks. len ( ) {
170- most_common_ancestor = Some ( common_ancestor) ;
171170 most_connected_blocks = connected_blocks;
172171 }
173172 }
174173
175- // Connect new blocks for all listeners at once to avoid re-fetching blocks.
176- if let Some ( common_ancestor) = most_common_ancestor {
177- let chain_listener = & ChainListenerSet ( chain_listeners_at_height) ;
178- let mut chain_notifier = ChainNotifier { header_cache : & mut header_cache, chain_listener } ;
179- chain_notifier
180- . connect_blocks ( common_ancestor, most_connected_blocks, & mut chain_poller)
181- . await
182- . map_err ( |( e, _) | e) ?;
174+ while !most_connected_blocks. is_empty ( ) {
175+ #[ cfg( not( test) ) ]
176+ const MAX_BLOCKS_AT_ONCE : usize = 6 * 6 ; // Six hours of blocks, 144MiB encoded
177+ #[ cfg( test) ]
178+ const MAX_BLOCKS_AT_ONCE : usize = 2 ;
179+
180+ let mut fetch_block_futures =
181+ Vec :: with_capacity ( core:: cmp:: min ( MAX_BLOCKS_AT_ONCE , most_connected_blocks. len ( ) ) ) ;
182+ for header in most_connected_blocks. iter ( ) . rev ( ) . take ( MAX_BLOCKS_AT_ONCE ) {
183+ let fetch_future = chain_poller. fetch_block ( header) ;
184+ fetch_block_futures
185+ . push ( ResultFuture :: Pending ( Box :: pin ( async move { ( header, fetch_future. await ) } ) ) ) ;
186+ }
187+ let results = MultiResultFuturePoller :: new ( fetch_block_futures) . await . into_iter ( ) ;
188+
189+ let mut fetched_blocks = [ const { None } ; MAX_BLOCKS_AT_ONCE ] ;
190+ for ( ( header, block_res) , result) in results. into_iter ( ) . zip ( fetched_blocks. iter_mut ( ) ) {
191+ * result = Some ( ( header. height , block_res?) ) ;
192+ }
193+ debug_assert ! ( fetched_blocks. iter( ) . take( most_connected_blocks. len( ) ) . all( |r| r. is_some( ) ) ) ;
194+ debug_assert ! ( fetched_blocks
195+ . is_sorted_by_key( |r| r. as_ref( ) . map( |( height, _) | * height) . unwrap_or( u32 :: MAX ) ) ) ;
196+
197+ for ( listener_height, listener) in chain_listeners_at_height. iter ( ) {
198+ // Connect blocks for this listener.
199+ for result in fetched_blocks. iter ( ) {
200+ if let Some ( ( height, block_data) ) = result {
201+ if * height > * listener_height {
202+ match & * * block_data {
203+ BlockData :: FullBlock ( block) => {
204+ listener. block_connected ( & block, * height) ;
205+ } ,
206+ BlockData :: HeaderOnly ( header_data) => {
207+ listener. filtered_block_connected ( & header_data, & [ ] , * height) ;
208+ } ,
209+ }
210+ }
211+ }
212+ }
213+ }
214+
215+ most_connected_blocks
216+ . truncate ( most_connected_blocks. len ( ) . saturating_sub ( MAX_BLOCKS_AT_ONCE ) ) ;
183217 }
184218
185219 Ok ( ( header_cache, best_header) )
@@ -200,33 +234,6 @@ impl<'a, L: chain::Listen + ?Sized> chain::Listen for DynamicChainListener<'a, L
200234 }
201235}
202236
203- /// A set of dynamically sized chain listeners, each paired with a starting block height.
204- struct ChainListenerSet < ' a , L : chain:: Listen + ?Sized > ( Vec < ( u32 , & ' a L ) > ) ;
205-
206- impl < ' a , L : chain:: Listen + ?Sized > chain:: Listen for ChainListenerSet < ' a , L > {
207- fn block_connected ( & self , block : & bitcoin:: Block , height : u32 ) {
208- for ( starting_height, chain_listener) in self . 0 . iter ( ) {
209- if height > * starting_height {
210- chain_listener. block_connected ( block, height) ;
211- }
212- }
213- }
214-
215- fn filtered_block_connected (
216- & self , header : & Header , txdata : & chain:: transaction:: TransactionData , height : u32 ,
217- ) {
218- for ( starting_height, chain_listener) in self . 0 . iter ( ) {
219- if height > * starting_height {
220- chain_listener. filtered_block_connected ( header, txdata, height) ;
221- }
222- }
223- }
224-
225- fn blocks_disconnected ( & self , _fork_point : BestBlock ) {
226- unreachable ! ( )
227- }
228- }
229-
230237#[ cfg( test) ]
231238mod tests {
232239 use super :: * ;
0 commit comments