Skip to content

Commit ea01278

Browse files
authored
fix(dash-spv): preserve buffered block headers across disconnect (#702)
`BlockHeadersManager::clear_in_flight_state` rebuilt the entire `HeadersPipeline` on every disconnect, throwing away each segment's `current_tip_hash`, `current_height`, `buffered_headers`, and `complete` flags. This PR splits it so that `SegmentState::clear_in_flight` and `HeadersPipeline::clear_in_flight` now reset only the `DownloadCoordinator` per segment. `BlockHeadersManager::clear_in_flight_state` calls the pipeline-level helper instead of constructing a new pipeline. `start_sync` skips `pipeline.init` when the pipeline is already initialized and calls `reset_tip_segment` so a previously synced tip is prepared to let `send_pending` re-fire `GetHeaders`. Adds unit coverage for the new clear path at the segment, pipeline, and manager levels (mid-sync and post-sync disconnect/reconnect cycles), plus a header-progress monotonicity assertion in `run_disconnect_loop` so the dashd integration tests catch a future regression that drops validated chain state on disconnect.
1 parent 95377d5 commit ea01278

5 files changed

Lines changed: 273 additions & 21 deletions

File tree

dash-spv/src/sync/block_headers/manager.rs

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,7 @@ mod tests {
263263
DiskStorageManager, PersistentBlockHeaderStorage, PersistentMetadataStorage, StorageManager,
264264
};
265265
use crate::sync::{ManagerIdentifier, SyncManager, SyncManagerProgress};
266+
use dashcore::network::message::NetworkMessage;
266267
use tokio::sync::mpsc::unbounded_channel;
267268

268269
type TestBlockHeadersManager =
@@ -428,6 +429,111 @@ mod tests {
428429
assert!(rx.try_recv().is_err());
429430
}
430431

432+
#[tokio::test]
433+
async fn test_disconnect_preserves_pipeline_and_resumes_from_advanced_tip() {
434+
let mut manager = create_test_manager().await;
435+
let (requests, mut rx) = create_test_request_sender();
436+
437+
// Use a target below the first testnet checkpoint (50000) so the
438+
// pipeline produces a single open-ended tip segment.
439+
let initial_event = NetworkEvent::PeersUpdated {
440+
connected_count: 1,
441+
best_height: Some(40_000),
442+
addresses: vec![],
443+
};
444+
manager.handle_network_event(&initial_event, &requests).await.unwrap();
445+
assert_eq!(manager.state(), SyncState::Syncing);
446+
assert!(manager.pipeline.is_initialized());
447+
assert_eq!(manager.pipeline.segment_count(), 1);
448+
449+
let initial_locator = match rx.try_recv().expect("initial GetHeaders not sent") {
450+
NetworkRequest::SendMessage(NetworkMessage::GetHeaders(msg)) => msg.locator_hashes[0],
451+
other => panic!("Expected GetHeaders, got {:?}", other),
452+
};
453+
assert!(rx.try_recv().is_err());
454+
455+
// Simulate a peer response. The single tip segment drains its buffer
456+
// through take_ready_to_store, advancing the storage tip and the
457+
// segment's current_tip_hash to advanced_hash.
458+
let header = Header::dummy_chain(1, initial_locator).remove(0);
459+
let advanced_hash = header.block_hash();
460+
manager.handle_headers_pipeline(&[header], &requests).await.unwrap();
461+
462+
// Drain the follow-up GetHeaders that send_pending issued.
463+
match rx.try_recv().expect("follow-up GetHeaders not sent") {
464+
NetworkRequest::SendMessage(NetworkMessage::GetHeaders(msg)) => {
465+
assert_eq!(msg.locator_hashes[0], advanced_hash);
466+
}
467+
other => panic!("Expected GetHeaders, got {:?}", other),
468+
}
469+
assert!(rx.try_recv().is_err());
470+
471+
let disconnect_event = NetworkEvent::PeersUpdated {
472+
connected_count: 0,
473+
best_height: Some(40_000),
474+
addresses: vec![],
475+
};
476+
manager.handle_network_event(&disconnect_event, &requests).await.unwrap();
477+
assert_eq!(manager.state(), SyncState::WaitingForConnections);
478+
assert!(
479+
manager.pipeline.is_initialized(),
480+
"pipeline must survive disconnect so resume can reuse validated state"
481+
);
482+
assert_eq!(manager.pipeline.segment_count(), 1);
483+
484+
// Reconnect: start_sync must skip pipeline.init and resume by sending
485+
// GetHeaders from each segment's preserved current_tip_hash.
486+
manager.handle_network_event(&initial_event, &requests).await.unwrap();
487+
assert_eq!(manager.state(), SyncState::Syncing);
488+
489+
let resumed_locator = match rx.try_recv().expect("resumed GetHeaders not sent") {
490+
NetworkRequest::SendMessage(NetworkMessage::GetHeaders(msg)) => msg.locator_hashes[0],
491+
other => panic!("Expected GetHeaders, got {:?}", other),
492+
};
493+
assert_eq!(
494+
resumed_locator, advanced_hash,
495+
"GetHeaders on reconnect must use the preserved current_tip_hash"
496+
);
497+
assert_ne!(resumed_locator, initial_locator);
498+
assert!(rx.try_recv().is_err());
499+
}
500+
501+
#[tokio::test]
502+
async fn test_disconnect_after_sync_resumes_and_catches_up() {
503+
let mut manager = create_synced_manager().await;
504+
let tip = manager.tip().await.unwrap();
505+
let synced_hash = *tip.hash();
506+
manager.pipeline.mark_tip_complete();
507+
assert!(manager.pipeline.is_tip_complete());
508+
509+
let (requests, mut rx) = create_test_request_sender();
510+
511+
let disconnect_event = NetworkEvent::PeersUpdated {
512+
connected_count: 0,
513+
best_height: Some(tip.height()),
514+
addresses: vec![],
515+
};
516+
manager.handle_network_event(&disconnect_event, &requests).await.unwrap();
517+
assert_eq!(manager.state(), SyncState::WaitingForConnections);
518+
assert!(manager.pipeline.is_initialized());
519+
520+
// Reconnect with a higher peer best_height (a new block was mined).
521+
let reconnect_event = NetworkEvent::PeersUpdated {
522+
connected_count: 1,
523+
best_height: Some(tip.height() + 1),
524+
addresses: vec![],
525+
};
526+
manager.handle_network_event(&reconnect_event, &requests).await.unwrap();
527+
assert_eq!(manager.state(), SyncState::Syncing);
528+
529+
let resumed_locator = match rx.try_recv().expect("resumed GetHeaders not sent") {
530+
NetworkRequest::SendMessage(NetworkMessage::GetHeaders(msg)) => msg.locator_hashes[0],
531+
other => panic!("Expected GetHeaders, got {:?}", other),
532+
};
533+
assert_eq!(resumed_locator, synced_hash);
534+
assert!(rx.try_recv().is_err());
535+
}
536+
431537
#[tokio::test]
432538
async fn test_empty_headers_after_tip_announcement_is_harmless() {
433539
let mut manager = create_synced_manager().await;

dash-spv/src/sync/block_headers/pipeline.rs

Lines changed: 74 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,6 @@ impl HeadersPipeline {
5252
}
5353
}
5454

55-
/// Get a reference to the checkpoint manager.
56-
pub fn checkpoint_manager(&self) -> &Arc<CheckpointManager> {
57-
&self.checkpoint_manager
58-
}
59-
6055
/// Initialize the pipeline for downloading from current_height to target_height.
6156
pub fn init(&mut self, current_height: u32, current_hash: BlockHash, target_height: u32) {
6257
self.segments.clear();
@@ -273,6 +268,18 @@ impl HeadersPipeline {
273268
}
274269
}
275270

271+
/// Drop only per-peer in-flight bookkeeping across every segment.
272+
///
273+
/// Buffered headers, segment topology, and per-segment validated tip state
274+
/// are preserved. `next_to_store` and `initialized` stay put so a reconnect
275+
/// can resume sending `GetHeaders` from each segment's preserved
276+
/// `current_tip_hash` without re-fetching what we already have.
277+
pub fn clear_in_flight(&mut self) {
278+
for segment in &mut self.segments {
279+
segment.clear_in_flight();
280+
}
281+
}
282+
276283
/// Check if pipeline is initialized.
277284
pub fn is_initialized(&self) -> bool {
278285
self.initialized
@@ -528,6 +535,68 @@ mod tests {
528535
assert!(pipeline.segments[0].buffered_headers.is_empty());
529536
}
530537

538+
#[test]
539+
fn test_clear_in_flight_preserves_buffers_across_segments() {
540+
let shared_hash = BlockHash::dummy(42);
541+
542+
let mut completed =
543+
SegmentState::new(0, 0, BlockHash::dummy(0), Some(100), Some(shared_hash));
544+
completed.complete = true;
545+
completed.current_height = 100;
546+
completed.current_tip_hash = shared_hash;
547+
// Buffered headers on a complete-but-not-yet-drained segment must survive.
548+
let mut completed_header = Header::dummy(1);
549+
completed_header.prev_blockhash = BlockHash::dummy(0);
550+
completed.buffered_headers.push(HashedBlockHeader::from(completed_header));
551+
552+
let mut mid = SegmentState::new(1, 100, shared_hash, Some(200), None);
553+
mid.coordinator.mark_sent(&[shared_hash]);
554+
let mut mid_header = Header::dummy(2);
555+
mid_header.prev_blockhash = shared_hash;
556+
mid.receive_headers(&[mid_header]).unwrap();
557+
let mid_preserved_tip = mid.current_tip_hash;
558+
let mid_preserved_height = mid.current_height;
559+
let mid_preserved_buffered = mid.buffered_headers.len();
560+
// Simulate a fresh in-flight follow-up request for this segment.
561+
mid.coordinator.mark_sent(&[mid_preserved_tip]);
562+
563+
let tip_hash = BlockHash::dummy(99);
564+
let mut tip = SegmentState::new(2, 500, tip_hash, None, None);
565+
tip.coordinator.mark_sent(&[tip_hash]);
566+
567+
let cm = create_test_checkpoint_manager(true);
568+
let mut pipeline = HeadersPipeline::new(cm);
569+
pipeline.initialized = true;
570+
pipeline.next_to_store = 0;
571+
pipeline.segments = vec![completed, mid, tip];
572+
573+
pipeline.clear_in_flight();
574+
575+
// initialized and next_to_store stay put.
576+
assert!(pipeline.is_initialized());
577+
assert_eq!(pipeline.next_to_store, 0);
578+
579+
// Completed segment keeps its buffer and complete flag.
580+
assert!(pipeline.segments[0].complete);
581+
assert_eq!(pipeline.segments[0].buffered_headers.len(), 1);
582+
assert_eq!(pipeline.segments[0].current_tip_hash, shared_hash);
583+
584+
// Mid-download segment: validated chain state preserved; coordinator wiped.
585+
assert_eq!(pipeline.segments[1].current_tip_hash, mid_preserved_tip);
586+
assert_eq!(pipeline.segments[1].current_height, mid_preserved_height);
587+
assert_eq!(pipeline.segments[1].buffered_headers.len(), mid_preserved_buffered);
588+
assert!(!pipeline.segments[1].complete);
589+
assert_eq!(pipeline.segments[1].coordinator.active_count(), 0);
590+
assert_eq!(pipeline.segments[1].coordinator.pending_count(), 0);
591+
// can_send returns true so a fresh GetHeaders can resume from preserved tip.
592+
assert!(pipeline.segments[1].can_send());
593+
594+
// Tip segment: in-flight cleared, preserved hash/height intact.
595+
assert_eq!(pipeline.segments[2].current_tip_hash, tip_hash);
596+
assert_eq!(pipeline.segments[2].coordinator.active_count(), 0);
597+
assert!(pipeline.segments[2].can_send());
598+
}
599+
531600
#[test]
532601
fn test_tip_complete_lifecycle() {
533602
let cm = create_test_checkpoint_manager(true);

dash-spv/src/sync/block_headers/segment_state.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,15 @@ impl SegmentState {
194194
self.coordinator.enqueue_retry(hash);
195195
}
196196
}
197+
198+
/// Drop only per-peer in-flight bookkeeping.
199+
///
200+
/// Buffered headers and the validated `current_tip_hash` / `current_height`
201+
/// are preserved so a reconnect can resume from where the last peer left off
202+
/// without re-fetching headers we already have.
203+
pub(super) fn clear_in_flight(&mut self) {
204+
self.coordinator.clear();
205+
}
197206
}
198207

199208
#[cfg(test)]
@@ -366,4 +375,41 @@ mod tests {
366375
}
367376
assert!(segment.buffered_headers.is_empty());
368377
}
378+
379+
#[test]
380+
fn test_clear_in_flight_preserves_chain_state() {
381+
let start_hash = BlockHash::dummy(0);
382+
let mut segment = SegmentState::new(0, 0, start_hash, None, None);
383+
segment.coordinator.mark_sent(&[start_hash]);
384+
385+
let mut header = Header::dummy(1);
386+
header.prev_blockhash = start_hash;
387+
segment.receive_headers(&[header]).unwrap();
388+
389+
let preserved_tip_hash = segment.current_tip_hash;
390+
let preserved_height = segment.current_height;
391+
let preserved_buffered = segment.buffered_headers.len();
392+
assert_ne!(preserved_tip_hash, start_hash);
393+
assert_eq!(preserved_height, 1);
394+
assert_eq!(preserved_buffered, 1);
395+
396+
// Simulate a fresh in-flight request, then clear it.
397+
segment.coordinator.mark_sent(&[preserved_tip_hash]);
398+
assert!(segment.coordinator.is_in_flight(&preserved_tip_hash));
399+
400+
segment.clear_in_flight();
401+
402+
assert!(!segment.coordinator.is_in_flight(&preserved_tip_hash));
403+
assert_eq!(segment.coordinator.active_count(), 0);
404+
assert_eq!(segment.coordinator.pending_count(), 0);
405+
406+
assert_eq!(segment.current_tip_hash, preserved_tip_hash);
407+
assert_eq!(segment.current_height, preserved_height);
408+
assert_eq!(segment.buffered_headers.len(), preserved_buffered);
409+
assert!(!segment.complete);
410+
411+
// After clearing, can_send should be true again so a fresh GetHeaders
412+
// can resume from the preserved tip hash without re-fetching what we have.
413+
assert!(segment.can_send());
414+
}
369415
}

