Skip to content

Commit fb6179a

Browse files
AIQnetLabclaude
andcommitted
fix: v28 — sync QoS isolation + self-healing finality
P0.1: split QUIC ingress into priority lanes. Consensus-critical msgs (commit/reveal/timeoutvote/TC/heartbeat/vrfclaim/tip-blocks) on the high-priority FIFO; bulk-serving (RequestBlocks/RequestMacroblocks/ Blocks|MacroblocksBatch/StateSnapshot) on a bounded bulk lane drained by a dedicated worker with drop-on-full + 30s log governor. A cold-sync flood structurally cannot delay consensus. P0.2: replace the per-exact-(from,to) cooldown in sync_blocks with a unified frontier+interval coordinator: clamp to [applied+1, applied+ SYNC_WINDOW=2000] (backpressure to the apply pipeline), overlap-dedup vs in-flight intervals, bounded concurrency, hard timeout. Kills the duplicate/overlapping-range storm at the source. P0.3: demote 5 per-request sync serving INFO logs to debug. Removes the log-DoS vector at thousands-of-nodes scale. P1.5: self-healing committee + beacon under a transient finality gap. calculate_qualified_candidates / deterministic_eligible_ids / try_load_macroblock_beacon all walk back <=8 macroblocks under the IDENTICAL predicate (non-empty eligible_producers AND randomness_beacon present) when N-2 is absent, instead of returning empty / None / p2p_fallback. Committee and beacon land on the same macroblock so leader selection stays consistent network-wide; a single missed macroblock is no longer irreversible. Fixes the proven root of the post-restart halt (cold-sync flood starving consensus FIFO + N-2-absent committee collapse) without crutches: removes the per-tuple cooldown and the empty-set collapse, replaces with canonical windowed sync + sticky-deterministic validator set. cargo check --workspace clean; release build clean; 147/147 lib tests; no regressions. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 760bc01 commit fb6179a

2 files changed

Lines changed: 293 additions & 130 deletions

File tree

development/qnet-integration/src/node.rs

