Skip to content

Commit 2f3d83b

Browse files
AIQnetLabclaude
andcommitted
fix: bound fork-recovery replay to the nearest snapshot, never from genesis
reconcile_state_after_rollback fell back to a full replay from height 0 under the global state lock on every rollback, freezing the node for minutes and turning a transient one-block fork into a permanent network halt. Root: commit 337ca43 switched the boundary snapshot writer to full_snap_, but the recovery reader (load_state_snapshot_by_height) only read the legacy state_snap_ key, so no snapshot was ever found. - add Storage::decode_snapshot_accounts: format-aware decoder that reads the canonical full_snap_ (Format A, raw accounts-CF dump) and the legacy state_snap_ (Format B, bincode Vec) into the account list - reconcile now decodes the snapshot bytes already fetched by find_snapshot_at_or_before (no second state_snap_-only read), restoring from the freshest snapshot <= target so replay is bounded to <= the snapshot interval instead of a genesis replay - refuse a from-0 replay above the genesis window: no usable snapshot and target > interval returns Err so the caller re-syncs from the canonical 2f+1 chain instead of freezing A transient fork above finality is now bounded and self-heals via re-sync. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 337ca43 commit 2f3d83b

2 files changed

Lines changed: 73 additions & 31 deletions

File tree

development/qnet-integration/src/node.rs

