diff --git a/crates/espresso/node/src/api.rs b/crates/espresso/node/src/api.rs index a7cd26666f4..5f4978a13e7 100644 --- a/crates/espresso/node/src/api.rs +++ b/crates/espresso/node/src/api.rs @@ -3013,20 +3013,22 @@ mod api_tests { .ok() .unwrap(); - // Check that all data has been garbage collected for the decided views. + // Quorum proposals are GCed at decide; DA proposals and VID shares are + // retained for the consensus storage retention window so payloads remain + // recoverable by this node and its peers. assert!( persistence .load_da_proposal(leaf.view_number()) .await .unwrap() - .is_none() + .is_some() ); assert!( persistence .load_vid_share(leaf.view_number()) .await .unwrap() - .is_none() + .is_some() ); assert!( persistence @@ -3063,6 +3065,7 @@ mod api_tests { D: TestableSequencerDataSource + Debug + 'static, { use ark_serialize::CanonicalDeserialize; + use hotshot_types::traits::block_contents::BlockPayload; let storage = D::create_storage().await; let persistence = D::persistence_options(&storage).create().await.unwrap(); @@ -3107,12 +3110,22 @@ mod api_tests { // Create another leaf, with missing data. We have to use a different payload commitment, // otherwise the database will be able to combine the empty payload from the genesis block - // with this header, and the payload will not actually be missing. + // with this header, and the payload will not actually be missing. The namespace table must + // also be non-empty: decide processing fills empty-namespace-table blocks with the + // canonical empty payload, which would likewise make the payload not missing. + let (_, ns_table) = espresso_types::Payload::from_transactions( + [Transaction::new(1_u32.into(), vec![1, 2, 3])], + &ValidatedState::default(), + &NodeState::mock(), + ) + .await + .unwrap(); let mut block_header = leaf.block_header().clone(); *block_header.height_mut() += 1; *block_header.payload_commitment_mut() = VidCommitment::V1( CanonicalDeserialize::deserialize_uncompressed_unchecked([1u8; 32].as_slice()).unwrap(), ); + *block_header.ns_table_mut() = ns_table; let qp = QuorumProposalWrapper { proposal: QuorumProposal2 { block_header, diff --git a/crates/espresso/node/src/context.rs b/crates/espresso/node/src/context.rs index 43afa1fbd6b..01437bf431d 100644 --- a/crates/espresso/node/src/context.rs +++ b/crates/espresso/node/src/context.rs @@ -10,8 +10,11 @@ use anyhow::Context; use async_lock::RwLock; use derivative::Derivative; use espresso_types::{ - NodeState, PubKey, Transaction, ValidatedState, - v0::traits::{EventConsumer as PersistenceEventConsumer, SequencerPersistence}, + NodeState, Payload, PrivKey, PubKey, Transaction, ValidatedState, + v0::traits::{ + DecidePayloadRecovery, EventConsumer as PersistenceEventConsumer, PendingDecide, + SequencerPersistence, + }, }; use futures::{ future::join_all, @@ -25,17 +28,20 @@ use hotshot_types::{ PeerConfig, ValidatorConfig, consensus::ConsensusMetricsValue, constants::EXTERNAL_EVENT_CHANNEL_SIZE, - data::{Leaf2, ViewNumber}, + data::{DaProposal2, Leaf2, VidCommitment, ViewNumber}, epoch_membership::EpochMembershipCoordinator, - message::UpgradeLock, + message::{Proposal, UpgradeLock}, network::NetworkConfig, new_protocol::CoordinatorEvent, - simple_certificate::CertificatePair, storage_metrics::StorageMetricsValue, traits::{ + EncodeBytes, + block_contents::{BlockHeader, BlockPayload}, metrics::{Counter, Gauge, Histogram, Metrics}, network::ConnectedNetwork, + signature_key::SignatureKey, }, + utils::{EpochTransitionIndicator, option_epoch_from_block_number}, }; use parking_lot::Mutex; use request_response::RequestResponseConfig; @@ -57,6 +63,7 @@ use crate::{ RequestResponseProtocol, data_source::{DataSource, Storage as RequestResponseStorage}, network::Sender as RequestResponseSender, + payload_recovery::PayloadRecovery, recipient_source::RecipientSource, }, startup_catchup::bootstrap_epoch_window, @@ -258,7 +265,7 @@ where RequestResponseSender::new(outbound_message_sender), request_response_receiver, RecipientSource { - memberships: membership_coordinator, + memberships: membership_coordinator.clone(), consensus_handle: consensus_handle.clone(), public_key: validator_config.public_key, }, @@ -278,6 +285,15 @@ where // itself) state_catchup.add_provider(Arc::new(request_response_protocol.clone())); + // Payload recovery for the decide pipeline: fetches DA proposals from peers when a + // view is decided before its payload lands on disk, so decide events reach the + // query service complete. + let payload_recovery: Arc = Arc::new(PayloadRecovery::new( + request_response_protocol.clone(), + membership_coordinator.clone(), + epoch_height, + )); + // Create the external event handler let mut tasks = TaskList::default(); let external_event_handler = ExternalEventHandler::new( @@ -303,6 +319,7 @@ where event_consumer, anchor_view, proposal_fetcher_cfg, + Some(payload_recovery), metrics, ) .with_task_list(tasks)) @@ -323,6 +340,7 @@ where event_consumer: impl PersistenceEventConsumer + 'static, anchor_view: Option, proposal_fetcher_cfg: ProposalFetcherConfig, + payload_recovery: Option>, metrics: &dyn Metrics, ) -> Self { let events = consensus_handle.event_stream(); @@ -364,6 +382,7 @@ where event_consumer.clone(), decide_rx, anchor_view, + payload_recovery, DecideProcessorMetrics::new(metrics), ), ); @@ -376,6 +395,7 @@ where node_id, events, persistence, + ctx.validator_config.private_key.clone(), ctx.state_signer.clone(), external_event_handler, Some(event_streamer.clone()), @@ -545,9 +565,12 @@ impl, P: SequencerPersistence> Drop for SequencerCon } } -/// Latest decided view and its (optional) deciding QC, sent from the event loop to the background -/// decide processor. `None` is the initial/no-op value of the `watch` channel. -type DecideSignal = Option<(ViewNumber, Option>>)>; +/// Latest decide, sent from the event loop to the background decide processor along with the +/// in-memory event data (payloads, VID shares, cert2) used for live query-service ingestion. +/// `None` is the initial/no-op value of the `watch` channel. Under processor lag the channel +/// coalesces and intermediate values are dropped; their views are regenerated from storage, +/// which by then has had time to catch up. +type DecideSignal = Option; /// Metrics for the background decide processor. `backlog` (decided - processed) is the key signal: /// sustained growth means staging tables accumulate (no data lost, but disk grows). @@ -557,6 +580,10 @@ struct DecideProcessorMetrics { backlog: Arc, duration: Arc, failures: Arc, + /// Block payloads recovered from peers for views decided without one. + payloads_recovered: Arc, + /// Failed attempts to recover a block payload from peers. + payload_recovery_failures: Arc, } impl DecideProcessorMetrics { @@ -576,6 +603,12 @@ impl DecideProcessorMetrics { .create_histogram("process_duration".into(), Some("seconds".into())) .into(), failures: metrics.create_counter("failures".into(), None).into(), + payloads_recovered: metrics + .create_counter("payloads_recovered".into(), None) + .into(), + payload_recovery_failures: metrics + .create_counter("payload_recovery_failures".into(), None) + .into(), } } } @@ -587,6 +620,7 @@ async fn handle_events( node_id: u64, mut events: impl Stream> + Unpin, persistence: Arc

, + private_key: PrivKey, state_signer: Arc>>, external_event_handler: ExternalEventHandler, events_streamer: Option>>>, @@ -616,6 +650,72 @@ async fn handle_events( tracing::warn!("Failed to handle external message: {:?}", err); } }, + CoordinatorEvent::BlockPayloadReconstructed { + view, + header, + payload, + } => { + // A payload reconstructed after its view was decided. Make sure it lands + // in both stores: consensus storage, so restart replay and peer recovery + // can serve it (consensus' own write is asynchronous and may be lost on a + // crash), and the query service, which back-fills the block. Spawned so + // slow writes cannot stall the event loop; both writes are idempotent. + let persistence = persistence.clone(); + let consumer = event_consumer.clone(); + let consensus_handle = consensus_handle.clone(); + let private_key = private_key.clone(); + let event = event.clone(); + let view = *view; + let header = header.clone(); + let payload = payload.clone(); + spawn(async move { + // Placeholder signature, matching consensus' own asynchronous DA + // writes; readers verify payloads against the header's payload + // commitment, not this signature. + match PubKey::sign(&private_key, &[]) { + Ok(signature) => { + let epoch_height = consensus_handle.epoch_height().await; + let proposal = Proposal { + data: DaProposal2:: { + encoded_transactions: payload.encode(), + metadata: header.metadata().clone(), + view_number: view, + epoch: option_epoch_from_block_number( + true, + header.block_number(), + epoch_height, + ), + epoch_transition_indicator: + EpochTransitionIndicator::NotInTransition, + }, + signature, + _pd: PhantomData, + }; + if let Err(err) = persistence + .append_da2(&proposal, header.payload_commitment()) + .await + { + tracing::warn!( + ?view, + "failed to persist reconstructed payload: {err:#}" + ); + } + }, + Err(err) => { + tracing::warn!( + ?view, + "failed to sign reconstructed DA proposal: {err:#}" + ); + }, + } + if let Err(err) = consumer.handle_event(&event).await { + tracing::warn!( + ?view, + "failed to store reconstructed payload in query service: {err:#}" + ); + } + }); + }, _ => {}, } @@ -666,6 +766,7 @@ async fn process_decided_events_task( consumer: Arc, mut decide_rx: watch::Receiver, anchor_view: Option, + payload_recovery: Option>, metrics: DecideProcessorMetrics, ) where P: SequencerPersistence, @@ -675,16 +776,25 @@ async fn process_decided_events_task( // cursor reported below raises it. let mut last_processed = anchor_view.map(|v| v.u64()).unwrap_or(0); - // Process leaves persisted before a previous shutdown but not yet handled. + // Process leaves persisted before a previous shutdown but not yet handled. No in-memory + // decide data survives a restart, so this pass runs purely from storage. if let Some(view) = anchor_view { match persistence - .process_decided_events(view, None, consumer.as_ref()) + .process_decided_events(view, None, consumer.as_ref(), None) .await { - Ok(processed) => { - if let Some(v) = processed { + Ok(outcome) => { + if let Some(v) = outcome.processed { last_processed = last_processed.max(v.u64()); } + spawn_payload_recovery( + &payload_recovery, + &persistence, + &consumer, + view.u64(), + outcome.missing_payload, + &metrics, + ); }, Err(err) => tracing::warn!( "failed to process decided leaves on startup, chain may not be up to date: {err:#}" @@ -706,10 +816,10 @@ async fn process_decided_events_task( Err(_) => {}, // Timed out; fall through to retry `latest`. } - let Some((view, deciding_qc)) = latest.clone() else { + let Some(pending) = latest.clone() else { continue; }; - let decided = view.u64(); + let decided = pending.view.u64(); metrics.last_decided.set(decided as usize); metrics .backlog @@ -717,20 +827,39 @@ async fn process_decided_events_task( let start = Instant::now(); let result = persistence - .process_decided_events(view, deciding_qc, consumer.as_ref()) + .process_decided_events( + pending.view, + pending.deciding_qc.clone(), + consumer.as_ref(), + // The in-memory data from the decide event, so events for just-decided + // views don't depend on consensus' asynchronous storage writes having + // landed. Retries reuse it; views it doesn't cover fall back to storage. + Some(&pending.data), + ) .await; metrics.duration.add_point(start.elapsed().as_secs_f64()); match result { - Ok(processed) => { + Ok(outcome) => { // Advance from the real cursor, not `decided`: if ingestion/GC lagged, `processed` // stays behind and the backlog gauge reflects it. - if let Some(v) = processed { + if let Some(v) = outcome.processed { last_processed = last_processed.max(v.u64()); } + // Recover payloads for leaves whose decide events were emitted without one, + // in the background. Results are delivered straight to consensus storage and + // the query service, so the cursor never waits on the network. + spawn_payload_recovery( + &payload_recovery, + &persistence, + &consumer, + decided, + outcome.missing_payload, + &metrics, + ); // reset latest if we have processed all the decided leaves - if let Some((view, _)) = latest.clone() - && last_processed >= view.u64() + if let Some(pending) = &latest + && last_processed >= pending.view.u64() { latest = None; } @@ -742,12 +871,133 @@ async fn process_decided_events_task( Err(err) => { // Cursor not advanced, so this range is retried next iteration; no data is lost. metrics.failures.add(1); - tracing::warn!(?view, "deferred decide processing failed: {err:#}"); + tracing::warn!( + view = ?pending.view, + "deferred decide processing failed: {err:#}" + ); }, } } } +/// Only attempt peer recovery for views within this distance of the newest decided view. +/// Peers retain DA proposals for their consensus storage retention window (about this many +/// views by default); anything older is very unlikely to be recoverable over the consensus +/// network and is left to the query service's peer fetching instead. +const PAYLOAD_RECOVERY_HORIZON: u64 = 130000; + +/// Number of attempts to recover a view's payload from peers before giving up and leaving +/// the gap to the query service's own fetching. +const PAYLOAD_RECOVERY_ATTEMPTS: u32 = 3; + +/// Spawn a background task recovering the payloads of `missing` — leaves whose decide +/// events were emitted without one — from peers. Each leaf is reported by exactly one +/// successful processing pass (the cursor advances past it), so recovery is attempted once +/// per leaf, with a bounded number of request retries. +fn spawn_payload_recovery( + payload_recovery: &Option>, + persistence: &Arc

, + consumer: &Arc, + decided_view: u64, + missing: Vec>, + metrics: &DecideProcessorMetrics, +) where + P: SequencerPersistence, + C: PersistenceEventConsumer + 'static, +{ + let Some(recovery) = payload_recovery else { + return; + }; + let leaves = missing + .into_iter() + .filter(|leaf| { + // Recovery is only supported for new-protocol (V2) payload commitments, and + // only within the window peers retain DA proposals for. + matches!( + leaf.block_header().payload_commitment(), + VidCommitment::V2(_) + ) && decided_view.saturating_sub(leaf.view_number().u64()) <= PAYLOAD_RECOVERY_HORIZON + }) + .collect::>(); + if leaves.is_empty() { + return; + } + spawn(recover_missing_payloads( + recovery.clone(), + persistence.clone(), + consumer.clone(), + leaves, + metrics.payloads_recovered.clone(), + metrics.payload_recovery_failures.clone(), + )); +} + +/// Fetch missing block payloads from peers and deliver each one the same way a late +/// `BlockPayloadReconstructed` event is delivered: persist the DA proposal to consensus +/// storage (so restart replay and peers see it), then forward the payload to the query +/// service, which back-fills the block decided without it. +pub(crate) async fn recover_missing_payloads( + recovery: Arc, + persistence: Arc