Lines changed: 136 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -4088,51 +4088,49 @@ impl BlockchainNode {
40884088
return Some([0u8; 32]);
40894089
}
40904090
let n_minus_2 = macroblock_index - 2;
4091-
match storage.get_macroblock_by_height(n_minus_2) {
4092-
Ok(Some(data)) => match bincode::deserialize::<qnet_state::MacroBlock>(&data) {
4093-
Ok(mb) => {
4094-
match mb.consensus_data.randomness_beacon {
4095-
Some(b) => Some(b),
4096-
None => {
4097-
if crate::node::is_warn() {
4098-
println!(
4099-
"[WARN][VRF] seed_beacon_absent mb={} — macroblock present but randomness_beacon=None",
4100-
n_minus_2,
4101-
);
4091+
// P1.5 self-healing: strict N-2 froze finality irreversibly when N-2
4092+
// was absent (and every later epoch's N-2 in turn). Walk back to the
4093+
// most recent REAL finalized macroblock ≤ N-2 — IDENTICAL predicate
4094+
// and depth as the committee fallback (non-empty eligible_producers
4095+
// AND randomness_beacon present), so beacon + committee always come
4096+
// from the SAME macroblock → leader selection stays consistent
4097+
// network-wide. Bounded depth; deeper gap → None (abstain, needs sync).
4098+
const MAX_FALLBACK_DEPTH: u64 = 8;
4099+
let floor = n_minus_2.saturating_sub(MAX_FALLBACK_DEPTH).max(1);
4100+
let mut idx = n_minus_2;
4101+
loop {
4102+
match storage.get_macroblock_by_height(idx) {
4103+
Ok(Some(data)) => {
4104+
if let Ok(mb) = bincode::deserialize::<qnet_state::MacroBlock>(&data) {
4105+
let has_set = mb.consensus_data.eligible_producers.as_ref()
4106+
.and_then(|s| bincode::deserialize::<Vec<qnet_state::EligibleProducer>>(s).ok())
4107+
.map(|v| v.iter().any(|p| !p.node_id.is_empty()))
4108+
.unwrap_or(false);
4109+
if has_set {
4110+
if let Some(b) = mb.consensus_data.randomness_beacon {
4111+
if idx != n_minus_2 && crate::node::is_warn() {
4112+
println!("[WARN][VRF] beacon_fallback used_mb={} wanted_mb={} depth={} reason=n2_absent_self_heal",
4113+
idx, n_minus_2, n_minus_2 - idx);
4114+
}
4115+
return Some(b);
41024116
}
4103-
None
41044117
}
41054118
}
41064119
}
4120+
Ok(None) => {}
41074121
Err(e) => {
41084122
if crate::node::is_warn() {
4109-
println!(
4110-
"[WARN][VRF] seed_deserialize_failed mb={} err={}",
4111-
n_minus_2, e,
4112-
);
4123+
println!("[WARN][VRF] seed_storage_error mb={} err={}", idx, e);
41134124
}
4114-
None
4115-
}
4116-
},
4117-
Ok(None) => {
4118-
if crate::node::is_warn() {
4119-
println!(
4120-
"[WARN][VRF] seed_missing_local mb={} — node is behind the chain, trigger sync",
4121-
n_minus_2,
4122-
);
41234125
}
4124-
None
4125-
}
4126-
Err(e) => {
4127-
if crate::node::is_warn() {
4128-
println!(
4129-
"[WARN][VRF] seed_storage_error mb={} err={}",
4130-
n_minus_2, e,
4131-
);
4132-
}
4133-
None
41344126
}
4127+
if idx <= floor { break; }
4128+
idx -= 1;
4129+
}
4130+
if crate::node::is_warn() {
4131+
println!("[WARN][VRF] seed_missing_local n2={} depth={} — node behind, trigger sync", n_minus_2, MAX_FALLBACK_DEPTH);
41354132
}
4133+
None
41364134
}
41374135

41384136
fn select_consensus_committee(
@@ -8156,7 +8154,11 @@ impl BlockchainNode {
81568154
let (macroblock_sync_tx, mut macroblock_sync_rx) = tokio::sync::mpsc::channel::<(u64, u64, String)>(1_000);
81578155

81588156
// PRODUCTION v2.19.22: Create QUIC message channel for full message processing
8157+
// QoS: consensus/high-priority lane.
81598158
let (quic_message_tx, mut quic_message_rx) = tokio::sync::mpsc::channel::<(String, crate::unified_p2p::NetworkMessage)>(10_000);
8159+
// QoS bulk lane: bounded smaller (droppable) so a cold-sync flood is
8160+
// shed at ingress instead of starving consensus. Drained by its own task.
8161+
let (quic_bulk_tx, mut quic_bulk_rx) = tokio::sync::mpsc::channel::<(String, crate::unified_p2p::NetworkMessage)>(2_000);
81608162

81618163
// PRODUCTION v2.19.25: Create transaction processing channel
81628164
let (transaction_tx, mut transaction_rx) = tokio::sync::mpsc::channel::<crate::unified_p2p::ReceivedTransaction>(50_000);
@@ -8185,6 +8187,7 @@ impl BlockchainNode {
81858187

81868188
// PRODUCTION v2.19.22: Set QUIC message channel for full message processing
81878189
unified_p2p_instance.set_quic_message_channel(quic_message_tx);
8190+
unified_p2p_instance.set_quic_bulk_channel(quic_bulk_tx);
81888191

81898192
// PRODUCTION v2.19.25: Set transaction channel for mempool integration
81908193
unified_p2p_instance.set_transaction_channel(transaction_tx);
@@ -9473,7 +9476,51 @@ impl BlockchainNode {
94739476
}
94749477
}
94759478
});
9476-
9479+
9480+
// QoS bulk-lane worker — fully isolated from the consensus consumer.
9481+
// Drains the bounded bulk channel; bounded-concurrency spawn_blocking
9482+
// per message so heavy serve/decode never contends for consensus
9483+
// cores. Lane drop counter is log-governed here (one summary / 30s)
9484+
// so a flood does not spam logs. This task carries the entire
9485+
// cold-sync serving cost; a flooding peer can saturate ONLY this
9486+
// lane, never the chain.
9487+
let blockchain_for_bulk = blockchain.clone();
9488+
tokio::spawn(async move {
9489+
const BULK_SERVE_CONCURRENCY: usize = 8;
9490+
let permits = std::sync::Arc::new(tokio::sync::Semaphore::new(BULK_SERVE_CONCURRENCY));
9491+
let mut last_drop_log = std::time::Instant::now();
9492+
let mut last_dropped: u64 = 0;
9493+
while let Some((from_peer, message)) = quic_bulk_rx.recv().await {
9494+
if last_drop_log.elapsed().as_secs() >= 30 {
9495+
let d = crate::unified_p2p::BULK_LANE_DROPPED
9496+
.load(std::sync::atomic::Ordering::Relaxed);
9497+
if d > last_dropped && is_warn() {
9498+
println!("[WARN][QUIC] bulk_lane_shed total={} delta={} window=30s reason=lane_full_dos_bound",
9499+
d, d - last_dropped);
9500+
}
9501+
last_dropped = d;
9502+
last_drop_log = std::time::Instant::now();
9503+
}
9504+
if let Some(ref p2p) = blockchain_for_bulk.unified_p2p {
9505+
let permit = match permits.clone().try_acquire_owned() {
9506+
Ok(p) => p,
9507+
Err(_) => {
9508+
// All serve slots busy → shed (bounded serving is
9509+
// the fairness guarantee; client re-requests).
9510+
crate::unified_p2p::BULK_LANE_DROPPED
9511+
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
9512+
continue;
9513+
}
9514+
};
9515+
let p2p_clone = p2p.clone();
9516+
tokio::task::spawn_blocking(move || {
9517+
let _permit = permit;
9518+
p2p_clone.handle_message(&from_peer, message);
9519+
});
9520+
}
9521+
}
9522+
});
9523+
94779524
// PRODUCTION v2.19.25: Start transaction receiver handler
94789525
// Processes transactions received from P2P network and adds to mempool
94799526
let blockchain_for_transactions = blockchain.clone();
@@ -24340,11 +24387,57 @@ if is_info() { println!("[INFO][SYNC] recovered node={} lag={}", node_id_for_syn
2434024387
});
2434124388
}
2434224389

24390+
// P1.5 self-healing: N-2 absent must NOT collapse the
24391+
// participant set to empty (that froze finality
24392+
// irreversibly — a missing macroblock made every later
24393+
// epoch's N-2 missing too). Deterministically walk back to
24394+
// the most recent AVAILABLE finalized eligible_producers
24395+
// snapshot ≤ N-2 (same rule/depth as
24396+
// deterministic_eligible_ids → numerator==denominator
24397+
// preserved). Macroblocks below the gap are 2f+1-finalized
24398+
// and universally present; the set is sticky → honest
24399+
// nodes converge and the gap is recoverable. Background
24400+
// N-2 fetch (above) still runs to restore the exact set.
24401+
const MAX_FALLBACK_DEPTH: u64 = 8;
24402+
let fb_floor = required_macroblock.saturating_sub(MAX_FALLBACK_DEPTH).max(1);
24403+
let mut fb_idx = required_macroblock;
24404+
loop {
24405+
if let Ok(Some(fb_data)) = storage.get_macroblock_by_height(fb_idx) {
24406+
if let Ok(fb_mb) = bincode::deserialize::<qnet_state::MacroBlock>(&fb_data) {
24407+
// Identical predicate as try_load_macroblock_beacon /
24408+
// deterministic_eligible_ids: real finalized macroblock
24409+
// (beacon present) → committee+beacon same macroblock.
24410+
if fb_mb.consensus_data.randomness_beacon.is_none() {
24411+
if fb_idx <= fb_floor { break; }
24412+
fb_idx -= 1;
24413+
continue;
24414+
}
24415+
if let Some(ref snap) = fb_mb.consensus_data.eligible_producers {
24416+
if let Ok(prods) = bincode::deserialize::<Vec<qnet_state::EligibleProducer>>(snap) {
24417+
let mut fb: Vec<(String, f64)> = prods.iter()
24418+
.filter(|p| !crate::unified_p2p::is_validator_ejected(&p.node_id))
24419+
.map(|p| (p.node_id.clone(), p.reputation))
24420+
.collect();
24421+
if !fb.is_empty() {
24422+
fb.sort_by(|a, b| a.0.cmp(&b.0));
24423+
if fb_idx != required_macroblock {
24424+
println!("[WARN][CAND] committee_fallback used_mb={} wanted_mb={} depth={} reason=n2_absent_self_heal participants={}",
24425+
fb_idx, required_macroblock, required_macroblock - fb_idx, fb.len());
24426+
}
24427+
return fb;
24428+
}
24429+
}
24430+
}
24431+
}
24432+
}
24433+
if fb_idx <= fb_floor { break; }
24434+
fb_idx -= 1;
24435+
}
2434324436
return Vec::new();
2434424437
}
2434524438
Err(e) => {
2434624439
// v2.47: Storage error = node is broken, cannot participate
24347-
eprintln!("[ERR][CAND] mb={} storage_err={} - node MUST SYNC!",
24440+
eprintln!("[ERR][CAND] mb={} storage_err={} - node MUST SYNC!",
2434824441
required_macroblock, e);
2434924442

2435024443
// Return EMPTY - node with storage errors must not participate!
@@ -31123,8 +31216,8 @@ if is_info() { println!("[INFO][SYNC] recovered node={} lag={}", node_id_for_syn
3112331216
return Ok(());
3112431217
}
3112531218

31126-
if is_info() {
31127-
println!("[INFO][SYNC] request from={} addr={} heights={}-{}", requester_id, from_peer_addr, from_height, to_height);
31219+
if is_debug() {
31220+
println!("[DBG][SYNC] serve_request from={} addr={} heights={}-{}", requester_id, from_peer_addr, from_height, to_height);
3112831221
}
3112931222

3113031223
// Get microblocks from storage (already in network format)
@@ -31135,7 +31228,7 @@ if is_info() { println!("[INFO][SYNC] recovered node={} lag={}", node_id_for_syn
3113531228
}
3113631229

3113731230
if blocks_data.is_empty() {
31138-
if is_info() { println!("[INFO][SYNC] empty_range heights={}-{} sending_empty_batch", from_height, to_height); }
31231+
if is_debug() { println!("[DBG][SYNC] empty_range heights={}-{} sending_empty_batch", from_height, to_height); }
3113931232
// v6.5 FIX: ALWAYS send a response, even for empty ranges
3114031233
// PROBLEM: Silent return Ok(()) caused requesting node to timeout after 2s
3114131234
// with "3 peers did not respond for h=0-0" — infinite retry loop
@@ -31281,9 +31374,9 @@ if is_info() { println!("[INFO][SYNC] recovered node={} lag={}", node_id_for_syn
3128131374
let small_blocks_count = small_blocks.len();
3128231375
let large_blocks_count = total_blocks - small_blocks_count;
3128331376

31284-
if is_info() {
31285-
println!("[INFO][SYNC] sending blocks={} (small={} large={}) size={}KB batches={} to={}",
31286-
total_blocks, small_blocks_count, large_blocks_count,
31377+
if is_debug() {
31378+
println!("[DBG][SYNC] sending blocks={} (small={} large={}) size={}KB batches={} to={}",
31379+
total_blocks, small_blocks_count, large_blocks_count,
3128731380
total_size / 1024, num_batches, requester_id);
3128831381
}
3128931382

0 commit comments

Comments
 (0)