dash-spv/src/sync/block_headers/sync_manager.rs

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use crate::error::SyncResult;
22
use crate::network::{Message, MessageType, NetworkEvent, RequestSender};
33
use crate::storage::{BlockHeaderStorage, MetadataStorage};
4-
use crate::sync::block_headers::pipeline::HeadersPipeline;
54
use crate::sync::sync_manager::ensure_not_started;
65
use crate::sync::{
76
BlockHeadersManager, ManagerIdentifier, ProgressPercentage, SyncEvent, SyncManager,
@@ -38,8 +37,12 @@ impl<H: BlockHeaderStorage, M: MetadataStorage> SyncManager for BlockHeadersMana
3837
}
3938

4039
fn clear_in_flight_state(&mut self) {
41-
let checkpoint_manager = self.pipeline.checkpoint_manager().clone();
42-
self.pipeline = HeadersPipeline::new(checkpoint_manager);
40+
// Drop only per-peer in-flight bookkeeping. Segment topology and
41+
// validated chain state per segment (current_tip_hash, current_height,
42+
// buffered_headers, complete) are preserved so a reconnect can resume
43+
// from where the disconnected peer left off without re-fetching headers
44+
// we already have.
45+
self.pipeline.clear_in_flight();
4346
self.pending_announcements.clear();
4447
self.announced_peers.clear();
4548
}
@@ -48,18 +51,31 @@ impl<H: BlockHeaderStorage, M: MetadataStorage> SyncManager for BlockHeadersMana
4851
ensure_not_started(self.state(), self.identifier())?;
4952
self.progress.set_state(SyncState::Syncing);
5053

