Skip to content

Commit c8181d0

Browse files
committed
synced fully
1 parent 1debfa3 commit c8181d0

2 files changed

Lines changed: 247 additions & 19 deletions

File tree

dash-spv/src/client/mod.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ pub use status_display::StatusDisplay;
4242
pub use wallet_utils::{WalletSummary, WalletUtils};
4343
pub use watch_manager::{WatchItemUpdateSender, WatchManager};
4444

45+
4546
/// Main Dash SPV client.
4647
pub struct DashSpvClient {
4748
config: ClientConfig,
@@ -3290,3 +3291,56 @@ mod tests {
32903291
assert_eq!(address_balance_change, 40000);
32913292
}
32923293
}
3294+
3295+
impl DashSpvClient {
3296+
/// Get diagnostic information about chain state vs storage synchronization
3297+
pub async fn get_sync_diagnostics(&self) -> Result<SyncDiagnostics> {
3298+
let storage_tip_height = self.storage
3299+
.get_tip_height()
3300+
.await
3301+
.map_err(|e| SpvError::Storage(e))?
3302+
.unwrap_or(0);
3303+
3304+
let chain_state = self.chain_state().await;
3305+
let chain_state_height = chain_state.get_height();
3306+
let chain_state_headers_count = chain_state.headers.len() as u32;
3307+
3308+
// Get sync manager's chain state - we need to access it differently
3309+
// The sync manager has its own internal chain state
3310+
let sync_progress = self.sync_manager.get_progress();
3311+
let sync_manager_height = sync_progress.header_height;
3312+
let sync_manager_headers_count = sync_progress.header_height + 1; // Approximate since we can't access internal state directly
3313+
3314+
let diagnostics = SyncDiagnostics {
3315+
storage_tip_height,
3316+
chain_state_height,
3317+
chain_state_headers_count,
3318+
sync_manager_height,
3319+
sync_manager_headers_count,
3320+
sync_base_height: chain_state.sync_base_height,
3321+
synced_from_checkpoint: chain_state.synced_from_checkpoint,
3322+
headers_mismatch: storage_tip_height != chain_state_height,
3323+
sync_manager_mismatch: sync_manager_height != chain_state_height,
3324+
};
3325+
3326+
if diagnostics.headers_mismatch || diagnostics.sync_manager_mismatch {
3327+
tracing::warn!("⚠️ Sync state mismatch detected: {:?}", diagnostics);
3328+
}
3329+
3330+
Ok(diagnostics)
3331+
}
3332+
}
3333+
3334+
/// Diagnostic information about sync state
3335+
#[derive(Debug, Clone)]
3336+
pub struct SyncDiagnostics {
3337+
pub storage_tip_height: u32,
3338+
pub chain_state_height: u32,
3339+
pub chain_state_headers_count: u32,
3340+
pub sync_manager_height: u32,
3341+
pub sync_manager_headers_count: u32,
3342+
pub sync_base_height: u32,
3343+
pub synced_from_checkpoint: bool,
3344+
pub headers_mismatch: bool,
3345+
pub sync_manager_mismatch: bool,
3346+
}

dash-spv/src/sync/headers_with_reorg.rs

