Skip to content

Commit 337ca43

Browse files
AIQnetLabclaude
andcommitted
fix: cold-join fast-sync binding + reliable NodeRegistration delivery
D2 — snapshot fast-sync (Pattern C) now actually binds (was never wired): - boundary snapshot is the canonical full_snap_ accounts-CF dump on every node's apply path (was a broken in-memory bincode state_snap_ that restored to garbage, so binding always failed and joiners fell to O(N) block replay) - verifier binds restored state to the QC-certified macroblock state_root: rebuild finalize_merkle from the restored accounts and compare to mb.state_root, instead of the never-assigned consensus_data.snapshot_root - fetch the macroblock prefix [latest+1..=mb_idx] so the binding macroblock persists (N-2 anchor) and is QC-validated by the normal store path D3 — NodeRegistration lands like NodeActivation: - boot send + periodic rebroadcast fan out the binding TX to all genesis nodes (was a single best-effort gossip with no fan-out) - rebroadcast runs sync-independently; producer eligibility stays gated at selection, not at TX broadcast - on boot, re-send unless the registration is already on-chain (chain is the source of truth, not the local activation code) - mobile/server-side registration path gets the same genesis fan-out Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 1ff366a commit 337ca43

5 files changed

Lines changed: 128 additions & 156 deletions

File tree

development/qnet-integration/src/bin/qnet-node.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2880,8 +2880,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
28802880
let already_persisted = node.get_storage().load_activation_code()
28812881
.map(|opt| opt.is_some())
28822882
.unwrap_or(false);
2883-
if already_persisted {
2884-
if is_info() { println!("[INFO][NODE] activation_already_persisted skip=fallback_call"); }
2883+
// Chain is the source of truth: skip the (re)send only if the code is persisted AND this
2884+
// node's NodeRegistration is already on-chain. If persisted on an earlier boot but the
2885+
// registration never landed (dropped join-time broadcast), re-send so the binding TX reaches
2886+
// a producer. One send per boot; the early-activation path is sync-gated (OFF on a cold
2887+
// joiner) so there is no same-boot double-submit.
2888+
let reg_onchain = node.get_storage().is_node_registration_onchain(&node.get_node_id());
2889+
if already_persisted && reg_onchain {
2890+
if is_info() { println!("[INFO][NODE] activation_persisted reg_onchain=true skip=fallback_call"); }
28852891
} else if let Err(e) = node.save_activation_code(&activation_code, node_type).await {
28862892
if is_warn() { println!("[WARN][NODE] activation_code_save_failed err={}", e); }
28872893
}

development/qnet-integration/src/block_pipeline.rs

Lines changed: 8 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -2904,105 +2904,19 @@ impl BlockPipeline {
29042904
}
29052905
}
29062906

