@@ -76,19 +76,26 @@ impl BlocksPipeline {
7676 /// Queue blocks with their heights and per-block interested wallet sets.
7777 ///
7878 /// Each entry's wallet set is the union of wallets whose addresses matched
79- /// the filter for that block. If the block is already tracked (pending or
80- /// in flight) we only merge the new wallet ids into the existing set so a
81- /// late-discovered wallet still gets the block processed when it arrives.
82- /// Re-enqueueing a tracked hash would corrupt the coordinator's pending
83- /// count and cause a duplicate request to the peer.
79+ /// the filter for that block. If the block is already tracked (pending,
80+ /// in flight, or downloaded but not yet consumed) we only merge the new
81+ /// wallet ids into the existing set so a late-discovered wallet still gets
82+ /// the block processed when it arrives. Re-enqueueing a tracked hash would
83+ /// corrupt the coordinator's pending count and cause a duplicate request
84+ /// to the peer.
8485 pub ( super ) fn queue (
8586 & mut self ,
8687 blocks : impl IntoIterator < Item = ( FilterMatchKey , BTreeSet < WalletId > ) > ,
8788 ) {
8889 for ( key, wallets) in blocks {
8990 let hash = * key. hash ( ) ;
90- let is_new = !self . hash_to_height . contains_key ( & hash) ;
91- if is_new {
91+ // `hash_to_height` is removed in `receive_block` once the block
92+ // lands in `downloaded`, so it alone does not cover the
93+ // downloaded-but-not-yet-taken window. `hash_to_wallets` persists
94+ // across that window until `take_next_ordered_block` consumes the
95+ // block, which makes it the right sentinel to also check.
96+ let already_tracked =
97+ self . hash_to_height . contains_key ( & hash) || self . hash_to_wallets . contains_key ( & hash) ;
98+ if !already_tracked {
9299 self . coordinator . enqueue ( [ hash] ) ;
93100 self . pending_heights . insert ( key. height ( ) ) ;
94101 self . hash_to_height . insert ( hash, key. height ( ) ) ;
@@ -526,6 +533,41 @@ mod tests {
526533 assert_eq ! ( taken_wallets, expected) ;
527534 }
528535
536+ #[ test]
537+ fn test_queue_does_not_re_enqueue_downloaded_hash ( ) {
538+ // A late-arriving wallet match for a block already received and sitting
539+ // in `downloaded` (but not yet consumed by `take_next_ordered_block`)
540+ // must merge the wallet id without re-enqueueing the hash.
541+ // `receive_block` removes `hash_to_height`, so without also checking
542+ // `hash_to_wallets` the queue would push the hash back to the
543+ // coordinator and cause a duplicate request.
544+ let mut pipeline = BlocksPipeline :: new ( ) ;
545+ let block = make_test_block ( 1 ) ;
546+ let hash = block. block_hash ( ) ;
547+ let wallets_a: BTreeSet < WalletId > = BTreeSet :: from ( [ [ 1u8 ; 32 ] ] ) ;
548+ let wallets_b: BTreeSet < WalletId > = BTreeSet :: from ( [ [ 2u8 ; 32 ] ] ) ;
549+
550+ pipeline. queue ( [ ( FilterMatchKey :: new ( 100 , hash) , wallets_a. clone ( ) ) ] ) ;
551+ let hashes = pipeline. coordinator . take_pending ( 1 ) ;
552+ pipeline. coordinator . mark_sent ( & hashes) ;
553+ assert ! ( pipeline. receive_block( & block) ) ;
554+ assert_eq ! ( pipeline. downloaded. len( ) , 1 ) ;
555+ assert_eq ! ( pipeline. coordinator. pending_count( ) , 0 ) ;
556+ assert_eq ! ( pipeline. coordinator. active_count( ) , 0 ) ;
557+
558+ // Late-arriving match for the same hash must not re-enqueue.
559+ pipeline. queue ( [ ( FilterMatchKey :: new ( 100 , hash) , wallets_b. clone ( ) ) ] ) ;
560+ assert_eq ! ( pipeline. coordinator. pending_count( ) , 0 ) ;
561+ assert_eq ! ( pipeline. coordinator. active_count( ) , 0 ) ;
562+ assert_eq ! ( pipeline. downloaded. len( ) , 1 ) ;
563+
564+ // Late wallet ids are still merged for when the block is taken.
565+ let ( _, _, taken_wallets) = pipeline. take_next_ordered_block ( ) . unwrap ( ) ;
566+ let mut expected = wallets_a;
567+ expected. extend ( wallets_b) ;
568+ assert_eq ! ( taken_wallets, expected) ;
569+ }
570+
529571 #[ test]
530572 fn test_add_from_storage_merges_wallet_sets ( ) {
531573 // The `add_from_storage` path must merge wallet sets for repeat
0 commit comments