@@ -257,6 +257,35 @@ impl<H: BlockHeaderStorage, FH: FilterHeaderStorage, F: FilterStorage, W: Wallet
257257 if stored_filters_tip >= batch_end {
258258 batch. mark_verified ( ) ;
259259 }
260+
261+ // When replacing an existing batch (e.g. after disconnect/reconnect),
262+ // preserve collected addresses and account for blocks that are still
263+ // pending in blocks_remaining. Without this, the replacement batch
264+ // would commit prematurely when its own pending_blocks reaches 0,
265+ // even though old blocks referencing this batch_start are still being
266+ // processed and may generate gap-limit addresses.
267+ if let Some ( mut old_batch) = self . active_batches . remove ( & scan_start) {
268+ let old_addresses = old_batch. take_collected_addresses ( ) ;
269+ if !old_addresses. is_empty ( ) {
270+ batch. add_addresses ( old_addresses. iter ( ) . cloned ( ) ) ;
271+ tracing:: debug!(
272+ "Preserved {} collected addresses from replaced batch {}" ,
273+ old_addresses. len( ) ,
274+ scan_start,
275+ ) ;
276+ }
277+ }
278+ let existing_remaining =
279+ self . blocks_remaining . values ( ) . filter ( |( _, bs) | * bs == scan_start) . count ( ) as u32 ;
280+ if existing_remaining > 0 {
281+ batch. set_pending_blocks ( existing_remaining) ;
282+ tracing:: debug!(
283+ "Batch {} has {} blocks still pending from previous scan" ,
284+ scan_start,
285+ existing_remaining,
286+ ) ;
287+ }
288+
260289 self . active_batches . insert ( scan_start, batch) ;
261290 self . committed_height = scan_start. saturating_sub ( 1 ) ;
262291
@@ -776,12 +805,13 @@ impl<H: BlockHeaderStorage, FH: FilterHeaderStorage, F: FilterStorage, W: Wallet
776805#[ cfg( test) ]
777806mod tests {
778807 use super :: * ;
779- use crate :: network:: MessageType ;
808+ use crate :: network:: { MessageType , NetworkManager } ;
780809 use crate :: storage:: {
781810 DiskStorageManager , PersistentBlockHeaderStorage , PersistentFilterHeaderStorage ,
782811 PersistentFilterStorage , StorageManager ,
783812 } ;
784813 use crate :: sync:: { ManagerIdentifier , SyncManagerProgress } ;
814+ use crate :: test_utils:: MockNetworkManager ;
785815 use key_wallet_manager:: test_utils:: MockWallet ;
786816
787817 type TestFiltersManager = FiltersManager <
@@ -949,4 +979,118 @@ mod tests {
949979 // After take, should be empty
950980 assert ! ( batch. take_collected_addresses( ) . is_empty( ) ) ;
951981 }
982+
983+ #[ tokio:: test]
984+ async fn test_replacement_batch_accounts_for_existing_blocks_remaining ( ) {
985+ // Verifies that when a batch has pending_blocks correctly accounting for
986+ // existing blocks_remaining entries (as set up by start_download after
987+ // reconnect), the batch does not commit until all blocks are processed.
988+ use dashcore:: Network ;
989+
990+ let mut manager = create_test_manager ( ) . await ;
991+ manager. set_state ( SyncState :: Syncing ) ;
992+
993+ let hash1 = dashcore:: block:: Header :: dummy ( 0 ) . block_hash ( ) ;
994+ let hash2 = dashcore:: block:: Header :: dummy ( 1 ) . block_hash ( ) ;
995+
996+ // Simulate post-replacement state: batch with pending_blocks=2
997+ // matching the 2 entries in blocks_remaining
998+ let mut batch = FiltersBatch :: new ( 0 , 4999 , HashMap :: new ( ) ) ;
999+ batch. mark_scanned ( ) ;
1000+ batch. set_pending_blocks ( 2 ) ;
1001+ manager. active_batches . insert ( 0 , batch) ;
1002+ manager. blocks_remaining . insert ( hash1, ( 100 , 0 ) ) ;
1003+ manager. blocks_remaining . insert ( hash2, ( 200 , 0 ) ) ;
1004+
1005+ let network = MockNetworkManager :: new ( ) ;
1006+ let requests = network. request_sender ( ) ;
1007+
1008+ // Process first block with new addresses (gap limit discovery)
1009+ let addr = dashcore:: Address :: dummy ( Network :: Testnet , 1 ) ;
1010+ let event = SyncEvent :: BlockProcessed {
1011+ block_hash : hash1,
1012+ height : 100 ,
1013+ new_addresses : vec ! [ addr] ,
1014+ } ;
1015+ let manager_ref: & mut dyn SyncManager = & mut manager;
1016+ manager_ref. handle_sync_event ( & event, & requests) . await . unwrap ( ) ;
1017+
1018+ // Batch should still be active with 1 block remaining
1019+ assert ! (
1020+ manager. active_batches. contains_key( & 0 ) ,
1021+ "batch must not commit with 1 block still remaining"
1022+ ) ;
1023+ assert_eq ! ( manager. active_batches. get( & 0 ) . unwrap( ) . pending_blocks( ) , 1 ) ;
1024+
1025+ // Process second block
1026+ let event = SyncEvent :: BlockProcessed {
1027+ block_hash : hash2,
1028+ height : 200 ,
1029+ new_addresses : vec ! [ ] ,
1030+ } ;
1031+ let manager_ref: & mut dyn SyncManager = & mut manager;
1032+ manager_ref. handle_sync_event ( & event, & requests) . await . unwrap ( ) ;
1033+
1034+ // Batch should have committed (pending_blocks=0, rescan ran with empty filters)
1035+ assert ! (
1036+ !manager. active_batches. contains_key( & 0 ) ,
1037+ "batch should commit after all blocks processed"
1038+ ) ;
1039+ assert_eq ! ( manager. committed_height, 4999 ) ;
1040+ }
1041+
1042+ #[ tokio:: test]
1043+ async fn test_zero_pending_blocks_causes_premature_batch_commit ( ) {
1044+ // Documents the bug scenario: if pending_blocks doesn't account for
1045+ // existing blocks_remaining (the pre-fix behavior), the batch commits
1046+ // after the first BlockProcessed event, and addresses from subsequent
1047+ // blocks are silently lost.
1048+ use dashcore:: Network ;
1049+
1050+ let mut manager = create_test_manager ( ) . await ;
1051+ manager. set_state ( SyncState :: Syncing ) ;
1052+
1053+ let hash1 = dashcore:: block:: Header :: dummy ( 0 ) . block_hash ( ) ;
1054+ let hash2 = dashcore:: block:: Header :: dummy ( 1 ) . block_hash ( ) ;
1055+
1056+ // Bug scenario: batch has pending_blocks=0 (not accounting for existing blocks)
1057+ let mut batch = FiltersBatch :: new ( 0 , 4999 , HashMap :: new ( ) ) ;
1058+ batch. mark_scanned ( ) ;
1059+ batch. set_pending_blocks ( 0 ) ;
1060+ manager. active_batches . insert ( 0 , batch) ;
1061+ manager. blocks_remaining . insert ( hash1, ( 100 , 0 ) ) ;
1062+ manager. blocks_remaining . insert ( hash2, ( 200 , 0 ) ) ;
1063+
1064+ let network = MockNetworkManager :: new ( ) ;
1065+ let requests = network. request_sender ( ) ;
1066+
1067+ // Process first block with new addresses
1068+ let addr = dashcore:: Address :: dummy ( Network :: Testnet , 1 ) ;
1069+ let event = SyncEvent :: BlockProcessed {
1070+ block_hash : hash1,
1071+ height : 100 ,
1072+ new_addresses : vec ! [ addr] ,
1073+ } ;
1074+ let manager_ref: & mut dyn SyncManager = & mut manager;
1075+ manager_ref. handle_sync_event ( & event, & requests) . await . unwrap ( ) ;
1076+
1077+ // Bug: batch committed prematurely because pending_blocks was 0
1078+ assert ! (
1079+ !manager. active_batches. contains_key( & 0 ) ,
1080+ "with pending_blocks=0, batch commits after first block"
1081+ ) ;
1082+
1083+ // Process second block - batch is gone, addresses are silently lost
1084+ let addr2 = dashcore:: Address :: dummy ( Network :: Testnet , 2 ) ;
1085+ let event = SyncEvent :: BlockProcessed {
1086+ block_hash : hash2,
1087+ height : 200 ,
1088+ new_addresses : vec ! [ addr2] ,
1089+ } ;
1090+ let manager_ref: & mut dyn SyncManager = & mut manager;
1091+ let events = manager_ref. handle_sync_event ( & event, & requests) . await . unwrap ( ) ;
1092+
1093+ // No events emitted because batch was already committed and removed
1094+ assert ! ( events. is_empty( ) , "addresses from hash2 are lost" ) ;
1095+ }
9521096}
0 commit comments