Skip to content

Commit 7c6a029

Browse files
AIQnetLabclaude
andcommitted
fix(onboarding): unify cold-join into single-owner FSM, scalable to millions
- single owner: SyncManager is the sole cold-join driver; legacy production-loop catch-up defers via coordinator phase + SYNC_IN_PROGRESS (no double-drive) - stage-then-promote: snapshot staged in *_stage CFs, verified vs the 2f+1 anchor, then atomically promoted; live state is never wiped before consensus binding passes - universal h=90 snapshot + sampled holders (O(N/5) at scale) so a checkpoint always exists and is findable; committee-dial pulls the tail from the round committee, not just the 5 genesis - eviction/desync keyed off the QC frontier; latch stops re-arm thrash; 90s lineage walk budget bounds verification on a mature chain - residual close: rewards/contract_storage derived from verified accounts on promote (no forge surface); crash-atomic promote with boot recovery Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
1 parent a194de3 commit 7c6a029

7 files changed

Lines changed: 444 additions & 148 deletions

File tree

development/qnet-integration/src/block_pipeline.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3080,13 +3080,18 @@ impl BlockPipeline {
30803080
}
30813081
}
30823082

3083-
// Canonical boundary snapshot at every SNAPSHOT_INCREMENTAL_INTERVAL, on EVERY node's apply
3084-
// path so a cold joiner can fast-sync from any peer. Pin a frozen DB view at `height`
3085-
// SYNCHRONOUSLY here — the serial apply loop has not started H+1, so the flush + snapshot
3086-
// capture exactly state_root@H. With persist-before-evict the pinned accounts CF is the
3087-
// COMPLETE committed tree leaf set (hot ∪ evicted), so a cold joiner's recompute reproduces
3088-
// the bound root past the LRU cap. The heavy serialization runs off-reactor on the frozen view.
3089-
if height > 0 && height % crate::node::SNAPSHOT_INCREMENTAL_INTERVAL == 0 {
3083+
// Canonical boundary snapshot on EVERY node's apply path (deterministic, role-independent)
3084+
// so a cold joiner can fast-sync from any peer — at the early anchor (h=90, first bindable
3085+
// boundary) AND every SNAPSHOT_INCREMENTAL_INTERVAL thereafter. Pin a frozen DB view at
3086+
// `height` SYNCHRONOUSLY here — the serial apply loop has not started H+1, so the snapshot
3087+
// captures exactly state_root@H. With persist-before-evict the pinned accounts CF is the
3088+
// COMPLETE committed leaf set, so a cold joiner's recompute reproduces the bound root. The
3089+
// heavy serialization runs off-reactor on the frozen view.
3090+
if height > 0
3091+
&& (height == crate::node::SNAPSHOT_EARLY_ANCHOR_HEIGHT
3092+
|| height % crate::node::SNAPSHOT_INCREMENTAL_INTERVAL == 0)
3093+
&& crate::node::should_materialize_snapshot(&ctx.node_id, height)
3094+
{
30903095
let snapshot_accounts = ctx.state.read().await.get_all_accounts();
30913096
match ctx.storage.prepare_snapshot_view(&snapshot_accounts) {
30923097
Ok(view) => {

development/qnet-integration/src/node.rs

Lines changed: 62 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,30 @@ const FAST_SYNC_TIMEOUT_SECS: u64 = 60; // Fast sync timeout
4040
const BACKGROUND_SYNC_TIMEOUT_SECS: u64 = 30; // Background sync timeout
4141
const SNAPSHOT_FULL_INTERVAL: u64 = 43200; // Full snapshot every 12 hours (43,200 microblocks = 480 macroblocks)
4242
pub const SNAPSHOT_INCREMENTAL_INTERVAL: u64 = 3600; // Incremental snapshot every 1 hour (3,600 microblocks = 40 macroblocks)
43+
pub const SNAPSHOT_EARLY_ANCHOR_HEIGHT: u64 = 90; // First consensus-bindable boundary (mb_idx=1): a young chain has a servable snapshot well before the 3600 interval
44+
45+
/// Active-node count mirrored from the production loop, read O(1) off the hot apply path by the
46+
/// snapshot-holder predicate. 0 = unknown ⇒ all-hold (a count-read gap can never make NOBODY hold).
47+
pub static SNAPSHOT_HOLDER_ACTIVE_COUNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
48+
49+
/// Which nodes MATERIALIZE the full snapshot at a boundary. Small networks (and the early h=90 anchor)
50+
/// = every node holds (current behavior, guaranteed cold-join coverage). At scale a deterministic
51+
/// ~1-in-SAMPLE_DENOM sample holds, rotating per snapshot interval, so storage/CPU is O(N/denom) not
52+
/// O(N); holders advertise via latest_full_snap and joiners discover them by peer fan-out (unchanged).
53+
pub fn should_materialize_snapshot(node_id: &str, height: u64) -> bool {
54+
const THRESHOLD: u64 = 50; // ≤ this many active nodes ⇒ every node holds
55+
const SAMPLE_DENOM: u64 = 5; // above THRESHOLD ⇒ ~1-in-5 hold
56+
if height == SNAPSHOT_EARLY_ANCHOR_HEIGHT { return true; } // first anchor always universal
57+
let n = SNAPSHOT_HOLDER_ACTIVE_COUNT.load(std::sync::atomic::Ordering::Relaxed);
58+
if n <= THRESHOLD { return true; }
59+
use sha3::{Digest, Sha3_256};
60+
let mut h = Sha3_256::new();
61+
h.update(b"QNET_SNAP_HOLDER_V1:");
62+
h.update(node_id.as_bytes());
63+
h.update(&(height / SNAPSHOT_INCREMENTAL_INTERVAL).to_le_bytes());
64+
let d = h.finalize();
65+
u64::from_le_bytes(d[0..8].try_into().unwrap_or([0u8; 8])) % SAMPLE_DENOM == 0
66+
}
4367
const API_HEALTH_CHECK_RETRIES: u32 = 5; // API health check attempts
4468
const API_HEALTH_CHECK_DELAY_SECS: u64 = 2; // Delay between health checks
4569

@@ -1032,6 +1056,12 @@ pub static SNAPSHOT_ANCHOR_MB: AtomicU64 = AtomicU64::new(0);
10321056
static SNAPSHOT_ANCHOR_HASH: [AtomicU64; 4] =
10331057
[AtomicU64::new(0), AtomicU64::new(0), AtomicU64::new(0), AtomicU64::new(0)];
10341058

1059+
/// Highest snapshot boundary a cold-join attempt has already FAILED on. A failed boundary (and any
1060+
/// ≤ it) is not re-attempted until a STRICTLY higher boundary is advertised — so a node degrades to
1061+
/// block replay instead of re-arming the same failing snapshot every desync tick (the non-destructive
1062+
/// thrash). Reset only by progress: a strictly higher boundary or a successful promote.
1063+
pub static LAST_SNAPSHOT_ATTEMPT_BOUNDARY: AtomicU64 = AtomicU64::new(0);
1064+
10351065
fn store_anchor_hash(h: &[u8; 32]) {
10361066
for i in 0..4 {
10371067
let mut b = [0u8; 8];
@@ -1127,6 +1157,10 @@ pub fn reload_snapshot_anchor() {
11271157
let anchor_h = anchor_mb.saturating_mul(90);
11281158
LAST_FINALIZED_HEIGHT.fetch_max(anchor_h, std::sync::atomic::Ordering::SeqCst);
11291159
LAST_FINALIZED_CONSENSUS_ROUND.fetch_max(anchor_h, std::sync::atomic::Ordering::SeqCst);
1160+
// Restore the WS security floor (= anchor height) so a crash right after promote is fail-LOW and
1161+
// healed here, never fail-high (a lower WS floor would let the binder accept a snapshot below the
1162+
// adopted finality).
1163+
WEAK_SUBJECTIVITY_CHECKPOINT.fetch_max(anchor_h, std::sync::atomic::Ordering::SeqCst);
11301164
// Heal the contiguous frontier up to the reloaded floor: a node whose chain_height was driven
11311165
// below the anchor by a pre-restart rollback would otherwise re-wedge (durable chain_height <
11321166
// reloaded anchor ⇒ sub-anchor re-request loop). Raise-only; runs once at boot before live blocks.
@@ -1136,26 +1170,6 @@ pub fn reload_snapshot_anchor() {
11361170
if is_info() { println!("[INFO][SYNC] snapshot_anchor_reloaded mb={} h={}", anchor_mb, anchor_h); }
11371171
}
11381172

1139-
/// Zero the runtime height + finality floors for a CLEAN re-bootstrap after discard_snapshot_state
1140-
/// wiped all state (a snapshot rejected AFTER a prior one was already adopted). Keeps the invariant
1141-
/// chain_height >= SNAPSHOT_ANCHOR_MB*90 consistent at 0: discard sets chain_height=0, the snapshot-bind
1142-
/// AnchorReset guard caps SNAPSHOT_ANCHOR_MB to the now-0 chain_height, and this drops the other floors
1143-
/// so no stale high floor strands the re-sync onto empty state. The genesis-rooted GALC capsule + binary
1144-
/// WS pin are INDEPENDENT and intentionally untouched, so the clean block-sync re-verifies safely.
1145-
pub fn reset_floors_for_rebootstrap() {
1146-
crate::unified_p2p::LOCAL_BLOCKCHAIN_HEIGHT.store(0, std::sync::atomic::Ordering::SeqCst);
1147-
QC_VERIFIED_FRONTIER.store(0, std::sync::atomic::Ordering::SeqCst);
1148-
WEAK_SUBJECTIVITY_CHECKPOINT.store(0, std::sync::atomic::Ordering::SeqCst);
1149-
// Drop the apply-dedup floor too: a rejected-snapshot rollback that leaves a high anchor over the
1150-
// now-wiped state makes block_sync treat h<=anchor*90 as already-applied and skip-forever.
1151-
SNAPSHOT_ANCHOR_MB.store(0, std::sync::atomic::Ordering::SeqCst);
1152-
{
1153-
let _g = crate::storage::lock_finality_state();
1154-
LAST_FINALIZED_HEIGHT.store(0, std::sync::atomic::Ordering::SeqCst);
1155-
LAST_FINALIZED_CONSENSUS_ROUND.store(0, std::sync::atomic::Ordering::SeqCst);
1156-
}
1157-
}
1158-
11591173
/// v9.0 BUG-30: Check if rollback to target_height is allowed by finality rules.
11601174
/// LEGACY v14.8: Non-atomic finality check. Exists only for diagnostic paths
11611175
/// that need to inspect the current finality boundary WITHOUT claiming the
@@ -7499,9 +7513,16 @@ impl BlockchainNode {
74997513
// Warm-restart cold-joiner: reload the persisted snapshot anchor on the main boot path,
75007514
// before the verify pipeline accepts blocks, so SNAPSHOT_ANCHOR_MB is set when anchor+1
75017515
// first arrives. No-op for fresh/genesis; consensus-listener boot reloads again as backstop.
7516+
// Complete any snapshot promote interrupted by a crash BEFORE reloading the anchor
7517+
// (idempotent: re-copies from the intact staging, then clears the marker).
7518+
if let Some(s) = try_get_storage() { s.recover_pending_snapshot_promote().await; }
75027519
reload_snapshot_anchor();
7520+
// A recovered promote may have advanced chain_height — re-read so the rest of boot
7521+
// (integrity checks, p2p height) uses the promoted height, not the pre-recovery value.
7522+
let height = try_get_storage().and_then(|s| s.get_chain_height().ok()).unwrap_or(height);
7523+
crate::unified_p2p::LOCAL_BLOCKCHAIN_HEIGHT.fetch_max(height, std::sync::atomic::Ordering::Release);
75037524
if is_debug() { println!("[DBG][NODE] p2p_height_init={}", height); }
7504-
7525+
75057526
height
75067527
}
75077528
Err(e) => {
@@ -14253,8 +14274,15 @@ impl BlockchainNode {
1425314274
}
1425414275
}
1425514276

14256-
// Start fast sync if not already running
14257-
if !FAST_SYNC_IN_PROGRESS.swap(true, Ordering::SeqCst) {
14277+
// Single cold-join owner: SyncManager (sync_manager.rs) fully drives
14278+
// snapshot fast-sync + genesis + block replay. This legacy production-loop
14279+
// catch-up defers to it both while SyncManager is in the Syncing phase
14280+
// (coordinator_is_syncing) AND during the pre-SyncStart init-sync window
14281+
// (SYNC_IN_PROGRESS, set before the init task spawns) — so the two never
14282+
// drive a cold-join concurrently.
14283+
if !crate::node::coordinator_is_syncing()
14284+
&& !SYNC_IN_PROGRESS.load(Ordering::SeqCst)
14285+
&& !FAST_SYNC_IN_PROGRESS.swap(true, Ordering::SeqCst) {
1425814286
FAST_SYNC_START_TIME.store(current_time, Ordering::Relaxed);
1425914287
LAST_SYNC_PROGRESS_TIME.store(current_time, Ordering::Relaxed);
1426014288
println!("[INFO][SYNC] fast_sync_start gap={}", height_difference);
@@ -14571,6 +14599,8 @@ impl BlockchainNode {
1457114599

1457214600
// Cache the result
1457314601
CACHED_NODE_COUNT.store(count, std::sync::atomic::Ordering::Relaxed);
14602+
// Mirror to the module-level atomic the snapshot-holder predicate reads (O(1), hot apply path).
14603+
SNAPSHOT_HOLDER_ACTIVE_COUNT.store(count, std::sync::atomic::Ordering::Relaxed);
1457414604
LAST_COUNT_UPDATE.store(current_time, std::sync::atomic::Ordering::Relaxed);
1457514605
count
1457614606
} else {
@@ -15875,8 +15905,13 @@ impl BlockchainNode {
1587515905
// CRITICAL: Strict synchronization check for consensus participation
1587615906
// New nodes MUST catch up before producing blocks
1587715907
let is_synchronized = if microblock_height > 10 {
15878-
// Normal operation: allow max 10 blocks behind
15908+
// Within 10 of the expected height AND at/above the QC-verified finalized
15909+
// frontier (the committed verified floor). Height-alone would let a node
15910+
// producing on an unverified replayed tip flip synced; the frontier floor
15911+
// binds "synced" to verified state. frontier==0 (fresh genesis) bypasses.
15912+
let frontier = qc_verified_frontier_cached();
1587915913
current_stored_height + 10 >= microblock_height
15914+
&& (frontier == 0 || current_stored_height >= frontier)
1588015915
} else {
1588115916
// Genesis phase: STRICT check to prevent attacks
1588215917
// Must have actual blocks, not just height 0
@@ -17981,7 +18016,7 @@ impl BlockchainNode {
1798118016
let early_anchor = microblock_height == 90;
1798218017
let baseline_due = microblock_height % SNAPSHOT_INCREMENTAL_INTERVAL == 0
1798318018
&& microblock_height > 0;
17984-
if early_anchor || baseline_due {
18019+
if (early_anchor || baseline_due) && should_materialize_snapshot(&node_id, microblock_height) {
1798518020
// Capture the hot in-memory account set at this exact height, then pin a frozen
1798618021
// DB view (sync flush + snapshot) HERE — before the next block mutates the CF.
1798718022
// With persist-before-evict the pinned accounts CF is the COMPLETE committed tree
@@ -24584,7 +24619,8 @@ if is_info() { println!("[INFO][SYNC] recovered node={} lag={}", node_id_for_syn
2458424619
if current_height < snapshot_height.saturating_sub(1000) {
2458524620
println!("[INFO][SYNC] snapshot_found h={} loading=true", snapshot_height);
2458624621

24587-
if let Err(e) = self.storage.load_state_snapshot(snapshot_height).await {
24622+
// Local own snapshot (trusted, self-created) → load directly into live state.
24623+
if let Err(e) = self.storage.load_state_snapshot(snapshot_height, false).await {
2458824624
println!("[WARN][SYNC] Failed to load snapshot: {}, falling back to normal sync", e);
2458924625
} else {
2459024626
// Update our height to snapshot height

development/qnet-integration/src/quic_transport.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1986,6 +1986,15 @@ impl QuicTransport {
19861986
// v9.7: Immediately update BEST_PEER_HEIGHT from handshake
19871987
if remote_block_height > 0 {
19881988
crate::unified_p2p::BEST_PEER_HEIGHT.fetch_max(remote_block_height, std::sync::atomic::Ordering::Relaxed);
1989+
// Best-effort per-peer height attestation (resolves only if the peer is already registered,
1990+
// e.g. a reconnect). On a first connect the peer is not yet in connected_peers, so this
1991+
// no-ops and the first signed HealthPing supplies the attested height. The cold-join "evict
1992+
// all sources" stall is fixed by the eviction self_synced guard + genesis exemption, not here.
1993+
if remote_node_id != self.node_id {
1994+
if let Some(p2p) = crate::node::try_get_p2p() {
1995+
p2p.update_peer_last_seen_with_height(&remote_node_id, Some(remote_block_height), true);
1996+
}
1997+
}
19891998
}
19901999

19912000
// CRITICAL: Prevent self-connect

development/qnet-integration/src/rpc.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,12 @@ const REWARD_NETWORK_STATS_CACHE_TTL_SECS: u64 = 30;
177177
/// Protects against DDoS attacks by limiting requests per IP address
178178
static API_RATE_LIMITER: Lazy<ApiRateLimiter> = Lazy::new(|| ApiRateLimiter::new());
179179

180+
/// Node-global concurrency bound on snapshot BYTE serving (full + chunk), independent of the per-IP
181+
/// limiter. Caps total in-flight snapshot serves so a flood of cold-joiners (or a spoofed-IP attacker)
182+
/// cannot exhaust a holder's memory/IO. Over the bound → immediate busy reply; the joiner retries
183+
/// another holder. Sized for thousands of nodes.
184+
static SNAPSHOT_SERVE_SEM: Lazy<tokio::sync::Semaphore> = Lazy::new(|| tokio::sync::Semaphore::new(16));
185+
180186
// ============================================================================
181187
// SECURITY: WebSocket Connection Rate Limiting
182188
// ============================================================================
@@ -4409,6 +4415,15 @@ async fn handle_snapshot_download(
44094415
"Content-Disposition", ""
44104416
));
44114417
}
4418+
let _serve_permit = match SNAPSHOT_SERVE_SEM.try_acquire() {
4419+
Ok(p) => p,
4420+
Err(_) => {
4421+
let body = serde_json::to_vec(&json!({"error": "snapshot serve busy"})).unwrap_or_default();
4422+
return Ok(warp::reply::with_header(
4423+
warp::reply::with_header(body, "Content-Type", "application/json"),
4424+
"Content-Disposition", ""));
4425+
}
4426+
};
44124427
match blockchain.get_snapshot_data(height) {
44134428
Ok(Some(data)) => {
44144429
// Return binary data with appropriate headers
@@ -4492,6 +4507,15 @@ async fn handle_snapshot_chunk(
44924507
"Content-Disposition", ""
44934508
));
44944509
}
4510+
let _serve_permit = match SNAPSHOT_SERVE_SEM.try_acquire() {
4511+
Ok(p) => p,
4512+
Err(_) => {
4513+
let body = serde_json::to_vec(&json!({"error": "snapshot serve busy"})).unwrap_or_default();
4514+
return Ok(warp::reply::with_header(
4515+
warp::reply::with_header(body, "Content-Type", "application/json"),
4516+
"Content-Disposition", ""));
4517+
}
4518+
};
44954519
match blockchain.get_storage().get_snapshot_chunk(height, chunk_index as u64) {
44964520
Ok(Some(data)) => {
44974521
Ok(warp::reply::with_header(

0 commit comments

Comments
 (0)