51-
let tip = self.tip().await?;
52-
let target_height = self.progress.target_height();
53-
54-
// Initialize the pipeline with checkpoint-based segments
55-
self.pipeline.init(tip.height(), *tip.hash(), target_height);
56-
57-
tracing::info!(
58-
"Starting parallel header sync from {} to {} ({} segments)",
59-
tip.height(),
60-
target_height,
61-
self.pipeline.segment_count()
62-
);
54+
if !self.pipeline.is_initialized() {
55+
let tip = self.tip().await?;
56+
let target_height = self.progress.target_height();
57+
58+
// Initialize the pipeline with checkpoint-based segments
59+
self.pipeline.init(tip.height(), *tip.hash(), target_height);
60+
61+
tracing::info!(
62+
"Starting parallel header sync from {} to {} ({} segments)",
63+
tip.height(),
64+
target_height,
65+
self.pipeline.segment_count()
66+
);
67+
} else {
68+
// Resume path: if we previously synced past the tip the open-ended
69+
// segment is marked complete and `send_pending` would skip it.
70+
// Reset it so a fresh GetHeaders is fired from the preserved
71+
// `current_tip_hash`. No-op if the tip is still mid-sync.
72+
self.pipeline.reset_tip_segment();
73+
tracing::info!(
74+
"Resuming parallel header sync ({} segments, {} buffered)",
75+
self.pipeline.segment_count(),
76+
self.pipeline.total_buffered()
77+
);
78+
}
6379

