diff --git a/dash-spv/src/sync/masternodes/manager.rs b/dash-spv/src/sync/masternodes/manager.rs index 5f3c58833..006a314d4 100644 --- a/dash-spv/src/sync/masternodes/manager.rs +++ b/dash-spv/src/sync/masternodes/manager.rs @@ -421,12 +421,55 @@ impl MasternodesManager { /// Dispatch pipeline completion based on the current `PipelineMode`. Called when /// the mnlistdiff pipeline drains, from either the message handler or the tick /// handler's timeout-cleanup path. - pub(super) async fn complete_pipeline(&mut self) -> SyncResult> { + /// + /// After an `Incremental` pipeline finishes, re-evaluate `next_pipeline_mode` at + /// the now-advanced tip and fire a catch-up QRInfo if the cycle gate now picks + /// `QuorumValidation`. Without this, a batch of headers that lands while an + /// earlier `Incremental` is in flight can silently skip the cycle's mining + /// window: every intermediate `BlockHeadersStored` event is rejected by the + /// `has_pending_requests` guard, and the tick handler's + /// `current_height < block_header_tip_height` check is false once the + /// `Incremental` catches up to the latest tip, so the catch-up branch in + /// `next_pipeline_mode` never gets a chance to fire. + /// + /// The re-evaluation calls `next_pipeline_mode` here, which may advance + /// per-cycle bookkeeping (`current_cycle_height`, `last_window_qrinfo_tip`, + /// `rotation_cycles`) if the tip crossed a cycle boundary while the + /// `Incremental` was in flight. That matches what the per-event handler + /// would have done had the intermediate events not been dropped. + pub(super) async fn complete_pipeline( + &mut self, + requests: &RequestSender, + ) -> SyncResult> { match std::mem::take(&mut self.sync_state.pipeline_mode) { PipelineMode::QuorumValidation { qr_info_result, } => self.verify_and_complete(qr_info_result).await, - PipelineMode::Incremental => self.complete_incremental_pipeline().await, + PipelineMode::Incremental => { + let mut events = self.complete_incremental_pipeline().await?; + if self.state() == SyncState::Synced && self.sync_state.qrinfo_in_flight.is_none() { + let tip = self.progress.block_header_tip_height(); + if matches!(self.next_pipeline_mode(tip), PipelineMode::QuorumValidation { .. }) + { + tracing::debug!( + tip, + "Incremental complete, cycle gate now picks QRInfo, firing catch-up" + ); + self.sync_state.qrinfo_retry_count = 0; + self.sync_state.clear_pending(); + match self.send_qrinfo_for_tip(requests).await { + Ok(extra) => events.extend(extra), + Err(e) => tracing::warn!( + error = %e, + "Catch-up QRInfo dispatch failed; \ + `current_cycle_attempts` stays 0 so the next \ + `BlockHeadersStored` will re-fire if the gate still picks QRInfo" + ), + } + } + } + Ok(events) + } } } @@ -572,12 +615,15 @@ impl std::fmt::Debug for MasternodesManager { #[cfg(test)] mod tests { use super::*; - use crate::network::MessageType; + use crate::network::{MessageType, NetworkRequest}; use crate::storage::{DiskStorageManager, PersistentBlockHeaderStorage, StorageManager}; use crate::sync::sync_manager::SyncManager; use crate::sync::{ManagerIdentifier, SyncManagerProgress}; + use dashcore::block::Header; use dashcore::hashes::Hash; + use dashcore::network::message::NetworkMessage; use dashcore::sml::masternode_list::MasternodeList; + use tokio::sync::mpsc; type TestMasternodesManager = MasternodesManager; @@ -591,6 +637,35 @@ mod tests { create_test_manager_for(dashcore::Network::Testnet).await } + /// Build a regtest manager whose engine has a single list at `tip` and + /// whose block header storage is populated with dummy headers up to + /// `tip`, in `Synced` state with `pipeline_mode = Incremental` and + /// `block_header_tip_height = tip`. Storage must be populated so that + /// `send_qrinfo_for_tip` finds a tip and reaches the network dispatch; + /// otherwise it short-circuits at `storage.get_tip()` and the catch-up + /// path can't be observed at the network layer. Returns the manager, a + /// `RequestSender`, and the matching receiver so the caller binds it + /// (the channel closes when the receiver drops). + async fn make_synced_incremental_manager( + tip: u32, + ) -> (TestMasternodesManager, RequestSender, mpsc::UnboundedReceiver) { + let storage = DiskStorageManager::with_temp_dir().await.unwrap(); + let block_headers = storage.block_headers(); + block_headers.write().await.store_headers(&Header::dummy_batch(0..tip + 1)).await.unwrap(); + let engine = engine_with_lists(&[(tip, 1)]); + let mut manager = MasternodesManager::new( + block_headers, + Arc::new(RwLock::new(engine)), + dashcore::Network::Regtest, + ) + .await; + manager.set_state(SyncState::Synced); + manager.sync_state.pipeline_mode = PipelineMode::Incremental; + manager.progress.update_block_header_tip_height(tip); + let (tx, rx) = mpsc::unbounded_channel(); + (manager, RequestSender::new(tx), rx) + } + #[tokio::test] async fn test_masternode_manager_new() { let manager = create_test_manager().await; @@ -838,4 +913,70 @@ mod tests { assert_eq!(manager.sync_state.last_synced_block_hash, None); assert_eq!(manager.progress.current_height(), 0); } + + /// `complete_pipeline` after `Incremental` re-evaluates the cycle gate at + /// the latest tip and fires a catch-up QRInfo when the gate picks + /// `QuorumValidation`. When a batch of headers lands while a prior + /// `Incremental` is in flight, every intermediate `BlockHeadersStored` + /// event is rejected by the `has_pending_requests` guard, and the tick + /// handler can't re-fire because `current_height == block_header_tip_height` + /// once the `Incremental` catches up. For DKG_TEST_DIP0024 (regtest), + /// cycle 48 has mining window 60..=68; at tip 70 with no prior attempts, + /// the gate picks catch-up QRInfo. The post-completion call to + /// `next_pipeline_mode` is the first to enter cycle 48 and bumps + /// `rotation_cycles` from 0 to 1. + #[tokio::test] + async fn test_complete_incremental_fires_catch_up_when_window_missed() { + let (mut manager, requests, mut rx) = make_synced_incremental_manager(70).await; + + manager.complete_pipeline(&requests).await.expect("complete_pipeline succeeds"); + + assert_eq!( + manager.sync_state.current_cycle_height, + Some(48), + "post-completion re-eval must call `next_pipeline_mode` and enter cycle 48" + ); + assert_eq!( + manager.progress.rotation_cycles(), + 1, + "entering cycle 48 once via the catch-up branch bumps `rotation_cycles`" + ); + assert_eq!( + manager.progress.qr_infos_requested(), + 1, + "the catch-up branch must reach `send_qrinfo_for_tip` and bump `qr_infos_requested`" + ); + assert!( + manager.sync_state.qrinfo_in_flight.is_some(), + "the catch-up branch must mark a QRInfo as in flight" + ); + let queued = rx.try_recv().expect("a NetworkRequest must be queued by the catch-up"); + assert!( + matches!(queued, NetworkRequest::SendMessage(NetworkMessage::GetQRInfo(_))), + "the queued request must be a `GetQRInfo`, got {:?}", + queued + ); + } + + /// When the cycle gate picks `Incremental` after an `Incremental` + /// completes (e.g. the tip is still before the mining window), the + /// catch-up branch must be a no-op. Cycle 48 mining window is 60..=68 + /// for DKG_TEST_DIP0024; tip 50 is before the window so the gate falls + /// through to `Incremental` and no QRInfo fires. + #[tokio::test] + async fn test_complete_incremental_does_not_fire_when_gate_picks_incremental() { + let (mut manager, requests, _rx) = make_synced_incremental_manager(50).await; + + manager.complete_pipeline(&requests).await.expect("complete_pipeline succeeds"); + + assert!( + manager.sync_state.qrinfo_in_flight.is_none(), + "no QRInfo must fire when the gate picks Incremental" + ); + assert_eq!( + manager.progress.qr_infos_requested(), + 0, + "no QRInfo must be requested when the gate picks Incremental" + ); + } } diff --git a/dash-spv/src/sync/masternodes/sync_manager.rs b/dash-spv/src/sync/masternodes/sync_manager.rs index 79c8116c7..f6d82f2b9 100644 --- a/dash-spv/src/sync/masternodes/sync_manager.rs +++ b/dash-spv/src/sync/masternodes/sync_manager.rs @@ -321,7 +321,7 @@ impl SyncManager for MasternodesManager { // If no pending requests, complete if !self.sync_state.has_pending_requests() { - return self.complete_pipeline().await; + return self.complete_pipeline(requests).await; } } @@ -399,7 +399,7 @@ impl SyncManager for MasternodesManager { return Ok(vec![]); } tracing::info!("All MnListDiff responses received"); - return self.complete_pipeline().await; + return self.complete_pipeline(requests).await; } } @@ -615,7 +615,7 @@ impl SyncManager for MasternodesManager { MAX_RETRY_ATTEMPTS ); self.sync_state.clear_pending(); - return self.complete_pipeline().await; + return self.complete_pipeline(requests).await; } } return Ok(vec![]); @@ -631,7 +631,7 @@ impl SyncManager for MasternodesManager { // Check if complete after handling timeouts if self.sync_state.mnlistdiff_pipeline.is_complete() { tracing::info!("MnListDiff pipeline complete"); - return self.complete_pipeline().await; + return self.complete_pipeline(requests).await; } }