Lines changed: 18 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -3500,46 +3500,33 @@ impl BlockchainNode {
35003500
// Decoding is purely a CPU transformation of bytes the caller
35013501
// already owns; doing it BEFORE we acquire the write lock keeps
35023502
// the apply pipeline blocked for the minimum possible window.
3503+
// Decode the snapshot bytes already fetched by find_snapshot_at_or_before (canonical
3504+
// full_snap_ or legacy state_snap_). Restoring from the freshest snapshot ≤ target
3505+
// bounds replay to ≤ SNAPSHOT_INCREMENTAL_INTERVAL instead of a full genesis replay.
35033506
let restored_baseline: Option<(u64, Vec<(String, qnet_state::Account)>)> =
35043507
match snap_choice {
3505-
Some((snap_height, _snap_data)) => {
3506-
match storage.load_state_snapshot_by_height(snap_height).await {
3507-
Ok(Some((_state_root, accounts_bytes))) => {
3508-
let deser = bincode::DefaultOptions::new()
3509-
.with_fixint_encoding()
3510-
.allow_trailing_bytes()
3511-
.deserialize::<Vec<(String, qnet_state::Account)>>(&accounts_bytes)
3512-
.or_else(|_| bincode::deserialize::<Vec<(String, qnet_state::Account)>>(&accounts_bytes));
3513-
match deser {
3514-
Ok(accounts) => Some((snap_height, accounts)),
3515-
Err(e) => {
3516-
println!(
3517-
"[WARN][STATE] reconcile_deserialize_failed snap_h={} err={} action=full_replay",
3518-
snap_height, e,
3519-
);
3520-
None
3521-
}
3522-
}
3523-
}
3524-
Ok(None) => None,
3508+
Some((snap_height, snap_data)) => {
3509+
match storage.decode_snapshot_accounts(&snap_data) {
3510+
Ok(accounts) => Some((snap_height, accounts)),
35253511
Err(e) => {
3526-
println!(
3527-
"[WARN][STATE] reconcile_load_snapshot_failed snap_h={} err={:?} action=full_replay",
3528-
snap_height, e,
3529-
);
3512+
println!("[WARN][STATE] reconcile_decode_failed snap_h={} err={:?}", snap_height, e);
35303513
None
35313514
}
35323515
}
35333516
}
3534-
None => {
3535-
println!(
3536-
"[INFO][STATE] reconcile_no_snapshot_available target={} action=full_replay",
3537-
target_height,
3538-
);
3539-
None
3540-
}
3517+
None => None,
35413518
};
35423519

3520+
// No usable snapshot above the genesis window ⇒ refuse a from-0 replay (it freezes the
3521+
// node under the state lock and resets the per-mb baseline). Return Err so the caller
3522+
// re-syncs from the canonical 2f+1 chain instead. A snapshot exists at every interval.
3523+
const SNAPSHOT_INCREMENTAL_INTERVAL: u64 = 3_600;
3524+
if restored_baseline.is_none() && target_height > SNAPSHOT_INCREMENTAL_INTERVAL {
3525+
return Err(format!(
3526+
"reconcile_no_usable_snapshot target={} action=resync_required", target_height
3527+
));
3528+
}
3529+
35433530
// Step 3: pre-load every microblock that needs to be replayed.
35443531
// Doing this BEFORE we acquire the state write lock means the
35453532
// apply pipeline only blocks during pure in-memory CPU work,

development/qnet-integration/src/storage.rs

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7852,6 +7852,61 @@ impl Storage {
78527852
}
78537853
}
78547854

7855+
/// Decode a snapshot blob into its account list for in-memory state rebuild during
7856+
/// fork-recovery. Reads BOTH the canonical full_snap_ (Format A: raw accounts-CF dump)
7857+
/// and the legacy state_snap_ (Format B: bincode Vec). Accounts only — other CF sections
7858+
/// ignored. Pure (no DB). Inverse of create_state_snapshot/save_state_snapshot writers.
7859+
pub fn decode_snapshot_accounts(&self, snap_data: &[u8]) -> IntegrationResult<Vec<(String, qnet_state::Account)>> {
7860+
if snap_data.len() < 41 {
7861+
return Err(IntegrationError::StorageError(format!("snapshot too short: {} bytes", snap_data.len())));
7862+
}
7863+
let stored_hash = &snap_data[..32];
7864+
let compressed = &snap_data[40..];
7865+
use sha3::{Sha3_256, Digest};
7866+
let mut hasher = Sha3_256::new();
7867+
hasher.update(compressed);
7868+
if stored_hash != hasher.finalize().as_slice() {
7869+
return Err(IntegrationError::StorageError("snapshot integrity check failed".to_string()));
7870+
}
7871+
let buf = zstd::decode_all(compressed)
7872+
.map_err(|e| IntegrationError::StorageError(format!("snapshot decompress failed: {}", e)))?;
7873+
if buf.first().copied() != Some(0x02) || buf.len() < 5 {
7874+
return Err(IntegrationError::StorageError("snapshot wrong/short type".to_string()));
7875+
}
7876+
// probe u32 after type byte: >=10_000 ⇒ Format B (state_root bytes); else Format A version
7877+
let probe = u32::from_le_bytes(buf[1..5].try_into().unwrap());
7878+
let mut out: Vec<(String, qnet_state::Account)> = Vec::new();
7879+
if probe >= 10_000 {
7880+
// Format B: [0x02 | state_root(32) | total_supply(8) | height(8) | bincode(Vec<(addr,Account)>)]
7881+
let body = 1 + 32 + 8 + 8;
7882+
if buf.len() < body { return Err(IntegrationError::StorageError("format_b truncated".to_string())); }
7883+
out = bincode::deserialize(&buf[body..])
7884+
.map_err(|e| IntegrationError::SerializationError(format!("format_b decode: {}", e)))?;
7885+
} else {
7886+
// Format A: [0x02 | version(4) | height(8) | ts(8) | (klen|k|vlen|v)* | REWARDS_V1 ...]
7887+
let mut cursor = 1 + 4 + 8 + 8;
7888+
while cursor < buf.len() {
7889+
if cursor + 10 <= buf.len() && &buf[cursor..cursor + 10] == b"REWARDS_V1" { break; }
7890+
if cursor + 4 > buf.len() { break; }
7891+
let klen = u32::from_le_bytes(buf[cursor..cursor + 4].try_into().unwrap()) as usize;
7892+
cursor += 4;
7893+
if cursor + klen > buf.len() { break; }
7894+
let key = &buf[cursor..cursor + klen]; cursor += klen;
7895+
if cursor + 4 > buf.len() { break; }
7896+
let vlen = u32::from_le_bytes(buf[cursor..cursor + 4].try_into().unwrap()) as usize;
7897+
cursor += 4;
7898+
if cursor + vlen > buf.len() { break; }
7899+
let val = &buf[cursor..cursor + vlen]; cursor += vlen;
7900+
let addr = String::from_utf8(key.to_vec())
7901+
.map_err(|e| IntegrationError::StorageError(format!("addr utf8: {}", e)))?;
7902+
let account = bincode::deserialize::<qnet_state::Account>(val)
7903+
.map_err(|e| IntegrationError::SerializationError(format!("account decode: {}", e)))?;
7904+
out.push((addr, account));
7905+
}
7906+
}
7907+
Ok(out)
7908+
}
7909+
78557910
pub async fn load_state_snapshot_by_height(&self, height: u64) -> IntegrationResult<Option<([u8; 32], Vec<u8>)>> {
78567911
let snapshots_cf = self.persistent.db.cf_handle("snapshots")
78577912
.ok_or_else(|| IntegrationError::StorageError("snapshots column family not found".to_string()))?;

0 commit comments

Comments
 (0)