Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 144 additions & 3 deletions dash-spv/src/sync/masternodes/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,12 +421,55 @@ impl<H: BlockHeaderStorage> MasternodesManager<H> {
/// Dispatch pipeline completion based on the current `PipelineMode`. Called when
/// the mnlistdiff pipeline drains, from either the message handler or the tick
/// handler's timeout-cleanup path.
pub(super) async fn complete_pipeline(&mut self) -> SyncResult<Vec<SyncEvent>> {
///
/// After an `Incremental` pipeline finishes, re-evaluate `next_pipeline_mode` at
/// the now-advanced tip and fire a catch-up QRInfo if the cycle gate now picks
/// `QuorumValidation`. Without this, a batch of headers that lands while an
/// earlier `Incremental` is in flight can silently skip the cycle's mining
/// window: every intermediate `BlockHeadersStored` event is rejected by the
/// `has_pending_requests` guard, and the tick handler's
/// `current_height < block_header_tip_height` check is false once the
/// `Incremental` catches up to the latest tip, so the catch-up branch in
/// `next_pipeline_mode` never gets a chance to fire.
///
/// The re-evaluation calls `next_pipeline_mode` here, which may advance
/// per-cycle bookkeeping (`current_cycle_height`, `last_window_qrinfo_tip`,
/// `rotation_cycles`) if the tip crossed a cycle boundary while the
/// `Incremental` was in flight. That matches what the per-event handler
/// would have done had the intermediate events not been dropped.
pub(super) async fn complete_pipeline(
&mut self,
requests: &RequestSender,
) -> SyncResult<Vec<SyncEvent>> {
match std::mem::take(&mut self.sync_state.pipeline_mode) {
PipelineMode::QuorumValidation {
qr_info_result,
} => self.verify_and_complete(qr_info_result).await,
PipelineMode::Incremental => self.complete_incremental_pipeline().await,
PipelineMode::Incremental => {
let mut events = self.complete_incremental_pipeline().await?;
if self.state() == SyncState::Synced && self.sync_state.qrinfo_in_flight.is_none() {
let tip = self.progress.block_header_tip_height();
if matches!(self.next_pipeline_mode(tip), PipelineMode::QuorumValidation { .. })
{
tracing::debug!(
tip,
"Incremental complete, cycle gate now picks QRInfo, firing catch-up"
);
self.sync_state.qrinfo_retry_count = 0;
self.sync_state.clear_pending();
match self.send_qrinfo_for_tip(requests).await {
Ok(extra) => events.extend(extra),
Err(e) => tracing::warn!(
error = %e,
"Catch-up QRInfo dispatch failed; \
`current_cycle_attempts` stays 0 so the next \
`BlockHeadersStored` will re-fire if the gate still picks QRInfo"
),
}
}
}
Ok(events)
}
}
}

Expand Down Expand Up @@ -572,12 +615,15 @@ impl<H: BlockHeaderStorage> std::fmt::Debug for MasternodesManager<H> {
#[cfg(test)]
mod tests {
use super::*;
use crate::network::MessageType;
use crate::network::{MessageType, NetworkRequest};
use crate::storage::{DiskStorageManager, PersistentBlockHeaderStorage, StorageManager};
use crate::sync::sync_manager::SyncManager;
use crate::sync::{ManagerIdentifier, SyncManagerProgress};
use dashcore::block::Header;
use dashcore::hashes::Hash;
use dashcore::network::message::NetworkMessage;
use dashcore::sml::masternode_list::MasternodeList;
use tokio::sync::mpsc;

type TestMasternodesManager = MasternodesManager<PersistentBlockHeaderStorage>;

Expand All @@ -591,6 +637,35 @@ mod tests {
create_test_manager_for(dashcore::Network::Testnet).await
}

/// Build a regtest manager whose engine has a single list at `tip` and
/// whose block header storage is populated with dummy headers up to
/// `tip`, in `Synced` state with `pipeline_mode = Incremental` and
/// `block_header_tip_height = tip`. Storage must be populated so that
/// `send_qrinfo_for_tip` finds a tip and reaches the network dispatch;
/// otherwise it short-circuits at `storage.get_tip()` and the catch-up
/// path can't be observed at the network layer. Returns the manager, a
/// `RequestSender`, and the matching receiver so the caller binds it
/// (the channel closes when the receiver drops).
async fn make_synced_incremental_manager(
tip: u32,
) -> (TestMasternodesManager, RequestSender, mpsc::UnboundedReceiver<NetworkRequest>) {
let storage = DiskStorageManager::with_temp_dir().await.unwrap();
let block_headers = storage.block_headers();
block_headers.write().await.store_headers(&Header::dummy_batch(0..tip + 1)).await.unwrap();
let engine = engine_with_lists(&[(tip, 1)]);
let mut manager = MasternodesManager::new(
block_headers,
Arc::new(RwLock::new(engine)),
dashcore::Network::Regtest,
)
.await;
manager.set_state(SyncState::Synced);
manager.sync_state.pipeline_mode = PipelineMode::Incremental;
manager.progress.update_block_header_tip_height(tip);
let (tx, rx) = mpsc::unbounded_channel();
(manager, RequestSender::new(tx), rx)
}

#[tokio::test]
async fn test_masternode_manager_new() {
let manager = create_test_manager().await;
Expand Down Expand Up @@ -838,4 +913,70 @@ mod tests {
assert_eq!(manager.sync_state.last_synced_block_hash, None);
assert_eq!(manager.progress.current_height(), 0);
}

/// `complete_pipeline` after `Incremental` re-evaluates the cycle gate at
/// the latest tip and fires a catch-up QRInfo when the gate picks
/// `QuorumValidation`. When a batch of headers lands while a prior
/// `Incremental` is in flight, every intermediate `BlockHeadersStored`
/// event is rejected by the `has_pending_requests` guard, and the tick
/// handler can't re-fire because `current_height == block_header_tip_height`
/// once the `Incremental` catches up. For DKG_TEST_DIP0024 (regtest),
/// cycle 48 has mining window 60..=68; at tip 70 with no prior attempts,
/// the gate picks catch-up QRInfo. The post-completion call to
/// `next_pipeline_mode` is the first to enter cycle 48 and bumps
/// `rotation_cycles` from 0 to 1.
#[tokio::test]
async fn test_complete_incremental_fires_catch_up_when_window_missed() {
let (mut manager, requests, mut rx) = make_synced_incremental_manager(70).await;

manager.complete_pipeline(&requests).await.expect("complete_pipeline succeeds");

assert_eq!(
manager.sync_state.current_cycle_height,
Some(48),
"post-completion re-eval must call `next_pipeline_mode` and enter cycle 48"
);
assert_eq!(
manager.progress.rotation_cycles(),
1,
"entering cycle 48 once via the catch-up branch bumps `rotation_cycles`"
);
assert_eq!(
manager.progress.qr_infos_requested(),
1,
"the catch-up branch must reach `send_qrinfo_for_tip` and bump `qr_infos_requested`"
);
assert!(
manager.sync_state.qrinfo_in_flight.is_some(),
"the catch-up branch must mark a QRInfo as in flight"
);
let queued = rx.try_recv().expect("a NetworkRequest must be queued by the catch-up");
assert!(
matches!(queued, NetworkRequest::SendMessage(NetworkMessage::GetQRInfo(_))),
"the queued request must be a `GetQRInfo`, got {:?}",
queued
);
}

/// When the cycle gate picks `Incremental` after an `Incremental`
/// completes (e.g. the tip is still before the mining window), the
/// catch-up branch must be a no-op. Cycle 48 mining window is 60..=68
/// for DKG_TEST_DIP0024; tip 50 is before the window so the gate falls
/// through to `Incremental` and no QRInfo fires.
#[tokio::test]
async fn test_complete_incremental_does_not_fire_when_gate_picks_incremental() {
let (mut manager, requests, _rx) = make_synced_incremental_manager(50).await;

manager.complete_pipeline(&requests).await.expect("complete_pipeline succeeds");

assert!(
manager.sync_state.qrinfo_in_flight.is_none(),
"no QRInfo must fire when the gate picks Incremental"
);
assert_eq!(
manager.progress.qr_infos_requested(),
0,
"no QRInfo must be requested when the gate picks Incremental"
);
}
}
8 changes: 4 additions & 4 deletions dash-spv/src/sync/masternodes/sync_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ impl<H: BlockHeaderStorage> SyncManager for MasternodesManager<H> {

// If no pending requests, complete
if !self.sync_state.has_pending_requests() {
return self.complete_pipeline().await;
return self.complete_pipeline(requests).await;
}
}

Expand Down Expand Up @@ -399,7 +399,7 @@ impl<H: BlockHeaderStorage> SyncManager for MasternodesManager<H> {
return Ok(vec![]);
}
tracing::info!("All MnListDiff responses received");
return self.complete_pipeline().await;
return self.complete_pipeline(requests).await;
}
}

Expand Down Expand Up @@ -615,7 +615,7 @@ impl<H: BlockHeaderStorage> SyncManager for MasternodesManager<H> {
MAX_RETRY_ATTEMPTS
);
self.sync_state.clear_pending();
return self.complete_pipeline().await;
return self.complete_pipeline(requests).await;
}
}
return Ok(vec![]);
Expand All @@ -631,7 +631,7 @@ impl<H: BlockHeaderStorage> SyncManager for MasternodesManager<H> {
// Check if complete after handling timeouts
if self.sync_state.mnlistdiff_pipeline.is_complete() {
tracing::info!("MnListDiff pipeline complete");
return self.complete_pipeline().await;
return self.complete_pipeline(requests).await;
}
}

Expand Down
Loading