@@ -421,12 +421,55 @@ impl<H: BlockHeaderStorage> MasternodesManager<H> {
421421 /// Dispatch pipeline completion based on the current `PipelineMode`. Called when
422422 /// the mnlistdiff pipeline drains, from either the message handler or the tick
423423 /// handler's timeout-cleanup path.
424- pub ( super ) async fn complete_pipeline ( & mut self ) -> SyncResult < Vec < SyncEvent > > {
424+ ///
425+ /// After an `Incremental` pipeline finishes, re-evaluate `next_pipeline_mode` at
426+ /// the now-advanced tip and fire a catch-up QRInfo if the cycle gate now picks
427+ /// `QuorumValidation`. Without this, a batch of headers that lands while an
428+ /// earlier `Incremental` is in flight can silently skip the cycle's mining
429+ /// window: every intermediate `BlockHeadersStored` event is rejected by the
430+ /// `has_pending_requests` guard, and the tick handler's
431+ /// `current_height < block_header_tip_height` check is false once the
432+ /// `Incremental` catches up to the latest tip, so the catch-up branch in
433+ /// `next_pipeline_mode` never gets a chance to fire.
434+ ///
435+ /// The re-evaluation calls `next_pipeline_mode` here, which may advance
436+ /// per-cycle bookkeeping (`current_cycle_height`, `last_window_qrinfo_tip`,
437+ /// `rotation_cycles`) if the tip crossed a cycle boundary while the
438+ /// `Incremental` was in flight. That matches what the per-event handler
439+ /// would have done had the intermediate events not been dropped.
440+ pub ( super ) async fn complete_pipeline (
441+ & mut self ,
442+ requests : & RequestSender ,
443+ ) -> SyncResult < Vec < SyncEvent > > {
425444 match std:: mem:: take ( & mut self . sync_state . pipeline_mode ) {
426445 PipelineMode :: QuorumValidation {
427446 qr_info_result,
428447 } => self . verify_and_complete ( qr_info_result) . await ,
429- PipelineMode :: Incremental => self . complete_incremental_pipeline ( ) . await ,
448+ PipelineMode :: Incremental => {
449+ let mut events = self . complete_incremental_pipeline ( ) . await ?;
450+ if self . state ( ) == SyncState :: Synced && self . sync_state . qrinfo_in_flight . is_none ( ) {
451+ let tip = self . progress . block_header_tip_height ( ) ;
452+ if matches ! ( self . next_pipeline_mode( tip) , PipelineMode :: QuorumValidation { .. } )
453+ {
454+ tracing:: debug!(
455+ tip,
456+ "Incremental complete, cycle gate now picks QRInfo, firing catch-up"
457+ ) ;
458+ self . sync_state . qrinfo_retry_count = 0 ;
459+ self . sync_state . clear_pending ( ) ;
460+ match self . send_qrinfo_for_tip ( requests) . await {
461+ Ok ( extra) => events. extend ( extra) ,
462+ Err ( e) => tracing:: warn!(
463+ error = %e,
464+ "Catch-up QRInfo dispatch failed; \
465+ `current_cycle_attempts` stays 0 so the next \
466+ `BlockHeadersStored` will re-fire if the gate still picks QRInfo"
467+ ) ,
468+ }
469+ }
470+ }
471+ Ok ( events)
472+ }
430473 }
431474 }
432475
@@ -572,12 +615,15 @@ impl<H: BlockHeaderStorage> std::fmt::Debug for MasternodesManager<H> {
572615#[ cfg( test) ]
573616mod tests {
574617 use super :: * ;
575- use crate :: network:: MessageType ;
618+ use crate :: network:: { MessageType , NetworkRequest } ;
576619 use crate :: storage:: { DiskStorageManager , PersistentBlockHeaderStorage , StorageManager } ;
577620 use crate :: sync:: sync_manager:: SyncManager ;
578621 use crate :: sync:: { ManagerIdentifier , SyncManagerProgress } ;
622+ use dashcore:: block:: Header ;
579623 use dashcore:: hashes:: Hash ;
624+ use dashcore:: network:: message:: NetworkMessage ;
580625 use dashcore:: sml:: masternode_list:: MasternodeList ;
626+ use tokio:: sync:: mpsc;
581627
582628 type TestMasternodesManager = MasternodesManager < PersistentBlockHeaderStorage > ;
583629
@@ -591,6 +637,35 @@ mod tests {
591637 create_test_manager_for ( dashcore:: Network :: Testnet ) . await
592638 }
593639
640+ /// Build a regtest manager whose engine has a single list at `tip` and
641+ /// whose block header storage is populated with dummy headers up to
642+ /// `tip`, in `Synced` state with `pipeline_mode = Incremental` and
643+ /// `block_header_tip_height = tip`. Storage must be populated so that
644+ /// `send_qrinfo_for_tip` finds a tip and reaches the network dispatch;
645+ /// otherwise it short-circuits at `storage.get_tip()` and the catch-up
646+ /// path can't be observed at the network layer. Returns the manager, a
647+ /// `RequestSender`, and the matching receiver so the caller binds it
648+ /// (the channel closes when the receiver drops).
649+ async fn make_synced_incremental_manager (
650+ tip : u32 ,
651+ ) -> ( TestMasternodesManager , RequestSender , mpsc:: UnboundedReceiver < NetworkRequest > ) {
652+ let storage = DiskStorageManager :: with_temp_dir ( ) . await . unwrap ( ) ;
653+ let block_headers = storage. block_headers ( ) ;
654+ block_headers. write ( ) . await . store_headers ( & Header :: dummy_batch ( 0 ..tip + 1 ) ) . await . unwrap ( ) ;
655+ let engine = engine_with_lists ( & [ ( tip, 1 ) ] ) ;
656+ let mut manager = MasternodesManager :: new (
657+ block_headers,
658+ Arc :: new ( RwLock :: new ( engine) ) ,
659+ dashcore:: Network :: Regtest ,
660+ )
661+ . await ;
662+ manager. set_state ( SyncState :: Synced ) ;
663+ manager. sync_state . pipeline_mode = PipelineMode :: Incremental ;
664+ manager. progress . update_block_header_tip_height ( tip) ;
665+ let ( tx, rx) = mpsc:: unbounded_channel ( ) ;
666+ ( manager, RequestSender :: new ( tx) , rx)
667+ }
668+
594669 #[ tokio:: test]
595670 async fn test_masternode_manager_new ( ) {
596671 let manager = create_test_manager ( ) . await ;
@@ -838,4 +913,70 @@ mod tests {
838913 assert_eq ! ( manager. sync_state. last_synced_block_hash, None ) ;
839914 assert_eq ! ( manager. progress. current_height( ) , 0 ) ;
840915 }
916+
917+ /// `complete_pipeline` after `Incremental` re-evaluates the cycle gate at
918+ /// the latest tip and fires a catch-up QRInfo when the gate picks
919+ /// `QuorumValidation`. When a batch of headers lands while a prior
920+ /// `Incremental` is in flight, every intermediate `BlockHeadersStored`
921+ /// event is rejected by the `has_pending_requests` guard, and the tick
922+ /// handler can't re-fire because `current_height == block_header_tip_height`
923+ /// once the `Incremental` catches up. For DKG_TEST_DIP0024 (regtest),
924+ /// cycle 48 has mining window 60..=68; at tip 70 with no prior attempts,
925+ /// the gate picks catch-up QRInfo. The post-completion call to
926+ /// `next_pipeline_mode` is the first to enter cycle 48 and bumps
927+ /// `rotation_cycles` from 0 to 1.
928+ #[ tokio:: test]
929+ async fn test_complete_incremental_fires_catch_up_when_window_missed ( ) {
930+ let ( mut manager, requests, mut rx) = make_synced_incremental_manager ( 70 ) . await ;
931+
932+ manager. complete_pipeline ( & requests) . await . expect ( "complete_pipeline succeeds" ) ;
933+
934+ assert_eq ! (
935+ manager. sync_state. current_cycle_height,
936+ Some ( 48 ) ,
937+ "post-completion re-eval must call `next_pipeline_mode` and enter cycle 48"
938+ ) ;
939+ assert_eq ! (
940+ manager. progress. rotation_cycles( ) ,
941+ 1 ,
942+ "entering cycle 48 once via the catch-up branch bumps `rotation_cycles`"
943+ ) ;
944+ assert_eq ! (
945+ manager. progress. qr_infos_requested( ) ,
946+ 1 ,
947+ "the catch-up branch must reach `send_qrinfo_for_tip` and bump `qr_infos_requested`"
948+ ) ;
949+ assert ! (
950+ manager. sync_state. qrinfo_in_flight. is_some( ) ,
951+ "the catch-up branch must mark a QRInfo as in flight"
952+ ) ;
953+ let queued = rx. try_recv ( ) . expect ( "a NetworkRequest must be queued by the catch-up" ) ;
954+ assert ! (
955+ matches!( queued, NetworkRequest :: SendMessage ( NetworkMessage :: GetQRInfo ( _) ) ) ,
956+ "the queued request must be a `GetQRInfo`, got {:?}" ,
957+ queued
958+ ) ;
959+ }
960+
961+ /// When the cycle gate picks `Incremental` after an `Incremental`
962+ /// completes (e.g. the tip is still before the mining window), the
963+ /// catch-up branch must be a no-op. Cycle 48 mining window is 60..=68
964+ /// for DKG_TEST_DIP0024; tip 50 is before the window so the gate falls
965+ /// through to `Incremental` and no QRInfo fires.
966+ #[ tokio:: test]
967+ async fn test_complete_incremental_does_not_fire_when_gate_picks_incremental ( ) {
968+ let ( mut manager, requests, _rx) = make_synced_incremental_manager ( 50 ) . await ;
969+
970+ manager. complete_pipeline ( & requests) . await . expect ( "complete_pipeline succeeds" ) ;
971+
972+ assert ! (
973+ manager. sync_state. qrinfo_in_flight. is_none( ) ,
974+ "no QRInfo must fire when the gate picks Incremental"
975+ ) ;
976+ assert_eq ! (
977+ manager. progress. qr_infos_requested( ) ,
978+ 0 ,
979+ "no QRInfo must be requested when the gate picks Incremental"
980+ ) ;
981+ }
841982}
0 commit comments