Skip to content

Commit 3188e33

Browse files
AIQnetLabclaude
andcommitted
fix(consensus,p2p,storage): autonomous cold-join (GALC) + live churn/apply-stall fixes
Cold-join / weak-subjectivity (genesis-anchored, no manual pin rotation): - galc.rs: genesis-multisigned live-checkpoint capsule (mb_index, mb_hash, per- macroblock committee digests) as a self-authenticating weak-subjectivity trust root; verified against embedded genesis PKs, 2f+1, monotonic adoption. - snapshot completeness: flush in-RAM accounts to the accounts CF before the dump; PinnedDbSnapshot exposes one consistent point-in-time view across all CFs. - persist-before-evict: account-cache eviction persists to CF first, drops only on success. - bind the per-macroblock committee digest in the by-hash pin branch. - raise MAX_WS_WALK_MB + MB_FETCH_MAX_ATTEMPTS for mature-chain cold-join. Live liveness (root-caused from the running genesis net): - eviction + desync reference the convergent QC-verified finalized frontier, not the produced-tip peer median: stops false eviction of healthy peers and the one-rotation (gap=30) redundant syncs; cold-join path unchanged (frontier==0). - height-eviction applies only to directly-connected peers; gossip-only nodes are reaped by the last_seen TTL (no false eviction at scale). - apply dedup is O(1) on the applied-frontier atomic; storage read only on a re-delivery (no hot-path RocksDB lookup). - periodic WAL flush runs off the consensus runtime via spawn_blocking with set_wait(false): no longer stalls block apply behind compaction. - producer publishes LOCAL_BLOCKCHAIN_HEIGHT immediately after save (dedup invariant holds across both writers). Validated: cargo check + 184 lib tests green (qnet-integration). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent e0e29bb commit 3188e33

10 files changed

Lines changed: 929 additions & 113 deletions

File tree

core/qnet-state/src/state.rs

Lines changed: 106 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -786,6 +786,14 @@ pub trait AccountStore: Send + Sync {
786786
/// the caller treats both identically (cache miss falls through to
787787
/// the canonical "account not found" path).
788788
fn load_account(&self, address: &str) -> Option<Account>;
789+
790+
/// Durable batch write of accounts. Called by the eviction sweep BEFORE dropping entries from the
791+
/// cache; returns true IFF the write durably succeeded. The evictor removes ONLY a successfully-
792+
/// persisted batch, so a failed persist (I/O error, or no store) keeps the accounts resident —
793+
/// never silently dropping a cold mutation not yet write-through-persisted (genesis /
794+
/// producer-inline), which would diverge the persistent mirror from the committed tree. Default
795+
/// false (a read-only store never persists ⇒ its accounts are never evicted).
796+
fn persist_accounts(&self, _accounts: &[(String, Account)]) -> bool { false }
789797
}
790798