2907-
// v15.9: committee-wide canonical snapshot at each
2908-
// SNAPSHOT_INCREMENTAL_INTERVAL — fresh nodes parallel-download from any
2909-
// committee member; macroblock snapshot_root has a byte-identical
2910-
// artefact on every honest node; rollback finds a snapshot <= target.
2911-
// SOURCE = in-memory state.accounts (the Arc<DashMap> every apply
2912-
// mutates), NOT the RocksDB accounts CF (only written on restore →
2913-
// stale/empty → bad snapshot_root). Sort by address before bincode
2914-
// (DashMap iter order is shard/node-dependent) → identical bytes → root
2915-
// converges. Off-reactor (blocking pool, Arc to accounts); pipeline
2916-
// returns immediately; failure WARN-only, never blocks liveness.
2907+
// Canonical boundary snapshot (raw accounts-CF dump via create_state_snapshot) at every
2908+
// SNAPSHOT_INCREMENTAL_INTERVAL, on EVERY node's apply path (not just the producer) so a
2909+
// cold joiner can fast-sync from any peer. This is the SAME representation the macroblock
2910+
// snapshot_root binds (compute_canonical_state_root over the accounts CF) → restore
2911+
// reproduces the bound root. Off-reactor; WARN-only, never blocks liveness.
29172912
const SNAPSHOT_INCREMENTAL_INTERVAL: u64 = 3_600;
29182913
if height > 0 && height % SNAPSHOT_INCREMENTAL_INTERVAL == 0 {
29192914
let storage_for_snapshot = ctx.storage.clone();
2920-
let state_for_snapshot = ctx.state.clone();
29212915
let snapshot_height = height;
29222916
tokio::spawn(async move {
2923-
let start = std::time::Instant::now();
2924-
2925-
// Read the in-memory state under a brief read lock to
2926-
// capture: (a) a strong handle to the accounts map,
2927-
// (b) the current state_root, (c) the current
2928-
// total_supply. We drop the lock before the heavy
2929-
// serialise step so block apply is not blocked.
2930-
let (accounts_arc, state_root, total_supply) = {
2931-
let sg = state_for_snapshot.read().await;
2932-
let accounts_arc = sg.accounts.clone();
2933-
let state_root = sg.calculate_state_root().unwrap_or([0u8; 32]);
2934-
let total_supply = sg.chain_state.read().total_supply;
2935-
(accounts_arc, state_root, total_supply)
2936-
};
2937-
2938-
// Heavy work: iterate DashMap, clone, sort, bincode.
2939-
// Lives on the blocking thread pool so the reactor
2940-
// stays free; the closure consumes `accounts_arc` so
2941-
// no shared-state hazards remain after spawn.
2942-
let serialise_result = tokio::task::spawn_blocking(move || {
2943-
let mut accounts: Vec<(String, qnet_state::Account)> = accounts_arc
2944-
.iter()
2945-
.map(|e| (e.key().clone(), e.value().clone()))
2946-
.collect();
2947-
accounts.sort_by(|a, b| a.0.cmp(&b.0));
2948-
bincode::serialize(&accounts)
2949-
}).await;
2950-
2951-
let state_data = match serialise_result {
2952-
Ok(Ok(data)) => data,
2953-
Ok(Err(e)) => {
2954-
if is_warn() {
2955-
println!(
2956-
"[WARN][PIPELINE] snapshot_serialize_fail h={} err={}",
2957-
snapshot_height, e,
2958-
);
2959-
}
2960-
return;
2961-
}
2962-
Err(e) => {
2963-
if is_warn() {
2964-
println!(
2965-
"[WARN][PIPELINE] snapshot_join_fail h={} err={:?}",
2966-
snapshot_height, e,
2967-
);
2968-
}
2969-
return;
2970-
}
2971-
};
2972-
2973-
if state_data.is_empty() {
2974-
// Genesis-window or pre-state node — nothing to bind.
2975-
return;
2976-
}
2977-
2978-
// Write the canonical snapshot artefact. `save_state_snapshot`
2979-
// wraps zstd-15 + integrity hash + atomic batch write —
2980-
// already off-reactor (Fix #2 spawn_blocking).
2981-
match storage_for_snapshot
2982-
.save_state_snapshot(
2983-
snapshot_height,
2984-
state_root,
2985-
total_supply,
2986-
state_data,
2987-
)
2988-
.await
2989-
{
2990-
Ok(_) => {
2991-
if is_info() {
2992-
println!(
2993-
"[INFO][PIPELINE] snapshot_created h={} elapsed_ms={} source=apply_stage",
2994-
snapshot_height,
2995-
start.elapsed().as_millis(),
2996-
);
2997-
}
2998-
}
2999-
Err(e) => {
3000-
if is_warn() {
3001-
println!(
3002-
"[WARN][PIPELINE] snapshot_save_failed h={} err={:?}",
3003-
snapshot_height, e,
3004-
);
3005-
}
2917+
if let Err(e) = storage_for_snapshot.create_state_snapshot(snapshot_height).await {
2918+
if is_warn() {
2919+
println!("[WARN][PIPELINE] snapshot_create_failed h={} err={:?}", snapshot_height, e);
30062920
}
30072921
}
30082922
});

development/qnet-integration/src/node.rs

Lines changed: 34 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -15164,31 +15164,34 @@ impl BlockchainNode {
1516415164
let reg_our_h = crate::unified_p2p::LOCAL_BLOCKCHAIN_HEIGHT.load(std::sync::atomic::Ordering::Relaxed);
1516515165
let reg_best_h = crate::unified_p2p::BEST_PEER_HEIGHT.load(std::sync::atomic::Ordering::Relaxed);
1516615166

15167+
// Sync-INDEPENDENT binding-TX rebroadcast: a node's own NodeRegistration must land
15168+
// on-chain even while it is still syncing (identity binding is signature-validated,
15169+
// not chain-state-validated). This does NOT make an unsynced node VRF-eligible —
15170+
// that is gated separately by register_as_active_node_async below. Producer-direct
15171+
// gossip + genesis fan-out (same delivery as NodeActivation); re-applying an included
15172+
// registration is a nonce/dedup no-op; the attempt budget bounds it.
15173+
if let Some(ref p2p) = unified_p2p {
15174+
let resend = if let Ok(mut guard) = PENDING_NODE_REGISTRATION.lock() {
15175+
let out = if let Some((id, bytes, attempts)) = guard.as_mut() {
15176+
if *attempts > 0 { *attempts -= 1; Some((id.clone(), bytes.clone(), *attempts)) } else { None }
15177+
} else { None };
15178+
if matches!(guard.as_ref(), Some((_, _, 0))) { *guard = None; }
15179+
out
15180+
} else { None };
15181+
if let Some((reg_id, reg_bytes, attempts_left)) = resend {
15182+
let _ = p2p.broadcast_transaction(reg_bytes.clone());
15183+
let tx_msg = crate::unified_p2p::NetworkMessage::Transaction { data: reg_bytes };
15184+
for ip in &crate::unified_p2p::get_genesis_bootstrap_ips() {
15185+
p2p.send_network_message(&format!("{}:8001", ip), tx_msg.clone());
15186+
}
15187+
if is_info() { println!("[INFO][REG] registration_rebroadcast id={} attempts_left={}", reg_id, attempts_left); }
15188+
}
15189+
}
15190+
1516715191
if reg_synced {
1516815192
if let Some(ref p2p) = unified_p2p {
1516915193
if is_info() { println!("[INFO][ACTIVE] periodic_registration h={} best={}", reg_our_h, reg_best_h); }
1517015194
p2p.register_as_active_node_async().await;
15171-
15172-
// Bounded rebroadcast of our own NodeRegistration so a dropped join-time
15173-
// broadcast still reaches a producer. Re-applying an already-included
15174-
// registration is a no-op (nonce/dedup), so this is safe; the budget stops it.
15175-
let resend = if let Ok(mut guard) = PENDING_NODE_REGISTRATION.lock() {
15176-
let out = if let Some((id, bytes, attempts)) = guard.as_mut() {
15177-
if *attempts > 0 {
15178-
*attempts -= 1;
15179-
Some((id.clone(), bytes.clone(), *attempts))
15180-
} else { None }
15181-
} else { None };
15182-
if matches!(guard.as_ref(), Some((_, _, 0))) { *guard = None; } // budget spent
15183-
out
15184-
} else { None };
15185-
if let Some((reg_id, reg_bytes, attempts_left)) = resend {
15186-
if let Err(e) = p2p.broadcast_transaction(reg_bytes) {
15187-
if is_warn() { println!("[WARN][REG] rebroadcast_fail id={} err={}", reg_id, e); }
15188-
} else if is_info() {
15189-
println!("[INFO][REG] registration_rebroadcast id={} attempts_left={}", reg_id, attempts_left);
15190-
}
15191-
}
1519215195
}
1519315196
} else {
1519415197
if is_info() {
@@ -27914,14 +27917,18 @@ if is_info() { println!("[INFO][SYNC] recovered node={} lag={}", node_id_for_syn
2791427917
if let Ok(mut pend) = PENDING_NODE_REGISTRATION.lock() {
2791527918
*pend = Some((self.node_id.clone(), tx_bytes.clone(), PENDING_REGISTRATION_MAX_REBROADCASTS));
2791627919
}
27917-
// Broadcast NodeRegistration TX (producer-direct + gossip backup) so a non-producer
27918-
// node's TX still reaches a producer for inclusion.
27920+
// Deliver with the same guarantee as NodeActivation: producer-direct gossip + direct
27921+
// fan-out to every genesis node. A fresh joiner usually has no producer info yet, so a
27922+
// single gossip is fragile; the genesis fan-out ensures the binding TX reaches the
27923+
// network even before sync completes.
2791927924
if let Some(ref p2p) = self.unified_p2p {
27920-
if let Err(e) = p2p.broadcast_transaction(tx_bytes) {
27921-
if is_warn() { println!("[WARN][REG] broadcast_fail hash={}... err={}", &tx_hash[..16.min(tx_hash.len())], e); }
27922-
} else {
27923-
if is_info() { println!("[INFO][REG] registration_tx_broadcast hash={}", &tx_hash[..16.min(tx_hash.len())]); }
27925+
let _ = p2p.broadcast_transaction(tx_bytes.clone());
27926+
let tx_msg = crate::unified_p2p::NetworkMessage::Transaction { data: tx_bytes };
27927+
let genesis_ips = crate::unified_p2p::get_genesis_bootstrap_ips();
27928+
for ip in &genesis_ips {
27929+
p2p.send_network_message(&format!("{}:8001", ip), tx_msg.clone());
2792427930
}
27931+
if is_info() { println!("[INFO][REG] registration_tx_broadcast hash={} genesis={}", &tx_hash[..16.min(tx_hash.len())], genesis_ips.len()); }
2792527932
}
2792627933
} else {
2792727934
eprintln!("[WARN][REG] onchain_tx_failed node={}", self.node_id);

development/qnet-integration/src/rpc.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11352,7 +11352,12 @@ async fn handle_register_node(
1135211352
&wallet_address[..16.min(wallet_address.len())],
1135311353
&tx_hash[..16.min(tx_hash.len())]);
1135411354
if let Some(p2p) = blockchain.get_unified_p2p() {
11355-
let _ = p2p.broadcast_transaction(tx_bytes);
11355+
let _ = p2p.broadcast_transaction(tx_bytes.clone());
11356+
// Same guaranteed delivery as NodeActivation: direct fan-out to all genesis.
11357+
let tx_msg = crate::unified_p2p::NetworkMessage::Transaction { data: tx_bytes };
11358+
for ip in &crate::unified_p2p::get_genesis_bootstrap_ips() {
11359+
p2p.send_network_message(&format!("{}:8001", ip), tx_msg.clone());
11360+
}
1135611361
}
1135711362
} else {
1135811363
eprintln!("[WARN][REG] super_onchain_tx_failed node={}", node_id);

0 commit comments

Comments
 (0)