Lines changed: 193 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -306,14 +306,24 @@ impl HeaderSyncManagerWithReorg {
306306
if !headers.is_empty() {
307307
let first = headers.first().unwrap();
308308
let last = headers.last().unwrap();
309-
tracing::debug!(
309+
tracing::info!(
310310
"Received headers batch: first.prev_hash={}, first.hash={}, last.hash={}, count={}",
311311
first.prev_blockhash,
312312
first.block_hash(),
313313
last.block_hash(),
314314
headers.len()
315315
);
316316

317+
// Check if the first header connects to our tip
318+
if let Some(tip) = self.chain_state.get_tip_header() {
319+
if first.prev_blockhash == tip.block_hash() {
320+
tracing::info!("✅ First header correctly extends our tip");
321+
} else {
322+
tracing::warn!("⚠️ First header does NOT extend our tip. Expected prev_hash: {}, got: {}",
323+
tip.block_hash(), first.prev_blockhash);
324+
}
325+
}
326+
317327
// If we're syncing from checkpoint, log if headers appear to be from wrong height
318328
if self.chain_state.synced_from_checkpoint {
319329
// Check if this looks like early blocks (low difficulty, early timestamps)
@@ -328,41 +338,132 @@ impl HeaderSyncManagerWithReorg {
328338
}
329339
}
330340

341+
// Track how many headers we actually process (not skip)
342+
let mut headers_processed = 0u32;
343+
331344
// Process each header with fork detection
332345
for header in &headers {
333-
// Skip headers we've already processed to avoid duplicate processing
346+
// Check if this header is already in our chain state
334347
let header_hash = header.block_hash();
335-
if let Some(existing_height) = storage
336-
.get_header_height_by_hash(&header_hash)
337-
.await
338-
.map_err(|e| SyncError::Storage(format!("Failed to check header existence: {}", e)))?
339-
{
340-
tracing::debug!("⏭️ Skipping already processed header {} at height {}", header_hash, existing_height);
341-
continue;
348+
349+
// First check if it's already in chain state by checking if we can find it at any height
350+
let mut header_in_chain_state = false;
351+
352+
// Check if this header extends our current tip
353+
if let Some(tip) = self.chain_state.get_tip_header() {
354+
if header.prev_blockhash == tip.block_hash() {
355+
// This header extends our tip, so it's not in chain state yet
356+
header_in_chain_state = false;
357+
} else if header_hash == tip.block_hash() {
358+
// This IS our current tip
359+
header_in_chain_state = true;
360+
}
361+
}
362+
363+
// If not extending tip, check if it's already in storage AND chain state
364+
if !header_in_chain_state {
365+
if let Some(existing_height) = storage
366+
.get_header_height_by_hash(&header_hash)
367+
.await
368+
.map_err(|e| SyncError::Storage(format!("Failed to check header existence: {}", e)))?
369+
{
370+
// Header exists in storage - check if it's also in chain state
371+
let chain_state_height = if self.chain_state.synced_from_checkpoint && existing_height >= self.chain_state.sync_base_height {
372+
// Adjust for checkpoint sync
373+
existing_height - self.chain_state.sync_base_height
374+
} else if !self.chain_state.synced_from_checkpoint {
375+
existing_height
376+
} else {
377+
// Height is before our checkpoint, can't be in chain state
378+
tracing::debug!("Header {} at height {} is before our checkpoint base {}",
379+
header_hash, existing_height, self.chain_state.sync_base_height);
380+
continue;
381+
};
382+
383+
// Check if chain state has a header at this height
384+
if let Some(chain_header) = self.chain_state.header_at_height(chain_state_height) {
385+
if chain_header.block_hash() == header_hash {
386+
// Header is already in both storage and chain state
387+
tracing::debug!("⏭️ Skipping header {} already in chain state at height {}",
388+
header_hash, existing_height);
389+
continue;
390+
}
391+
}
392+
393+
// Header is in storage but NOT in chain state - we need to process it
394+
tracing::info!("📥 Header {} exists in storage at height {} but not in chain state, adding it",
395+
header_hash, existing_height);
396+
}
342397
}
343398

344399
match self.process_header_with_fork_detection(header, storage).await? {
345400
HeaderProcessResult::ExtendedMainChain => {
346401
// Normal case - header extends the main chain
402+
headers_processed += 1;
347403
}
348404
HeaderProcessResult::CreatedFork => {
349405
tracing::warn!("⚠️ Fork detected at height {}", self.chain_state.get_height());
406+
headers_processed += 1;
350407
}
351408
HeaderProcessResult::ExtendedFork => {
352409
tracing::debug!("Fork extended");
410+
headers_processed += 1;
353411
}
354412
HeaderProcessResult::Orphan => {
355413
tracing::debug!("Orphan header received: {}", header.block_hash());
414+
// Don't count orphans as processed
356415
}
357416
HeaderProcessResult::TriggeredReorg(depth) => {
358417
tracing::warn!("🔄 Chain reorganization triggered - depth: {}", depth);
418+
headers_processed += 1;
359419
}
360420
}
361421
}
362422

363423
// Check if any fork is now stronger than the main chain
364424
self.check_for_reorg(storage).await?;
365425

426+
// Log summary of what was processed
427+
tracing::info!(
428+
"📊 Header batch processing complete: {} processed, {} skipped out of {} total",
429+
headers_processed,
430+
headers.len() - headers_processed as usize,
431+
headers.len()
432+
);
433+
434+
// Check if we made progress
435+
if headers_processed == 0 && !headers.is_empty() {
436+
tracing::warn!(
437+
"⚠️ All {} headers were skipped (already in chain state). This may happen during sync recovery.",
438+
headers.len()
439+
);
440+
441+
// Check if the last header in the batch matches our tip
442+
if let Some(last_header) = headers.last() {
443+
if let Some(tip) = self.chain_state.get_tip_header() {
444+
if last_header.block_hash() == tip.block_hash() {
445+
tracing::info!(
446+
"📊 Last header in batch matches our tip at height {}. Sync appears to be complete.",
447+
self.chain_state.get_height()
448+
);
449+
// If we received headers up to our tip and processed none, we're synced
450+
self.syncing_headers = false;
451+
return Ok(false);
452+
}
453+
}
454+
}
455+
}
456+
457+
// Additional check: if we received fewer headers than expected, we might be at the tip
458+
if headers.len() < 2000 && headers_processed == 0 {
459+
tracing::info!(
460+
"📊 Received partial batch ({} headers) with no new headers. Likely at chain tip.",
461+
headers.len()
462+
);
463+
self.syncing_headers = false;
464+
return Ok(false);
465+
}
466+
366467
if self.syncing_headers {
367468
// During sync mode - request next batch
368469
if let Some(tip) = self.chain_state.get_tip_header() {
@@ -533,6 +634,78 @@ impl HeaderSyncManagerWithReorg {
533634
Ok(())
534635
}
535636

637+
/// Build a proper block locator following the Bitcoin protocol
638+
/// Returns a vector of block hashes with exponentially increasing steps
639+
fn build_block_locator_from_hash(&self, tip_hash: BlockHash, include_genesis: bool) -> Vec<BlockHash> {
640+
let mut locator = Vec::new();
641+
642+
// Always include the tip
643+
locator.push(tip_hash);
644+
645+
// Get the current height
646+
let tip_height = self.chain_state.tip_height();
647+
if tip_height == 0 {
648+
return locator; // Only genesis, nothing more to add
649+
}
650+
651+
// Build exponentially spaced block locator
652+
// Steps: 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, ...
653+
let mut step = 1u32;
654+
let mut current_height = tip_height;
655+
656+
while current_height > self.chain_state.sync_base_height {
657+
// Calculate the next height to include
658+
let next_height = current_height.saturating_sub(step);
659+
660+
// Don't go below sync base height
661+
if next_height < self.chain_state.sync_base_height {
662+
break;
663+
}
664+
665+
// Get header at this height
666+
if let Some(header) = self.chain_state.header_at_height(next_height) {
667+
locator.push(header.block_hash());
668+
current_height = next_height;
669+
670+
// Double the step for exponential spacing
671+
step = step.saturating_mul(2);
672+
673+
// Limit the locator size to prevent it from getting too large
674+
if locator.len() >= 10 {
675+
break;
676+
}
677+
} else {
678+
// If we can't find the header, try the next step
679+
break;
680+
}
681+
}
682+
683+
// Add checkpoint/base hash if we haven't reached it yet
684+
if current_height > self.chain_state.sync_base_height && self.chain_state.sync_base_height > 0 {
685+
if let Some(base_header) = self.chain_state.header_at_height(self.chain_state.sync_base_height) {
686+
locator.push(base_header.block_hash());
687+
}
688+
}
689+
690+
// Optionally add genesis
691+
if include_genesis && self.chain_state.sync_base_height == 0 {
692+
if let Some(genesis_hash) = self.config.network.known_genesis_block_hash() {
693+
// Only add genesis if it's not already in the locator
694+
if !locator.contains(&genesis_hash) {
695+
locator.push(genesis_hash);
696+
}
697+
}
698+
}
699+
700+
tracing::debug!(
701+
"Built block locator with {} hashes: {:?}",
702+
locator.len(),
703+
locator.iter().take(5).collect::<Vec<_>>() // Show first 5 for debugging
704+
);
705+
706+
locator
707+
}
708+
536709
/// Request headers from the network
537710
pub async fn request_headers(
538711
&mut self,
@@ -544,26 +717,26 @@ impl HeaderSyncManagerWithReorg {
544717
// When syncing from a checkpoint, we need to create a proper locator
545718
// that helps the peer understand we want headers AFTER this point
546719
if self.chain_state.synced_from_checkpoint && self.chain_state.sync_base_height > 0 {
547-
// For checkpoint sync, only include the checkpoint hash
548-
// Including genesis would allow peers to fall back to sending headers from genesis
549-
// if they don't recognize the checkpoint, which is exactly what we want to avoid
720+
// For checkpoint sync, build a proper locator but don't include genesis
721+
// to avoid peers falling back to sending headers from genesis
550722
tracing::info!(
551-
"📍 Using checkpoint-only locator for height {}: [{}]",
552-
self.chain_state.sync_base_height,
553-
hash
723+
"📍 Building checkpoint-based locator starting from height {}",
724+
self.chain_state.sync_base_height
554725
);
555-
vec![hash]
726+
self.build_block_locator_from_hash(hash, false)
556727
} else if network.has_headers2_peer().await && !self.headers2_failed {
557728
// Check if this is genesis and we're using headers2
558729
let genesis_hash = self.config.network.known_genesis_block_hash();
559730
if genesis_hash == Some(hash) {
560731
tracing::info!("📍 Using empty locator for headers2 genesis sync");
561732
vec![]
562733
} else {
563-
vec![hash]
734+
// Build a proper locator for non-genesis headers2 requests
735+
self.build_block_locator_from_hash(hash, true)
564736
}
565737
} else {
566-
vec![hash]
738+
// Build a proper locator for regular requests
739+
self.build_block_locator_from_hash(hash, true)
567740
}
568741
},
569742
None => {
@@ -916,7 +1089,8 @@ impl HeaderSyncManagerWithReorg {
9161089
// More aggressive timeout when no peers
9171090
std::time::Duration::from_secs(5)
9181091
} else {
919-
std::time::Duration::from_millis(500)
1092+
// Give peers reasonable time to respond (10 seconds)
1093+
std::time::Duration::from_secs(10)
9201094
};
9211095

9221096
if self.last_sync_progress.elapsed() > timeout_duration {

0 commit comments

Comments
 (0)