6480
// Send initial batch of requests
6581
let sent = self.pipeline.send_pending(requests)?;

dash-spv/tests/dashd_sync/helpers.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@ use dash_spv::test_utils::SYNC_TIMEOUT;
1616

1717
use super::setup::{ClientHandle, TestContext};
1818

19+
/// Read the headers manager's effective height (storage tip plus buffered).
20+
fn current_header_height(handle: &ClientHandle) -> u32 {
21+
handle.progress_receiver.borrow().headers().ok().map(|h| h.current_height()).unwrap_or(0)
22+
}
23+
1924
/// Wait for sync to reach target height.
2025
pub(super) async fn wait_for_sync(
2126
progress_receiver: &mut watch::Receiver<SyncProgress>,
@@ -198,7 +203,9 @@ pub(super) async fn assert_no_mempool_tx(
198203
///
199204
/// Waits for progress events, disconnects all peers after every 5th event,
200205
/// validates disconnect/reconnect network events, and asserts wallet state
201-
/// after sync completes.
206+
/// after sync completes. Also asserts header progress (storage tip plus
207+
/// buffered) is monotonic across each disconnect cycle, so a regression that
208+
/// drops validated chain state on disconnect is caught.
202209
pub(super) async fn run_disconnect_loop(
203210
mut client_handle: ClientHandle,
204211
node: &DashCoreNode,
@@ -230,6 +237,7 @@ pub(super) async fn run_disconnect_loop(
230237
disconnect_count + 1,
231238
event.description()
232239
);
240+
let pre_disconnect_height = current_header_height(&client_handle);
233241
node.disconnect_all_peers();
234242
disconnect_count += 1;
235243
events_since_disconnect = 0;
@@ -249,6 +257,13 @@ pub(super) async fn run_disconnect_loop(
249257
).await;
250258
assert!(saw_reconnect, "SPV should reconnect after disconnection");
251259
tracing::info!("SPV reconnected (PeerConnected)");
260+
261+
let post_reconnect_height = current_header_height(&client_handle);
262+
assert!(
263+
post_reconnect_height >= pre_disconnect_height,
264+
"Header progress regressed across disconnect {}: {} -> {}",
265+
disconnect_count, pre_disconnect_height, post_reconnect_height
266+
);
252267
}
253268
}
254269
Ok(SyncEvent::SyncComplete { .. }) => {

0 commit comments

Comments
 (0)