diff --git a/crates/core/src/tracker/analysis.rs b/crates/core/src/tracker/analysis.rs new file mode 100644 index 00000000..dc995699 --- /dev/null +++ b/crates/core/src/tracker/analysis.rs @@ -0,0 +1,1480 @@ +//! Pure analysis functions for tracker duty failure detection and peer +//! participation accounting. + +use std::{ + collections::{HashMap, HashSet}, + sync::OnceLock, +}; + +use pluto_eth2api::EthBeaconNodeApiClientError; +use pluto_featureset::{Feature, GLOBAL_STATE}; + +use crate::{ + tracker::{ + Event, StepError, + reason::{ + REASON_BROADCAST_BN_ERROR, REASON_BUG_AGGREGATION_ERROR, REASON_BUG_DUTY_DB_ERROR, + REASON_BUG_FETCH_ERROR, REASON_BUG_PAR_SIG_DB_EXTERNAL, + REASON_BUG_PAR_SIG_DB_INCONSISTENT, REASON_BUG_PAR_SIG_DB_INTERNAL, REASON_BUG_SIG_AGG, + REASON_FAILED_AGGREGATOR_SELECTION, REASON_FAILED_PROPOSER_RANDAO, + REASON_FETCH_BN_ERROR, REASON_INSUFFICIENT_AGGREGATOR_SELECTIONS, + REASON_INSUFFICIENT_PEER_SIGNATURES, REASON_MISSING_AGGREGATOR_ATTESTATION, + REASON_NO_AGGREGATOR_SELECTIONS, REASON_NO_CONSENSUS, REASON_NO_LOCAL_VC_SIGNATURE, + REASON_NO_PEER_SIGNATURES, REASON_NOT_INCLUDED_ON_CHAIN, + REASON_PAR_SIG_DB_INCONSISTENT_SYNC, REASON_PROPOSER_INSUFFICIENT_RANDAOS, + REASON_PROPOSER_NO_EXTERNAL_RANDAOS, REASON_PROPOSER_ZERO_RANDAOS, + REASON_SYNC_CONTRIBUTION_FAILED_PREPARE, REASON_SYNC_CONTRIBUTION_FEW_PREPARES, + REASON_SYNC_CONTRIBUTION_NO_EXTERNAL_PREPARES, REASON_SYNC_CONTRIBUTION_NO_SYNC_MSG, + REASON_SYNC_CONTRIBUTION_ZERO_PREPARES, REASON_UNKNOWN, + REASON_ZERO_AGGREGATOR_SELECTIONS, Reason, + }, + step::Step, + }, + types::{Duty, DutyType, ParSignedData, PubKey}, +}; + +/// Partial signatures grouped by message root, grouped by pubkey. +pub type ParSigsByMsg = HashMap>>; + +/// Returns true if every pubkey has at most one distinct message root. +pub(crate) fn msg_roots_consistent(parsigs: &ParSigsByMsg) -> bool { + parsigs.values().all(|roots| roots.len() <= 1) +} + +/// Set of duty types for which chain inclusion is supported. +/// +/// The result is cached for the lifetime of the process. This assumes +/// `GLOBAL_STATE` (and therefore `Feature::AttestationInclusion`) is +/// configured once at startup and never mutated afterward — matching Go, +/// which reads the flag on every call but relies on the same invariant. +pub(crate) fn incl_supported() -> &'static HashSet { + static CACHE: OnceLock> = OnceLock::new(); + CACHE.get_or_init(|| { + let mut set = HashSet::new(); + set.insert(DutyType::Proposer); + let state = GLOBAL_STATE.read().expect("featureset poisoned"); + if state.enabled(Feature::AttestationInclusion) { + set.insert(DutyType::Attester); + set.insert(DutyType::Aggregator); + } + set + }) +} + +/// Returns the terminal step for a duty type — either `Bcast` or +/// `ChainInclusion` depending on whether inclusion checks are supported. +pub(crate) fn last_step(duty_type: &DutyType) -> Step { + if incl_supported().contains(duty_type) { + Step::ChainInclusion + } else { + Step::Bcast + } +} + +/// Duty types that are expected to occasionally produce inconsistent partial +/// signatures (sync committee duties). +pub(crate) fn expect_inconsistent_par_sigs(duty_type: &DutyType) -> bool { + matches!( + duty_type, + DutyType::SyncMessage | DutyType::SyncContribution + ) +} + +/// Outcome of duty failure analysis. +#[derive(Debug, Clone)] +pub struct DutyFailure { + /// The step where the duty got stuck. + pub step: Step, + /// Human-friendly reason for the failure. + pub reason: Reason, + /// Underlying step error if any. + pub err: Option, +} + +/// The step at which a duty stopped progressing. +#[derive(Debug, Clone)] +pub(crate) struct DutyFailedStep { + /// Whether the duty failed, i.e. did not reach its terminal step. + pub failed: bool, + /// The step the duty got stuck at; `Zero` on success. + pub step: Step, + /// The error reported by that step, if any. + pub err: Option, +} + +/// Locates the step where a duty got stuck, the last error reported by that +/// step, and whether the duty failed. +/// +/// An empty event slice indicates a duty +/// that failed before any event was recorded (returns `step = Zero`). +pub(crate) fn duty_failed_step(events: &[Event]) -> DutyFailedStep { + if events.is_empty() { + return DutyFailedStep { + failed: true, + step: Step::Zero, + err: None, + }; + } + + let mut events_by_step: HashMap> = HashMap::new(); + for e in events { + events_by_step.entry(e.step).or_default().push(e); + } + + // Scan backwards from the step just before Sentinel down to Fetcher, + // returning the last event of the highest-numbered step that recorded any + // events. Matches Go's `for step := sentinel - 1; step > zero; step--`. + const STEPS: &[Step] = &[ + Step::ChainInclusion, + Step::Bcast, + Step::AggSigDB, + Step::SigAgg, + Step::ParSigDBExternal, + Step::ParSigEx, + Step::ParSigDBInternal, + Step::ValidatorAPI, + Step::DutyDB, + Step::Consensus, + Step::Fetcher, + ]; + + let last = STEPS + .iter() + .filter_map(|s| events_by_step.get(s).and_then(|es| es.last()).copied()) + .next(); + + let Some(last) = last else { + return DutyFailedStep { + failed: true, + step: Step::Zero, + err: None, + }; + }; + + // Determine if the final step was successful. Use the duty type from the + // first event (all events in the slice share the same duty). + let last_for_duty = last_step(&events[0].duty.duty_type); + if last.step == last_for_duty && last.step_err.is_none() { + return DutyFailedStep { + failed: false, + step: Step::Zero, + err: None, + }; + } + + DutyFailedStep { + failed: true, + step: last.step, + err: last.step_err.clone(), + } +} + +/// Analyses whether a duty failed and, if so, why. +pub(crate) fn analyse_duty_failed( + duty: &Duty, + all_events: &HashMap>, + failed_step: &DutyFailedStep, + msg_root_consistent: bool, +) -> Option { + if !failed_step.failed { + return None; + } + + let mut reason = REASON_UNKNOWN; + let mut step = failed_step.step; + let mut err = failed_step.err.clone(); + + match failed_step.step { + Step::Fetcher => return analyse_fetcher_failed(duty, all_events, err), + Step::Consensus => { + if err.is_some() { + reason = REASON_NO_CONSENSUS; + } + } + Step::DutyDB => { + if err.is_some() { + reason = REASON_BUG_DUTY_DB_ERROR; + } else { + step = Step::ValidatorAPI; + reason = REASON_NO_LOCAL_VC_SIGNATURE; + } + } + Step::ParSigDBInternal => { + reason = REASON_BUG_PAR_SIG_DB_INTERNAL; + } + Step::ParSigEx => { + if err.is_none() { + reason = REASON_NO_PEER_SIGNATURES; + } + } + Step::ParSigDBExternal => { + if err.is_some() { + return Some(DutyFailure { + step: Step::ParSigDBExternal, + reason: REASON_BUG_PAR_SIG_DB_EXTERNAL, + err, + }); + } + if msg_root_consistent { + reason = REASON_INSUFFICIENT_PEER_SIGNATURES; + } else if expect_inconsistent_par_sigs(&duty.duty_type) { + reason = REASON_PAR_SIG_DB_INCONSISTENT_SYNC; + } else { + reason = REASON_BUG_PAR_SIG_DB_INCONSISTENT; + } + } + Step::SigAgg => { + if err.is_some() { + reason = REASON_BUG_SIG_AGG; + } + } + Step::AggSigDB => { + reason = REASON_BUG_AGGREGATION_ERROR; + } + Step::Bcast => { + if err.is_none() { + err = Some(string_error("bug: missing chain inclusion event")); + } else { + reason = REASON_BROADCAST_BN_ERROR; + } + } + Step::ChainInclusion => { + if err.is_none() { + err = Some(string_error("bug: missing chain inclusion error")); + } else { + reason = REASON_NOT_INCLUDED_ON_CHAIN; + } + } + Step::Zero => { + err = Some(string_error("no events for duty")); + } + _ => { + err = Some(string_error(&format!( + "duty failed at step {}", + failed_step.step + ))); + } + } + + Some(DutyFailure { step, reason, err }) +} + +/// Analyses fetcher-step failures, checking pre-requisite duties for +/// proposer, aggregator, and sync-contribution duty types. +pub(crate) fn analyse_fetcher_failed( + duty: &Duty, + all_events: &HashMap>, + fetch_err: Option, +) -> Option { + match &duty.duty_type { + DutyType::Proposer => Some(analyse_fetcher_failed_proposer(duty, all_events, fetch_err)), + DutyType::Aggregator => analyse_fetcher_failed_aggregator(duty, all_events, fetch_err), + DutyType::SyncContribution => { + analyse_fetcher_failed_sync_contribution(duty, all_events, fetch_err) + } + _ => { + // TODO: when the fetcher is ported, add an `is_cancelled_error` check here + // (similar to `is_eth2_api_error`) so cancellation/timeout errors map to the + // default reason rather than `REASON_BUG_FETCH_ERROR`, matching Go's three-tier + // logic in `analyseFetcherFailed` (tracker.go:299–305). + let reason = if let Some(e) = &fetch_err + && is_eth2_api_error(e.as_ref()) + { + REASON_FETCH_BN_ERROR + } else { + REASON_BUG_FETCH_ERROR + }; + Some(DutyFailure { + step: Step::Fetcher, + reason, + err: fetch_err, + }) + } + } +} + +fn analyse_fetcher_failed_proposer( + duty: &Duty, + all_events: &HashMap>, + fetch_err: Option, +) -> DutyFailure { + let randao_duty = Duty::new_randao_duty(duty.slot); + let randao_events = all_events + .get(&randao_duty) + .map(Vec::as_slice) + .unwrap_or(&[]); + let randao = duty_failed_step(randao_events); + + let reason = if randao.failed { + match randao.step { + Step::ParSigEx => REASON_PROPOSER_NO_EXTERNAL_RANDAOS, + Step::ParSigDBExternal => REASON_PROPOSER_INSUFFICIENT_RANDAOS, + Step::Zero => REASON_PROPOSER_ZERO_RANDAOS, + _ => REASON_FAILED_PROPOSER_RANDAO, + } + } else { + REASON_BUG_FETCH_ERROR + }; + + DutyFailure { + step: Step::Fetcher, + reason, + err: fetch_err, + } +} + +fn analyse_fetcher_failed_aggregator( + duty: &Duty, + all_events: &HashMap>, + fetch_err: Option, +) -> Option { + fetch_err.as_ref()?; + + let prep_agg_duty = Duty::new_prepare_aggregator_duty(duty.slot); + let prep_events = all_events + .get(&prep_agg_duty) + .map(Vec::as_slice) + .unwrap_or(&[]); + let prep = duty_failed_step(prep_events); + + if prep.failed { + let reason = match prep.step { + Step::ParSigEx => REASON_NO_AGGREGATOR_SELECTIONS, + Step::ParSigDBExternal => REASON_INSUFFICIENT_AGGREGATOR_SELECTIONS, + Step::Zero => REASON_ZERO_AGGREGATOR_SELECTIONS, + _ => REASON_FAILED_AGGREGATOR_SELECTION, + }; + return Some(DutyFailure { + step: Step::Fetcher, + reason, + err: fetch_err, + }); + } + + let attester_duty = Duty::new_attester_duty(duty.slot); + let att_events = all_events + .get(&attester_duty) + .map(Vec::as_slice) + .unwrap_or(&[]); + let att = duty_failed_step(att_events); + + let reason = if att.failed && att.step <= Step::DutyDB { + REASON_MISSING_AGGREGATOR_ATTESTATION + } else { + REASON_BUG_FETCH_ERROR + }; + + Some(DutyFailure { + step: Step::Fetcher, + reason, + err: fetch_err, + }) +} + +fn analyse_fetcher_failed_sync_contribution( + duty: &Duty, + all_events: &HashMap>, + fetch_err: Option, +) -> Option { + fetch_err.as_ref()?; + + let prep_duty = Duty::new_prepare_sync_contribution_duty(duty.slot); + let prep_events = all_events.get(&prep_duty).map(Vec::as_slice).unwrap_or(&[]); + let prep = duty_failed_step(prep_events); + + if prep.failed { + let reason = match prep.step { + Step::ParSigEx => REASON_SYNC_CONTRIBUTION_NO_EXTERNAL_PREPARES, + Step::ParSigDBExternal => REASON_SYNC_CONTRIBUTION_FEW_PREPARES, + Step::Zero => REASON_SYNC_CONTRIBUTION_ZERO_PREPARES, + _ => REASON_SYNC_CONTRIBUTION_FAILED_PREPARE, + }; + return Some(DutyFailure { + step: Step::Fetcher, + reason, + err: fetch_err, + }); + } + + let sync_msg_duty = Duty::new_sync_message_duty(duty.slot); + let sync_events = all_events + .get(&sync_msg_duty) + .map(Vec::as_slice) + .unwrap_or(&[]); + let sync = duty_failed_step(sync_events); + + let reason = if sync.failed && sync.step <= Step::AggSigDB { + REASON_SYNC_CONTRIBUTION_NO_SYNC_MSG + } else { + REASON_BUG_FETCH_ERROR + }; + + Some(DutyFailure { + step: Step::Fetcher, + reason, + err: fetch_err, + }) +} + +/// Groups partial signatures by message root, per pubkey, deduplicating by +/// `(pubkey, share_idx)`. +pub(crate) fn extract_par_sigs(events: &[Event]) -> ParSigsByMsg { + let mut dedup: HashSet<(PubKey, u64)> = HashSet::new(); + let mut resp: ParSigsByMsg = HashMap::new(); + + for e in events { + let Some(par_sig) = &e.par_sig else { + continue; + }; + + let key = (e.pubkey, par_sig.share_idx); + if !dedup.insert(key) { + continue; + } + + let root = match par_sig.signed_data.message_root() { + Ok(r) => r, + Err(err) => { + tracing::warn!(error = %err, "Parsig message root"); + continue; + } + }; + + resp.entry(e.pubkey) + .or_default() + .entry(root) + .or_default() + .push(par_sig.clone()); + } + + resp +} + +/// Result of [`analyse_participation`]. +pub(crate) struct ParticipationResult { + /// Partial-signature count per peer share index for expected peers. + pub participated: HashMap, + /// Partial-signature count per peer share index for unexpected peers. + pub unexpected: HashMap, + /// Number of distinct validator pubkeys that had any event for this duty. + pub validators_per_duty: usize, +} + +/// Counts partial signatures per peer share index — both expected +/// participations and unexpected events — plus the total number of distinct +/// validator pubkeys that had this duty scheduled. +pub(crate) fn analyse_participation( + duty: &Duty, + all_events: &HashMap>, +) -> ParticipationResult { + let mut participated: HashMap = HashMap::new(); + let mut unexpected: HashMap = HashMap::new(); + let mut dedup: HashSet<(u64, PubKey)> = HashSet::new(); + let mut pubkeys: HashSet = HashSet::new(); + + let Some(events) = all_events.get(duty) else { + return ParticipationResult { + participated, + unexpected, + validators_per_duty: 0, + }; + }; + + for e in events { + pubkeys.insert(e.pubkey); + + if !matches!(e.step, Step::ParSigDBExternal | Step::ParSigDBInternal) { + continue; + } + + let Some(par_sig) = &e.par_sig else { + continue; + }; + let share_idx = par_sig.share_idx; + + if !is_par_sig_event_expected(duty, e.pubkey, all_events) { + let slot = unexpected.entry(share_idx).or_insert(0); + *slot = slot.saturating_add(1); + continue; + } + + if dedup.insert((share_idx, e.pubkey)) { + let slot = participated.entry(share_idx).or_insert(0); + *slot = slot.saturating_add(1); + } + } + + ParticipationResult { + participated, + unexpected, + validators_per_duty: pubkeys.len(), + } +} + +/// Returns true if a partial-signature event is expected for the given duty +/// and pubkey — i.e. that duty (or an associated prerequisite) was scheduled. +pub(crate) fn is_par_sig_event_expected( + duty: &Duty, + pubkey: PubKey, + all_events: &HashMap>, +) -> bool { + // VAPI-triggered duties cannot be cross-referenced to a scheduled duty. + if matches!( + duty.duty_type, + DutyType::Exit | DutyType::BuilderRegistration + ) { + return true; + } + + let scheduled = |typ: DutyType| -> bool { + let key = Duty::new(duty.slot, typ); + let events = match all_events.get(&key) { + Some(es) => es, + None => return false, + }; + events + .iter() + .any(|e| e.step == Step::Fetcher && e.pubkey == pubkey) + }; + + match &duty.duty_type { + DutyType::Randao => scheduled(DutyType::Proposer) || scheduled(DutyType::BuilderProposer), + DutyType::PrepareAggregator => scheduled(DutyType::Attester), + DutyType::PrepareSyncContribution | DutyType::SyncMessage => { + scheduled(DutyType::SyncContribution) + } + other => scheduled(other.clone()), + } +} + +fn is_eth2_api_error(err: &(dyn std::error::Error + 'static)) -> bool { + let mut current: Option<&(dyn std::error::Error + 'static)> = Some(err); + while let Some(e) = current { + if e.downcast_ref::().is_some() { + return true; + } + current = e.source(); + } + false +} + +fn string_error(s: &str) -> StepError { + #[derive(Debug)] + struct Msg(String); + impl std::fmt::Display for Msg { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(&self.0) + } + } + impl std::error::Error for Msg {} + std::sync::Arc::new(Msg(s.to_string())) +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use pluto_crypto::types::{SIGNATURE_LENGTH, Signature}; + + use super::*; + use crate::{ + signeddata::SignedDataError, + types::{ParSignedData, SignedData, SlotNumber}, + }; + + fn pubkey(byte: u8) -> PubKey { + PubKey::from([byte; 48]) + } + + /// Computes the failed step for `duty` and runs the failure analysis, + /// mirroring how `TrackerService::analyse` wires the two together. + fn analyse_failed( + duty: &Duty, + events: &HashMap>, + msg_root_consistent: bool, + ) -> Option { + let failed_step = duty_failed_step(events.get(duty).map(Vec::as_slice).unwrap_or(&[])); + analyse_duty_failed(duty, events, &failed_step, msg_root_consistent) + } + + fn evt(duty: Duty, step: Step) -> Event { + Event { + duty, + step, + pubkey: pubkey(0), + step_err: None, + par_sig: None, + } + } + + fn evt_with_err(duty: Duty, step: Step, msg: &str) -> Event { + Event { + duty, + step, + pubkey: pubkey(0), + step_err: Some(string_error(msg)), + par_sig: None, + } + } + + fn evt_pubkey(duty: Duty, step: Step, pk: PubKey) -> Event { + Event { + duty, + step, + pubkey: pk, + step_err: None, + par_sig: None, + } + } + + /// Wraps an `EthBeaconNodeApiClientError` so [`is_eth2_api_error`] picks it + /// up via the error chain (mirrors Go's `errors.Wrap(eth2api.Error{...})`). + #[derive(Debug)] + struct WrappedEth2(EthBeaconNodeApiClientError); + + impl std::fmt::Display for WrappedEth2 { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "wrapped: {}", self.0) + } + } + + impl std::error::Error for WrappedEth2 { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + Some(&self.0) + } + } + + fn eth2_err() -> StepError { + Arc::new(WrappedEth2(EthBeaconNodeApiClientError::UnexpectedResponse)) + } + + #[derive(Debug, Clone, PartialEq, Eq)] + struct TestSignedData { + id: [u8; 32], + sig: [u8; SIGNATURE_LENGTH], + } + + impl TestSignedData { + fn new(id_byte: u8) -> Self { + Self { + id: [id_byte; 32], + sig: [0u8; SIGNATURE_LENGTH], + } + } + } + + impl SignedData for TestSignedData { + fn signature(&self) -> Result { + Ok(self.sig) + } + + fn set_signature(&self, sig: Signature) -> Result + where + Self: Sized, + { + Ok(Self { id: self.id, sig }) + } + + fn set_signature_boxed( + &self, + sig: Signature, + ) -> Result, SignedDataError> { + Ok(Box::new(self.set_signature(sig)?)) + } + + fn message_root(&self) -> Result<[u8; 32], SignedDataError> { + Ok(self.id) + } + } + + #[test] + fn analyse_duty_failed_progressive() { + // Replicates Go's TestAnalyseDutyFailed which uses one shared events + // map; subtests append the next step in workflow order so the last + // step recorded is always the one we just added. + let att = Duty::new_attester_duty(SlotNumber::new(1)); + let mut events: HashMap> = HashMap::new(); + + // Failed at fetcher with a non-eth2 error → BugFetchError. + events.entry(att.clone()).or_default().push(evt_with_err( + att.clone(), + Step::Fetcher, + "fetcher failed", + )); + let r = analyse_failed(&att, &events, true).unwrap(); + assert_eq!(r.step, Step::Fetcher); + assert_eq!(r.reason, REASON_BUG_FETCH_ERROR); + assert!(r.err.is_some()); + + // Failed at consensus. + events.entry(att.clone()).or_default().push(evt_with_err( + att.clone(), + Step::Consensus, + "consensus failed", + )); + let r = analyse_failed(&att, &events, true).unwrap(); + assert_eq!(r.step, Step::Consensus); + assert_eq!(r.reason, REASON_NO_CONSENSUS); + + // dutyDB step with no error → reported as validatorAPI / NoLocalVCSignature. + events + .entry(att.clone()) + .or_default() + .push(evt(att.clone(), Step::DutyDB)); + let r = analyse_failed(&att, &events, true).unwrap(); + assert_eq!(r.step, Step::ValidatorAPI); + assert_eq!(r.reason, REASON_NO_LOCAL_VC_SIGNATURE); + assert!(r.err.is_none()); + + // Failed at parsigDBInternal with err. + events.entry(att.clone()).or_default().push(evt_with_err( + att.clone(), + Step::ParSigDBInternal, + "parsigdb_internal failed", + )); + let r = analyse_failed(&att, &events, true).unwrap(); + assert_eq!(r.step, Step::ParSigDBInternal); + assert_eq!(r.reason, REASON_BUG_PAR_SIG_DB_INTERNAL); + + // Failed at parsigEx with no error → NoPeerSignatures. + events + .entry(att.clone()) + .or_default() + .push(evt(att.clone(), Step::ParSigEx)); + let r = analyse_failed(&att, &events, true).unwrap(); + assert_eq!(r.step, Step::ParSigEx); + assert_eq!(r.reason, REASON_NO_PEER_SIGNATURES); + + // parsigDBExternal with err → BugParSigDBExternal. + events.entry(att.clone()).or_default().push(evt_with_err( + att.clone(), + Step::ParSigDBExternal, + "parsigdb_external failed", + )); + let r = analyse_failed(&att, &events, true).unwrap(); + assert_eq!(r.step, Step::ParSigDBExternal); + assert_eq!(r.reason, REASON_BUG_PAR_SIG_DB_EXTERNAL); + + // parsigDBExternal with no err: three msg_root variants. + events + .entry(att.clone()) + .or_default() + .push(evt(att.clone(), Step::ParSigDBExternal)); + let r = analyse_failed(&att, &events, true).unwrap(); + assert_eq!(r.step, Step::ParSigDBExternal); + assert_eq!(r.reason, REASON_INSUFFICIENT_PEER_SIGNATURES); + + let r = analyse_failed(&att, &events, false).unwrap(); + assert_eq!(r.step, Step::ParSigDBExternal); + assert_eq!(r.reason, REASON_BUG_PAR_SIG_DB_INCONSISTENT); + + // Sync-committee duty reuses the same events for the inconsistent case. + let sync_msg = Duty::new_sync_message_duty(SlotNumber::new(1)); + events.insert(sync_msg.clone(), events.get(&att).cloned().unwrap()); + let r = analyse_failed(&sync_msg, &events, false).unwrap(); + assert_eq!(r.step, Step::ParSigDBExternal); + assert_eq!(r.reason, REASON_PAR_SIG_DB_INCONSISTENT_SYNC); + + // Failed at bcast with err. + events.entry(att.clone()).or_default().push(evt_with_err( + att.clone(), + Step::Bcast, + "bcast failed", + )); + let r = analyse_failed(&att, &events, true).unwrap(); + assert_eq!(r.step, Step::Bcast); + assert_eq!(r.reason, REASON_BROADCAST_BN_ERROR); + + // Failed at chainInclusion with err. + events.entry(att.clone()).or_default().push(evt_with_err( + att.clone(), + Step::ChainInclusion, + "not included on chain", + )); + let r = analyse_failed(&att, &events, true).unwrap(); + assert_eq!(r.step, Step::ChainInclusion); + assert_eq!(r.reason, REASON_NOT_INCLUDED_ON_CHAIN); + } + + #[test] + fn analyse_duty_failed_proposer_via_randao() { + let proposer = Duty::new_proposer_duty(SlotNumber::new(1)); + let randao = Duty::new_randao_duty(SlotNumber::new(1)); + + let mut events: HashMap> = HashMap::new(); + events.insert( + proposer.clone(), + vec![evt_with_err( + proposer.clone(), + Step::Fetcher, + "context canceled", + )], + ); + events.insert( + randao.clone(), + vec![ + evt(randao.clone(), Step::ValidatorAPI), + evt(randao.clone(), Step::ParSigDBInternal), + evt(randao.clone(), Step::ParSigEx), + ], + ); + + // Randao reached ParSigEx → ProposerNoExternalRandaos. + let r = analyse_failed(&proposer, &events, true).unwrap(); + assert_eq!(r.step, Step::Fetcher); + assert_eq!(r.reason, REASON_PROPOSER_NO_EXTERNAL_RANDAOS); + + // Randao reached ParSigDBExternal → ProposerInsufficientRandaos. + events + .get_mut(&randao) + .unwrap() + .push(evt(randao.clone(), Step::ParSigDBExternal)); + let r = analyse_failed(&proposer, &events, true).unwrap(); + assert_eq!(r.reason, REASON_PROPOSER_INSUFFICIENT_RANDAOS); + + // No Randao events at all → ProposerZeroRandaos. + events.insert(randao, vec![]); + let r = analyse_failed(&proposer, &events, true).unwrap(); + assert_eq!(r.reason, REASON_PROPOSER_ZERO_RANDAOS); + } + + #[test] + fn analyse_duty_failed_attester_success() { + let att = Duty::new_attester_duty(SlotNumber::new(1)); + assert_eq!(last_step(&att.duty_type), Step::Bcast); + + // Events for every step up to (but not including) chainInclusion. + let steps = [ + Step::Fetcher, + Step::Consensus, + Step::DutyDB, + Step::ValidatorAPI, + Step::ParSigDBInternal, + Step::ParSigEx, + Step::ParSigDBExternal, + Step::SigAgg, + Step::AggSigDB, + Step::Bcast, + ]; + let events: HashMap> = std::iter::once(( + att.clone(), + steps.iter().map(|s| evt(att.clone(), *s)).collect(), + )) + .collect(); + + assert!(analyse_failed(&att, &events, true).is_none()); + } + + #[test] + fn duty_failed_step_success_and_empty() { + let att = Duty::new_attester_duty(SlotNumber::new(0)); + let steps = [ + Step::Fetcher, + Step::Consensus, + Step::DutyDB, + Step::ValidatorAPI, + Step::ParSigDBInternal, + Step::ParSigEx, + Step::ParSigDBExternal, + Step::SigAgg, + Step::AggSigDB, + Step::Bcast, + ]; + let events: Vec = steps.iter().map(|s| evt(att.clone(), *s)).collect(); + + let r = duty_failed_step(&events); + assert!(!r.failed); + assert_eq!(r.step, Step::Zero); + assert!(r.err.is_none()); + + let r = duty_failed_step(&[]); + assert!(r.failed); + assert_eq!(r.step, Step::Zero); + assert!(r.err.is_none()); + } + + #[test] + fn duty_failed_step_picks_last_step_with_multiple_events() { + // Many events per step, all carrying the same error → last step in + // workflow order (bcast) is the failure point. + let att = Duty::new_attester_duty(SlotNumber::new(123)); + let steps = [ + Step::Fetcher, + Step::Consensus, + Step::DutyDB, + Step::ValidatorAPI, + Step::ParSigDBInternal, + Step::ParSigEx, + Step::ParSigDBExternal, + Step::SigAgg, + Step::AggSigDB, + Step::Bcast, + ]; + let mut events: Vec = Vec::new(); + for s in steps { + for _ in 0..5 { + events.push(evt_with_err(att.clone(), s, "test error")); + } + } + + let r = duty_failed_step(&events); + assert!(r.failed); + assert_eq!(r.step, Step::Bcast); + assert!(r.err.is_some()); + + // Now also append success (no-error) events for every step. The + // newest event at the terminal step has no error → success. + for s in steps { + events.push(evt(att.clone(), s)); + } + let r = duty_failed_step(&events); + assert!(!r.failed); + assert_eq!(r.step, Step::Zero); + assert!(r.err.is_none()); + } + + #[test] + fn analyse_fetcher_failed_table() { + let slot = SlotNumber::new(123); + let agg = Duty::new_aggregator_duty(slot); + let prep_agg = Duty::new_prepare_aggregator_duty(slot); + let att = Duty::new_attester_duty(slot); + let sync_con = Duty::new_sync_contribution_duty(slot); + let sync_msg = Duty::new_sync_message_duty(slot); + let prep_sync_con = Duty::new_prepare_sync_contribution_duty(slot); + + struct Case { + name: &'static str, + duty: Duty, + events: HashMap>, + reason: Reason, + failed: bool, + has_err: bool, + } + + let cases = vec![ + Case { + name: "eth2 error", + duty: att.clone(), + events: { + let mut m = HashMap::new(); + m.insert( + att.clone(), + vec![Event { + duty: att.clone(), + step: Step::Fetcher, + pubkey: pubkey(0), + step_err: Some(eth2_err()), + par_sig: None, + }], + ); + m + }, + reason: REASON_FETCH_BN_ERROR, + failed: true, + has_err: true, + }, + Case { + name: "no aggregator selections endpoint support", + duty: agg.clone(), + events: { + let mut m = HashMap::new(); + m.insert( + agg.clone(), + vec![evt_with_err(agg.clone(), Step::Fetcher, "context canceled")], + ); + m + }, + reason: REASON_ZERO_AGGREGATOR_SELECTIONS, + failed: true, + has_err: true, + }, + Case { + name: "no external prepare-aggregator signatures", + duty: agg.clone(), + events: { + let mut m = HashMap::new(); + m.insert( + agg.clone(), + vec![evt_with_err(agg.clone(), Step::Fetcher, "context canceled")], + ); + m.insert( + prep_agg.clone(), + vec![evt(prep_agg.clone(), Step::ParSigEx)], + ); + m + }, + reason: REASON_NO_AGGREGATOR_SELECTIONS, + failed: true, + has_err: true, + }, + Case { + name: "insufficient prepare-aggregator signatures", + duty: agg.clone(), + events: { + let mut m = HashMap::new(); + m.insert( + agg.clone(), + vec![evt_with_err(agg.clone(), Step::Fetcher, "context canceled")], + ); + m.insert( + prep_agg.clone(), + vec![evt(prep_agg.clone(), Step::ParSigDBExternal)], + ); + m + }, + reason: REASON_INSUFFICIENT_AGGREGATOR_SELECTIONS, + failed: true, + has_err: true, + }, + Case { + name: "prepare-aggregator failed at sigAgg", + duty: agg.clone(), + events: { + let mut m = HashMap::new(); + m.insert( + agg.clone(), + vec![evt_with_err(agg.clone(), Step::Fetcher, "context canceled")], + ); + m.insert(prep_agg.clone(), vec![evt(prep_agg.clone(), Step::SigAgg)]); + m + }, + reason: REASON_FAILED_AGGREGATOR_SELECTION, + failed: true, + has_err: true, + }, + Case { + name: "attester failed for aggregator", + duty: agg.clone(), + events: { + let mut m = HashMap::new(); + m.insert( + agg.clone(), + vec![evt_with_err(agg.clone(), Step::Fetcher, "context canceled")], + ); + m.insert(prep_agg.clone(), vec![evt(prep_agg.clone(), Step::Bcast)]); + m.insert( + att.clone(), + vec![evt_with_err(att.clone(), Step::Fetcher, "some error")], + ); + m + }, + reason: REASON_MISSING_AGGREGATOR_ATTESTATION, + failed: true, + has_err: true, + }, + Case { + name: "no aggregator found (nil err)", + duty: agg.clone(), + events: { + let mut m = HashMap::new(); + m.insert(agg.clone(), vec![evt(agg.clone(), Step::Fetcher)]); + m.insert(prep_agg.clone(), vec![evt(prep_agg.clone(), Step::Bcast)]); + m.insert(att.clone(), vec![evt(att.clone(), Step::Bcast)]); + m + }, + reason: REASON_UNKNOWN, + failed: false, + has_err: false, + }, + Case { + name: "sync committee selections endpoint not supported", + duty: sync_con.clone(), + events: { + let mut m = HashMap::new(); + m.insert( + sync_con.clone(), + vec![evt_with_err( + sync_con.clone(), + Step::Fetcher, + "context canceled", + )], + ); + m + }, + reason: REASON_SYNC_CONTRIBUTION_ZERO_PREPARES, + failed: true, + has_err: true, + }, + Case { + name: "no external prepare-sync-contribution signatures", + duty: sync_con.clone(), + events: { + let mut m = HashMap::new(); + m.insert( + sync_con.clone(), + vec![evt_with_err( + sync_con.clone(), + Step::Fetcher, + "context canceled", + )], + ); + m.insert( + prep_sync_con.clone(), + vec![evt(prep_sync_con.clone(), Step::ParSigEx)], + ); + m + }, + reason: REASON_SYNC_CONTRIBUTION_NO_EXTERNAL_PREPARES, + failed: true, + has_err: true, + }, + Case { + name: "insufficient prepare-sync-contribution", + duty: sync_con.clone(), + events: { + let mut m = HashMap::new(); + m.insert( + sync_con.clone(), + vec![evt_with_err( + sync_con.clone(), + Step::Fetcher, + "context canceled", + )], + ); + m.insert( + prep_sync_con.clone(), + vec![evt(prep_sync_con.clone(), Step::ParSigDBExternal)], + ); + m + }, + reason: REASON_SYNC_CONTRIBUTION_FEW_PREPARES, + failed: true, + has_err: true, + }, + Case { + name: "prepare-sync-contribution failed", + duty: sync_con.clone(), + events: { + let mut m = HashMap::new(); + m.insert( + sync_con.clone(), + vec![evt_with_err( + sync_con.clone(), + Step::Fetcher, + "context canceled", + )], + ); + m.insert( + prep_sync_con.clone(), + vec![evt(prep_sync_con.clone(), Step::SigAgg)], + ); + m + }, + reason: REASON_SYNC_CONTRIBUTION_FAILED_PREPARE, + failed: true, + has_err: true, + }, + Case { + name: "sync-message failed for sync-contribution", + duty: sync_con.clone(), + events: { + let mut m = HashMap::new(); + m.insert( + sync_con.clone(), + vec![evt_with_err( + sync_con.clone(), + Step::Fetcher, + "context canceled", + )], + ); + m.insert( + prep_sync_con.clone(), + vec![evt(prep_sync_con.clone(), Step::Bcast)], + ); + m.insert( + sync_msg.clone(), + vec![evt(sync_msg.clone(), Step::ParSigEx)], + ); + m + }, + reason: REASON_SYNC_CONTRIBUTION_NO_SYNC_MSG, + failed: true, + has_err: true, + }, + Case { + name: "no sync-committee aggregators (nil err)", + duty: sync_con.clone(), + events: { + let mut m = HashMap::new(); + m.insert(sync_con.clone(), vec![evt(sync_con.clone(), Step::Fetcher)]); + m.insert( + prep_sync_con.clone(), + vec![evt(prep_sync_con.clone(), Step::Bcast)], + ); + m.insert(sync_msg.clone(), vec![evt(sync_msg.clone(), Step::Bcast)]); + m + }, + reason: REASON_UNKNOWN, + failed: false, + has_err: false, + }, + Case { + name: "unexpected error", + duty: att.clone(), + events: { + let mut m = HashMap::new(); + m.insert( + att.clone(), + vec![evt_with_err(att.clone(), Step::Fetcher, "unexpected error")], + ); + m + }, + reason: REASON_BUG_FETCH_ERROR, + failed: true, + has_err: true, + }, + ]; + + for c in cases { + let r = analyse_failed(&c.duty, &c.events, true); + assert_eq!(r.is_some(), c.failed, "{}: failed mismatch", c.name); + if let Some(f) = r { + assert_eq!(f.reason, c.reason, "{}: reason mismatch", c.name); + assert_eq!(f.step, Step::Fetcher, "{}: step mismatch", c.name); + assert_eq!(f.err.is_some(), c.has_err, "{}: err presence", c.name); + } else { + // Not-failed fetcher cases (no aggregator/sync selected this + // slot) must surface as `Step::Fetcher` so the metrics reporter + // skips them rather than counting a success. + assert_eq!( + duty_failed_step(&c.events[&c.duty]).step, + Step::Fetcher, + "{}: expected fetcher no-op step", + c.name + ); + } + } + } + + #[test] + fn is_par_sig_event_expected_table() { + let slot = SlotNumber::new(123); + let pk = pubkey(7); + + // DutyExit and DutyBuilderRegistration always expected. + assert!(is_par_sig_event_expected( + &Duty::new_voluntary_exit_duty(slot), + pk, + &HashMap::new() + )); + assert!(is_par_sig_event_expected( + &Duty::new_builder_registration_duty(slot), + pk, + &HashMap::new() + )); + + // Randao expected when proposer is scheduled with matching pubkey. + let mut events: HashMap> = HashMap::new(); + let proposer = Duty::new_proposer_duty(slot); + events.insert( + proposer.clone(), + vec![evt_pubkey(proposer, Step::Fetcher, pk)], + ); + assert!(is_par_sig_event_expected( + &Duty::new_randao_duty(slot), + pk, + &events + )); + + // Randao unexpected without proposer. + assert!(!is_par_sig_event_expected( + &Duty::new_randao_duty(slot), + pk, + &HashMap::new() + )); + + // PrepareAggregator expected when attester scheduled. + let mut events: HashMap> = HashMap::new(); + let attester = Duty::new_attester_duty(slot); + events.insert( + attester.clone(), + vec![evt_pubkey(attester, Step::Fetcher, pk)], + ); + assert!(is_par_sig_event_expected( + &Duty::new_prepare_aggregator_duty(slot), + pk, + &events + )); + + // PrepareAggregator unexpected without attester. + assert!(!is_par_sig_event_expected( + &Duty::new_prepare_aggregator_duty(slot), + pk, + &HashMap::new() + )); + + // PrepareSyncContribution / SyncMessage expected when SyncContribution + // scheduled. + let mut events: HashMap> = HashMap::new(); + let sc = Duty::new_sync_contribution_duty(slot); + events.insert(sc.clone(), vec![evt_pubkey(sc, Step::Fetcher, pk)]); + assert!(is_par_sig_event_expected( + &Duty::new_prepare_sync_contribution_duty(slot), + pk, + &events + )); + assert!(is_par_sig_event_expected( + &Duty::new_sync_message_duty(slot), + pk, + &events + )); + + // SyncMessage and PrepareSyncContribution unexpected without + // SyncContribution. + assert!(!is_par_sig_event_expected( + &Duty::new_sync_message_duty(slot), + pk, + &HashMap::new() + )); + assert!(!is_par_sig_event_expected( + &Duty::new_prepare_sync_contribution_duty(slot), + pk, + &HashMap::new() + )); + } + + #[test] + fn extract_par_sigs_empty() { + assert!(extract_par_sigs(&[]).is_empty()); + } + + #[test] + fn extract_par_sigs_groups_by_msg_root_per_pubkey() { + // Mirrors Go's TestAnalyseParSigs: pubkey "a" gets two batches with + // distinct message roots (4 sigs and 2 sigs), pubkey "b" gets one + // batch (6 sigs). Result is keyed by pubkey then by root. + let att = Duty::new_attester_duty(SlotNumber::new(0)); + let pk_a = pubkey(1); + let pk_b = pubkey(2); + + // Build events: each event has a unique share_idx (so dedup keeps + // all of them) and shares the message root within the batch. + let mut events: Vec = Vec::new(); + let mut next_idx: u64 = 0; + + // pk_a, root=A, 4 sigs. + let data_a = TestSignedData::new(0xAA); + for _ in 0..4 { + events.push(Event { + duty: att.clone(), + step: Step::ParSigDBExternal, + pubkey: pk_a, + step_err: None, + par_sig: Some(ParSignedData::new(data_a.clone(), next_idx)), + }); + next_idx = next_idx.checked_add(1).unwrap(); + } + + // pk_a, root=B, 2 sigs. + let data_b = TestSignedData::new(0xBB); + for _ in 0..2 { + events.push(Event { + duty: att.clone(), + step: Step::ParSigDBExternal, + pubkey: pk_a, + step_err: None, + par_sig: Some(ParSignedData::new(data_b.clone(), next_idx)), + }); + next_idx = next_idx.checked_add(1).unwrap(); + } + + // pk_b, root=C, 6 sigs. + let data_c = TestSignedData::new(0xCC); + for _ in 0..6 { + events.push(Event { + duty: att.clone(), + step: Step::ParSigDBExternal, + pubkey: pk_b, + step_err: None, + par_sig: Some(ParSignedData::new(data_c.clone(), next_idx)), + }); + next_idx = next_idx.checked_add(1).unwrap(); + } + + let result = extract_par_sigs(&events); + + // pk_a has two roots, pk_b has one. + assert_eq!(result.len(), 2); + let a_groups = result.get(&pk_a).expect("pk_a missing"); + let b_groups = result.get(&pk_b).expect("pk_b missing"); + assert_eq!(a_groups.len(), 2); + assert_eq!(b_groups.len(), 1); + + let mut a_sizes: Vec = a_groups.values().map(Vec::len).collect(); + a_sizes.sort_unstable(); + assert_eq!(a_sizes, vec![2, 4]); + + let b_sizes: Vec = b_groups.values().map(Vec::len).collect(); + assert_eq!(b_sizes, vec![6]); + + // Inconsistent: pk_a has more than one root, pk_b has just one. + assert!(!msg_roots_consistent(&result)); + } + + #[test] + fn extract_par_sigs_dedups_by_pubkey_and_share_idx() { + // Two events with the same (pubkey, share_idx) → deduped down to one + // entry, regardless of differing signature content. + let att = Duty::new_attester_duty(SlotNumber::new(0)); + let pk = pubkey(1); + let data = TestSignedData::new(0xAA); + + let events = vec![ + Event { + duty: att.clone(), + step: Step::ParSigDBExternal, + pubkey: pk, + step_err: None, + par_sig: Some(ParSignedData::new(data.clone(), 0)), + }, + Event { + duty: att, + step: Step::ParSigDBExternal, + pubkey: pk, + step_err: None, + par_sig: Some(ParSignedData::new(data, 0)), + }, + ]; + let result = extract_par_sigs(&events); + let groups = result.get(&pk).unwrap(); + let total: usize = groups.values().map(Vec::len).sum(); + assert_eq!(total, 1); + } + + #[test] + fn analyse_duty_failed_unexpected_failures() { + let att = Duty::new_attester_duty(SlotNumber::new(123)); + + // consensus with nil error → REASON_UNKNOWN (Go's reasonUnknown). + let mut events = HashMap::new(); + events.insert(att.clone(), vec![evt(att.clone(), Step::Consensus)]); + let r = analyse_failed(&att, &events, false).unwrap(); + assert_eq!(r.step, Step::Consensus); + assert_eq!(r.reason, REASON_UNKNOWN); + assert!(r.err.is_none()); + + // parsigex with error → REASON_UNKNOWN (err.is_none() branch missed). + let mut events = HashMap::new(); + events.insert( + att.clone(), + vec![evt_with_err( + att.clone(), + Step::ParSigEx, + "parsigex broadcast err", + )], + ); + let r = analyse_failed(&att, &events, false).unwrap(); + assert_eq!(r.step, Step::ParSigEx); + assert_eq!(r.reason, REASON_UNKNOWN); + assert!(r.err.is_some()); + + // sigAgg with nil error → REASON_UNKNOWN. + let mut events = HashMap::new(); + events.insert(att.clone(), vec![evt(att.clone(), Step::SigAgg)]); + let r = analyse_failed(&att, &events, false).unwrap(); + assert_eq!(r.step, Step::SigAgg); + assert_eq!(r.reason, REASON_UNKNOWN); + assert!(r.err.is_none()); + } +} diff --git a/crates/core/src/tracker/metrics.rs b/crates/core/src/tracker/metrics.rs new file mode 100644 index 00000000..17081c6c --- /dev/null +++ b/crates/core/src/tracker/metrics.rs @@ -0,0 +1,64 @@ +//! Prometheus metrics for the tracker. + +use vise::*; + +/// Metrics for the duty tracker. +#[derive(Debug, Metrics)] +#[metrics(prefix = "core_tracker")] +pub struct TrackerMetrics { + /// Set to 1 if peer participated successfully for the given duty or + /// else 0. + #[metrics(labels = ["duty", "peer"])] + pub participation: LabeledFamily<(String, String), Gauge, 2>, + + /// Total number of successful participations by peer and duty type. + #[metrics(labels = ["duty", "peer"])] + pub participation_success_total: LabeledFamily<(String, String), Counter, 2>, + + /// Total number of missed participations by peer and duty type. + #[metrics(labels = ["duty", "peer"])] + pub participation_missed_total: LabeledFamily<(String, String), Counter, 2>, + + /// Total number of expected participations (fail + success) by peer + /// and duty type. + #[metrics(labels = ["duty", "peer"])] + pub participation_expected_total: LabeledFamily<(String, String), Counter, 2>, + + /// Total number of failed duties by type. + #[metrics(labels = ["duty"])] + pub failed_duties_total: LabeledFamily, + + /// Total number of failed duties by type and reason code. + #[metrics(labels = ["duty", "reason"])] + pub failed_duty_reasons_total: LabeledFamily<(String, String), Counter, 2>, + + /// Total number of successful duties by type. + #[metrics(labels = ["duty"])] + pub success_duties_total: LabeledFamily, + + /// Total number of expected duties (failed + success) by type. + #[metrics(labels = ["duty"])] + pub expect_duties_total: LabeledFamily, + + /// Total number of unexpected events by peer. + #[metrics(labels = ["peer"])] + pub unexpected_events_total: LabeledFamily, + + /// Total number of duties that contained inconsistent partial signed + /// data by duty type. + #[metrics(labels = ["duty"])] + pub inconsistent_parsigs_total: LabeledFamily, + + /// Cluster's average attestation inclusion delay in slots. Available + /// only when the attestation_inclusion feature flag is enabled. + pub inclusion_delay: Gauge, + + /// Total number of broadcast duties never included in any block by + /// type. + #[metrics(labels = ["duty"])] + pub inclusion_missed_total: LabeledFamily, +} + +/// Global metrics for the duty tracker. +#[vise::register] +pub static TRACKER_METRICS: Global = Global::new(); diff --git a/crates/core/src/tracker/mod.rs b/crates/core/src/tracker/mod.rs index 3f9dd60b..755385e0 100644 --- a/crates/core/src/tracker/mod.rs +++ b/crates/core/src/tracker/mod.rs @@ -3,9 +3,11 @@ //! [`TrackerService::start`] spawns a background loop that accumulates //! per-duty [`Event`]s submitted by core workflow components via the //! [`Tracker`] trait. When the analyser deadline fires the accumulated events -//! will be used to determine failure reasons and report participation (not yet -//! implemented). When the deleter deadline fires the events for that duty are -//! discarded to bound memory usage. +//! are passed through [`analysis::analyse_duty_failed`] and +//! [`analysis::analyse_participation`], and the results are dispatched to the +//! reporters in [`reporters`] for metrics and structured logging. When the +//! deleter deadline fires the events for that duty are discarded to bound +//! memory usage. //! //! Both deadliners must share the same [`CancellationToken`] as the tracker so //! that the whole system shuts down together. @@ -16,6 +18,15 @@ pub mod reason; /// Step enum for the core workflow. pub mod step; +/// Pure analysis functions used by the tracker loop. +pub mod analysis; + +/// Prometheus metrics for the tracker. +pub mod metrics; + +/// Reporters that consume analysis results and emit metrics/logs. +pub mod reporters; + use std::{collections::HashMap, future::Future, sync::Arc}; use tokio::sync::mpsc; @@ -26,6 +37,15 @@ use crate::{ types::{Duty, ParSignedData, ParSignedDataSet, PubKey}, }; +use analysis::{ + DutyFailure, analyse_duty_failed, analyse_participation, duty_failed_step, extract_par_sigs, + msg_roots_consistent, +}; +use reason::REASON_UNKNOWN; +use reporters::{ + DutyResultReporter, MetricsDutyReporter, MetricsParticipationReporter, ParticipationReporter, + UnsupportedIgnorer, report_par_sigs, +}; use step::Step; /// Type-erased step error. @@ -151,6 +171,7 @@ const EVENT_BUFFER: usize = 1024; /// `par_sig` is only set by `ParSigDBInternal`, `ParSigEx`, and /// `ParSigDBExternal` events, matching Go's `event.parSig`. #[allow(dead_code)] +#[derive(Clone)] pub(crate) struct Event { pub duty: Duty, pub step: Step, @@ -185,6 +206,8 @@ pub struct TrackerHandle { impl TrackerHandle { async fn send_event(&self, event: Event) { + // Shutdown is signalled by the receiver being dropped, which causes + // send() to return Err immediately — no explicit cancellation select needed. if let Err(e) = self.input_tx.send(event).await { tracing::warn!( duty = %e.0.duty, @@ -312,8 +335,9 @@ pub struct TrackerService { deleter: DeadlinerHandle, deleter_rx: mpsc::Receiver, from_slot: u64, - #[allow(dead_code)] - peers: Vec, + failed_duty_reporter: Box, + participation_reporter: Box, + unsupported_ignorer: UnsupportedIgnorer, } impl TrackerService { @@ -336,29 +360,30 @@ impl TrackerService { peers: Vec, from_slot: u64, ) -> Arc { - Self::start_with_buffer( + Self::start_with_buffer_and_sinks( cancel, analyser, analyser_rx, deleter, deleter_rx, - peers, from_slot, EVENT_BUFFER, + Box::new(MetricsDutyReporter::new()), + Box::new(MetricsParticipationReporter::new(peers)), ) } - /// Like [`start`] but with a configurable channel buffer size, for tests. #[allow(clippy::too_many_arguments)] - fn start_with_buffer( + fn start_with_buffer_and_sinks( cancel: CancellationToken, analyser: DeadlinerHandle, AnalyserRx(analyser_rx): AnalyserRx, deleter: DeadlinerHandle, DeleterRx(deleter_rx): DeleterRx, - peers: Vec, from_slot: u64, buffer: usize, + failed_duty_reporter: Box, + participation_reporter: Box, ) -> Arc { let (input_tx, input_rx) = mpsc::channel(buffer); @@ -370,7 +395,9 @@ impl TrackerService { deleter, deleter_rx, from_slot, - peers, + failed_duty_reporter, + participation_reporter, + unsupported_ignorer: UnsupportedIgnorer::new(), }; let task = tokio::spawn(task.run()); @@ -378,6 +405,41 @@ impl TrackerService { Arc::new(TrackerHandle { input_tx, task }) } + fn analyse(&mut self, duty: &Duty, events: &std::collections::HashMap>) { + let duty_events = events.get(duty).map(Vec::as_slice).unwrap_or(&[]); + let parsigs = extract_par_sigs(duty_events); + report_par_sigs(duty, &parsigs); + + let failed_step = duty_failed_step(duty_events); + let outcome = + analyse_duty_failed(duty, events, &failed_step, msg_roots_consistent(&parsigs)); + + if self.unsupported_ignorer.check(duty, outcome.as_ref()) { + return; + } + + let failed = outcome.is_some(); + // On success the reporter only reads `step`: `Fetcher` for + // aggregator/sync-contribution slots with no selection (a no-op the + // reporter must skip, not count) versus `Zero` for a genuine success. + let result = outcome.unwrap_or(DutyFailure { + step: failed_step.step, + reason: REASON_UNKNOWN, + err: None, + }); + + self.failed_duty_reporter.report(duty, failed, &result); + + let part = analyse_participation(duty, events); + self.participation_reporter.report( + duty, + failed, + &part.participated, + &part.unexpected, + part.validators_per_duty, + ); + } + async fn run(mut self) { let mut events: HashMap> = HashMap::new(); @@ -395,8 +457,7 @@ impl TrackerService { duty = self.analyser_rx.recv() => { match duty { Some(duty) => { - // TODO: extract par sigs, analyse failed duty, report participation. - tracing::debug!(duty = %duty, "Duty analysis triggered (not yet implemented)"); + self.analyse(&duty, &events); } None => { tracing::error!("Analyser deadliner channel closed unexpectedly; stopping tracker"); @@ -438,7 +499,7 @@ impl TrackerService { #[cfg(test)] mod tests { - use std::time::Duration; + use std::{collections::HashMap, sync::Mutex, time::Duration}; use chrono::{DateTime, Utc}; use tokio_util::sync::CancellationToken; @@ -446,9 +507,177 @@ mod tests { use super::*; use crate::{ deadline::{DeadlineCalculator, DeadlinerTask, NeverExpiringCalculator}, - types::{Duty, DutyType, SlotNumber}, + signeddata::SignedDataError, + tracker::{ + reason::Reason, + reporters::{DutyResultReporter, ParticipationReporter}, + }, + types::{Duty, DutyType, ParSignedData, ParSignedDataSet, SlotNumber}, }; + // ── Integration test infrastructure ───────────────────────────────────── + + #[derive(Debug, Clone)] + struct FailRecord { + duty: Duty, + failed: bool, + step: Step, + reason: Reason, + } + + #[derive(Debug, Clone)] + struct ParticipationRecord { + duty: Duty, + failed: bool, + participated: HashMap, + unexpected: HashMap, + expected_per_peer: usize, + } + + struct RecordingFailureReporter { + records: std::sync::Arc>>, + cancel: CancellationToken, + trigger_on: usize, + } + + impl DutyResultReporter for RecordingFailureReporter { + fn report(&mut self, duty: &Duty, failed: bool, result: &DutyFailure) { + let mut recs = self.records.lock().unwrap(); + recs.push(FailRecord { + duty: duty.clone(), + failed, + step: result.step, + reason: result.reason, + }); + if recs.len() >= self.trigger_on { + self.cancel.cancel(); + } + } + } + + struct RecordingParticipationReporter { + records: std::sync::Arc>>, + cancel: CancellationToken, + trigger_on: usize, + } + + impl ParticipationReporter for RecordingParticipationReporter { + fn report( + &mut self, + duty: &Duty, + failed: bool, + participated: &HashMap, + unexpected: &HashMap, + expected_per_peer: usize, + ) { + let mut recs = self.records.lock().unwrap(); + recs.push(ParticipationRecord { + duty: duty.clone(), + failed, + participated: participated.clone(), + unexpected: unexpected.clone(), + expected_per_peer, + }); + if recs.len() >= self.trigger_on { + self.cancel.cancel(); + } + } + } + + struct NopFailureReporter; + + impl DutyResultReporter for NopFailureReporter { + fn report(&mut self, _: &Duty, _: bool, _: &DutyFailure) {} + } + + struct NopParticipationReporter; + + impl ParticipationReporter for NopParticipationReporter { + fn report( + &mut self, + _: &Duty, + _: bool, + _: &HashMap, + _: &HashMap, + _: usize, + ) { + } + } + + /// Starts a `TrackerService` with custom reporters and test-controlled + /// analyser/deleter trigger channels (bypassing the real deadliner). + fn start_test_tracker( + cancel: &CancellationToken, + from_slot: u64, + failure_sink: Box, + participation_sink: Box, + ) -> (Arc, mpsc::Sender, mpsc::Sender) { + let (analyser_handle, _) = + DeadlinerTask::start(cancel.clone(), "analyser", FutureCalculator); + let (deleter_handle, _) = DeadlinerTask::start(cancel.clone(), "deleter", FutureCalculator); + let (analyser_tx, analyser_rx) = mpsc::channel(16); + let (deleter_tx, deleter_rx) = mpsc::channel(16); + + let handle = TrackerService::start_with_buffer_and_sinks( + cancel.clone(), + analyser_handle, + AnalyserRx(analyser_rx), + deleter_handle, + DeleterRx(deleter_rx), + from_slot, + EVENT_BUFFER, + failure_sink, + participation_sink, + ); + + (handle, analyser_tx, deleter_tx) + } + + async fn wait_for_task(handle: Arc) { + let raw = Arc::try_unwrap(handle).unwrap_or_else(|_| panic!("single Arc owner in test")); + tokio::time::timeout(Duration::from_secs(1), raw.task) + .await + .expect("task did not exit within timeout") + .expect("task panicked"); + } + + /// Minimal [`crate::types::SignedData`] for constructing [`ParSignedData`] + /// in tests without needing real ETH2 attestation data. + #[derive(Debug, Clone, PartialEq, Eq)] + struct SimpleSignedData; + + impl crate::types::SignedData for SimpleSignedData { + fn signature(&self) -> Result { + Ok([0u8; 96]) + } + + fn set_signature( + &self, + _sig: pluto_crypto::types::Signature, + ) -> Result { + Ok(Self) + } + + fn set_signature_boxed( + &self, + sig: pluto_crypto::types::Signature, + ) -> Result, SignedDataError> { + Ok(Box::new(self.set_signature(sig)?)) + } + + fn message_root(&self) -> Result<[u8; 32], SignedDataError> { + Ok([0u8; 32]) + } + } + + fn par_sig_set(pubkeys: &[PubKey], share_idx: u64) -> ParSignedDataSet { + let mut set = ParSignedDataSet::new(); + for pk in pubkeys { + set.insert(*pk, ParSignedData::new(SimpleSignedData, share_idx)); + } + set + } + fn attester(slot: u64) -> Duty { Duty::new(SlotNumber::new(slot), DutyType::Attester) } @@ -499,25 +728,53 @@ mod tests { #[tokio::test] async fn from_slot_filters_old_events() { let cancel = CancellationToken::new(); - let handle = start_service(&cancel, 10); - // Slot 5 is below from_slot=10 and must be filtered before reaching - // the deadliner. Slot 15 is above and must be scheduled normally. + let fail_records: std::sync::Arc>> = Default::default(); + + // from_slot=10: slot-5 events must be discarded, slot-15 events kept. + let (handle, analyser_tx, deleter_tx) = start_test_tracker( + &cancel, + 10, + Box::new(RecordingFailureReporter { + records: fail_records.clone(), + cancel: cancel.clone(), + trigger_on: 2, + }), + Box::new(NopParticipationReporter), + ); + handle.fetcher_fetched(attester(5), &[pubkey()], None).await; handle .fetcher_fetched(attester(15), &[pubkey()], None) .await; + tokio::task::yield_now().await; - // Yield so the loop processes both events. + // Trigger analysis for both; only slot-15 had events stored. + analyser_tx.send(attester(5)).await.unwrap(); + analyser_tx.send(attester(15)).await.unwrap(); tokio::task::yield_now().await; + let _ = deleter_tx.send(attester(5)).await; + let _ = deleter_tx.send(attester(15)).await; - cancel.cancel(); + wait_for_task(handle).await; - let raw = Arc::try_unwrap(handle).unwrap_or_else(|_| panic!("single Arc owner in test")); - tokio::time::timeout(Duration::from_secs(1), raw.task) - .await - .expect("task did not exit within timeout") - .expect("task panicked"); + let recs = fail_records.lock().unwrap(); + assert_eq!(recs.len(), 2); + + let slot5 = recs.iter().find(|r| r.duty == attester(5)).unwrap(); + assert!(slot5.failed); + // No events stored for slot 5 (filtered): analysis sees an empty map. + assert_eq!( + slot5.step, + Step::Zero, + "slot-5 was filtered: no events in map" + ); + + let slot15 = recs.iter().find(|r| r.duty == attester(15)).unwrap(); + assert!(slot15.failed); + // Slot-15 fetcher event was stored and analysed (fails at fetcher, no + // completion). + assert_eq!(slot15.step, Step::Fetcher, "slot-15 events were accepted"); } #[tokio::test] @@ -556,35 +813,294 @@ mod tests { .expect("task panicked"); } + // ── Integration tests ──────────────────────────────────────────────────── + + /// Sends a fetcher event and a consensus event with an error, triggers the + /// analyser, and verifies the failure is reported at the consensus step. #[tokio::test] - async fn fan_out_sends_one_event_per_pubkey() { + async fn tracker_failed_duty_fail_at_consensus() { + use crate::tracker::reason::REASON_NO_CONSENSUS; + let cancel = CancellationToken::new(); - let (analyser, analyser_rx) = - DeadlinerTask::start(cancel.clone(), "analyser", FutureCalculator); - let (deleter, deleter_rx) = - DeadlinerTask::start(cancel.clone(), "deleter", FutureCalculator); - let handle = TrackerService::start_with_buffer( - cancel.clone(), - analyser, - AnalyserRx(analyser_rx), - deleter, - DeleterRx(deleter_rx), - vec![], + let duty = attester(1); + let keys = [pubkey(), PubKey::from([2u8; 48]), PubKey::from([3u8; 48])]; + + let fail_records: std::sync::Arc>> = Default::default(); + let part_records: std::sync::Arc>> = Default::default(); + + let (handle, analyser_tx, deleter_tx) = start_test_tracker( + &cancel, 0, - 1, + Box::new(RecordingFailureReporter { + records: fail_records.clone(), + cancel: cancel.clone(), + trigger_on: 1, + }), + Box::new(RecordingParticipationReporter { + records: part_records.clone(), + cancel: cancel.clone(), + trigger_on: usize::MAX, + }), ); + let consensus_err: StepError = + std::sync::Arc::new(std::io::Error::other("consensus error")); + handle.fetcher_fetched(duty.clone(), &keys, None).await; + handle + .consensus_proposed(duty.clone(), &keys, Some(consensus_err)) + .await; + tokio::task::yield_now().await; + + analyser_tx.send(duty.clone()).await.unwrap(); + tokio::task::yield_now().await; + // Cancel fires inside the sink; deleter send may race — ignore errors. + let _ = deleter_tx.send(duty.clone()).await; + + wait_for_task(handle).await; + + let recs = fail_records.lock().unwrap(); + assert_eq!(recs.len(), 1); + assert_eq!(recs[0].duty, duty); + assert!(recs[0].failed); + assert_eq!(recs[0].step, Step::Consensus); + assert_eq!(recs[0].reason, REASON_NO_CONSENSUS); + + let part = part_records.lock().unwrap(); + assert_eq!(part.len(), 1); + assert!(part[0].failed); + } + + /// Sends a broadcast (Bcast) event with no error — the terminal step for + /// an Attester duty — and verifies the duty is reported as successful. + #[tokio::test] + async fn tracker_failed_duty_success() { + let cancel = CancellationToken::new(); + let duty = attester(1); let keys = [pubkey(), PubKey::from([2u8; 48]), PubKey::from([3u8; 48])]; - handle.fetcher_fetched(attester(1), &keys, None).await; - handle.consensus_proposed(attester(1), &keys, None).await; + let fail_records: std::sync::Arc>> = Default::default(); + let part_records: std::sync::Arc>> = Default::default(); + + let (handle, analyser_tx, deleter_tx) = start_test_tracker( + &cancel, + 0, + Box::new(RecordingFailureReporter { + records: fail_records.clone(), + cancel: cancel.clone(), + trigger_on: 1, + }), + Box::new(RecordingParticipationReporter { + records: part_records.clone(), + cancel: cancel.clone(), + trigger_on: usize::MAX, + }), + ); + + handle + .broadcaster_broadcast(duty.clone(), &keys, None) + .await; tokio::task::yield_now().await; - cancel.cancel(); - let raw = Arc::try_unwrap(handle).unwrap_or_else(|_| panic!("single Arc owner in test")); - tokio::time::timeout(Duration::from_secs(1), raw.task) - .await - .expect("task did not exit within timeout") - .expect("task panicked"); + analyser_tx.send(duty.clone()).await.unwrap(); + tokio::task::yield_now().await; + let _ = deleter_tx.send(duty.clone()).await; + + wait_for_task(handle).await; + + let recs = fail_records.lock().unwrap(); + assert_eq!(recs.len(), 1); + assert_eq!(recs[0].duty, duty); + assert!(!recs[0].failed); + assert_eq!(recs[0].step, Step::Zero); + + let part = part_records.lock().unwrap(); + assert_eq!(part.len(), 1); + assert!(!part[0].failed); + } + + /// A partial-signature event arrives for a peer whose share index has no + /// corresponding fetcher event, so it is counted as unexpected rather than + /// participated. + #[tokio::test] + async fn unexpected_participation() { + const UNEXPECTED_PEER: u64 = 2; + let cancel = CancellationToken::new(); + let duty = attester(123); + let pk = pubkey(); + + let part_records: std::sync::Arc>> = Default::default(); + + let (handle, analyser_tx, deleter_tx) = start_test_tracker( + &cancel, + 0, + Box::new(NopFailureReporter), + Box::new(RecordingParticipationReporter { + records: part_records.clone(), + cancel: cancel.clone(), + trigger_on: 1, + }), + ); + + handle + .par_sig_db_stored_external(duty.clone(), &par_sig_set(&[pk], UNEXPECTED_PEER), None) + .await; + tokio::task::yield_now().await; + + analyser_tx.send(duty.clone()).await.unwrap(); + tokio::task::yield_now().await; + let _ = deleter_tx.send(duty.clone()).await; + + wait_for_task(handle).await; + + let recs = part_records.lock().unwrap(); + assert_eq!(recs.len(), 1); + assert_eq!(recs[0].duty, duty); + assert!(recs[0].failed); + assert_eq!(recs[0].participated, HashMap::new()); + assert_eq!(recs[0].unexpected, HashMap::from([(UNEXPECTED_PEER, 1)])); + } + + /// When Proposer events are deleted before Randao is analysed, the Randao + /// partial signature cannot be cross-referenced to a scheduled Proposer + /// duty and must be counted as unexpected. + #[tokio::test] + async fn duty_randao_unexpected() { + const VALID_PEER: u64 = 1; + let cancel = CancellationToken::new(); + let slot = SlotNumber::new(123); + let duty_proposer = Duty::new_proposer_duty(slot); + let duty_randao = Duty::new_randao_duty(slot); + let pk = pubkey(); + + let part_records: std::sync::Arc>> = Default::default(); + + let (handle, analyser_tx, deleter_tx) = start_test_tracker( + &cancel, + 0, + Box::new(NopFailureReporter), + Box::new(RecordingParticipationReporter { + records: part_records.clone(), + cancel: cancel.clone(), + trigger_on: 2, + }), + ); + + let fetch_err: StepError = + std::sync::Arc::new(std::io::Error::other("failed to query randao")); + handle + .fetcher_fetched(duty_proposer.clone(), &[pk], Some(fetch_err)) + .await; + handle + .par_sig_db_stored_external(duty_randao.clone(), &par_sig_set(&[pk], VALID_PEER), None) + .await; + tokio::task::yield_now().await; + + analyser_tx.send(duty_proposer.clone()).await.unwrap(); + tokio::task::yield_now().await; + deleter_tx.send(duty_proposer.clone()).await.unwrap(); + tokio::task::yield_now().await; + // Cancel fires after both records are received; send may race. + let _ = analyser_tx.send(duty_randao.clone()).await; + + wait_for_task(handle).await; + + let recs = part_records.lock().unwrap(); + let randao_rec = recs + .iter() + .find(|r| r.duty == duty_randao) + .expect("randao record"); + assert!(randao_rec.failed); + assert_eq!(randao_rec.participated, HashMap::new()); + assert_eq!(randao_rec.unexpected, HashMap::from([(VALID_PEER, 1)])); + } + + /// When Proposer events are still present when Randao is analysed, the + /// Randao partial signature is cross-referenced to the scheduled Proposer + /// duty and counted as normal participation (not unexpected). + #[tokio::test] + async fn duty_randao_expected() { + const VALID_PEER: u64 = 1; + let cancel = CancellationToken::new(); + let slot = SlotNumber::new(123); + let duty_proposer = Duty::new_proposer_duty(slot); + let duty_randao = Duty::new_randao_duty(slot); + let pk = pubkey(); + + let part_records: std::sync::Arc>> = Default::default(); + + let (handle, analyser_tx, deleter_tx) = start_test_tracker( + &cancel, + 0, + Box::new(NopFailureReporter), + Box::new(RecordingParticipationReporter { + records: part_records.clone(), + cancel: cancel.clone(), + trigger_on: 2, + }), + ); + + let fetch_err: StepError = + std::sync::Arc::new(std::io::Error::other("failed to query randao")); + handle + .fetcher_fetched(duty_proposer.clone(), &[pk], Some(fetch_err)) + .await; + handle + .par_sig_db_stored_external(duty_randao.clone(), &par_sig_set(&[pk], VALID_PEER), None) + .await; + tokio::task::yield_now().await; + + analyser_tx.send(duty_proposer.clone()).await.unwrap(); + tokio::task::yield_now().await; + analyser_tx.send(duty_randao.clone()).await.unwrap(); + tokio::task::yield_now().await; + // Cancel fires after the randao record; deleter send may race. + let _ = deleter_tx.send(duty_proposer.clone()).await; + + wait_for_task(handle).await; + + let recs = part_records.lock().unwrap(); + let randao_rec = recs + .iter() + .find(|r| r.duty == duty_randao) + .expect("randao record"); + assert!(randao_rec.failed); + assert_eq!(randao_rec.participated, HashMap::from([(VALID_PEER, 1)])); + assert_eq!(randao_rec.unexpected, HashMap::new()); + } + + #[tokio::test] + async fn fan_out_sends_one_event_per_pubkey() { + let cancel = CancellationToken::new(); + let duty = attester(1); + let keys = [pubkey(), PubKey::from([2u8; 48]), PubKey::from([3u8; 48])]; + + let part_records: std::sync::Arc>> = Default::default(); + + let (handle, analyser_tx, deleter_tx) = start_test_tracker( + &cancel, + 0, + Box::new(NopFailureReporter), + Box::new(RecordingParticipationReporter { + records: part_records.clone(), + cancel: cancel.clone(), + trigger_on: 1, + }), + ); + + handle.fetcher_fetched(duty.clone(), &keys, None).await; + handle.consensus_proposed(duty.clone(), &keys, None).await; + tokio::task::yield_now().await; + + analyser_tx.send(duty.clone()).await.unwrap(); + tokio::task::yield_now().await; + let _ = deleter_tx.send(duty.clone()).await; + + wait_for_task(handle).await; + + let recs = part_records.lock().unwrap(); + assert_eq!(recs.len(), 1); + // analyse_participation counts distinct pubkeys across all stored events; + // expected_per_peer==3 proves each key produced its own event entry. + assert_eq!(recs[0].expected_per_peer, 3); } } diff --git a/crates/core/src/tracker/reporters.rs b/crates/core/src/tracker/reporters.rs new file mode 100644 index 00000000..48b323ca --- /dev/null +++ b/crates/core/src/tracker/reporters.rs @@ -0,0 +1,427 @@ +//! Reporters that consume duty analysis results and emit metrics + logs. + +use std::collections::HashMap; + +use crate::{ + tracker::{ + PeerInfo, + analysis::{DutyFailure, ParSigsByMsg, expect_inconsistent_par_sigs, msg_roots_consistent}, + metrics::TRACKER_METRICS, + reason::{REASON_SYNC_CONTRIBUTION_ZERO_PREPARES, REASON_ZERO_AGGREGATOR_SELECTIONS}, + step::Step, + }, + types::{Duty, DutyType}, +}; + +pub(crate) trait DutyResultReporter: Send { + fn report(&mut self, duty: &Duty, failed: bool, result: &DutyFailure); +} + +pub(crate) trait ParticipationReporter: Send { + fn report( + &mut self, + duty: &Duty, + failed: bool, + participated: &HashMap, + unexpected: &HashMap, + expected_per_peer: usize, + ); +} + +/// Logs and reports failed/successful duties to Prometheus. +pub struct MetricsDutyReporter; + +impl MetricsDutyReporter { + /// Creates a reporter and zero-initialises per-duty-type counters so that + /// Prometheus exports them even before the first event fires. + pub fn new() -> Self { + for dt in DutyType::all() { + let dt_str = dt.to_string(); + TRACKER_METRICS.failed_duties_total[&dt_str].inc_by(0); + TRACKER_METRICS.success_duties_total[&dt_str].inc_by(0); + TRACKER_METRICS.expect_duties_total[&dt_str].inc_by(0); + } + Self + } + + /// Reports the outcome of a duty: logs a warning on failure and updates + /// per-duty counters. On success only `result.step` is read. + pub fn report(&self, duty: &Duty, failed: bool, result: &DutyFailure) { + if !failed { + // Skip fetcher-level success counts to avoid double-counting duties + // (matches Go's TODO around aggregator detection). + if result.step == Step::Fetcher { + return; + } + let dt = duty.duty_type.to_string(); + TRACKER_METRICS.expect_duties_total[&dt].inc(); + TRACKER_METRICS.success_duties_total[&dt].inc(); + return; + } + + match result.err.as_ref() { + Some(e) => tracing::warn!( + step = %result.step, + reason = %result.reason.short, + reason_code = %result.reason.code, + error = %e, + duty = %duty, + "Duty failed", + ), + None => tracing::warn!( + step = %result.step, + reason = %result.reason.short, + reason_code = %result.reason.code, + duty = %duty, + "Duty failed", + ), + } + + let dt = duty.duty_type.to_string(); + TRACKER_METRICS.expect_duties_total[&dt].inc(); + TRACKER_METRICS.failed_duties_total[&dt].inc(); + TRACKER_METRICS.failed_duty_reasons_total[&(dt, result.reason.code.to_string())].inc(); + } +} + +impl Default for MetricsDutyReporter { + fn default() -> Self { + Self::new() + } +} + +impl DutyResultReporter for MetricsDutyReporter { + fn report(&mut self, duty: &Duty, failed: bool, result: &DutyFailure) { + MetricsDutyReporter::report(self, duty, failed, result); + } +} + +/// Suppresses repeated noise from duty types unsupported by the cluster's VCs +/// (attestation aggregation, sync committee contribution). +/// +/// Mirrors Go's `newUnsupportedIgnorer` closure. +pub struct UnsupportedIgnorer { + logged_no_aggregator: bool, + logged_no_contribution: bool, + aggregation_supported: bool, + contribution_supported: bool, +} + +impl UnsupportedIgnorer { + /// Creates a fresh ignorer with no logged state. + pub fn new() -> Self { + Self { + logged_no_aggregator: false, + logged_no_contribution: false, + aggregation_supported: false, + contribution_supported: false, + } + } + + /// Returns true if this duty failure should be ignored — i.e. it's an + /// unsupported feature we've already warned about. Also tracks + /// successful aggregator/sync-contribution duties so future failures + /// aren't silenced. + pub fn check(&mut self, duty: &Duty, outcome: Option<&DutyFailure>) -> bool { + let Some(f) = outcome else { + if duty.duty_type == DutyType::Aggregator { + self.aggregation_supported = true; + } + if duty.duty_type == DutyType::SyncContribution { + self.contribution_supported = true; + } + return false; + }; + + if !self.aggregation_supported + && duty.duty_type == DutyType::Aggregator + && f.step == Step::Fetcher + && f.reason == REASON_ZERO_AGGREGATOR_SELECTIONS + { + if !self.logged_no_aggregator { + tracing::warn!( + "Ignoring attestation aggregation failures since VCs do not seem to support beacon committee selection aggregation", + ); + } + self.logged_no_aggregator = true; + return true; + } + + if !self.contribution_supported + && duty.duty_type == DutyType::SyncContribution + && f.step == Step::Fetcher + && f.reason == REASON_SYNC_CONTRIBUTION_ZERO_PREPARES + { + if !self.logged_no_contribution { + tracing::warn!( + "Ignoring sync contribution failures since VCs do not seem to support sync committee selection aggregation", + ); + } + self.logged_no_contribution = true; + return true; + } + + false + } +} + +impl Default for UnsupportedIgnorer { + fn default() -> Self { + Self::new() + } +} + +/// Reports per-peer duty participation to metrics and logs absence changes. +pub struct MetricsParticipationReporter { + peers: Vec, + prev_absent: HashMap>, +} + +impl MetricsParticipationReporter { + /// Creates a reporter and zero-initialises per-peer × per-duty counters + /// so that Prometheus exports them before the first event. + pub fn new(peers: Vec) -> Self { + for dt in DutyType::all() { + let dt_str = dt.to_string(); + for peer in &peers { + let labels = (dt_str.clone(), peer.name.clone()); + TRACKER_METRICS.participation_success_total[&labels].inc_by(0); + TRACKER_METRICS.participation_missed_total[&labels].inc_by(0); + TRACKER_METRICS.participation_expected_total[&labels].inc_by(0); + } + } + Self { + peers, + prev_absent: HashMap::new(), + } + } + + /// Reports per-peer participation for a duty: updates counters, sets the + /// participation gauge, and logs absence changes. + pub fn report( + &mut self, + duty: &Duty, + failed: bool, + participated: &HashMap, + unexpected: &HashMap, + // Distinct validator pubkeys that had any event for this duty (matches + // Go's pubkeyMapLen). For aggregator duties this may be fewer than the + // cluster's total validator count if only some validators were selected. + expected_per_peer: usize, + ) { + // Suppress no-op duties (e.g. aggregator slots with no selected peer) + // unless the duty actually failed. + if participated.is_empty() && !failed { + return; + } + + let mut absent: Vec = Vec::new(); + let dt_str = duty.duty_type.to_string(); + + for peer in &self.peers { + let share_idx = peer.share_idx as u64; + let part = participated.get(&share_idx).copied().unwrap_or(0); + let unexp = unexpected.get(&share_idx).copied().unwrap_or(0); + + let labels = (dt_str.clone(), peer.name.clone()); + TRACKER_METRICS.participation_success_total[&labels].inc_by(part as u64); + TRACKER_METRICS.participation_expected_total[&labels].inc_by(expected_per_peer as u64); + TRACKER_METRICS.participation_missed_total[&labels] + .inc_by(expected_per_peer.saturating_sub(part) as u64); + + if part > 0 { + TRACKER_METRICS.participation[&labels].set(1); + } else if unexp > 0 { + tracing::warn!( + peer = %peer.name, + duty = %duty, + "Unexpected event found", + ); + TRACKER_METRICS.unexpected_events_total[&peer.name].inc_by(unexp as u64); + } else { + absent.push(peer.name.clone()); + TRACKER_METRICS.participation[&labels].set(0); + } + } + + // Only log when the absent set changes from the previous duty of this + // type, to avoid log spam every slot. + if self.prev_absent.get(&duty.duty_type) != Some(&absent) { + if absent.is_empty() { + tracing::info!(duty = %duty, "All peers participated in duty"); + } else if absent.len() == self.peers.len() { + tracing::info!(duty = %duty, "No peers participated in duty"); + } else { + tracing::info!(duty = %duty, absent = ?absent, "Not all peers participated in duty"); + } + } + + self.prev_absent.insert(duty.duty_type.clone(), absent); + } +} + +impl ParticipationReporter for MetricsParticipationReporter { + fn report( + &mut self, + duty: &Duty, + failed: bool, + participated: &HashMap, + unexpected: &HashMap, + expected_per_peer: usize, + ) { + MetricsParticipationReporter::report( + self, + duty, + failed, + participated, + unexpected, + expected_per_peer, + ); + } +} + +/// Reports inconsistent partial signature data across peers. +pub fn report_par_sigs(duty: &Duty, parsigs: &ParSigsByMsg) { + if msg_roots_consistent(parsigs) { + return; + } + + TRACKER_METRICS.inconsistent_parsigs_total[&duty.duty_type.to_string()].inc(); + + for (pubkey, by_root) in parsigs { + // Intentional fix over Go: Go checks len(parsigMsgs) (the outer map, i.e. + // number of pubkeys) instead of the per-pubkey root count, so it + // silently skips logging when only one pubkey has inconsistent roots + // (tracker.go:851). + if by_root.len() <= 1 { + continue; + } + + let groups: Vec<(String, Vec)> = by_root + .iter() + .map(|(root, sigs)| { + let indexes: Vec = sigs.iter().map(|s| s.share_idx).collect(); + (hex::encode(root), indexes) + }) + .collect(); + + if expect_inconsistent_par_sigs(&duty.duty_type) { + tracing::debug!( + pubkey = %pubkey, + duty = %duty, + ?groups, + "Inconsistent sync committee partial signed data", + ); + } else { + tracing::warn!( + pubkey = %pubkey, + duty = %duty, + ?groups, + "Inconsistent partial signed data", + ); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + tracker::reason::{REASON_BUG_AGGREGATION_ERROR, REASON_UNKNOWN}, + types::SlotNumber, + }; + + /// The ignorer is stateful, so order + /// matters across assertions. + #[test] + fn unsupported_ignorer_state_machine() { + let mut ignorer = UnsupportedIgnorer::new(); + + // Attester with non-aggregator reason is never ignored. + assert!(!ignorer.check( + &Duty::new_attester_duty(SlotNumber::new(123)), + Some(&DutyFailure { + step: Step::SigAgg, + reason: REASON_BUG_AGGREGATION_ERROR, + err: None + }), + )); + + // First Aggregator / Fetcher / ZeroAggregatorSelections failure is ignored. + assert!(ignorer.check( + &Duty::new_aggregator_duty(SlotNumber::new(123)), + Some(&DutyFailure { + step: Step::Fetcher, + reason: REASON_ZERO_AGGREGATOR_SELECTIONS, + err: None + }), + )); + + // A successful Aggregator marks aggregation as supported. + assert!(!ignorer.check(&Duty::new_aggregator_duty(SlotNumber::new(123)), None,)); + + // After aggregation_supported is true, future Aggregator failures + // are no longer ignored. + assert!(!ignorer.check( + &Duty::new_aggregator_duty(SlotNumber::new(123)), + Some(&DutyFailure { + step: Step::Fetcher, + reason: REASON_ZERO_AGGREGATOR_SELECTIONS, + err: None + }), + )); + + // First SyncContribution / Fetcher / ZeroPrepares failure is ignored. + assert!(ignorer.check( + &Duty::new_sync_contribution_duty(SlotNumber::new(123)), + Some(&DutyFailure { + step: Step::Fetcher, + reason: REASON_SYNC_CONTRIBUTION_ZERO_PREPARES, + err: None + }), + )); + + // A successful SyncContribution marks contribution as supported. + assert!(!ignorer.check( + &Duty::new_sync_contribution_duty(SlotNumber::new(123)), + None, + )); + + // Subsequent SyncContribution failures are no longer ignored. + assert!(!ignorer.check( + &Duty::new_sync_contribution_duty(SlotNumber::new(123)), + Some(&DutyFailure { + step: Step::Fetcher, + reason: REASON_SYNC_CONTRIBUTION_ZERO_PREPARES, + err: None + }), + )); + } + + /// Unrelated reasons / steps are never ignored regardless of internal + /// state. + #[test] + fn unsupported_ignorer_passes_unrelated_failures() { + let mut ignorer = UnsupportedIgnorer::new(); + + // Aggregator failure with a different reason → not ignored. + assert!(!ignorer.check( + &Duty::new_aggregator_duty(SlotNumber::new(1)), + Some(&DutyFailure { + step: Step::Fetcher, + reason: REASON_UNKNOWN, + err: None + }), + )); + + // SyncContribution failure at a non-Fetcher step → not ignored. + assert!(!ignorer.check( + &Duty::new_sync_contribution_duty(SlotNumber::new(1)), + Some(&DutyFailure { + step: Step::Consensus, + reason: REASON_SYNC_CONTRIBUTION_ZERO_PREPARES, + err: None + }), + )); + } +} diff --git a/crates/core/src/tracker/step.rs b/crates/core/src/tracker/step.rs index 26bef0a5..1e32db4d 100644 --- a/crates/core/src/tracker/step.rs +++ b/crates/core/src/tracker/step.rs @@ -4,7 +4,7 @@ use std::fmt::Display; /// /// Variants are ordered by their position in the workflow; this ordering is /// used when scanning backwards to find the last reached step. -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] #[repr(u8)] pub enum Step { /// No step reached (zero value).