791799
pub struct StateManager {
@@ -1052,14 +1060,38 @@ impl StateManager {
10521060
.collect();
10531061
sorted.sort_by_key(|(_, ts)| *ts);
10541062

1063+
// Persist-before-evict: snapshot the victims' current values and flush
1064+
// them to the durable store BEFORE dropping from the cache, so a cold
1065+
// mutation not yet write-through-persisted (genesis / producer-inline)
1066+
// is never lost — the persistent store stays a complete mirror of the
1067+
// committed tree. Persist-then-remove (not the reverse) leaves no window
1068+
// where a concurrent read sees the address as absent.
1069+
let victims: Vec<(String, Account)> = sorted.iter().take(target_evict)
1070+
.filter_map(|(addr, _)| self.accounts.get(addr).map(|e| (addr.clone(), e.value().clone())))
1071+
.collect();
1072+
// Evict a batch ONLY when it is safe to drop from RAM: if a durable store is wired, evict iff the
1073+
// persist succeeded (a failed write keeps the victims resident so a cold mutation is never lost
1074+
// and the persistent mirror never diverges from the committed tree — the next sweep retries). If
1075+
// NO store is wired (cache-only mode: tests/tooling) there is no durable mirror to diverge from,
1076+
// so eviction is the intended cache bound. Production always wires the store before raising cap.
1077+
let persisted = if victims.is_empty() {
1078+
true
1079+
} else {
1080+
match *self.disk_store.read() {
1081+
Some(ref store) => store.persist_accounts(&victims),
1082+
None => true,
1083+
}
1084+
};
10551085
let mut evicted = 0usize;
1056-
for (addr, _) in sorted.iter().take(target_evict) {
1057-
self.accounts.remove(addr);
1058-
self.last_access.remove(addr);
1059-
evicted = evicted.saturating_add(1);
1060-
}
1061-
if evicted > 0 {
1062-
self.evictions_total.fetch_add(evicted as u64, std::sync::atomic::Ordering::Relaxed);
1086+
if persisted {
1087+
for (addr, _) in &victims {
1088+
self.accounts.remove(addr);
1089+
self.last_access.remove(addr);
1090+
evicted = evicted.saturating_add(1);
1091+
}
1092+
if evicted > 0 {
1093+
self.evictions_total.fetch_add(evicted as u64, std::sync::atomic::Ordering::Relaxed);
1094+
}
10631095
}
10641096
evicted
10651097
}
@@ -2244,6 +2276,13 @@ mod cache_tests {
22442276
self.load_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
22452277
self.data.read().get(address).cloned()
22462278
}
2279+
fn persist_accounts(&self, accounts: &[(String, Account)]) -> bool {
2280+
let mut g = self.data.write();
2281+
for (addr, acct) in accounts {
2282+
g.insert(addr.clone(), acct.clone());
2283+
}
2284+
true
2285+
}
22472286
}
22482287

22492288
fn make_account(balance: u64) -> Account {
@@ -2306,6 +2345,66 @@ mod cache_tests {
23062345
assert_eq!(store.loads(), 1);
23072346
}
23082347

2348+
/// Persist-before-evict: a cold mutation that never went through the
2349+
/// write-through path (genesis / producer-inline) must be flushed to the
2350+
/// durable store at eviction, so durable ∪ cache loses nothing — the
2351+
/// snapshot-completeness invariant at scale (past the LRU cap).
2352+
#[test]
2353+
fn test_persist_before_evict_no_data_loss() {
2354+
let sm = StateManager::new();
2355+
let store = MockStore::new();
2356+
sm.set_disk_store(store.clone() as Arc<dyn AccountStore>);
2357+
sm.set_cache_capacity(10);
2358+
2359+
for i in 0..25u64 {
2360+
let addr = format!("acct_{:03}", i);
2361+
sm.accounts.insert(addr.clone(), make_account(i));
2362+
sm.last_access.insert(addr, i); // ascending ⇒ lowest i evicted first
2363+
}
2364+
assert_eq!(sm.accounts.len(), 25);
2365+
2366+
let evicted = sm.evict_cold_accounts();
2367+
assert_eq!(evicted, 15, "must evict down to the cap");
2368+
assert_eq!(sm.accounts.len(), 10);
2369+
2370+
// No account lost: each is in the cache OR persisted on disk.
2371+
for i in 0..25u64 {
2372+
let addr = format!("acct_{:03}", i);
2373+
let in_cache = sm.accounts.contains_key(&addr);
2374+
let on_disk = store.data.read().contains_key(&addr);
2375+
assert!(in_cache || on_disk, "account {} lost on eviction", addr);
2376+
}
2377+
// The 15 oldest were evicted ⇒ must have been persisted before removal.
2378+
for i in 0..15u64 {
2379+
let addr = format!("acct_{:03}", i);
2380+
assert!(store.data.read().contains_key(&addr), "evicted {} must be persisted", addr);
2381+
}
2382+
}
2383+
2384+
/// A wired durable store whose write FAILS must NOT drop accounts from RAM — else the persistent
2385+
/// mirror diverges from the committed tree (snapshot state_root mismatch). Keep them resident.
2386+
struct FailingStore;
2387+
impl AccountStore for FailingStore {
2388+
fn load_account(&self, _address: &str) -> Option<Account> { None }
2389+
fn persist_accounts(&self, _accounts: &[(String, Account)]) -> bool { false } // always fails
2390+
}
2391+
2392+
#[test]
2393+
fn test_evict_keeps_accounts_when_persist_fails() {
2394+
let sm = StateManager::new();
2395+
sm.set_disk_store(Arc::new(FailingStore) as Arc<dyn AccountStore>);
2396+
sm.set_cache_capacity(10);
2397+
for i in 0..25u64 {
2398+
let addr = format!("acct_{:03}", i);
2399+
sm.accounts.insert(addr.clone(), make_account(i));
2400+
sm.last_access.insert(addr, i);
2401+
}
2402+
assert_eq!(sm.accounts.len(), 25);
2403+
let evicted = sm.evict_cold_accounts();
2404+
assert_eq!(evicted, 0, "must not evict when the durable persist fails");
2405+
assert_eq!(sm.accounts.len(), 25, "all accounts stay resident on persist failure");
2406+
}
2407+
23092408
#[test]
23102409
fn test_warm_account_genuine_miss_returns_false() {
23112410
let sm = StateManager::new();

development/qnet-integration/src/block_pipeline.rs

Lines changed: 51 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2391,27 +2391,34 @@ impl BlockPipeline {
23912391
// path; if hung, watchdog will surface it.
23922392
metrics.mark_apply_op(height, PIPELINE_OP_APPLY_DEDUP);
23932393
let dedup_start = std::time::Instant::now();
2394-
// v15.6: Dedup check runs on the blocking pool. The RocksDB lookup
2395-
// on a hot row competes with the same column family the apply
2396-
// stage writes to a few microseconds later; running it on the
2397-
// async path made one slow read freeze the entire stage. The
2398-
// tokio::task::spawn_blocking handoff is cheap (single channel
2399-
// hop) and isolates this I/O from runtime liveness.
2400-
let storage_for_dedup = ctx.storage.clone();
2401-
let already_applied = match tokio::task::spawn_blocking(move || {
2402-
storage_for_dedup.load_microblock(height)
2403-
.map(|opt| opt.is_some())
2404-
.unwrap_or(false)
2405-
}).await {
2406-
Ok(v) => v,
2407-
Err(join_err) => {
2408-
if is_warn() {
2409-
println!(
2410-
"[WARN][PIPELINE] apply_dedup_join_err h={} err={}",
2411-
height, join_err
2412-
);
2394+
// Apply is strictly sequential by height and publishes the applied frontier in
2395+
// LOCAL_BLOCKCHAIN_HEIGHT at commit. A block strictly ABOVE that frontier cannot be a
2396+
// duplicate, so the common path is answered with an O(1) atomic read — NO hot-path
2397+
// RocksDB lookup (a storage read here contends with the same CF the apply stage writes
2398+
// microseconds later, and one slow read under a maintenance-flush/compaction storm
2399+
// froze the whole stage). Only a re-delivery (height <= frontier) consults storage, off
2400+
// the hot path on the blocking pool.
2401+
let applied_tip = crate::unified_p2p::LOCAL_BLOCKCHAIN_HEIGHT
2402+
.load(std::sync::atomic::Ordering::Acquire);
2403+
let already_applied = if height > applied_tip {
2404+
false
2405+
} else {
2406+
let storage_for_dedup = ctx.storage.clone();
2407+
match tokio::task::spawn_blocking(move || {
2408+
storage_for_dedup.load_microblock(height)
2409+
.map(|opt| opt.is_some())
2410+
.unwrap_or(false)
2411+
}).await {
2412+
Ok(v) => v,
2413+
Err(join_err) => {
2414+
if is_warn() {
2415+
println!(
2416+
"[WARN][PIPELINE] apply_dedup_join_err h={} err={}",
2417+
height, join_err
2418+
);
2419+
}
2420+
false
24132421
}
2414-
false
24152422
}
24162423
};
24172424
let dedup_elapsed = dedup_start.elapsed();
@@ -3063,22 +3070,35 @@ impl BlockPipeline {
30633070
}
30643071
}
30653072

3066-
// Canonical boundary snapshot (raw accounts-CF dump via create_state_snapshot) at every
3067-
// SNAPSHOT_INCREMENTAL_INTERVAL, on EVERY node's apply path (not just the producer) so a
3068-
// cold joiner can fast-sync from any peer. This is the SAME representation the macroblock
3069-
// snapshot_root binds (compute_canonical_state_root over the accounts CF) → restore
3070-
// reproduces the bound root. Off-reactor; WARN-only, never blocks liveness.
3073+
// Canonical boundary snapshot at every SNAPSHOT_INCREMENTAL_INTERVAL, on EVERY node's apply
3074+
// path so a cold joiner can fast-sync from any peer. Pin a frozen DB view at `height`
3075+
// SYNCHRONOUSLY here — the serial apply loop has not started H+1, so the flush + snapshot
3076+
// capture exactly state_root@H. With persist-before-evict the pinned accounts CF is the
3077+
// COMPLETE committed tree leaf set (hot ∪ evicted), so a cold joiner's recompute reproduces
3078+
// the bound root past the LRU cap. The heavy serialization runs off-reactor on the frozen view.
30713079
const SNAPSHOT_INCREMENTAL_INTERVAL: u64 = 3_600;
30723080
if height > 0 && height % SNAPSHOT_INCREMENTAL_INTERVAL == 0 {
3073-
let storage_for_snapshot = ctx.storage.clone();
3074-
let snapshot_height = height;
3075-
tokio::spawn(async move {
3076-
if let Err(e) = storage_for_snapshot.create_state_snapshot(snapshot_height).await {
3081+
let snapshot_accounts = ctx.state.read().await.get_all_accounts();
3082+
match ctx.storage.prepare_snapshot_view(&snapshot_accounts) {
3083+
Ok(view) => {
3084+
let storage_for_snapshot = ctx.storage.clone();
3085+
let snapshot_height = height;
3086+
tokio::spawn(async move {
3087+
if let Err(e) = storage_for_snapshot
3088+
.create_state_snapshot(snapshot_height, view).await
3089+
{
3090+
if is_warn() {
3091+
println!("[WARN][PIPELINE] snapshot_create_failed h={} err={:?}", snapshot_height, e);
3092+
}
3093+
}
3094+
});
3095+
}
3096+
Err(e) => {
30773097
if is_warn() {
3078-
println!("[WARN][PIPELINE] snapshot_create_failed h={} err={:?}", snapshot_height, e);
3098+
println!("[WARN][PIPELINE] snapshot_prepare_failed h={} err={:?}", height, e);
30793099
}
30803100
}
3081-
});
3101+
}
30823102
}
30833103

30843104
// ────────────────────────────────────────────────────────────────

0 commit comments

Comments
 (0)