, + consumer: Arc, + leaves: Vec>, + recovered: Arc, + failures: Arc, +) where + P: SequencerPersistence, + C: PersistenceEventConsumer + 'static, +{ + for leaf in leaves { + let view = leaf.view_number(); + let mut proposal = None; + for attempt in 1..=PAYLOAD_RECOVERY_ATTEMPTS { + match recovery.recover_payload(&leaf).await { + Ok(Some(found)) => { + proposal = Some(found); + break; + }, + Ok(None) => { + tracing::warn!(?view, attempt, "could not recover block payload from peers"); + }, + Err(err) => { + tracing::warn!(?view, attempt, "payload recovery failed: {err:#}"); + }, + } + } + let Some(proposal) = proposal else { + failures.add(1); + continue; + }; + tracing::info!(?view, "recovered block payload from peers"); + recovered.add(1); + + // Consensus storage first, so the payload survives a restart and can be served to + // peers; the write is idempotent. + if let Err(err) = persistence + .append_da2(&proposal, leaf.block_header().payload_commitment()) + .await + { + tracing::warn!(?view, "failed to store recovered payload: {err:#}"); + } + + // Then the query service, through the same event the coordinator emits for late + // local reconstructions. + let payload = + Payload::from_bytes(&proposal.data.encoded_transactions, &proposal.data.metadata); + let event = CoordinatorEvent::BlockPayloadReconstructed { + view, + header: leaf.block_header().clone(), + payload, + }; + if let Err(err) = consumer.handle_event(&event).await { + tracing::warn!( + ?view, + "failed to store recovered payload in query service: {err:#}" + ); + } + } +} + #[derive(Debug, Default, Clone)] #[allow(clippy::type_complexity)] pub(crate) struct TaskList(Arc)>>>); diff --git a/crates/espresso/node/src/persistence.rs b/crates/espresso/node/src/persistence.rs index 9a06bd516ab..8ccb3adf9c2 100644 --- a/crates/espresso/node/src/persistence.rs +++ b/crates/espresso/node/src/persistence.rs @@ -7,6 +7,41 @@ //! This is distinct from the query service persistent storage found in the `api` module, which is //! an extension that node operators can opt into. This module defines the minimum level of //! persistence which is _required_ to run a node. +//! +//! # Payload delivery to the query service +//! +//! The query service is fed exclusively by the decide pipeline implemented here: the consensus +//! event loop persists decided leaves (`persist_event`), and a background task +//! (`process_decided_events`) builds decide events from the persisted leaf spine and hands them +//! to the event consumer, advancing a cursor only on success. The payload, VID share, and cert2 +//! attached to each event come first from the in-memory decide data captured by `persist_event` +//! ([`DecideEventData`](espresso_types::v0::traits::DecideEventData)), falling back to the +//! consensus staging tables (DA proposals, VID shares) for views not covered — restart replay, +//! signals coalesced under processor lag, or decides that never had the data. +//! +//! Under the new protocol, a node usually obtains a block payload by reconstructing it from VID +//! shares carried in Vote1 broadcasts, and the result is written to storage *asynchronously* — so +//! the payload can land on disk shortly after its view is decided, or (if the node's vote was not +//! needed for quorum and it missed the share broadcasts) never. Decide events are never delayed +//! waiting for that data; instead, payload delivery is guaranteed in event-driven layers: +//! +//! 1. **In-memory decide data**: the decided leaves arrive with their payloads filled in and +//! VID shares attached; the decide event is built directly from them, with no dependence on +//! the asynchronous storage writes having landed. This is the normal path. +//! 2. **Late back-fill**: when a payload is reconstructed *after* its view was already decided, +//! the coordinator emits `BlockPayloadReconstructed`; the event loop persists the payload to +//! consensus storage (so restart replay and peers see it) and forwards it to the query +//! service, which back-fills the block. +//! 3. **Peer recovery** ([`DecidePayloadRecovery`](espresso_types::v0::traits::DecidePayloadRecovery)): +//! when a decide event is emitted with the payload still missing (the node never received +//! enough shares to reconstruct it), the reported leaves are handed to a background task that +//! fetches the DA proposal from peers over the request-response protocol, verifies it against +//! the header's payload commitment, and delivers it through the same path as layer 2. To make +//! this possible, DA proposals and VID shares are *retained* after processing for the +//! consensus storage retention window (instead of being deleted at decide), so every node can +//! serve recently decided payloads. +//! 4. **Query service fetching**: as a final backstop, blocks stored without a payload are healed +//! by the query service's own peer fetching. use std::collections::HashMap; @@ -149,10 +184,11 @@ mod tests { network_config::light_client_genesis_from_stake_table, }; use espresso_types::{ - Event, L1Client, L1ClientOptions, Leaf, Leaf2, NodeState, PubKey, SeqTypes, ValidatedState, + Event, Header, L1Client, L1ClientOptions, Leaf, Leaf2, NodeState, Payload, PubKey, + SeqTypes, Transaction, ValidatedState, traits::{ - EventConsumer, EventsPersistenceRead, MembershipPersistence, NullEventConsumer, - PersistenceOptions, SequencerPersistence, + DecideEventData, DecidePayloadRecovery, EventConsumer, EventsPersistenceRead, + MembershipPersistence, NullEventConsumer, PersistenceOptions, SequencerPersistence, }, v0_3::{AuthenticatedValidator, EventKey, Fetcher, RegisteredValidator, StakeTableEvent}, }; @@ -169,7 +205,8 @@ mod tests { use hotshot_types::{ data::{ DaProposal2, EpochNumber, QuorumProposal2, QuorumProposalWrapper, VidCommitment, - ViewNumber, ns_table::parse_ns_table, vid_commitment, vid_disperse::AvidMDisperseShare, + VidDisperseShare, ViewNumber, ns_table::parse_ns_table, vid_commitment, + vid_disperse::AvidMDisperseShare, }, event::{EventType, HotShotAction, LeafInfo}, light_client::StateKeyPair, @@ -180,7 +217,11 @@ mod tests { UpgradeCertificate, }, simple_vote::{NextEpochQuorumData2, QuorumData2, UpgradeProposalData, VersionedVoteData}, - traits::{EncodeBytes, block_contents::BlockHeader}, + traits::{ + EncodeBytes, + block_contents::{BlockHeader, BlockPayload}, + metrics::NoMetrics, + }, utils::EpochTransitionIndicator, vid::avidm::{AvidMScheme, init_avidm_param}, vote::HasViewNumber, @@ -201,6 +242,7 @@ mod tests { test_helpers::{STAKE_TABLE_CAPACITY_FOR_TEST, TestNetwork, TestNetworkConfigBuilder}, }, catchup::NullStateCatchup, + context::recover_missing_payloads, testing::{TestConfigBuilder, staking_priv_keys}, }; @@ -821,15 +863,26 @@ mod tests { ViewNumber::new(2) ); + // DA proposals and VID shares are retained after processing (for the consensus + // storage retention window) so payloads remain recoverable by this node and its + // peers; only the retention-based pruner removes them. for i in 0..=2 { - assert_eq!( - storage.load_da_proposal(ViewNumber::new(i)).await.unwrap(), - None + assert!( + storage + .load_da_proposal(ViewNumber::new(i)) + .await + .unwrap() + .is_some(), + "DA proposals should be retained after processing" ); - assert_eq!( - storage.load_vid_share(ViewNumber::new(i)).await.unwrap(), - None + assert!( + storage + .load_vid_share(ViewNumber::new(i)) + .await + .unwrap() + .is_some(), + "VID shares should be retained after processing" ); } @@ -903,15 +956,24 @@ mod tests { let info = &leaf_chain[0]; assert_eq!(info.leaf, leaves[3]); - // The remaining data should have been GCed. - assert_eq!( - storage.load_da_proposal(ViewNumber::new(3)).await.unwrap(), - None + // Quorum proposals are GCed at decide; DA proposals and VID shares are retained + // for the retention window so payloads remain recoverable. + assert!( + storage + .load_da_proposal(ViewNumber::new(3)) + .await + .unwrap() + .is_some(), + "DA proposals should be retained after processing" ); - assert_eq!( - storage.load_vid_share(ViewNumber::new(3)).await.unwrap(), - None + assert!( + storage + .load_vid_share(ViewNumber::new(3)) + .await + .unwrap() + .is_some(), + "VID shares should be retained after processing" ); assert_eq!( storage.load_quorum_proposals().await.unwrap(), @@ -1211,22 +1273,26 @@ mod tests { ) .await .unwrap(); - // Garbage collection should have run. + // DA proposals and VID shares are retained after processing (for the consensus + // storage retention window) so payloads remain recoverable by this node and its + // peers; only the retention-based pruner removes them. for i in 0..4 { - tracing::info!(i, "check proposal garbage collected"); + tracing::info!(i, "check proposal retained"); assert!( storage .load_vid_share(ViewNumber::new(i)) .await .unwrap() - .is_none() + .is_some(), + "VID shares should be retained after processing" ); assert!( storage .load_da_proposal(ViewNumber::new(i)) .await .unwrap() - .is_none() + .is_some(), + "DA proposals should be retained after processing" ); } tracing::info!("check anchor leaf updated"); @@ -1421,7 +1487,7 @@ mod tests { // A failing consumer propagates the error and leaves the cursor un-advanced: nothing is // GC'd and the range is retried below. storage - .process_decided_events(ViewNumber::new(3), None, &FailConsumer) + .process_decided_events(ViewNumber::new(3), None, &FailConsumer, None) .await .unwrap_err(); for i in 0..4 { @@ -1437,15 +1503,20 @@ mod tests { // One process pass at the latest view drains the whole backlog, runs GC, and reports the // cursor it advanced to. - let processed = storage - .process_decided_events(ViewNumber::new(3), None, &consumer) + let outcome = storage + .process_decided_events(ViewNumber::new(3), None, &consumer, None) .await .unwrap(); assert_eq!( - processed, + outcome.processed, Some(ViewNumber::new(3)), "process_decided_events should report the highest processed view" ); + assert!( + outcome.missing_payload.is_empty(), + "no leaf should be reported missing its payload: {:?}", + outcome.missing_payload + ); // All four leaves delivered, with payloads and VID shares reconstructed from storage. let leaf_chain = consumer.leaf_chain().await; @@ -1457,30 +1528,32 @@ mod tests { assert!(info.leaf.block_payload().is_some()); } - // GC ran for the processed range. + // DA proposals and VID shares are retained after processing (for the consensus + // storage retention window) so payloads remain recoverable by this node and its + // peers; they are only removed by the retention-based pruner. for i in 0..4 { assert!( storage .load_vid_share(ViewNumber::new(i)) .await .unwrap() - .is_none(), - "process_decided_events should have garbage collected VID shares" + .is_some(), + "process_decided_events should retain VID shares for the retention window" ); assert!( storage .load_da_proposal(ViewNumber::new(i)) .await .unwrap() - .is_none(), - "process_decided_events should have garbage collected DA proposals" + .is_some(), + "process_decided_events should retain DA proposals for the retention window" ); } // Re-processing with nothing new is a no-op. let consumer2 = EventCollector::default(); storage - .process_decided_events(ViewNumber::new(3), None, &consumer2) + .process_decided_events(ViewNumber::new(3), None, &consumer2, None) .await .unwrap(); assert!( @@ -1493,6 +1566,714 @@ mod tests { ); } + /// Build a mock chain of `len` consecutive decided leaves (all sharing the genesis + /// header/payload) along with their VID share and DA proposal artifacts, plus the + /// payload's VID commitment. + #[allow(clippy::type_complexity)] + async fn mock_chain( + len: u64, + ) -> ( + Vec<( + Leaf2, + QuorumCertificate2, + Proposal>, + Proposal>, + )>, + VidCommitment, + ) { + let leaf: Leaf2 = Leaf::genesis( + &ValidatedState::default(), + &NodeState::mock(), + MOCK_UPGRADE.base, + ) + .await + .into(); + let leaf_payload = leaf.block_payload().unwrap(); + let leaf_payload_bytes_arc = leaf_payload.encode(); + let avidm_param = init_avidm_param(2).unwrap(); + let weights = vec![1u32; 2]; + let ns_table = parse_ns_table( + leaf_payload.byte_len().as_usize(), + &leaf_payload.ns_table().encode(), + ); + let (payload_commitment, shares) = + AvidMScheme::ns_disperse(&avidm_param, &weights, &leaf_payload_bytes_arc, ns_table) + .unwrap(); + + let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], 1); + let mut vid = AvidMDisperseShare:: { + view_number: ViewNumber::new(0), + payload_commitment, + share: shares[0].clone(), + recipient_key: pubkey, + epoch: Some(EpochNumber::new(0)), + target_epoch: Some(EpochNumber::new(0)), + common: avidm_param, + } + .to_proposal(&privkey) + .unwrap() + .clone(); + let mut quorum_proposal = QuorumProposalWrapper:: { + proposal: QuorumProposal2:: { + block_header: leaf.block_header().clone(), + view_number: ViewNumber::genesis(), + justify_qc: QuorumCertificate::genesis( + &ValidatedState::default(), + &NodeState::mock(), + TEST_VERSIONS.test, + ) + .await + .to_qc2(), + upgrade_certificate: None, + view_change_evidence: None, + next_drb_result: None, + next_epoch_justify_qc: None, + epoch: None, + state_cert: None, + }, + }; + let mut qc = QuorumCertificate2::genesis( + &ValidatedState::default(), + &NodeState::mock(), + TEST_VERSIONS.test, + ) + .await; + + let block_payload_signature = BLSPubKey::sign(&privkey, &leaf_payload_bytes_arc) + .expect("Failed to sign block payload"); + let mut da_proposal = Proposal { + data: DaProposal2:: { + encoded_transactions: leaf_payload_bytes_arc.clone(), + metadata: leaf_payload.ns_table().clone(), + view_number: ViewNumber::new(0), + epoch: Some(EpochNumber::new(0)), + epoch_transition_indicator: EpochTransitionIndicator::NotInTransition, + }, + signature: block_payload_signature, + _pd: Default::default(), + }; + + let commit = vid_commitment( + &leaf_payload_bytes_arc, + &leaf.block_header().metadata().encode(), + 2, + TEST_VERSIONS.test.base, + ); + + let mut chain = vec![]; + for i in 0..len { + quorum_proposal.proposal.view_number = ViewNumber::new(i); + let leaf = Leaf2::from_quorum_proposal(&quorum_proposal); + qc.view_number = leaf.view_number(); + qc.data.leaf_commit = Committable::commit(&leaf); + vid.data.view_number = leaf.view_number(); + da_proposal.data.view_number = leaf.view_number(); + chain.push((leaf.clone(), qc.clone(), vid.clone(), da_proposal.clone())); + } + (chain, commit) + } + + type MockChain = Vec<( + Leaf2, + QuorumCertificate2, + Proposal>, + Proposal>, + )>; + + /// Build a mock chain like [`mock_chain`], but whose blocks carry a real (non-empty) + /// payload, so the decide pipeline genuinely needs a payload source — the in-memory + /// decide data or a persisted DA proposal; the empty-namespace-table fast path does + /// not apply. + async fn mock_chain_with_txns(len: u64) -> (MockChain, Payload, VidCommitment) { + let (payload, ns_table) = Payload::from_transactions( + [Transaction::new(1_u32.into(), vec![1, 2, 3])], + &ValidatedState::default(), + &NodeState::mock(), + ) + .await + .unwrap(); + assert!( + ns_table.iter().next().is_some(), + "test payload must have a non-empty namespace table" + ); + let header = Header::genesis( + &NodeState::mock(), + payload.clone(), + &ns_table, + MOCK_UPGRADE.base, + ); + let payload_bytes = payload.encode(); + + let avidm_param = init_avidm_param(2).unwrap(); + let weights = vec![1u32; 2]; + let avidm_ns_table = parse_ns_table(payload.byte_len().as_usize(), &ns_table.encode()); + let (payload_commitment, shares) = + AvidMScheme::ns_disperse(&avidm_param, &weights, &payload_bytes, avidm_ns_table) + .unwrap(); + + let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], 1); + let mut vid = AvidMDisperseShare:: { + view_number: ViewNumber::new(0), + payload_commitment, + share: shares[0].clone(), + recipient_key: pubkey, + epoch: Some(EpochNumber::new(0)), + target_epoch: Some(EpochNumber::new(0)), + common: avidm_param, + } + .to_proposal(&privkey) + .unwrap() + .clone(); + let mut quorum_proposal = QuorumProposalWrapper:: { + proposal: QuorumProposal2:: { + block_header: header, + view_number: ViewNumber::genesis(), + justify_qc: QuorumCertificate::genesis( + &ValidatedState::default(), + &NodeState::mock(), + TEST_VERSIONS.test, + ) + .await + .to_qc2(), + upgrade_certificate: None, + view_change_evidence: None, + next_drb_result: None, + next_epoch_justify_qc: None, + epoch: None, + state_cert: None, + }, + }; + let mut qc = QuorumCertificate2::genesis( + &ValidatedState::default(), + &NodeState::mock(), + TEST_VERSIONS.test, + ) + .await; + + let block_payload_signature = + BLSPubKey::sign(&privkey, &payload_bytes).expect("Failed to sign block payload"); + let mut da_proposal = Proposal { + data: DaProposal2:: { + encoded_transactions: payload_bytes.clone(), + metadata: ns_table.clone(), + view_number: ViewNumber::new(0), + epoch: Some(EpochNumber::new(0)), + epoch_transition_indicator: EpochTransitionIndicator::NotInTransition, + }, + signature: block_payload_signature, + _pd: Default::default(), + }; + + let commit = vid_commitment( + &payload_bytes, + &ns_table.encode(), + 2, + TEST_VERSIONS.test.base, + ); + + let mut chain = vec![]; + for i in 0..len { + quorum_proposal.proposal.view_number = ViewNumber::new(i); + let leaf = Leaf2::from_quorum_proposal(&quorum_proposal); + qc.view_number = leaf.view_number(); + qc.data.leaf_commit = Committable::commit(&leaf); + vid.data.view_number = leaf.view_number(); + da_proposal.data.view_number = leaf.view_number(); + chain.push((leaf.clone(), qc.clone(), vid.clone(), da_proposal.clone())); + } + (chain, payload, commit) + } + + /// Capture the in-memory decide data for `views` of the chain, the way `persist_event` + /// does in production: the decided leaves come with their payloads filled in and their + /// VID shares attached. + fn live_decide_data( + chain: &MockChain, + payload: &Payload, + views: impl IntoIterator, + ) -> DecideEventData { + let views = views.into_iter().collect::>(); + let infos = chain + .iter() + .filter(|(leaf, ..)| views.contains(&leaf.view_number().u64())) + .map(|(leaf, _, vid, _)| { + let mut leaf = leaf.clone(); + leaf.fill_block_payload_unchecked(payload.clone()); + let share: Proposal> = + convert_proposal(vid.clone()); + LeafInfo { + leaf, + vid_share: Some(share.data), + state: Default::default(), + delta: None, + state_cert: None, + } + }) + .collect::>(); + DecideEventData::new(infos.iter(), None) + } + + /// The in-memory data from the decide event alone is enough to emit complete decide + /// events: with the consensus staging tables completely empty (as when a view is + /// decided before consensus' asynchronous storage writes land), processing with the + /// live data attached emits every leaf with its payload and VID share — without ever + /// reading or writing the staging tables, and with nothing reported missing. + #[rstest_reuse::apply(persistence_types)] + pub async fn test_decide_from_memory(_p: PhantomData

) { + let tmp = P::tmp_storage().await; + let storage = P::options(&tmp).create().await.unwrap(); + + let (chain, payload, _) = mock_chain_with_txns(4).await; + + // Persist all four decided leaves. Nothing is written to the staging tables: the + // background DA/VID writes have not landed yet. + let consumer = EventCollector::default(); + let leaf_chain = chain + .iter() + .map(|(leaf, qc, ..)| (leaf_info(leaf.clone()), qc.clone())) + .collect::>(); + storage + .persist_decided_leaves( + ViewNumber::new(3), + leaf_chain + .iter() + .map(|(leaf, qc)| (leaf, CertificatePair::non_epoch_change(qc.clone()))), + None, + &consumer, + ) + .await + .unwrap(); + + // One pass with the live data completes immediately, with nothing missing. + let live = live_decide_data(&chain, &payload, 0..4); + let outcome = storage + .process_decided_events(ViewNumber::new(3), None, &consumer, Some(&live)) + .await + .unwrap(); + assert_eq!( + outcome.processed, + Some(ViewNumber::new(3)), + "live data must allow processing without the staging tables" + ); + assert!( + outcome.missing_payload.is_empty(), + "no payload should be reported missing: {:?}", + outcome.missing_payload + ); + + // Every post-genesis leaf was delivered exactly once, complete with the payload + // and VID share from memory. (Genesis is special-cased — the canonical empty + // payload — and the fs backend may re-emit it as its anchor, which consumers are + // required to tolerate idempotently, so it is checked separately.) + let leaf_chain = consumer.leaf_chain().await; + for (leaf, _, vid, _) in chain.iter().skip(1) { + let infos = leaf_chain + .iter() + .filter(|info| info.leaf.view_number() == leaf.view_number()) + .collect::>(); + assert_eq!( + infos.len(), + 1, + "each post-genesis view must be delivered exactly once: {leaf_chain:#?}" + ); + let info = infos[0]; + assert_eq!(info.leaf, *leaf); + assert_eq!( + info.leaf.block_payload().unwrap().encode(), + payload.encode(), + "the payload must be the one carried by the decide event" + ); + let expected: Proposal> = + convert_proposal(vid.clone()); + assert_eq!( + info.vid_share.as_ref().unwrap(), + &expected.data, + "the VID share must be the one carried by the decide event" + ); + } + + // The staging tables were never involved: nothing read them and nothing wrote + // them, proving the data came from memory. + for i in 0..4 { + assert!( + storage + .load_da_proposal(ViewNumber::new(i)) + .await + .unwrap() + .is_none(), + "the live path must not populate the DA staging table" + ); + assert!( + storage + .load_vid_share(ViewNumber::new(i)) + .await + .unwrap() + .is_none(), + "the live path must not populate the VID staging table" + ); + } + } + + /// Views not covered by the in-memory decide data fall back to the consensus staging + /// tables: a single pass emits storage-sourced and memory-sourced leaves side by side. + #[rstest_reuse::apply(persistence_types)] + pub async fn test_decide_from_memory_partial(_p: PhantomData

) { + let tmp = P::tmp_storage().await; + let storage = P::options(&tmp).create().await.unwrap(); + + let (chain, payload, commit) = mock_chain_with_txns(4).await; + + // Views 0 and 1 have their artifacts on disk (the background writes landed); + // views 2 and 3 do not. + for (_, _, vid, da) in chain.iter().take(2) { + storage.append_da2(da, commit).await.unwrap(); + storage + .append_vid(&convert_proposal(vid.clone())) + .await + .unwrap(); + } + + let consumer = EventCollector::default(); + let leaf_chain = chain + .iter() + .map(|(leaf, qc, ..)| (leaf_info(leaf.clone()), qc.clone())) + .collect::>(); + storage + .persist_decided_leaves( + ViewNumber::new(3), + leaf_chain + .iter() + .map(|(leaf, qc)| (leaf, CertificatePair::non_epoch_change(qc.clone()))), + None, + &consumer, + ) + .await + .unwrap(); + + // The live data covers only views 2 and 3 (e.g. an older signal was coalesced + // away under processor lag); one pass still completes, mixing sources per view. + let live = live_decide_data(&chain, &payload, 2..4); + let outcome = storage + .process_decided_events(ViewNumber::new(3), None, &consumer, Some(&live)) + .await + .unwrap(); + assert_eq!(outcome.processed, Some(ViewNumber::new(3))); + assert!( + outcome.missing_payload.is_empty(), + "no payload should be reported missing: {:?}", + outcome.missing_payload + ); + + let leaf_chain = consumer.leaf_chain().await; + assert_eq!(leaf_chain.len(), 4, "{leaf_chain:#?}"); + for info in &leaf_chain { + assert_eq!( + info.leaf.block_payload().unwrap().encode(), + payload.encode() + ); + assert!(info.vid_share.is_some()); + } + + // Views 2 and 3 were only ever in memory; the staging tables still don't know + // them. + for i in 2..4 { + assert!( + storage + .load_da_proposal(ViewNumber::new(i)) + .await + .unwrap() + .is_none() + ); + assert!( + storage + .load_vid_share(ViewNumber::new(i)) + .await + .unwrap() + .is_none() + ); + } + } + + /// Missing data never delays the decide pipeline: a single pass emits every leaf + /// immediately, attaching whatever data is available. Leaves missing their payload + /// are reported in the outcome for background peer recovery; leaves missing only VID + /// data are emitted without it (the query service heals VID via peer fetching) and + /// NOT reported. + #[rstest_reuse::apply(persistence_types)] + pub async fn test_decide_missing_data_emitted_and_reported( + _p: PhantomData

, + ) { + let tmp = P::tmp_storage().await; + let storage = P::options(&tmp).create().await.unwrap(); + + let (chain, _payload, commit) = mock_chain_with_txns(4).await; + + // DA proposals land only for views 0 and 1; VID shares only for views 0-2. View 3 + // is missing both, and views 2 and 3 are missing their payloads. + for (_, _, _, da) in chain.iter().take(2) { + storage.append_da2(da, commit).await.unwrap(); + } + for (_, _, vid, _) in chain.iter().take(3) { + storage + .append_vid(&convert_proposal(vid.clone())) + .await + .unwrap(); + } + + // Persist all four decided leaves up front. + let consumer = EventCollector::default(); + let leaf_chain = chain + .iter() + .map(|(leaf, qc, ..)| (leaf_info(leaf.clone()), qc.clone())) + .collect::>(); + storage + .persist_decided_leaves( + ViewNumber::new(3), + leaf_chain + .iter() + .map(|(leaf, qc)| (leaf, CertificatePair::non_epoch_change(qc.clone()))), + None, + &consumer, + ) + .await + .unwrap(); + + // One pass processes everything: nothing defers, the cursor reaches the newest + // decided view. + let outcome = storage + .process_decided_events(ViewNumber::new(3), None, &consumer, None) + .await + .unwrap(); + assert_eq!( + outcome.processed, + Some(ViewNumber::new(3)), + "missing data must not hold the cursor back" + ); + + // The leaves missing their payloads (views 2 and 3) are reported for recovery, + // oldest first; the leaf missing only its VID share is not. + assert_eq!( + outcome + .missing_payload + .iter() + .map(|leaf| leaf.view_number().u64()) + .collect::>(), + vec![2, 3], + "exactly the payload-less leaves must be reported, in view order" + ); + + // All four leaves were emitted, each with whatever data was available. + let leaf_chain = consumer.leaf_chain().await; + assert_eq!(leaf_chain.len(), 4, "{leaf_chain:#?}"); + for ((leaf, ..), info) in chain.iter().zip(leaf_chain.iter()) { + assert_eq!(info.leaf, *leaf); + let view = leaf.view_number().u64(); + assert_eq!( + info.leaf.block_payload().is_some(), + view < 2, + "only views with a stored DA proposal have payloads (view {view})" + ); + assert_eq!( + info.vid_share.is_some(), + view < 3, + "only views with a stored share have VID data (view {view})" + ); + } + + // Re-processing with nothing new emits nothing and reports nothing: each leaf is + // reported missing its payload by exactly one successful pass, so background + // recovery is triggered exactly once per leaf. + let consumer2 = EventCollector::default(); + let outcome = storage + .process_decided_events(ViewNumber::new(3), None, &consumer2, None) + .await + .unwrap(); + assert!( + outcome.missing_payload.is_empty(), + "an already-processed leaf must not be reported again: {:?}", + outcome.missing_payload + ); + // (The fs backend may re-emit its anchor leaf, which consumers are required to + // tolerate idempotently; nothing else is emitted.) + assert!( + consumer2 + .leaf_chain() + .await + .iter() + .all(|info| info.leaf.view_number() == ViewNumber::new(3)), + "re-processing must not re-emit already-processed leaves" + ); + } + + /// Blocks with an empty namespace table don't need a DA proposal: their payload is + /// the canonical empty payload and is filled in directly, so they are not reported + /// as missing it. + #[rstest_reuse::apply(persistence_types)] + pub async fn test_decide_empty_payload_fast_path(_p: PhantomData

) { + let tmp = P::tmp_storage().await; + let storage = P::options(&tmp).create().await.unwrap(); + + let (chain, _) = mock_chain(2).await; + + // VID shares land, but no DA proposals at all. The mock headers have an empty + // namespace table, so the payload is known regardless. + for (_, _, vid, _) in &chain { + storage + .append_vid(&convert_proposal(vid.clone())) + .await + .unwrap(); + } + + let consumer = EventCollector::default(); + let leaf_chain = chain + .iter() + .map(|(leaf, qc, ..)| (leaf_info(leaf.clone()), qc.clone())) + .collect::>(); + storage + .persist_decided_leaves( + ViewNumber::new(1), + leaf_chain + .iter() + .map(|(leaf, qc)| (leaf, CertificatePair::non_epoch_change(qc.clone()))), + None, + &consumer, + ) + .await + .unwrap(); + + // The empty payload is filled in, both leaves process, and nothing is reported + // missing. + let outcome = storage + .process_decided_events(ViewNumber::new(1), None, &consumer, None) + .await + .unwrap(); + assert_eq!(outcome.processed, Some(ViewNumber::new(1))); + assert!( + outcome.missing_payload.is_empty(), + "empty-namespace-table blocks must not be reported missing their payload: {:?}", + outcome.missing_payload + ); + let leaf_chain = consumer.leaf_chain().await; + assert_eq!(leaf_chain.len(), 2); + for info in &leaf_chain { + assert!( + info.leaf.block_payload().is_some(), + "empty-namespace-table blocks get the canonical empty payload" + ); + } + } + + /// Serves DA proposals for recovery from a fixed map, simulating peers that retained + /// them in their consensus storage. + #[derive(Debug, Default)] + struct MockPayloadRecovery { + proposals: BTreeMap>>, + } + + #[async_trait] + impl DecidePayloadRecovery for MockPayloadRecovery { + async fn recover_payload( + &self, + leaf: &Leaf2, + ) -> anyhow::Result>>> { + Ok(self.proposals.get(&leaf.view_number().u64()).cloned()) + } + } + + /// Records `BlockPayloadReconstructed` events, the delivery path shared by late local + /// reconstructions and peer-recovered payloads. + #[derive(Clone, Debug, Default)] + struct PayloadCollector { + payloads: Arc>>, + } + + #[async_trait] + impl EventConsumer for PayloadCollector { + async fn handle_event(&self, event: &CoordinatorEvent) -> anyhow::Result<()> { + if let CoordinatorEvent::BlockPayloadReconstructed { view, payload, .. } = event { + self.payloads.write().await.push((*view, payload.clone())); + } + Ok(()) + } + } + + /// A payload recovered from peers is delivered to *both* stores: the DA proposal is + /// persisted to consensus storage (where restart replay and other peers can see it) + /// and a `BlockPayloadReconstructed` event back-fills the query service. + #[rstest_reuse::apply(persistence_types)] + pub async fn test_recovered_payload_delivered_to_both_stores( + _p: PhantomData

, + ) { + let tmp = P::tmp_storage().await; + let storage = Arc::new(P::options(&tmp).create().await.unwrap()); + + let (chain, payload, _) = mock_chain_with_txns(2).await; + + // Decide both views with no payload data anywhere. View 1 is emitted without its + // payload and reported for recovery (view 0 is the genesis view, which is + // special-cased to the canonical empty payload). + let consumer = EventCollector::default(); + let leaf_chain = chain + .iter() + .map(|(leaf, qc, ..)| (leaf_info(leaf.clone()), qc.clone())) + .collect::>(); + storage + .persist_decided_leaves( + ViewNumber::new(1), + leaf_chain + .iter() + .map(|(leaf, qc)| (leaf, CertificatePair::non_epoch_change(qc.clone()))), + None, + &consumer, + ) + .await + .unwrap(); + let outcome = storage + .process_decided_events(ViewNumber::new(1), None, &consumer, None) + .await + .unwrap(); + assert_eq!(outcome.processed, Some(ViewNumber::new(1))); + assert_eq!( + outcome + .missing_payload + .iter() + .map(|leaf| leaf.view_number().u64()) + .collect::>(), + vec![1] + ); + + // Run recovery against a mock peer serving the DA proposal. + let recovery: Arc = Arc::new(MockPayloadRecovery { + proposals: [(1, chain[1].3.clone())].into_iter().collect(), + }); + let collector = Arc::new(PayloadCollector::default()); + let metrics = NoMetrics::boxed(); + recover_missing_payloads( + recovery, + storage.clone(), + collector.clone(), + outcome.missing_payload, + metrics.create_counter("recovered".into(), None).into(), + metrics.create_counter("failures".into(), None).into(), + ) + .await; + + // The recovered DA proposal landed in consensus storage... + let stored = storage + .load_da_proposal(ViewNumber::new(1)) + .await + .unwrap() + .expect("recovered DA proposal must be persisted to consensus storage"); + assert_eq!(stored.data.encoded_transactions, payload.encode()); + + // ...and the payload reached the query-service consumer through the same event a + // late local reconstruction uses. + let delivered = collector.payloads.read().await.clone(); + assert_eq!(delivered.len(), 1, "{delivered:?}"); + assert_eq!(delivered[0].0, ViewNumber::new(1)); + assert_eq!(delivered[0].1.encode(), payload.encode()); + } + #[rstest_reuse::apply(persistence_types)] pub async fn test_pruning(_p: PhantomData

) { let tmp = P::tmp_storage().await; diff --git a/crates/espresso/node/src/persistence/fs.rs b/crates/espresso/node/src/persistence/fs.rs index 68fb8111d0e..feaf13d100a 100644 --- a/crates/espresso/node/src/persistence/fs.rs +++ b/crates/espresso/node/src/persistence/fs.rs @@ -17,7 +17,10 @@ use espresso_types::{ AuthenticatedValidatorMap, Leaf, Leaf2, NetworkConfig, Payload, PubKey, RegisteredValidatorMap, SeqTypes, StakeTableHash, traits::{EventsPersistenceRead, MembershipPersistence, StakeTuple}, - v0::traits::{EventConsumer, PersistenceOptions, SequencerPersistence}, + v0::traits::{ + DecideEventData, DecideProcessingOutcome, EventConsumer, PersistenceOptions, + SequencerPersistence, + }, v0_3::{ AuthenticatedValidator, EventKey, IndexedStake, RegisteredValidator, RewardAmount, StakeTableEvent, @@ -366,8 +369,12 @@ impl Inner { ) -> anyhow::Result<()> { let prune_view = ViewNumber::new(decided_view.saturating_sub(self.view_retention)); - self.prune_files(self.da2_dir_path(), prune_view, None, prune_intervals)?; - self.prune_files(self.vid2_dir_path(), prune_view, None, prune_intervals)?; + // DA proposals and VID shares are deliberately retained for the full retention + // window (not deleted as soon as their views are processed) so that this node — + // and its peers, via the request-response protocol — can still recover payloads + // for views that were decided before their data landed on disk. + self.prune_files(self.da2_dir_path(), prune_view, None, &[])?; + self.prune_files(self.vid2_dir_path(), prune_view, None, &[])?; self.prune_files( self.quorum_proposals2_dir_path(), prune_view, @@ -381,11 +388,18 @@ impl Inner { prune_intervals, )?; - // Save the most recent leaf as it will be our anchor point if the node restarts. + // Save the most recent *processed* leaf: it is our anchor point if the node + // restarts, and the next processing pass relies on the oldest remaining leaf + // having already been included in a previous decide event. + let keep_leaf = prune_intervals + .iter() + .map(|interval| *interval.end()) + .max() + .unwrap_or(decided_view); self.prune_files( self.decided_leaf2_path(), prune_view, - Some(decided_view), + Some(keep_leaf), prune_intervals, )?; @@ -446,13 +460,17 @@ impl Inner { /// Generate events based on persisted decided leaves. /// /// Returns a list of closed intervals of views which can be safely deleted, as all leaves - /// within these view ranges have been processed by the event consumer. + /// within these view ranges have been processed by the event consumer, along with the + /// leaves whose decide events were emitted without a block payload (so the caller can + /// recover them from peers in the background, after releasing the inner lock). async fn generate_decide_events( &mut self, view: ViewNumber, deciding_qc: Option>>, consumer: &impl EventConsumer, - ) -> anyhow::Result>> { + live: Option<&DecideEventData>, + metrics: &PersistenceMetricsValue, + ) -> anyhow::Result<(Vec>, Vec)> { // Generate a decide event for each leaf, to be processed by the event consumer. We make a // separate event for each leaf because it is possible we have non-consecutive leaves in our // storage, which would not be valid as a single decide with a single leaf chain. @@ -466,23 +484,42 @@ impl Inner { fs::read(&path).context(format!("reading decided leaf {}", path.display()))?; let (mut leaf, cert) = self.parse_decided_leaf(&bytes)?; - // Include the VID share if available. - let vid_proposal = self.load_vid_share(v)?; - if vid_proposal.is_none() { + // Include the VID share if available, preferring the in-memory copy from the + // decide event: under the new protocol the share file is written + // asynchronously, so it may not have landed on disk yet, while the decide + // event already carries the share. + let vid_share = match live.and_then(|data| data.vid_share(v)) { + Some(share) => { + metrics.decide_vid_from_memory.add(1); + Some(share.clone()) + }, + None => self.load_vid_share(v)?.map(|proposal| proposal.data), + }; + if vid_share.is_none() { tracing::debug!(?v, "VID share not available at decide"); } - let vid_share = vid_proposal.as_ref().map(|proposal| proposal.data.clone()); // Move the state cert to the finalized dir if it exists. let state_cert = self.store_finalized_state_cert(v)?; - // Fill in the full block payload using the DA proposals we had persisted. - if let Some(proposal) = self.load_da_proposal(v)? { + // Fill in the full block payload, preferring the in-memory copy from the + // decide event; fall back to the DA proposal file. + if let Some(payload) = live.and_then(|data| data.payload(v)) { + leaf.fill_block_payload_unchecked(payload.clone()); + metrics.decide_payload_from_memory.add(1); + } else if let Some(proposal) = self.load_da_proposal(v)? { let payload = Payload::from_bytes( &proposal.data.encoded_transactions, &proposal.data.metadata, ); leaf.fill_block_payload_unchecked(payload); + } else if v == ViewNumber::genesis() + || leaf.block_header().ns_table().iter().next().is_none() + { + // We don't get a DA proposal for the genesis view, but we know what the + // payload always is; the same goes for any block with an empty namespace + // table. + leaf.fill_block_payload_unchecked(Payload::empty().0); } else { tracing::debug!(?v, "DA proposal not available at decide"); } @@ -511,13 +548,34 @@ impl Inner { } } + let mut missing_payload = Vec::new(); let mut intervals = vec![]; let mut current_interval = None; for (view, (leaf, cert)) in leaves { let height = leaf.leaf.block_header().block_number(); + // Missing data is not waited for: the event is emitted as-is. A missing + // payload is reported to the caller so it can be recovered from peers in the + // background and delivered to consensus storage and the query service late, + // the same way `BlockPayloadReconstructed` events are; missing VID data is + // left to the query service's own peer fetching. + if leaf.leaf.block_payload().is_none() { + tracing::warn!(?view, "DA proposal not available at decide"); + metrics.decide_missing_payload.add(1); + missing_payload.push(leaf.leaf.clone()); + } + if leaf.vid_share.is_none() && view != ViewNumber::genesis() { + tracing::warn!(?view, "VID share not available at decide"); + metrics.decide_missing_vid.add(1); + } + let event = if leaf.leaf.block_header().version() >= versions::NEW_PROTOCOL_VERSION { - let cert2 = self.load_cert2(view)?; + // Prefer the in-memory cert2 from the decide event over the + // asynchronously-written file. + let cert2 = match live.and_then(|data| data.cert2(view)) { + Some(cert2) => Some(cert2.clone()), + None => self.load_cert2(view)?, + }; // One event per view. cert2 is only stored for the // directly finalized view // ancestors get `cert2: None`, @@ -564,7 +622,7 @@ impl Inner { intervals.push(start..=end); } - Ok(intervals) + Ok((intervals, missing_payload)) } fn load_da_proposal( @@ -825,14 +883,15 @@ impl SequencerPersistence for Persistence { view: ViewNumber, deciding_qc: Option>>, consumer: &(impl EventConsumer + 'static), - ) -> anyhow::Result> { + live: Option<&DecideEventData>, + ) -> anyhow::Result { // On error, GC does not run over the failed range, so the leaves stay on disk and are // retried; no data is lost. - let intervals = self + let (intervals, missing_payload) = self .inner .write() .await - .generate_decide_events(view, deciding_qc, consumer) + .generate_decide_events(view, deciding_qc, consumer, live, &self.metrics) .await?; // Highest view we generated an event for; unprocessed leaves stay on disk (the cursor). @@ -844,7 +903,10 @@ impl SequencerPersistence for Persistence { tracing::warn!(?view, "GC failed: {err:#}"); } - Ok(processed) + Ok(DecideProcessingOutcome { + processed, + missing_payload, + }) } async fn load_anchor_leaf( diff --git a/crates/espresso/node/src/persistence/persistence_metrics.rs b/crates/espresso/node/src/persistence/persistence_metrics.rs index 3d7f61880d4..619b9814a4c 100644 --- a/crates/espresso/node/src/persistence/persistence_metrics.rs +++ b/crates/espresso/node/src/persistence/persistence_metrics.rs @@ -1,4 +1,4 @@ -use hotshot_types::traits::metrics::{Histogram, Metrics, NoMetrics}; +use hotshot_types::traits::metrics::{Counter, Histogram, Metrics, NoMetrics}; /// Metrics for the persistence layer #[derive(Clone, Debug)] @@ -11,6 +11,20 @@ pub struct PersistenceMetricsValue { pub internal_append_da2_duration: Box, /// Time taken by the underlying storage to execute the command that appends Quorum Proposal 2 pub internal_append_quorum2_duration: Box, + /// Decide events emitted without a block payload; the leaf is reported for background + /// peer recovery, which back-fills the query service when it succeeds + pub decide_missing_payload: Box, + /// Decide events emitted without VID data; healed by the query service's peer fetching + pub decide_missing_vid: Box, + /// Block payloads filled into decide events from the in-memory decide data, without + /// touching consensus storage (may count a view more than once across retry passes) + pub decide_payload_from_memory: Box, + /// VID shares filled into decide events from the in-memory decide data, without + /// touching consensus storage (may count a view more than once across retry passes) + pub decide_vid_from_memory: Box, + /// Times decide event generation stopped at a non-consecutive leaf (a height gap in + /// consensus storage; if it persists, the decide pipeline is stalled) + pub decide_height_gaps: Box, } impl PersistenceMetricsValue { @@ -34,6 +48,14 @@ impl PersistenceMetricsValue { String::from("internal_append_quorum2_duration"), Some("seconds".to_string()), ), + decide_missing_payload: metrics + .create_counter(String::from("decide_missing_payload"), None), + decide_missing_vid: metrics.create_counter(String::from("decide_missing_vid"), None), + decide_payload_from_memory: metrics + .create_counter(String::from("decide_payload_from_memory"), None), + decide_vid_from_memory: metrics + .create_counter(String::from("decide_vid_from_memory"), None), + decide_height_gaps: metrics.create_counter(String::from("decide_height_gaps"), None), } } } diff --git a/crates/espresso/node/src/persistence/sql.rs b/crates/espresso/node/src/persistence/sql.rs index 77874f81513..6dc34cf4bf2 100644 --- a/crates/espresso/node/src/persistence/sql.rs +++ b/crates/espresso/node/src/persistence/sql.rs @@ -19,7 +19,10 @@ use espresso_types::{ NetworkConfig, Payload, PubKey, Ratio, RegisteredValidatorMap, StakeTableHash, parse_duration, parse_size, traits::{EventsPersistenceRead, MembershipPersistence, StakeTuple}, - v0::traits::{EventConsumer, PersistenceOptions, SequencerPersistence, StateCatchup}, + v0::traits::{ + DecideEventData, DecideProcessingOutcome, EventConsumer, PersistenceOptions, + SequencerPersistence, StateCatchup, + }, v0_3::{ AuthenticatedValidator, EventKey, IndexedStake, RegisteredValidator, RewardAmount, StakeTableEvent, @@ -888,10 +891,15 @@ impl Persistence { .map(|row| ViewNumber::new(row.get::("last_processed_view") as u64))) } + /// Generate decide events for all unprocessed decided leaves, recording leaves whose + /// events were emitted without a block payload in `missing_payload` (so the caller can + /// recover them from peers in the background). async fn generate_decide_events( &self, deciding_qc: Option>>, consumer: &impl EventConsumer, + live: Option<&DecideEventData>, + missing_payload: &mut Vec, ) -> anyhow::Result<()> { let mut last_processed_view: Option = self .db @@ -926,7 +934,6 @@ impl Persistence { .bind(from_view) .fetch(tx.as_mut()); let mut leaves: Vec<(Leaf2, CertificatePair)> = vec![]; - let mut final_qc = None; while let Some(row) = rows.next().await { let row = match row { Ok(row) => row, @@ -956,34 +963,51 @@ impl Persistence { if let Some(parent) = parent && height != parent + 1 { - tracing::debug!( + // A height gap means a decide event was never persisted for the + // intervening leaves (e.g. it was dropped before reaching the event + // loop). The decide pipeline cannot advance past the gap, so if this + // persists, query-service ingestion is stalled. + tracing::error!( height, parent, "ending decide event at non-consecutive leaf" ); + self.internal_metrics.decide_height_gaps.add(1); break; } parent = Some(height); let cert = CertificatePair::new(qc, next_epoch_qc); - final_qc = Some(cert.clone()); leaves.push((leaf, cert)); } drop(rows); - let Some(final_qc) = final_qc else { + if leaves.is_empty() { // End event processing when there are no more decided views. tracing::debug!(from_view, "no new leaves at decide"); return Ok(()); - }; + } - // Find the range of views encompassed by this leaf chain. All data in this range can be - // processed by the consumer and then deleted. + // Find the range of views encompassed by this leaf chain. All data in this range can + // be processed by the consumer and then garbage collected. let from_view = leaves[0].0.view_number(); let to_view = leaves[leaves.len() - 1].0.view_number(); - // Collect VID shares for the decide event. - let mut vid_shares = tx - .fetch_all( + // Data carried in memory on the decide event itself. This is the preferred source: + // under the new protocol, the staging tables below are written asynchronously, so a + // just-decided view's data may not have landed on disk yet, while the decide event + // already carries it. Storage is only the fallback for views not covered here. + let live_payload = |view: ViewNumber| live.and_then(|data| data.payload(view)); + let live_vid = |view: ViewNumber| live.and_then(|data| data.vid_share(view)); + + // Collect VID shares for the decide event, skipping the read when the in-memory + // event data already covers every view. (Any view not covered may still use the + // stored share, even where one is not required for completeness, so the gate + // must mirror the fill below exactly.) + let need_vid_query = leaves + .iter() + .any(|(leaf, _)| live_vid(leaf.view_number()).is_none()); + let mut vid_shares = if need_vid_query { + tx.fetch_all( query("SELECT view, data FROM vid_share2 where view >= $1 AND view <= $2") .bind(from_view.u64() as i64) .bind(to_view.u64() as i64), @@ -998,11 +1022,20 @@ impl Persistence { >(&data)?; Ok((view as u64, vid_proposal)) }) - .collect::>>()?; + .collect::>>()? + } else { + BTreeMap::new() + }; - // Collect DA proposals for the decide event. - let mut da_proposals = tx - .fetch_all( + // Collect DA proposals for the decide event, skipping the read when the in-memory + // event data already covers every view. (Same as above: a view not covered may + // still use the stored proposal, even where the canonical empty payload would do, + // so the gate must mirror the fill below exactly.) + let need_da_query = leaves + .iter() + .any(|(leaf, _)| live_payload(leaf.view_number()).is_none()); + let mut da_proposals = if need_da_query { + tx.fetch_all( query("SELECT view, data FROM da_proposal2 where view >= $1 AND view <= $2") .bind(from_view.u64() as i64) .bind(to_view.u64() as i64), @@ -1016,7 +1049,12 @@ impl Persistence { bincode::deserialize::>>(&data)?; Ok((view as u64, da_proposal.data)) }) - .collect::>>()?; + .collect::>>()? + } else { + BTreeMap::new() + }; + + let final_qc = leaves[leaves.len() - 1].1.clone(); // Collect state certs for the decide event. let state_certs = Self::load_state_certs(&mut tx, from_view, to_view) @@ -1029,62 +1067,88 @@ impl Persistence { ); })?; - let cert2 = tx - .fetch_optional( - query("SELECT data FROM decided_cert2 WHERE view = $1") - .bind(to_view.u64() as i64), - ) - .await? - .map(|row| { - let bytes: Vec = row.get("data"); - bincode::deserialize::>(&bytes) - .context("deserializing decided cert2") - }) - .transpose()?; + // The cert2 certifying the newest leaf, preferring the in-memory copy from the + // decide event over the asynchronously-written `decided_cert2` table. + let cert2 = match live.and_then(|data| data.cert2(to_view)) { + Some(cert2) => Some(cert2.clone()), + None => tx + .fetch_optional( + query("SELECT data FROM decided_cert2 WHERE view = $1") + .bind(to_view.u64() as i64), + ) + .await? + .map(|row| { + let bytes: Vec = row.get("data"); + bincode::deserialize::>(&bytes) + .context("deserializing decided cert2") + }) + .transpose()?, + }; drop(tx); // Collate all the information by view number and construct a chain of leaves. - let chain = leaves - .into_iter() - // Go in reverse chronological order, as expected by Decide events. - .rev() - .map(|(mut leaf, cert)| { - let view = leaf.view_number(); - - // Include the VID share if available. - let vid_proposal = vid_shares.remove(&view); - if vid_proposal.is_none() { - tracing::debug!(?view, "VID share not available at decide"); - } - let vid_share = vid_proposal.as_ref().map(|proposal| proposal.data.clone()); - - // Fill in the full block payload using the DA proposals we had persisted. - if let Some(proposal) = da_proposals.remove(&view) { - let payload = - Payload::from_bytes(&proposal.encoded_transactions, &proposal.metadata); - leaf.fill_block_payload_unchecked(payload); - } else if view == ViewNumber::genesis() { - // We don't get a DA proposal for the genesis view, but we know what the - // payload always is. - leaf.fill_block_payload_unchecked(Payload::empty().0); - } else { - tracing::debug!(?view, "DA proposal not available at decide"); - } + // Go in reverse chronological order, as expected by Decide events. + let mut chain = Vec::with_capacity(leaves.len()); + for (mut leaf, cert) in leaves.into_iter().rev() { + let view = leaf.view_number(); + + // Include the VID share if available, preferring the in-memory copy from + // the decide event over the asynchronously-written staging table. + let vid_share = match live_vid(view) { + Some(share) => { + self.internal_metrics.decide_vid_from_memory.add(1); + Some(share.clone()) + }, + None => vid_shares.remove(&view).map(|proposal| proposal.data), + }; + if vid_share.is_none() && view != ViewNumber::genesis() { + // The share never reached this node and is not recoverable here; the + // query service has to fetch the VID data from peers. + tracing::warn!(?view, "VID share not available at decide"); + self.internal_metrics.decide_missing_vid.add(1); + } - let state_cert = state_certs.get(&view).cloned(); - - let info = LeafInfo { - leaf, - vid_share, - state_cert, - // Note: the following fields are not used in Decide event processing, - // and should be removed. For now, we just default them. - state: Default::default(), - delta: Default::default(), - }; - DecidedLeaf { info, cert } - }) - .collect(); + // Fill in the full block payload, preferring the in-memory copy from the + // decide event; fall back to the DA proposal persisted in the staging + // table. + if let Some(payload) = live_payload(view) { + leaf.fill_block_payload_unchecked(payload.clone()); + self.internal_metrics.decide_payload_from_memory.add(1); + } else if let Some(proposal) = da_proposals.remove(&view) { + let payload = + Payload::from_bytes(&proposal.encoded_transactions, &proposal.metadata); + leaf.fill_block_payload_unchecked(payload); + } else if view == ViewNumber::genesis() + || leaf.block_header().ns_table().iter().next().is_none() + { + // We don't get a DA proposal for the genesis view, but we know what the + // payload always is; the same goes for any block with an empty namespace + // table. + leaf.fill_block_payload_unchecked(Payload::empty().0); + } else { + // The payload was not reconstructed before this view was decided. The + // event is emitted without it, and the leaf is reported to the caller + // so the payload can be recovered from peers in the background and + // delivered to consensus storage and the query service late, the same + // way `BlockPayloadReconstructed` events are. + tracing::warn!(?view, "DA proposal not available at decide"); + self.internal_metrics.decide_missing_payload.add(1); + missing_payload.push(leaf.clone()); + } + + let state_cert = state_certs.get(&view).cloned(); + + let info = LeafInfo { + leaf, + vid_share, + state_cert, + // Note: the following fields are not used in Decide event processing, + // and should be removed. For now, we just default them. + state: Default::default(), + delta: Default::default(), + }; + chain.push(DecidedLeaf { info, cert }); + } tracing::debug!( ?from_view, @@ -1133,18 +1197,13 @@ impl Persistence { } // Delete the data that has been fully processed. - tx.execute( - query("DELETE FROM vid_share2 where view >= $1 AND view <= $2") - .bind(from_view_i64) - .bind(to_view_i64), - ) - .await?; - tx.execute( - query("DELETE FROM da_proposal2 where view >= $1 AND view <= $2") - .bind(from_view_i64) - .bind(to_view_i64), - ) - .await?; + // + // DA proposals and VID shares are deliberately NOT deleted here: they + // are retained for the consensus storage retention window (see + // [`ConsensusPruningOptions`]) so that this node — and its peers, via + // the request-response protocol — can still recover payloads for views + // that were decided before their data landed on disk. They are cleaned + // up by [`Persistence::prune`] after each decide. tx.execute( query("DELETE FROM quorum_proposals2 where view >= $1 AND view <= $2") .bind(from_view_i64) @@ -1625,17 +1684,25 @@ impl SequencerPersistence for Persistence { view: ViewNumber, deciding_qc: Option>>, consumer: &(impl EventConsumer + 'static), - ) -> anyhow::Result> { + live: Option<&DecideEventData>, + ) -> anyhow::Result { // Generate events for the new leaves, then GC. On error `last_processed_view` is not // advanced past the failure point, so no data is lost and the range is retried. - self.generate_decide_events(deciding_qc, consumer).await?; + let mut missing_payload = Vec::new(); + self.generate_decide_events(deciding_qc, consumer, live, &mut missing_payload) + .await?; + // Events are emitted newest-first within each batch; report missing leaves oldest-first. + missing_payload.sort_by_key(|leaf| leaf.view_number()); // Best-effort GC of data not included in any decide event; runs again at the next decide. if let Err(err) = self.prune(view).await { tracing::warn!(?view, "pruning failed: {err:#}"); } - self.load_processed_view().await + Ok(DecideProcessingOutcome { + processed: self.load_processed_view().await?, + missing_payload, + }) } async fn load_latest_acted_view(&self) -> anyhow::Result> { diff --git a/crates/espresso/node/src/request_response/data_source.rs b/crates/espresso/node/src/request_response/data_source.rs index a3bd0480585..d235e1edcce 100644 --- a/crates/espresso/node/src/request_response/data_source.rs +++ b/crates/espresso/node/src/request_response/data_source.rs @@ -350,6 +350,18 @@ impl, N: ConnectedNetwork, P: SequencerP Ok(Response::RewardMerkleTreeV2(merkle_tree_bytes)) }, + Request::DaProposal(view) => { + // DA proposals are retained in consensus storage for the retention window + // so that peers which decided a view before obtaining its payload can + // recover it from us (see `generate_decide_events`). + let proposal = self + .persistence + .load_da_proposal(ViewNumber::new(*view)) + .await + .with_context(|| "failed to load DA proposal from persistence")? + .with_context(|| format!("no DA proposal available for view {view}"))?; + Ok(Response::DaProposal(Box::new(proposal))) + }, } } } diff --git a/crates/espresso/node/src/request_response/mod.rs b/crates/espresso/node/src/request_response/mod.rs index 374ede2e673..d6dca0378fd 100644 --- a/crates/espresso/node/src/request_response/mod.rs +++ b/crates/espresso/node/src/request_response/mod.rs @@ -16,6 +16,7 @@ use tokio::sync::mpsc::Receiver; pub mod catchup; pub mod data_source; pub mod network; +pub mod payload_recovery; pub mod recipient_source; pub mod request; diff --git a/crates/espresso/node/src/request_response/payload_recovery.rs b/crates/espresso/node/src/request_response/payload_recovery.rs new file mode 100644 index 00000000000..21fd4a309ca --- /dev/null +++ b/crates/espresso/node/src/request_response/payload_recovery.rs @@ -0,0 +1,167 @@ +//! Peer-based recovery of block payloads for the decide pipeline. +//! +//! Under the new protocol a node can decide a view without ever obtaining its payload: +//! payloads are reconstructed from VID shares carried by Vote1 broadcasts, and a node +//! whose vote is not needed for quorum (or that was restarted mid-view) may miss them +//! entirely. When the decide processor emits an event with the payload still missing, a +//! background task uses [`PayloadRecovery`] to fetch the DA proposal from peers — who +//! retain DA proposals for their consensus storage retention window — verifies the +//! payload against the block header's payload commitment, and delivers it to consensus +//! storage and the query service. + +use std::time::Duration; + +use anyhow::{Context, bail, ensure}; +use async_trait::async_trait; +use espresso_types::{ + Leaf2, PubKey, SeqTypes, + v0::traits::{DecidePayloadRecovery, SequencerPersistence}, +}; +use hotshot::traits::NodeImplementation; +use hotshot_types::{ + data::{DaProposal2, VidCommitment, vid_commitment, vid_disperse::vid_total_weight}, + epoch_membership::EpochMembershipCoordinator, + message::Proposal, + traits::{EncodeBytes, network::ConnectedNetwork}, +}; +use request_response::RequestType; +use tokio::time::timeout; + +use super::{ + RequestResponseProtocol, + request::{Request, Response}, +}; + +/// How long to wait for a single payload-recovery request before giving up. The caller +/// retries a bounded number of times (see `PAYLOAD_RECOVERY_ATTEMPTS` in the decide +/// processor) before leaving the gap to the query service's own fetching. +const RECOVERY_TIMEOUT: Duration = Duration::from_secs(15); + +/// Fetches DA proposals (block payloads) from peers over the request-response protocol +/// for views that were decided before this node obtained their payload. Responses are +/// verified against the block header's payload commitment, recomputing the VID commitment +/// with the same parameters the disperser used. +pub struct PayloadRecovery +where + I: NodeImplementation, + N: ConnectedNetwork, + P: SequencerPersistence, +{ + protocol: RequestResponseProtocol, + membership: EpochMembershipCoordinator, + epoch_height: u64, +} + +impl PayloadRecovery +where + I: NodeImplementation, + N: ConnectedNetwork, + P: SequencerPersistence, +{ + pub fn new( + protocol: RequestResponseProtocol, + membership: EpochMembershipCoordinator, + epoch_height: u64, + ) -> Self { + Self { + protocol, + membership, + epoch_height, + } + } +} + +impl std::fmt::Debug for PayloadRecovery +where + I: NodeImplementation, + N: ConnectedNetwork, + P: SequencerPersistence, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PayloadRecovery") + .field("epoch_height", &self.epoch_height) + .finish_non_exhaustive() + } +} + +#[async_trait] +impl DecidePayloadRecovery for PayloadRecovery +where + I: NodeImplementation, + N: ConnectedNetwork, + P: SequencerPersistence, +{ + async fn recover_payload( + &self, + leaf: &Leaf2, + ) -> anyhow::Result>>> { + let header = leaf.block_header(); + let expected = header.payload_commitment(); + // Recovery is only supported for new-protocol (V2) commitments; older versions + // received payloads via DA proposal broadcast before voting, so they don't hit + // the missing-payload path in practice. + if !matches!(expected, VidCommitment::V2(_)) { + return Ok(None); + } + let view = leaf.view_number(); + + // Derive the VID parameters exactly as the disperser did — from the leaf epoch's + // stake table — so the recomputed commitment matches. + let epoch = leaf.epoch(self.epoch_height); + let total_weight = vid_total_weight::( + self.membership + .stake_table_for_epoch(epoch) + .map_err(|err| { + anyhow::anyhow!("failed to get stake table for epoch {epoch:?}: {err:#}") + })? + .stake_table(), + epoch, + ); + + let version = header.version(); + let ns_table = header.ns_table().clone(); + + let result = timeout( + RECOVERY_TIMEOUT, + self.protocol.request_indefinitely( + Request::DaProposal(view.u64()), + RequestType::Batched, + move |_req, response| { + let ns_table = ns_table.clone(); + async move { + let Response::DaProposal(proposal) = response else { + bail!("unexpected response type"); + }; + ensure!( + proposal.data.view_number == view, + "DA proposal response for wrong view" + ); + ensure!( + proposal.data.metadata == ns_table, + "namespace table mismatch in DA proposal response" + ); + let computed = vid_commitment( + &proposal.data.encoded_transactions, + &proposal.data.metadata.encode(), + total_weight, + version, + ); + ensure!( + computed == expected, + "payload commitment mismatch in DA proposal response" + ); + Ok(*proposal) + } + }, + ), + ) + .await; + + match result { + Ok(Ok(proposal)) => Ok(Some(proposal)), + Ok(Err(err)) => Err(err).context("payload recovery request failed"), + // Timed out waiting for a valid response; the caller may retry later. + Err(_) => Ok(None), + } + } +} diff --git a/crates/espresso/node/src/request_response/request.rs b/crates/espresso/node/src/request_response/request.rs index 348258dad27..0df025cfd79 100644 --- a/crates/espresso/node/src/request_response/request.rs +++ b/crates/espresso/node/src/request_response/request.rs @@ -5,7 +5,11 @@ use espresso_types::{ v0_3::{ChainConfig, RewardAccountV1, RewardMerkleTreeV1}, v0_4::{RewardAccountV2, RewardMerkleTreeV2}, }; -use hotshot_types::{data::VidShare, simple_certificate::LightClientStateUpdateCertificateV2}; +use hotshot_types::{ + data::{DaProposal2, VidShare}, + message::Proposal, + simple_certificate::LightClientStateUpdateCertificateV2, +}; use request_response::{Serializable, request::Request as RequestTrait}; use serde::{Deserialize, Serialize}; @@ -40,6 +44,11 @@ pub enum Request { RewardMerkleTreeV2(u64, ViewNumber), /// A request for the cert2 at or above the given height Cert2(Height), + /// A request for the DA proposal (block payload) at the given view, used to recover + /// payloads for views that were decided before this node obtained their payload. DA + /// proposals are retained in consensus storage for the retention window precisely so + /// they can be served here. + DaProposal(ViewNumber), } /// The outermost response type. This an enum that contains all the possible responses that the @@ -66,6 +75,10 @@ pub enum Response { RewardMerkleTreeV2(Vec), /// A response with the earliest cert2 (fast finality protocol) Cert2(Certificate2), + /// A response with the DA proposal for a view. The signature is not meaningful to the + /// requester; the payload must be verified against the block header's payload + /// commitment instead. + DaProposal(Box>>), } /// Implement the `RequestTrait` trait for the `Request` type. This tells the request response diff --git a/crates/espresso/types/src/v0/traits.rs b/crates/espresso/types/src/v0/traits.rs index 79ad4019c8b..8ce85de941b 100644 --- a/crates/espresso/types/src/v0/traits.rs +++ b/crates/espresso/types/src/v0/traits.rs @@ -42,7 +42,7 @@ use super::{ }; use crate::{ AuthenticatedValidatorMap, BlockMerkleTree, FeeAccount, FeeAccountProof, FeeMerkleCommitment, - Leaf2, NetworkConfig, PubKey, SeqTypes, + Leaf2, NetworkConfig, Payload, PubKey, SeqTypes, v0::impls::{StakeTableHash, ValidatedState}, v0_3::{ ChainConfig, RegisteredValidator, RewardAccountProofV1, RewardAccountV1, RewardAmount, @@ -789,17 +789,20 @@ pub trait SequencerPersistence: } /// Decode a consensus decide event and persist its leaves, for the consensus event loop. - /// Returns `Some((decided_view, deciding_qc))` on a decide so the caller can wake a background - /// task to run [`process_decided_events`](Self::process_decided_events); `None` otherwise. + /// Returns a [`PendingDecide`] on a decide so the caller can wake a background task to run + /// [`process_decided_events`](Self::process_decided_events); `None` otherwise. /// /// This is the persist-only half of a decide: query-service ingestion and GC are deferred to - /// [`process_decided_events`](Self::process_decided_events). Tests that want the synchronous - /// persist-then-process behavior use [`append_decided_leaves`](Self::append_decided_leaves). + /// [`process_decided_events`](Self::process_decided_events). The returned [`PendingDecide`] + /// carries the in-memory payload/VID/cert2 data from the event, so the processor can emit + /// complete decide events without waiting for consensus' asynchronous storage writes to land. + /// Tests that want the synchronous persist-then-process behavior use + /// [`append_decided_leaves`](Self::append_decided_leaves). async fn persist_event( &self, event: &CoordinatorEvent, consumer: &(impl EventConsumer + 'static), - ) -> Option<(ViewNumber, Option>>)> { + ) -> Option { match event { CoordinatorEvent::LegacyEvent(hotshot_event) => { let EventType::Decide { @@ -831,10 +834,16 @@ pub trait SequencerPersistence: ); return None; } - Some((decided_view, deciding_qc.clone())) + Some(PendingDecide { + view: decided_view, + deciding_qc: deciding_qc.clone(), + data: Arc::new(DecideEventData::new(leaf_chain.iter(), None)), + }) }, CoordinatorEvent::NewDecide { - leaf_infos, cert1, .. + leaf_infos, + cert1, + cert2, } => { let first = leaf_infos.first()?; let decided_view = first.leaf.view_number(); @@ -861,7 +870,15 @@ pub trait SequencerPersistence: ); return None; } - Some((decided_view, None)) + Some(PendingDecide { + view: decided_view, + deciding_qc: None, + data: Arc::new(DecideEventData::new( + leaf_infos.iter(), + // `cert2` certifies the newest decided leaf. + cert2.clone().map(|cert2| (decided_view, cert2)), + )), + }) }, _ => None, } @@ -906,8 +923,9 @@ pub trait SequencerPersistence: self.persist_decided_leaves(decided_view, leaf_chain, deciding_qc.clone(), consumer) .await?; // Leaves are persisted; processing failures are non-fatal here and retried in production. + // No in-memory event data is passed, so this form always exercises the storage path. if let Err(err) = self - .process_decided_events(decided_view, deciding_qc, consumer) + .process_decided_events(decided_view, deciding_qc, consumer, None) .await { tracing::warn!(?decided_view, "decide event processing failed: {err:#}"); @@ -931,19 +949,37 @@ pub trait SequencerPersistence: /// Cursor-driven (e.g. `last_processed_view`): advances only on success, so it may lag /// consensus without losing data. /// - /// Returns the highest view confirmed processed (the cursor), or `None` if nothing was - /// processed, so the caller can track real progress. Errors are propagated; the failed range - /// is retried on the next call. + /// `live` carries the payload/VID/cert2 data from the in-memory decide event. It is the + /// preferred source when building the events: under the new protocol, consensus writes this + /// data to storage asynchronously, so a just-decided view's data may not have landed on disk + /// yet, while the decide event already carries it. Storage is the fallback for views not + /// covered (restart replay, signals coalesced under processor lag, decides that never had + /// the data). /// - /// Default returns `Some(decided_view)`: backends with no replayable storage (e.g. `NoStorage`) - /// forward events synchronously in `persist_decided_leaves` and are always caught up here. + /// Events are never deferred waiting for missing data: a leaf whose payload is in neither + /// `live` nor storage is emitted without it, and reported in the returned outcome so the + /// caller can heal the gap asynchronously — by recovering the payload from peers and + /// delivering it to consensus storage and the query service the same way late + /// `BlockPayloadReconstructed` events are. + /// + /// Returns a [`DecideProcessingOutcome`] carrying the highest view confirmed processed (the + /// cursor; `None` if nothing was processed) and the leaves emitted without payloads. Errors + /// are propagated; the failed range is retried on the next call. + /// + /// Default returns `Some(decided_view)` with no missing payloads: backends with no replayable + /// storage (e.g. `NoStorage`) forward events synchronously in `persist_decided_leaves` and are + /// always caught up here. async fn process_decided_events( &self, decided_view: ViewNumber, _deciding_qc: Option>>, _consumer: &(impl EventConsumer + 'static), - ) -> anyhow::Result> { - Ok(Some(decided_view)) + _live: Option<&DecideEventData>, + ) -> anyhow::Result { + Ok(DecideProcessingOutcome { + processed: Some(decided_view), + missing_payload: vec![], + }) } async fn load_anchor_leaf( @@ -1086,6 +1122,111 @@ pub trait EventConsumer: Debug + Send + Sync { async fn handle_event(&self, event: &CoordinatorEvent) -> anyhow::Result<()>; } +/// Outcome of a decide processing pass +/// ([`process_decided_events`](SequencerPersistence::process_decided_events)). +#[derive(Debug, Default)] +pub struct DecideProcessingOutcome { + /// Highest view confirmed processed (the cursor), or `None` if nothing was processed. + pub processed: Option, + /// Leaves whose decide events were emitted without a block payload, in view order. + /// Candidates for background payload recovery from peers. + pub missing_payload: Vec, +} + +/// Recover a missing block payload for a decided leaf from an external source. +/// +/// Under the new protocol a node can decide a view without ever obtaining its payload +/// (e.g. it was not needed for quorum and missed the share-carrying Vote1 broadcasts). +/// When [`process_decided_events`](SequencerPersistence::process_decided_events) reports +/// leaves emitted without payloads, a background task uses this hook to fetch them from +/// peers — who retain DA proposals for the consensus storage retention window — and then +/// delivers them to consensus storage and the query service. +#[async_trait] +pub trait DecidePayloadRecovery: Debug + Send + Sync { + /// Try to fetch the DA proposal (block payload) for `leaf`. Implementations MUST + /// verify the returned payload against the leaf's payload commitment; a `Some` result + /// is trusted by the caller. Returns `Ok(None)` if the payload could not be recovered + /// (the attempt may be retried later). + async fn recover_payload( + &self, + leaf: &Leaf2, + ) -> anyhow::Result>>>; +} + +/// Payload, VID, and cert2 data captured in memory from a decide event, keyed by view. +/// +/// Under the new protocol, consensus writes DA proposals, VID shares, and cert2s to storage +/// asynchronously, off the critical path, so a view can be decided before its data lands on +/// disk. The decide event itself already carries this data, though: the decided leaves come +/// with their payloads filled in and their VID shares attached. Capturing it here lets +/// [`process_decided_events`](SequencerPersistence::process_decided_events) build complete +/// query-service decide events without reading — and racing — the consensus staging tables. +/// Storage remains the fallback for views not covered (restart replay, signals coalesced +/// under processor lag, decides that never had the data in the first place). +#[derive(Clone, Debug, Default)] +pub struct DecideEventData { + /// Block payloads from the decided leaves. + payloads: BTreeMap, + /// VID shares attached to the decide event. + vid_shares: BTreeMap>, + /// cert2s certifying decided leaves, keyed by the view they certify. + cert2s: BTreeMap>, +} + +impl DecideEventData { + /// Capture the in-memory data from a decide event's leaf chain. `cert2`, when present, + /// is keyed by the view it certifies (the newest decided view). + pub fn new<'a>( + leaf_infos: impl IntoIterator>, + cert2: Option<(ViewNumber, Certificate2)>, + ) -> Self { + let mut payloads = BTreeMap::new(); + let mut vid_shares = BTreeMap::new(); + for info in leaf_infos { + let view = info.leaf.view_number(); + if let Some(payload) = info.leaf.block_payload() { + payloads.insert(view, payload); + } + if let Some(share) = &info.vid_share { + vid_shares.insert(view, share.clone()); + } + } + Self { + payloads, + vid_shares, + cert2s: cert2.into_iter().collect(), + } + } + + /// The block payload of the leaf decided at `view`, if the decide event carried it. + pub fn payload(&self, view: ViewNumber) -> Option<&Payload> { + self.payloads.get(&view) + } + + /// This node's VID share for `view`, if the decide event carried it. + pub fn vid_share(&self, view: ViewNumber) -> Option<&VidDisperseShare> { + self.vid_shares.get(&view) + } + + /// The cert2 certifying the leaf decided at `view`, if the decide event carried it. + pub fn cert2(&self, view: ViewNumber) -> Option<&Certificate2> { + self.cert2s.get(&view) + } +} + +/// A decide persisted by [`persist_event`](SequencerPersistence::persist_event) and pending +/// background processing. +#[derive(Clone, Debug)] +pub struct PendingDecide { + /// The newest decided view. + pub view: ViewNumber, + /// The QC deciding `view` (legacy epoch decides only). + pub deciding_qc: Option>>, + /// In-memory data from the decide event, for live query-service ingestion. Shared via + /// `Arc` so cloning the signal (e.g. out of a `watch` channel) stays cheap. + pub data: Arc, +} + #[async_trait] impl EventConsumer for Box where diff --git a/crates/hotshot/new-protocol/src/coordinator.rs b/crates/hotshot/new-protocol/src/coordinator.rs index f41f92617dc..03db9662134 100644 --- a/crates/hotshot/new-protocol/src/coordinator.rs +++ b/crates/hotshot/new-protocol/src/coordinator.rs @@ -467,16 +467,18 @@ where out.view, out.epoch, out.payload.clone(), - out.metadata.clone(), + out.metadata, VidCommitment::V2(out.payload_commitment), ); - if let Some(proposal) = self.consensus.proposal_at(out.view) { - self.outbox.push_back(ConsensusOutput::BlockPayloadReconstructed { - view: out.view, - header: proposal.block_header.clone(), - payload: out.payload, - }); - } + // Notify downstream consumers (e.g. the query service) of the + // reconstructed payload. The header is carried through the + // reconstructor, so this works even if the proposal has already + // been garbage collected from consensus state. + self.outbox.push_back(ConsensusOutput::BlockPayloadReconstructed { + view: out.view, + header: out.header, + payload: out.payload, + }); return Ok(ConsensusInput::BlockReconstructed(out.view, out.payload_commitment)) } Err(()) => { @@ -1053,15 +1055,9 @@ where self.storage .append_proposal(validated.message.proposal.data.clone()); - let m = validated - .message - .proposal - .data - .block_header - .metadata() - .clone(); + let header = validated.message.proposal.data.block_header.clone(); self.vid_reconstructor - .handle_vid_share(vid_share.clone(), m); + .handle_vid_share(vid_share.clone(), header); // GC for the cache let view = validated.message.proposal.data.view_number(); diff --git a/crates/hotshot/new-protocol/src/storage.rs b/crates/hotshot/new-protocol/src/storage.rs index 56bc351a3ab..57c9d87f52a 100644 --- a/crates/hotshot/new-protocol/src/storage.rs +++ b/crates/hotshot/new-protocol/src/storage.rs @@ -20,6 +20,21 @@ use crate::message::{Certificate2, Proposal}; const RETRY_DELAY: Duration = Duration::from_millis(300); +/// Maximum number of attempts for a storage write before giving up. Together with +/// [`RETRY_DELAY`] this bounds the lifetime of a persistently failing write task to ~30s. +const MAX_APPEND_ATTEMPTS: usize = 100; + +/// How many views below the GC view in-flight storage writes are allowed to keep running. +/// +/// Writes for just-decided views must be allowed to complete: the decide pipeline normally +/// builds query-service decide events from the in-memory decide data, but falls back to +/// reading this data from disk (restart replay, coalesced signals), and peers fetch it for +/// their own recovery — so aborting writes right at the decide would lose data that was +/// still in flight (e.g. a VID reconstruction that finished just before its view was +/// decided). Aborting below the horizon is only a backstop against leaking stuck tasks; +/// bounded retries terminate them anyway. +const GC_ABORT_HORIZON: u64 = 100; + /// New protocol storage extension for data that is not part of the legacy HotShot storage trait. #[async_trait] pub trait NewProtocolStorage: StorageTrait { @@ -51,9 +66,12 @@ impl> Storage { error!("failed to sign VID share for storage"); return; }; - loop { + for attempt in 1..=MAX_APPEND_ATTEMPTS { match storage.append_vid(&proposal).await { Ok(()) => return, + Err(err) if attempt == MAX_APPEND_ATTEMPTS => { + error!(%err, "failed to append VID share after {MAX_APPEND_ATTEMPTS} attempts, giving up"); + }, Err(err) => { warn!(%err, "failed to append VID share, retrying"); sleep(RETRY_DELAY).await; @@ -91,9 +109,12 @@ impl> Storage { signature, _pd: PhantomData, }; - loop { + for attempt in 1..=MAX_APPEND_ATTEMPTS { match storage.append_da2(&proposal, vid_commit).await { Ok(()) => return, + Err(err) if attempt == MAX_APPEND_ATTEMPTS => { + error!(%err, "failed to append DA proposal after {MAX_APPEND_ATTEMPTS} attempts, giving up"); + }, Err(err) => { warn!(%err, "failed to append DA proposal, retrying"); sleep(RETRY_DELAY).await; @@ -107,9 +128,12 @@ impl> Storage { pub fn append_cert2(&mut self, view: ViewNumber, cert2: Certificate2) { let storage = self.storage.clone(); let handle = spawn(async move { - loop { + for attempt in 1..=MAX_APPEND_ATTEMPTS { match storage.append_cert2(view, cert2.clone()).await { Ok(()) => return, + Err(err) if attempt == MAX_APPEND_ATTEMPTS => { + error!(%err, %view, "failed to append cert2 after {MAX_APPEND_ATTEMPTS} attempts, giving up"); + }, Err(err) => { warn!(%err, %view, "failed to append cert2, retrying"); sleep(RETRY_DELAY).await; @@ -127,9 +151,12 @@ impl> Storage { ) { let storage = self.storage.clone(); let handle = spawn(async move { - loop { + for attempt in 1..=MAX_APPEND_ATTEMPTS { match storage.update_state_cert(state_cert.clone()).await { Ok(()) => return, + Err(err) if attempt == MAX_APPEND_ATTEMPTS => { + error!(%err, epoch = %state_cert.epoch, "failed to append state cert after {MAX_APPEND_ATTEMPTS} attempts, giving up"); + }, Err(err) => { warn!(%err, epoch = %state_cert.epoch, "failed to append state cert, retrying"); sleep(RETRY_DELAY).await; @@ -169,9 +196,12 @@ impl> Storage { signature, _pd: PhantomData, }; - loop { + for attempt in 1..=MAX_APPEND_ATTEMPTS { match storage.append_proposal_wrapper(&signed).await { Ok(()) => return, + Err(err) if attempt == MAX_APPEND_ATTEMPTS => { + error!(%err, "failed to append proposal after {MAX_APPEND_ATTEMPTS} attempts, giving up"); + }, Err(err) => { warn!(%err, "failed to append proposal, retrying"); sleep(RETRY_DELAY).await; @@ -183,7 +213,17 @@ impl> Storage { } pub fn gc(&mut self, view_number: ViewNumber) { - let keep = self.handles.split_off(&view_number); + // Reap tasks that have already completed. + self.handles.retain(|_, handles| { + handles.retain(|handle| !handle.is_finished()); + !handles.is_empty() + }); + + // Abort only tasks far below the GC view, as a backstop against leaks. Writes for + // recently decided views are left running: the decide pipeline still needs to read + // that data back from disk to build query-service decide events. + let horizon = ViewNumber::new(view_number.saturating_sub(GC_ABORT_HORIZON)); + let keep = self.handles.split_off(&horizon); for handles in self.handles.values() { for handle in handles { handle.abort(); diff --git a/crates/hotshot/new-protocol/src/tests/vid.rs b/crates/hotshot/new-protocol/src/tests/vid.rs index b04a068f80d..feac009bc4d 100644 --- a/crates/hotshot/new-protocol/src/tests/vid.rs +++ b/crates/hotshot/new-protocol/src/tests/vid.rs @@ -3,7 +3,7 @@ use hotshot_example_types::node_types::TestTypes; use hotshot_types::traits::signature_key::SignatureKey; use super::common::utils::TestData; -use crate::vid::VidReconstructor; +use crate::vid::{RECONSTRUCT_KEEP_HORIZON, VidReconstructor}; /// Threshold for SuccessThreshold with 10 nodes of stake 1: (10*2)/3 + 1 = 7. const THRESHOLD: u64 = 7; @@ -17,7 +17,7 @@ async fn test_no_duplicate_reconstruction_after_threshold() { let view = &test_data.views[0]; let mut reconstructor = VidReconstructor::::new(); - // Feed the proposal share first (carries metadata required for reconstruction). + // Feed the proposal share first (carries the header required for reconstruction). let proposal_key = BLSPubKey::generated_from_seed_indexed([0u8; 32], 0).0; let proposal_share = view .vid_shares @@ -25,7 +25,7 @@ async fn test_no_duplicate_reconstruction_after_threshold() { .find(|s| s.recipient_key == proposal_key) .unwrap() .clone(); - reconstructor.handle_vid_share(proposal_share, view.proposal.data.block_header.metadata); + reconstructor.handle_vid_share(proposal_share, view.proposal.data.block_header.clone()); // Feed remaining shares from other nodes — enough to exceed the threshold. for i in 1..view.vid_shares.len() as u64 { @@ -85,7 +85,7 @@ async fn test_mark_reconstructed_skips_reconstruction() { .find(|s| s.recipient_key == proposal_key) .unwrap() .clone(); - reconstructor.handle_vid_share(proposal_share, view.proposal.data.block_header.metadata); + reconstructor.handle_vid_share(proposal_share, view.proposal.data.block_header.clone()); for i in 1..view.vid_shares.len() as u64 { let key = BLSPubKey::generated_from_seed_indexed([0u8; 32], i).0; let share = view @@ -114,6 +114,45 @@ async fn test_mark_reconstructed_skips_reconstruction() { } } +/// GC within the keep horizon must not abort an in-flight reconstruction: GC runs when +/// views are decided, and the decided views' payloads are exactly what the decide +/// pipeline still needs (e.g. a multi-leaf decide after a timeout). +#[tokio::test] +async fn test_gc_keeps_recent_reconstructions() { + let test_data = TestData::new(1).await; + let view = &test_data.views[0]; + let mut reconstructor = VidReconstructor::::new(); + + // Feed threshold shares so a reconstruction task is in flight. + let proposal_key = BLSPubKey::generated_from_seed_indexed([0u8; 32], 0).0; + let proposal_share = view + .vid_shares + .iter() + .find(|s| s.recipient_key == proposal_key) + .unwrap() + .clone(); + reconstructor.handle_vid_share(proposal_share, view.proposal.data.block_header.clone()); + for i in 1..THRESHOLD { + let key = BLSPubKey::generated_from_seed_indexed([0u8; 32], i).0; + let share = view + .vid_shares + .iter() + .find(|s| s.recipient_key == key) + .unwrap() + .clone(); + reconstructor.handle_vid_share(share, None); + } + + // GC at the edge of the keep horizon: the in-flight reconstruction for this view must + // survive and still produce a result. + reconstructor.gc(view.view_number + RECONSTRUCT_KEEP_HORIZON); + let result = tokio::time::timeout(std::time::Duration::from_secs(5), reconstructor.next()) + .await + .expect("reconstruction should complete despite GC within the keep horizon") + .expect("should produce a reconstruction result"); + assert!(result.is_ok(), "reconstruction should succeed"); +} + /// Shares arriving after reconstruction has already completed for a view /// should be silently dropped (the `reconstructed` set guards this path). #[tokio::test] @@ -122,7 +161,7 @@ async fn test_shares_after_reconstruction_are_ignored() { let view = &test_data.views[0]; let mut reconstructor = VidReconstructor::::new(); - // Feed exactly threshold shares (with metadata on the first). + // Feed exactly threshold shares (with the header on the first). let first_key = BLSPubKey::generated_from_seed_indexed([0u8; 32], 0).0; let first_share = view .vid_shares @@ -130,7 +169,7 @@ async fn test_shares_after_reconstruction_are_ignored() { .find(|s| s.recipient_key == first_key) .unwrap() .clone(); - reconstructor.handle_vid_share(first_share, view.proposal.data.block_header.metadata); + reconstructor.handle_vid_share(first_share, view.proposal.data.block_header.clone()); for i in 1..THRESHOLD { let key = BLSPubKey::generated_from_seed_indexed([0u8; 32], i).0; diff --git a/crates/hotshot/new-protocol/src/vid.rs b/crates/hotshot/new-protocol/src/vid.rs index 6e81d670583..a72cab1cda7 100644 --- a/crates/hotshot/new-protocol/src/vid.rs +++ b/crates/hotshot/new-protocol/src/vid.rs @@ -5,7 +5,7 @@ use hotshot::traits::BlockPayload; use hotshot_types::{ data::{EpochNumber, VidCommitment2, VidDisperse2, VidDisperseShare2, ViewNumber}, epoch_membership::EpochMembershipCoordinator, - traits::node_implementation::NodeType, + traits::{block_contents::BlockHeader, node_implementation::NodeType}, vid::avidm_gf2::{AvidmGf2Common, AvidmGf2Scheme, AvidmGf2Share}, }; use tokio::task::{AbortHandle, JoinSet}; @@ -22,6 +22,10 @@ pub struct VidReconstructOutput { pub payload_commitment: VidCommitment2, pub payload: T::BlockPayload, pub metadata: >::Metadata, + /// Header of the block this payload belongs to, captured from the proposal. Carried + /// through reconstruction so consumers don't depend on the proposal still being in + /// consensus state (it may have been garbage collected by the time we finish). + pub header: T::BlockHeader, pub tx_commitments: Vec>, } @@ -107,7 +111,9 @@ pub(crate) struct VidShareAccumulator { accumulated_weight: usize, seen_keys: HashSet, common: AvidmGf2Common, - metadata: Option<>::Metadata>, + /// Block header from the proposal for this view. Required for reconstruction (it + /// provides the payload metadata) and carried into the output for consumers. + header: Option, epoch: Option, } @@ -117,6 +123,11 @@ impl VidShareAccumulator { } } +/// Number of views below the GC view for which in-flight reconstructions and share +/// accumulators are kept alive, so that payloads for just-decided views can still be +/// reconstructed and delivered to the decide pipeline / query service. +pub(crate) const RECONSTRUCT_KEEP_HORIZON: u64 = 5; + #[derive(Default)] pub struct VidReconstructor { accumulators: BTreeMap>, @@ -135,9 +146,9 @@ impl VidReconstructor { } } - pub(crate) fn handle_vid_share(&mut self, share: VidDisperseShare2, metadata: M) + pub(crate) fn handle_vid_share(&mut self, share: VidDisperseShare2, header: H) where - M: Into>::Metadata>>, + H: Into>, { let view = share.view_number; if self.reconstructed.contains(&view) { @@ -146,7 +157,7 @@ impl VidReconstructor { let payload_commitment = share.payload_commitment; let recipient_key = share.recipient_key.clone(); let weight = share.share.weight(); - let metadata = metadata.into(); + let header = header.into(); let share_epoch = share.epoch; let accumulator = self .accumulators @@ -156,13 +167,13 @@ impl VidReconstructor { accumulated_weight: 0, seen_keys: HashSet::new(), common: share.common.clone(), - metadata: None, + header: None, epoch: share_epoch, }); - if accumulator.metadata.is_none() - && let Some(m) = metadata + if accumulator.header.is_none() + && let Some(h) = header { - accumulator.metadata = Some(m) + accumulator.header = Some(h) } if accumulator.seen_keys.insert(recipient_key) { accumulator.accumulated_weight += weight; @@ -201,8 +212,9 @@ impl VidReconstructor { }; let shares = accumulator.shares.clone(); let common = accumulator.common.clone(); - // Metadata comes from when we get the proposal, otherwise we can't reconstruct the payload - let Some(metadata) = accumulator.metadata.clone() else { + // The header comes from the proposal; without it we have no payload metadata and + // can't reconstruct the payload. + let Some(header) = accumulator.header.clone() else { return; }; let epoch = accumulator.epoch.unwrap_or(EpochNumber::genesis()); @@ -211,6 +223,7 @@ impl VidReconstructor { // TODO: Handle error return Err(()); }; + let metadata = header.metadata().clone(); let payload = T::BlockPayload::from_bytes(&result, &metadata); let tx_commitments = payload.transaction_commitments(&metadata); Ok(VidReconstructOutput { @@ -219,6 +232,7 @@ impl VidReconstructor { payload_commitment, payload, metadata, + header, tx_commitments, }) }); @@ -226,12 +240,22 @@ impl VidReconstructor { } pub fn gc(&mut self, view_number: ViewNumber) { - let keep = self.calculations.split_off(&view_number); + // GC runs when views are decided, but the decided views' payloads are exactly what + // the decide pipeline still needs: a multi-leaf decide (e.g. after a timeout) + // would otherwise abort the reconstructions for the older leaves in the batch and + // lose their payloads. Keep a small horizon of views alive below the GC view; far + // below it, accumulators can no longer make progress anyway (Vote1 messages + // carrying shares stop arriving once the network moves on). + let horizon = ViewNumber::new(view_number.saturating_sub(RECONSTRUCT_KEEP_HORIZON)); + let keep = self.calculations.split_off(&horizon); for handle in self.calculations.values_mut() { handle.abort(); } self.calculations = keep; - self.accumulators = self.accumulators.split_off(&view_number); + self.accumulators = self.accumulators.split_off(&horizon); + // Forget completed views below the horizon; their accumulators are gone, so late + // shares can no longer trigger duplicate reconstructions. + self.reconstructed = self.reconstructed.split_off(&horizon); } /// Mark `view` as already-reconstructed: drop accumulated shares, abort any