diff --git a/Cargo.lock b/Cargo.lock index ecab19d1..87ef2c29 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5625,6 +5625,7 @@ dependencies = [ "thiserror 2.0.18", "tokio", "tokio-util", + "tower", "tracing", "tree_hash", "vise", diff --git a/Cargo.toml b/Cargo.toml index ac569187..bd7beb79 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -106,6 +106,7 @@ tree_hash_derive = "0.12" tar = "0.4" flate2 = "1.1" wiremock = "0.6" +tower = "0.5" sysinfo = "0.33" quick-xml = { version = "0.39", features = ["serialize"] } diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 60a2130c..fb816537 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -56,6 +56,7 @@ pluto-testutil.workspace = true pluto-tracing.workspace = true tokio = { workspace = true, features = ["test-util"] } wiremock.workspace = true +tower = { workspace = true, features = ["util"] } [build-dependencies] pluto-build-proto.workspace = true diff --git a/crates/core/src/dutydb/memory.rs b/crates/core/src/dutydb/memory.rs index 01a68b86..abcc6c16 100644 --- a/crates/core/src/dutydb/memory.rs +++ b/crates/core/src/dutydb/memory.rs @@ -49,6 +49,12 @@ pub enum Error { #[error("dutydb shutdown: query could not be answered")] Shutdown, + /// The awaited duty was evicted before its unsigned data became + /// available. Distinct from `Shutdown` so callers can map this to a + /// timeout-style error rather than a service-down error. + #[error("dutydb: awaited duty expired before data was stored")] + AwaitDutyExpired, + /// Two validators share the same `(slot, committee_index, valIdx)` with /// different public keys. #[error( @@ -177,6 +183,17 @@ struct ContribKey { root: phase0::Root, } +/// Per-poll outcome handed back by an `await_data` lookup closure. +enum Lookup { + /// The awaited value is now present — return it to the caller. + Found(V), + /// The awaited duty has been evicted; the lookup will never succeed. + /// `await_data` returns [`Error::AwaitDutyExpired`]. + Evicted, + /// Neither stored nor evicted yet — park on the notify and retry. + Pending, +} + struct State { attestation_duties: HashMap, attestation_pub_keys: HashMap, @@ -190,6 +207,27 @@ struct State { contrib_duties: HashMap, contrib_keys_by_slot: HashMap>, + /// Highest slot whose attester duty has been evicted by the deadliner. + /// Because the deadliner expires duties in non-decreasing slot order and + /// `store()` refuses already-expired duties (`AddOutcome::AlreadyExpired`), + /// any awaited slot `<=` this mark that is not currently stored will never + /// be stored. Tracking only the high-water mark — rather than the set of + /// every evicted slot, which would grow without bound for the lifetime of + /// the node — lets `await_attestation` return `AwaitDutyExpired` + /// immediately for a gone duty while keeping the bookkeeping O(1) in + /// memory. + max_evicted_attestation_slot: Option, + /// Highest slot whose proposer duty has been evicted. See + /// [`max_evicted_attestation_slot`](Self::max_evicted_attestation_slot). + max_evicted_proposer_slot: Option, + /// Highest slot whose sync-contribution duty has been evicted. See + /// [`max_evicted_attestation_slot`](Self::max_evicted_attestation_slot). + max_evicted_contrib_slot: Option, + // NB: there is no eviction mark for aggregated attestations. They are + // awaited by root only (`await_agg_attestation` has no slot), so there is + // no slot to compare against a high-water mark; an evicted root relies on + // the caller's request timeout instead, matching Charon's Go dutydb which + // keeps no eviction record at all. deadliner_rx: tokio::sync::mpsc::Receiver, } @@ -225,6 +263,9 @@ impl MemDB { aggregation_keys_by_slot: HashMap::new(), contrib_duties: HashMap::new(), contrib_keys_by_slot: HashMap::new(), + max_evicted_attestation_slot: None, + max_evicted_proposer_slot: None, + max_evicted_contrib_slot: None, deadliner_rx, }), attestation_notify: Notify::new(), @@ -272,7 +313,6 @@ impl MemDB { Some(UnsignedDutyData::Proposal(p)) => state.store_proposal(p)?, Some(_) => return Err(Error::InvalidVersionedProposal), } - self.proposer_notify.notify_waiters(); } DutyType::Attester => { for (pubkey, data) in &unsigned_set { @@ -282,7 +322,6 @@ impl MemDB { }; state.store_attestation(*pubkey, att)?; } - self.attestation_notify.notify_waiters(); } DutyType::Aggregator => { for data in unsigned_set.values() { @@ -292,7 +331,6 @@ impl MemDB { }; state.store_agg_attestation(agg)?; } - self.aggregation_notify.notify_waiters(); } DutyType::SyncContribution => { for data in unsigned_set.values() { @@ -302,24 +340,54 @@ impl MemDB { }; state.store_sync_contribution(contrib)?; } - self.contrib_notify.notify_waiters(); } _ => return Err(Error::UnsupportedDutyType), } - - // Drain all expired duties that the deadliner has sent. + // Wake the matching notify for the duty we just stored, plus + // anything we drain below. `notify_waiters` is cheap if no one is + // parked and just bumps a counter, so calling it under the write + // lock is harmless — woken tasks block on `state.read()` until we + // drop. + self.wake(duty.duty_type); + + // Drain all expired duties that the deadliner has sent. Waiters + // whose duty just expired need to see `Lookup::Evicted` and exit, + // not re-park — so we wake the matching notify after each eviction. while let Ok(expired) = state.deadliner_rx.try_recv() { + let duty_type = expired.duty_type.clone(); state.delete_duty(expired)?; + self.wake(duty_type); } Ok(()) } + /// Wakes the [`Notify`] paired with `duty_type`. No-op for duty types + /// the DB doesn't track (e.g. `Exit`, `BuilderRegistration`). + fn wake(&self, duty_type: DutyType) { + let notify = match duty_type { + DutyType::Proposer => &self.proposer_notify, + DutyType::Attester => &self.attestation_notify, + DutyType::Aggregator => &self.aggregation_notify, + DutyType::SyncContribution => &self.contrib_notify, + _ => return, + }; + notify.notify_waiters(); + } + /// Blocks until a proposal for the given slot is available, then returns /// it. pub async fn await_proposal(&self, slot: u64) -> Result { - self.await_data(&self.proposer_notify, |s| s.proposer_duties.get(&slot)) - .await + self.await_data(&self.proposer_notify, |s| { + if let Some(v) = s.proposer_duties.get(&slot) { + Lookup::Found(v.clone()) + } else if s.max_evicted_proposer_slot.is_some_and(|hw| slot <= hw) { + Lookup::Evicted + } else { + Lookup::Pending + } + }) + .await } /// Blocks until attestation data for the given slot and committee index is @@ -333,8 +401,19 @@ impl MemDB { slot, committee_index, }; - self.await_data(&self.attestation_notify, |s| s.attestation_duties.get(&key)) - .await + self.await_data(&self.attestation_notify, |s| { + if let Some(v) = s.attestation_duties.get(&key) { + Lookup::Found(v.clone()) + } else if s + .max_evicted_attestation_slot + .is_some_and(|hw| key.slot <= hw) + { + Lookup::Evicted + } else { + Lookup::Pending + } + }) + .await } /// Blocks until an aggregated attestation for the given slot and @@ -347,7 +426,14 @@ impl MemDB { root: attestation_root, }; self.await_data(&self.aggregation_notify, |s| { - s.aggregation_duties.get(&key).map(|a| &a.0) + // Awaited by root only, so there is no slot to test against an + // eviction high-water mark: an evicted root relies on the caller's + // request timeout to terminate (matching Charon's Go dutydb). + if let Some(v) = s.aggregation_duties.get(&key) { + Lookup::Found(v.0.clone()) + } else { + Lookup::Pending + } }) .await } @@ -365,8 +451,16 @@ impl MemDB { subcommittee_index, root: beacon_block_root, }; - self.await_data(&self.contrib_notify, |s| s.contrib_duties.get(&key)) - .await + self.await_data(&self.contrib_notify, |s| { + if let Some(v) = s.contrib_duties.get(&key) { + Lookup::Found(v.clone()) + } else if s.max_evicted_contrib_slot.is_some_and(|hw| slot <= hw) { + Lookup::Evicted + } else { + Lookup::Pending + } + }) + .await } // A single Notify per duty type wakes all waiters on every store, not only @@ -374,22 +468,26 @@ impl MemDB { // is small (one per validator), so the extra wakeups are cheap. A keyed // notify (HashMap) would avoid them but adds complexity that // isn't worth it here. + // + // `delete_duty` also wakes the notify so waiters whose duty just expired + // exit immediately via the `Lookup::Evicted` branch, instead of parking + // for another `notify_waiters` call or for the per-request timeout in + // the caller. async fn await_data( &self, notify: &Notify, - lookup: impl for<'s> Fn(&'s State) -> Option<&'s V>, - ) -> Result - where - V: Clone, - { + lookup: impl Fn(&State) -> Lookup, + ) -> Result { loop { let notified = notify.notified(); tokio::pin!(notified); { let state = self.state.read().await; - if let Some(v) = lookup(&state) { - return Ok(v.clone()); + match lookup(&state) { + Lookup::Found(v) => return Ok(v), + Lookup::Evicted => return Err(Error::AwaitDutyExpired), + Lookup::Pending => {} } } @@ -571,12 +669,20 @@ impl State { Ok(()) } + /// Raises an eviction high-water mark to `slot` if `slot` is newer (or the + /// mark is unset). The deadliner expires duties in non-decreasing slot + /// order, so in practice this only ever moves the mark forward. + fn bump_high_water(mark: &mut Option, slot: u64) { + *mark = Some(mark.map_or(slot, |current| current.max(slot))); + } + fn delete_duty(&mut self, duty: Duty) -> Result<()> { let slot = duty.slot.inner(); info!(slot, duty_type = %duty.duty_type, "dutydb: deleting expired duty"); match duty.duty_type { DutyType::Proposer => { self.proposer_duties.remove(&slot); + Self::bump_high_water(&mut self.max_evicted_proposer_slot, slot); } DutyType::BuilderProposer => return Err(Error::DeprecatedDutyBuilderProposer), DutyType::Attester => { @@ -589,8 +695,11 @@ impl State { }); } } + Self::bump_high_water(&mut self.max_evicted_attestation_slot, slot); } DutyType::Aggregator => { + // No eviction mark: aggregated attestations are awaited by root + // only, so there is nothing for a slot high-water mark to gate. if let Some(keys) = self.aggregation_keys_by_slot.remove(&slot) { for key in keys { self.aggregation_duties.remove(&key); @@ -603,6 +712,7 @@ impl State { self.contrib_duties.remove(&key); } } + Self::bump_high_water(&mut self.max_evicted_contrib_slot, slot); } _ => return Err(Error::UnknownDutyType), } @@ -1166,6 +1276,67 @@ mod tests { assert!(db.pub_key_by_attestation(SLOT, 0, 0).await.is_err()); } + /// After a slot is evicted, `await_attestation` must return + /// `AwaitDutyExpired` immediately (not park until the request timeout) for + /// that slot AND for any older slot — the eviction state is a single + /// high-water mark, so it stays O(1) in memory rather than accumulating one + /// entry per evicted slot for the lifetime of the node. + #[tokio::test] + async fn await_attestation_expired_after_eviction_high_water() { + let deadliner = far_future_handle(); + let (trim_tx, trim_rx) = channel::(64); + let db = make_db_with_deadliner(deadliner, trim_rx); + + const SLOT: u64 = 123; + + let mut set = UnsignedDataSet::new(); + set.insert( + random_core_pub_key(), + UnsignedDutyData::Attestation(att_data(SLOT, 0, 0)), + ); + db.store(Duty::new(SlotNumber::new(SLOT), DutyType::Attester), set) + .await + .unwrap(); + + // Evict SLOT, then trigger expiry processing with an unrelated store. + trim_tx + .send(Duty::new(SlotNumber::new(SLOT), DutyType::Attester)) + .await + .expect("trim_tx should be open"); + let mut set2 = UnsignedDataSet::new(); + set2.insert( + random_core_pub_key(), + UnsignedDutyData::Proposal(Box::new(phase0_proposal(SLOT.saturating_add(1), 0))), + ); + db.store( + Duty::new(SlotNumber::new(SLOT.saturating_add(1)), DutyType::Proposer), + set2, + ) + .await + .unwrap(); + + // The evicted slot resolves to AwaitDutyExpired without parking. + let timeout = std::time::Duration::from_secs(5); + let evicted = tokio::time::timeout(timeout, db.await_attestation(SLOT, 0)) + .await + .expect("await must not park for an evicted slot"); + assert!( + matches!(evicted, Err(Error::AwaitDutyExpired)), + "evicted slot: expected AwaitDutyExpired, got {evicted:?}" + ); + + // An older, never-stored slot is also below the high-water mark: its + // deadline has necessarily passed too, so it must fail fast rather than + // park — and we keep no per-slot record to answer this. + let older = tokio::time::timeout(timeout, db.await_attestation(SLOT.saturating_sub(1), 0)) + .await + .expect("await must not park for a slot below the eviction high-water"); + assert!( + matches!(older, Err(Error::AwaitDutyExpired)), + "older slot: expected AwaitDutyExpired, got {older:?}" + ); + } + #[tokio::test] async fn agg_attestation_two_roots_same_slot() { const SLOT: u64 = 300; diff --git a/crates/core/src/validatorapi/component.rs b/crates/core/src/validatorapi/component.rs index e0e609b1..e61df23b 100644 --- a/crates/core/src/validatorapi/component.rs +++ b/crates/core/src/validatorapi/component.rs @@ -4,30 +4,47 @@ //! and public-share mappings needed to translate between distributed-validator //! root keys and this node's threshold-BLS share. -use std::{collections::HashMap, sync::Arc}; +use std::{collections::HashMap, sync::Arc, time::Duration}; use async_trait::async_trait; use axum::http::StatusCode; use pluto_eth2api::{ - EthBeaconNodeApiClient, GetProposerDutiesRequest, GetProposerDutiesResponse, - spec::phase0::BLSPubKey, + EthBeaconNodeApiClient, GetAttesterDutiesRequest, GetAttesterDutiesResponse, + GetProposerDutiesRequest, GetProposerDutiesResponse, GetSyncCommitteeDutiesRequest, + GetSyncCommitteeDutiesResponse, spec::phase0::BLSPubKey, }; +use tokio::time::error::Elapsed; use super::{ error::ApiError, handler::Handler, types::{ - AggregateAttestationOpts, AttestationData, AttestationDataOpts, AttesterDutiesOpts, - AttesterDuty, BeaconCommitteeSelection, EthResponse, NodeVersionData, NodeVersionResponse, - ProposalOpts, ProposerDutiesOpts, ProposerDutiesResponse, ProposerDuty, - SignedContributionAndProof, SignedValidatorRegistration, SignedVoluntaryExit, - SyncCommitteeContribution, SyncCommitteeContributionOpts, SyncCommitteeDutiesOpts, + AggregateAttestationOpts, AttestationDataOpts, AttestationDataResponse, AttesterDutiesOpts, + AttesterDutiesResponse, AttesterDuty, BeaconCommitteeSelection, EthResponse, + NodeVersionData, NodeVersionResponse, ProposalOpts, ProposerDutiesOpts, + ProposerDutiesResponse, ProposerDuty, SignedContributionAndProof, + SignedValidatorRegistration, SignedVoluntaryExit, SyncCommitteeContribution, + SyncCommitteeContributionOpts, SyncCommitteeDutiesOpts, SyncCommitteeDutiesResponse, SyncCommitteeDuty, SyncCommitteeMessage, SyncCommitteeSelection, Validator, ValidatorsOpts, VersionedAttestation, VersionedProposal, VersionedSignedAggregateAndProof, VersionedSignedBlindedProposal, VersionedSignedProposal, }, }; -use crate::version; +use crate::{ + dutydb::{Error as DutyDbError, MemDB}, + version, +}; + +/// Hard deadline for upstream beacon-node calls. Bounds the worst-case +/// handler latency when the upstream hangs or stalls. Mirrors Charon's +/// `defaultRequestTimeout` (`core/validatorapi/router.go:61`). +const UPSTREAM_REQUEST_TIMEOUT: Duration = Duration::from_secs(10); + +/// Hard deadline for the `attestation_data` await on the local DutyDB. +/// Bounded so a request whose slot never produces consensus output cannot +/// hold a handler task indefinitely. Sized at roughly two slots so a real +/// attestation duty has time to flow through the pipeline. +const ATTESTATION_DATA_TIMEOUT: Duration = Duration::from_secs(24); /// Validator API [`Handler`] implementation. /// @@ -39,6 +56,9 @@ use crate::version; pub struct Component { /// Upstream beacon-node API client. eth2_cl: Arc, + /// In-memory DutyDB used to await consensus output (e.g. attestation + /// data) produced by the rest of the pipeline. + dutydb: Arc, /// Threshold BLS share index assigned to this node (1-indexed). #[allow(dead_code, reason = "consumed by submit_* handlers in later PRs")] share_idx: u64, @@ -62,12 +82,14 @@ impl Component { /// Builds a new component. pub fn new( eth2_cl: Arc, + dutydb: Arc, share_idx: u64, pub_share_by_pubkey: HashMap, builder_enabled: bool, ) -> Self { Self { eth2_cl, + dutydb, share_idx, pub_share_by_pubkey, builder_enabled, @@ -76,10 +98,18 @@ impl Component { } /// Builds a component that skips partial-signature verification on - /// submit endpoints. Test use only. - pub fn new_insecure(eth2_cl: Arc, share_idx: u64) -> Self { + /// submit endpoints. Gated to test builds — `insecure_test: true` must + /// never reach production, since later submit handlers consult this flag + /// to bypass signature checks. + #[cfg(test)] + pub fn new_insecure( + eth2_cl: Arc, + dutydb: Arc, + share_idx: u64, + ) -> Self { Self { eth2_cl, + dutydb, share_idx, pub_share_by_pubkey: HashMap::new(), builder_enabled: false, @@ -113,28 +143,38 @@ impl Handler for Component { .epoch(opts.epoch.to_string()) .build() .map_err(|err| { - ApiError::new(StatusCode::BAD_REQUEST, "invalid epoch").with_source( - std::io::Error::new(std::io::ErrorKind::InvalidInput, err.to_string()), - ) + ApiError::new(StatusCode::BAD_REQUEST, "invalid epoch") + .with_boxed_source(err.into()) })?; - let response = self - .eth2_cl - .get_proposer_duties(request) - .await - .map_err(|err| { - ApiError::new(StatusCode::BAD_GATEWAY, "upstream proposer duties failed") - .with_source(std::io::Error::other(err.to_string())) - })?; + let response = tokio::time::timeout( + UPSTREAM_REQUEST_TIMEOUT, + self.eth2_cl.get_proposer_duties(request), + ) + .await + .map_err(|_| upstream_timeout("proposer duties"))? + .map_err(|err| upstream_call_failed("proposer duties", err.into()))?; let mut payload = match response { GetProposerDutiesResponse::Ok(payload) => payload, - other => { - return Err(ApiError::new( - StatusCode::BAD_GATEWAY, - format!("unexpected upstream proposer duties response: {other:?}"), + GetProposerDutiesResponse::BadRequest(body) => { + return Err(upstream_status_error( + StatusCode::BAD_REQUEST, + "proposer duties", + body, )); } + GetProposerDutiesResponse::ServiceUnavailable(body) => { + return Err(upstream_status_error( + StatusCode::SERVICE_UNAVAILABLE, + "proposer duties", + body, + )); + } + other @ (GetProposerDutiesResponse::InternalServerError(_) + | GetProposerDutiesResponse::Unknown) => { + return Err(upstream_unexpected("proposer duties", other)); + } }; swap_proposer_pubshares(&mut payload.data, &self.pub_share_by_pubkey)?; @@ -144,23 +184,122 @@ impl Handler for Component { async fn attester_duties( &self, - _opts: AttesterDutiesOpts, - ) -> Result>, ApiError> { - unimplemented!("attester_duties not yet ported") + opts: AttesterDutiesOpts, + ) -> Result { + let request = GetAttesterDutiesRequest::builder() + .epoch(opts.epoch.to_string()) + .body(opts.indices) + .build() + .map_err(|err| { + ApiError::new(StatusCode::BAD_REQUEST, "invalid attester duties request") + .with_boxed_source(err.into()) + })?; + + let response = tokio::time::timeout( + UPSTREAM_REQUEST_TIMEOUT, + self.eth2_cl.get_attester_duties(request), + ) + .await + .map_err(|_| upstream_timeout("attester duties"))? + .map_err(|err| upstream_call_failed("attester duties", err.into()))?; + + let mut payload = match response { + GetAttesterDutiesResponse::Ok(payload) => payload, + GetAttesterDutiesResponse::BadRequest(body) => { + return Err(upstream_status_error( + StatusCode::BAD_REQUEST, + "attester duties", + body, + )); + } + GetAttesterDutiesResponse::ServiceUnavailable(body) => { + return Err(upstream_status_error( + StatusCode::SERVICE_UNAVAILABLE, + "attester duties", + body, + )); + } + other @ (GetAttesterDutiesResponse::InternalServerError(_) + | GetAttesterDutiesResponse::Unknown) => { + return Err(upstream_unexpected("attester duties", other)); + } + }; + + swap_attester_pubshares(&mut payload.data, &self.pub_share_by_pubkey)?; + + Ok(payload) } async fn sync_committee_duties( &self, - _opts: SyncCommitteeDutiesOpts, - ) -> Result>, ApiError> { - unimplemented!("sync_committee_duties not yet ported") + opts: SyncCommitteeDutiesOpts, + ) -> Result { + let request = GetSyncCommitteeDutiesRequest::builder() + .epoch(opts.epoch.to_string()) + .body(opts.indices) + .build() + .map_err(|err| { + ApiError::new( + StatusCode::BAD_REQUEST, + "invalid sync committee duties request", + ) + .with_boxed_source(err.into()) + })?; + + let response = tokio::time::timeout( + UPSTREAM_REQUEST_TIMEOUT, + self.eth2_cl.get_sync_committee_duties(request), + ) + .await + .map_err(|_| upstream_timeout("sync committee duties"))? + .map_err(|err| upstream_call_failed("sync committee duties", err.into()))?; + + let mut payload = match response { + GetSyncCommitteeDutiesResponse::Ok(payload) => payload, + GetSyncCommitteeDutiesResponse::BadRequest(body) => { + return Err(upstream_status_error( + StatusCode::BAD_REQUEST, + "sync committee duties", + body, + )); + } + GetSyncCommitteeDutiesResponse::ServiceUnavailable(body) => { + return Err(upstream_status_error( + StatusCode::SERVICE_UNAVAILABLE, + "sync committee duties", + body, + )); + } + other @ (GetSyncCommitteeDutiesResponse::InternalServerError(_) + | GetSyncCommitteeDutiesResponse::Unknown) => { + return Err(upstream_unexpected("sync committee duties", other)); + } + }; + + swap_sync_committee_pubshares(&mut payload.data, &self.pub_share_by_pubkey)?; + + Ok(payload) } async fn attestation_data( &self, - _opts: AttestationDataOpts, - ) -> Result, ApiError> { - unimplemented!("attestation_data not yet ported") + opts: AttestationDataOpts, + ) -> Result { + let data = tokio::time::timeout( + ATTESTATION_DATA_TIMEOUT, + self.dutydb + .await_attestation(opts.slot, opts.committee_index), + ) + .await + .map_err(|_: Elapsed| { + ApiError::new( + StatusCode::REQUEST_TIMEOUT, + "attestation data not available before deadline", + ) + })? + .map_err(map_dutydb_error)?; + + Ok(AttestationDataResponse { data }) } async fn submit_attestations( @@ -256,6 +395,81 @@ impl Handler for Component { } } +/// Builds the `ApiError` returned when an upstream beacon-node call elapses +/// past [`UPSTREAM_REQUEST_TIMEOUT`]. +fn upstream_timeout(endpoint: &'static str) -> ApiError { + ApiError::new( + StatusCode::GATEWAY_TIMEOUT, + format!("upstream {endpoint} timed out"), + ) +} + +/// Builds the `ApiError` returned when an upstream beacon-node call returns a +/// transport-level error. Boxed so `anyhow::Error` (which doesn't itself +/// implement `std::error::Error`) can be attached via `.into()`. +fn upstream_call_failed( + endpoint: &'static str, + err: Box, +) -> ApiError { + ApiError::new( + StatusCode::BAD_GATEWAY, + format!("upstream {endpoint} failed"), + ) + .with_boxed_source(err) +} + +/// Builds the `ApiError` returned when the upstream responds with a faithful +/// HTTP status that we propagate (e.g. 400, 503). The upstream body is +/// attached as a `source` for debug logging — never serialized into the +/// client-visible message. +fn upstream_status_error( + status: StatusCode, + endpoint: &'static str, + body: B, +) -> ApiError { + ApiError::new( + status, + format!("upstream {endpoint} returned {}", status.as_u16()), + ) + .with_source(std::io::Error::other(format!( + "upstream {endpoint} body: {body:?}" + ))) +} + +/// Builds the `ApiError` returned when the upstream responds with an +/// unexpected variant (e.g. `Unknown`, or `InternalServerError`). The variant +/// is attached as a `source` so the debug log retains it but the client +/// message stays generic. +fn upstream_unexpected(endpoint: &'static str, response: R) -> ApiError { + ApiError::new( + StatusCode::BAD_GATEWAY, + format!("unexpected upstream {endpoint} response"), + ) + .with_source(std::io::Error::other(format!( + "upstream {endpoint} variant: {response:?}" + ))) +} + +/// Maps a [`crate::dutydb::Error`] into the `ApiError` returned to the client +/// when an `attestation_data` await fails. `Shutdown` propagates as 503 so the +/// VC can retry; `AwaitDutyExpired` propagates as 408 — same as a timeout — +/// since the duty is gone and the data will never arrive. Anything else is a +/// programming error here and becomes 500. +fn map_dutydb_error(err: DutyDbError) -> ApiError { + let (status, message) = match err { + DutyDbError::Shutdown => (StatusCode::SERVICE_UNAVAILABLE, "dutydb is shutting down"), + DutyDbError::AwaitDutyExpired => ( + StatusCode::REQUEST_TIMEOUT, + "attestation duty expired before data was stored", + ), + _ => ( + StatusCode::INTERNAL_SERVER_ERROR, + "await attestation failed", + ), + }; + ApiError::new(status, message).with_source(err) +} + /// Rewrites each duty's root public key to this node's public share. Duties /// whose pubkey is not in `pub_share_by_pubkey` are passed through unchanged /// (the upstream returns all proposers for the epoch, not just ours). @@ -272,6 +486,48 @@ fn swap_proposer_pubshares( Ok(()) } +/// Like [`swap_proposer_pubshares`] but for attester duties. Attester duties +/// only ever come back for validators owned by this cluster, so an unknown +/// pubkey indicates a misconfiguration and is rejected. +fn swap_attester_pubshares( + duties: &mut [AttesterDuty], + pub_share_by_pubkey: &HashMap, +) -> Result<(), ApiError> { + for duty in duties { + let pubkey = parse_bls_pubkey(&duty.pubkey)?; + let share = pub_share_by_pubkey.get(&pubkey).ok_or_else(|| { + // Cluster/lock-file misconfiguration — the upstream returned a + // well-formed duty, but this node has no share for that validator. + // 500 (not 502): the failure is local, not gateway-level. + ApiError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "pubshare not found for attester duty", + ) + })?; + duty.pubkey = format_bls_pubkey(share); + } + Ok(()) +} + +/// Sync-committee duties variant of [`swap_attester_pubshares`]. +fn swap_sync_committee_pubshares( + duties: &mut [SyncCommitteeDuty], + pub_share_by_pubkey: &HashMap, +) -> Result<(), ApiError> { + for duty in duties { + let pubkey = parse_bls_pubkey(&duty.pubkey)?; + let share = pub_share_by_pubkey.get(&pubkey).ok_or_else(|| { + // See `swap_attester_pubshares` — same 500-not-502 reasoning. + ApiError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "pubshare not found for sync committee duty", + ) + })?; + duty.pubkey = format_bls_pubkey(share); + } + Ok(()) +} + fn parse_bls_pubkey(s: &str) -> Result { let trimmed = s.strip_prefix("0x").unwrap_or(s); let bytes = hex::decode(trimmed).map_err(|err| { @@ -294,7 +550,48 @@ fn format_bls_pubkey(pubkey: &BLSPubKey) -> String { #[cfg(test)] mod tests { + use chrono::{DateTime, Utc}; + use tokio::sync::mpsc; + use tokio_util::sync::CancellationToken; + use super::*; + use crate::{ + deadline::{DeadlineCalculator, DeadlinerTask, Result as DeadlineResult}, + dutydb::{UnsignedDataSet, UnsignedDutyData}, + signeddata::{ + AttestationData as SignedAttestationData, AttesterDuty as SignedAttesterDuty, + }, + testutils::random_core_pub_key, + types::{Duty, DutyType, SlotNumber}, + validatorapi::types::AttestationDataOpts, + }; + + /// Schedules every duty with a deadline at `MAX_UTC`, so duties are + /// `Scheduled` but never naturally expire. + struct FarFutureCalculator; + + impl DeadlineCalculator for FarFutureCalculator { + fn deadline(&self, _: &Duty) -> DeadlineResult>> { + Ok(Some(DateTime::::MAX_UTC)) + } + } + + /// Build a Component backed by a real (but never-expiring) DutyDB plus a + /// dummy upstream client. Useful for tests that only exercise endpoints + /// served from the DB. + fn make_test_component() -> (Component, Arc) { + let cancel = CancellationToken::new(); + let (deadliner, _deadliner_rx) = + DeadlinerTask::start(cancel.clone(), "validatorapi-tests", FarFutureCalculator); + // Held to keep the eviction channel's sender alive so the dutydb's + // `evict_rx` doesn't observe a closed channel. + let (_evict_tx, evict_rx) = mpsc::channel(1); + let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); + let eth2_cl = + Arc::new(EthBeaconNodeApiClient::with_base_url("http://127.0.0.1:0").unwrap()); + let component = Component::new_insecure(eth2_cl, Arc::clone(&dutydb), 1); + (component, dutydb) + } #[test] fn swap_replaces_known_pubkeys_and_keeps_unknown() { @@ -323,6 +620,65 @@ mod tests { assert_eq!(duties[1].pubkey, format_bls_pubkey(&stranger)); } + #[test] + fn swap_attester_replaces_pubkeys_and_rejects_unknown() { + let root = [0x11_u8; 48]; + let share = [0x22_u8; 48]; + let unknown = [0x33_u8; 48]; + + let map = HashMap::from([(root, share)]); + + let mut duties = vec![AttesterDuty { + pubkey: format_bls_pubkey(&root), + slot: "1".to_owned(), + committee_index: "0".to_owned(), + committee_length: "16".to_owned(), + committees_at_slot: "4".to_owned(), + validator_committee_index: "0".to_owned(), + validator_index: "5".to_owned(), + }]; + + swap_attester_pubshares(&mut duties, &map).unwrap(); + assert_eq!(duties[0].pubkey, format_bls_pubkey(&share)); + + let mut stranger_duties = vec![AttesterDuty { + pubkey: format_bls_pubkey(&unknown), + slot: "2".to_owned(), + committee_index: "0".to_owned(), + committee_length: "16".to_owned(), + committees_at_slot: "4".to_owned(), + validator_committee_index: "0".to_owned(), + validator_index: "6".to_owned(), + }]; + let err = swap_attester_pubshares(&mut stranger_duties, &map).unwrap_err(); + assert_eq!(err.status_code, StatusCode::INTERNAL_SERVER_ERROR); + } + + #[test] + fn swap_sync_committee_replaces_pubkeys_and_rejects_unknown() { + let root = [0x44_u8; 48]; + let share = [0x55_u8; 48]; + let unknown = [0x66_u8; 48]; + + let map = HashMap::from([(root, share)]); + + let mut duties = vec![SyncCommitteeDuty { + pubkey: format_bls_pubkey(&root), + validator_index: "12".to_owned(), + validator_sync_committee_indices: vec!["0".to_owned()], + }]; + swap_sync_committee_pubshares(&mut duties, &map).unwrap(); + assert_eq!(duties[0].pubkey, format_bls_pubkey(&share)); + + let mut stranger = vec![SyncCommitteeDuty { + pubkey: format_bls_pubkey(&unknown), + validator_index: "13".to_owned(), + validator_sync_committee_indices: vec![], + }]; + let err = swap_sync_committee_pubshares(&mut stranger, &map).unwrap_err(); + assert_eq!(err.status_code, StatusCode::INTERNAL_SERVER_ERROR); + } + #[test] fn swap_rejects_malformed_pubkey() { let mut duties = vec![ProposerDuty { @@ -336,10 +692,7 @@ mod tests { #[tokio::test] async fn node_version_formats_pluto_string() { - // Use an unreachable upstream — node_version doesn't call it. - let eth2_cl = - Arc::new(EthBeaconNodeApiClient::with_base_url("http://127.0.0.1:0").unwrap()); - let component = Component::new_insecure(eth2_cl, 1); + let (component, _db) = make_test_component(); let response = component.node_version().await.unwrap(); @@ -347,4 +700,295 @@ mod tests { assert!(response.data.version.contains(std::env::consts::ARCH)); assert!(response.data.version.contains(std::env::consts::OS)); } + + #[tokio::test] + async fn attestation_data_returns_data_stored_in_dutydb() { + const SLOT: u64 = 100; + const COMM_IDX: u64 = 4; + const V_IDX: u64 = 1; + + let (component, db) = make_test_component(); + + let unsigned = SignedAttestationData { + data: pluto_eth2api::spec::phase0::AttestationData { + slot: SLOT, + index: COMM_IDX, + beacon_block_root: [0x11; 32], + source: pluto_eth2api::spec::phase0::Checkpoint::default(), + target: pluto_eth2api::spec::phase0::Checkpoint::default(), + }, + duty: SignedAttesterDuty { + slot: SLOT, + validator_index: V_IDX, + committee_index: COMM_IDX, + committee_length: 8, + committees_at_slot: 1, + validator_committee_index: 0, + }, + }; + let mut set = UnsignedDataSet::new(); + set.insert( + random_core_pub_key(), + UnsignedDutyData::Attestation(unsigned.clone()), + ); + db.store(Duty::new(SlotNumber::new(SLOT), DutyType::Attester), set) + .await + .unwrap(); + + let response = component + .attestation_data(AttestationDataOpts { + slot: SLOT, + committee_index: COMM_IDX, + }) + .await + .unwrap(); + assert_eq!(response.data.slot, SLOT); + assert_eq!(response.data.index, COMM_IDX); + assert_eq!(response.data.beacon_block_root, [0x11; 32]); + } + + /// Storing `(SLOT, COMM_IDX)` must NOT satisfy an `attestation_data` + /// request for `(SLOT, COMM_IDX + 1)`. Verifies the dutydb is keyed on + /// the full `(slot, committee_index)` tuple, not just the slot. + #[tokio::test(start_paused = true)] + async fn attestation_data_does_not_resolve_for_wrong_committee_index() { + const SLOT: u64 = 200; + const COMM_IDX: u64 = 7; + + let (component, db) = make_test_component(); + + let unsigned = SignedAttestationData { + data: pluto_eth2api::spec::phase0::AttestationData { + slot: SLOT, + index: COMM_IDX, + beacon_block_root: [0x22; 32], + source: pluto_eth2api::spec::phase0::Checkpoint::default(), + target: pluto_eth2api::spec::phase0::Checkpoint::default(), + }, + duty: SignedAttesterDuty { + slot: SLOT, + validator_index: 9, + committee_index: COMM_IDX, + committee_length: 8, + committees_at_slot: 1, + validator_committee_index: 0, + }, + }; + let mut set = UnsignedDataSet::new(); + set.insert( + random_core_pub_key(), + UnsignedDutyData::Attestation(unsigned), + ); + db.store(Duty::new(SlotNumber::new(SLOT), DutyType::Attester), set) + .await + .unwrap(); + + // Auto-advance past the handler timeout so the await trips on the + // wrong committee_index, not on the existing one. + let err = component + .attestation_data(AttestationDataOpts { + slot: SLOT, + committee_index: COMM_IDX + 1, + }) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::REQUEST_TIMEOUT); + } + + /// Verifies the handler enforces `ATTESTATION_DATA_TIMEOUT` — an + /// `await_attestation` for a slot that is never stored returns 408 + /// instead of hanging. + #[tokio::test(start_paused = true)] + async fn attestation_data_times_out_when_data_never_arrives() { + let (component, _db) = make_test_component(); + + let err = component + .attestation_data(AttestationDataOpts { + slot: 999, + committee_index: 0, + }) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::REQUEST_TIMEOUT); + } + + /// Verifies that when the dutydb evicts the awaited duty (via the + /// deadliner), the in-flight handler exits promptly with + /// `REQUEST_TIMEOUT` instead of parking on the notify forever. + #[tokio::test] + async fn attestation_data_returns_408_when_duty_is_evicted() { + use tokio::sync::mpsc::channel; + + const SLOT: u64 = 333; + const COMM_IDX: u64 = 1; + + // Hand-build a Component whose dutydb shares its eviction channel + // with the test, so we can drive eviction deterministically. + let cancel = CancellationToken::new(); + let (deadliner, _deadliner_rx) = + DeadlinerTask::start(cancel.clone(), "validatorapi-tests", FarFutureCalculator); + let (trim_tx, trim_rx) = channel::(8); + let dutydb = Arc::new(MemDB::new(deadliner, trim_rx, &cancel)); + let eth2_cl = + Arc::new(EthBeaconNodeApiClient::with_base_url("http://127.0.0.1:0").unwrap()); + let component = Component::new_insecure(eth2_cl, Arc::clone(&dutydb), 1); + + // Start an await before any data is stored. + let waiter = { + let component = Arc::new(component); + let c = Arc::clone(&component); + tokio::spawn(async move { + c.attestation_data(AttestationDataOpts { + slot: SLOT, + committee_index: COMM_IDX, + }) + .await + }) + }; + + // Yield so the waiter parks. + tokio::task::yield_now().await; + + // Simulate the deadliner emitting an eviction for this slot… + trim_tx + .send(Duty::new(SlotNumber::new(SLOT), DutyType::Attester)) + .await + .unwrap(); + + // …then trigger eviction processing by storing an unrelated duty. + let unsigned = SignedAttestationData { + data: pluto_eth2api::spec::phase0::AttestationData { + slot: SLOT.saturating_add(1), + index: 0, + beacon_block_root: [0x33; 32], + source: pluto_eth2api::spec::phase0::Checkpoint::default(), + target: pluto_eth2api::spec::phase0::Checkpoint::default(), + }, + duty: SignedAttesterDuty { + slot: SLOT.saturating_add(1), + validator_index: 0, + committee_index: 0, + committee_length: 8, + committees_at_slot: 1, + validator_committee_index: 0, + }, + }; + let mut set = UnsignedDataSet::new(); + set.insert( + random_core_pub_key(), + UnsignedDutyData::Attestation(unsigned), + ); + dutydb + .store( + Duty::new(SlotNumber::new(SLOT.saturating_add(1)), DutyType::Attester), + set, + ) + .await + .unwrap(); + + let err = waiter.await.unwrap().unwrap_err(); + assert_eq!(err.status_code, StatusCode::REQUEST_TIMEOUT); + } + + /// Verifies that dropping the handler future releases the dutydb + /// waiter — the next store() should not see a hanging reader on the + /// state lock. + #[tokio::test] + async fn attestation_data_drops_waiter_when_future_dropped() { + let (component, db) = make_test_component(); + let component = Arc::new(component); + + let waiter = { + let component = Arc::clone(&component); + tokio::spawn(async move { + component + .attestation_data(AttestationDataOpts { + slot: 4242, + committee_index: 0, + }) + .await + }) + }; + + tokio::task::yield_now().await; + waiter.abort(); + let _ = waiter.await; + + // Confirm db is still usable — store should not deadlock. + let unsigned = SignedAttestationData { + data: pluto_eth2api::spec::phase0::AttestationData { + slot: 1, + index: 0, + beacon_block_root: [0x44; 32], + source: pluto_eth2api::spec::phase0::Checkpoint::default(), + target: pluto_eth2api::spec::phase0::Checkpoint::default(), + }, + duty: SignedAttesterDuty { + slot: 1, + validator_index: 0, + committee_index: 0, + committee_length: 8, + committees_at_slot: 1, + validator_committee_index: 0, + }, + }; + let mut set = UnsignedDataSet::new(); + set.insert( + random_core_pub_key(), + UnsignedDutyData::Attestation(unsigned), + ); + db.store(Duty::new(SlotNumber::new(1), DutyType::Attester), set) + .await + .unwrap(); + } + + /// `map_dutydb_error` covers the three distinguishable variants from + /// `crate::dutydb::Error`. + #[test] + fn map_dutydb_error_status_codes() { + assert_eq!( + map_dutydb_error(DutyDbError::Shutdown).status_code, + StatusCode::SERVICE_UNAVAILABLE + ); + assert_eq!( + map_dutydb_error(DutyDbError::AwaitDutyExpired).status_code, + StatusCode::REQUEST_TIMEOUT + ); + assert_eq!( + map_dutydb_error(DutyDbError::UnsupportedDutyType).status_code, + StatusCode::INTERNAL_SERVER_ERROR + ); + } + + /// `upstream_status_error` keeps the upstream response body out of the + /// client-visible message but preserves it on `source()` so it lands in + /// the debug log. + #[test] + fn upstream_status_error_does_not_leak_body_into_message() { + use pluto_eth2api::BlindedBlock400Response; + + let body = BlindedBlock400Response { + code: 503.0, + message: "secret upstream stacktrace path=/etc/secret".to_owned(), + stacktraces: Some(vec!["at /etc/secret/lighthouse:42".to_owned()]), + }; + let err = upstream_status_error(StatusCode::SERVICE_UNAVAILABLE, "attester duties", body); + + assert_eq!(err.status_code, StatusCode::SERVICE_UNAVAILABLE); + assert!(!err.message.contains("secret")); + assert!(!err.message.contains("stacktrace")); + // But the source carries it for debug logging. + let src = err.source.as_ref().unwrap().to_string(); + assert!(src.contains("secret")); + } + + /// `upstream_unexpected` mirrors `upstream_status_error`'s no-leak shape + /// for the `Unknown` / `InternalServerError` arms. + #[test] + fn upstream_unexpected_does_not_leak_variant_into_message() { + let err = upstream_unexpected("attester duties", GetAttesterDutiesResponse::Unknown); + assert_eq!(err.status_code, StatusCode::BAD_GATEWAY); + assert!(!err.message.contains("Unknown")); + assert!(err.source.as_ref().unwrap().to_string().contains("Unknown")); + } } diff --git a/crates/core/src/validatorapi/error.rs b/crates/core/src/validatorapi/error.rs index 12064c51..e13c440d 100644 --- a/crates/core/src/validatorapi/error.rs +++ b/crates/core/src/validatorapi/error.rs @@ -47,6 +47,18 @@ impl ApiError { self.source = Some(Box::new(source)); self } + + /// Attaches a boxed source error for debug logging. Use this when the + /// upstream error is not `std::error::Error` itself (e.g. `anyhow::Error`, + /// which only implements `AsRef` and converts via `.into()`). + #[must_use] + pub fn with_boxed_source( + mut self, + source: Box, + ) -> Self { + self.source = Some(source); + self + } } impl fmt::Display for ApiError { diff --git a/crates/core/src/validatorapi/handler.rs b/crates/core/src/validatorapi/handler.rs index 06604aff..b1a3a050 100644 --- a/crates/core/src/validatorapi/handler.rs +++ b/crates/core/src/validatorapi/handler.rs @@ -5,11 +5,11 @@ use async_trait::async_trait; use super::{ error::ApiError, types::{ - AggregateAttestationOpts, AttestationData, AttestationDataOpts, AttesterDutiesOpts, - AttesterDuty, BeaconCommitteeSelection, EthResponse, NodeVersionResponse, ProposalOpts, - ProposerDutiesOpts, ProposerDutiesResponse, SignedContributionAndProof, + AggregateAttestationOpts, AttestationDataOpts, AttestationDataResponse, AttesterDutiesOpts, + AttesterDutiesResponse, BeaconCommitteeSelection, EthResponse, NodeVersionResponse, + ProposalOpts, ProposerDutiesOpts, ProposerDutiesResponse, SignedContributionAndProof, SignedValidatorRegistration, SignedVoluntaryExit, SyncCommitteeContribution, - SyncCommitteeContributionOpts, SyncCommitteeDutiesOpts, SyncCommitteeDuty, + SyncCommitteeContributionOpts, SyncCommitteeDutiesOpts, SyncCommitteeDutiesResponse, SyncCommitteeMessage, SyncCommitteeSelection, Validator, ValidatorsOpts, VersionedAttestation, VersionedProposal, VersionedSignedAggregateAndProof, VersionedSignedBlindedProposal, VersionedSignedProposal, @@ -28,7 +28,7 @@ pub trait Handler: Send + Sync + 'static { async fn attester_duties( &self, opts: AttesterDutiesOpts, - ) -> Result>, ApiError>; + ) -> Result; /// `GET /eth/v1/validator/duties/proposer/{epoch}`. async fn proposer_duties( @@ -40,13 +40,13 @@ pub trait Handler: Send + Sync + 'static { async fn sync_committee_duties( &self, opts: SyncCommitteeDutiesOpts, - ) -> Result>, ApiError>; + ) -> Result; /// `GET /eth/v1/validator/attestation_data`. async fn attestation_data( &self, opts: AttestationDataOpts, - ) -> Result, ApiError>; + ) -> Result; /// `POST /eth/v2/beacon/pool/attestations`. async fn submit_attestations( diff --git a/crates/core/src/validatorapi/router.rs b/crates/core/src/validatorapi/router.rs index e619f37d..a32ab9b9 100644 --- a/crates/core/src/validatorapi/router.rs +++ b/crates/core/src/validatorapi/router.rs @@ -7,17 +7,40 @@ use std::sync::Arc; use axum::{ Json, Router, - extract::{Path, State}, - response::IntoResponse, - routing::{get, post}, + extract::{ + DefaultBodyLimit, Path, Query, Request, State, + rejection::{JsonRejection, QueryRejection}, + }, + http::{HeaderValue, StatusCode, header}, + middleware::{self, Next}, + response::{IntoResponse, Response}, + routing::{MethodRouter, get, post}, }; +use serde::Deserialize; + +/// Cap on the `POST /eth/v1/validator/duties/{attester,sync}/{epoch}` request +/// bodies. A realistic cluster ships at most a few thousand validator indices; +/// 64 KiB still allows ~10k indices in either numeric or string encoding, +/// well above any plausible workload. +const DUTIES_BODY_LIMIT: usize = 64 * 1024; use super::{ error::ApiError, handler::Handler, - types::{NodeVersionResponse, ProposerDutiesOpts, ProposerDutiesResponse}, + types::{ + AttestationDataOpts, AttestationDataResponse, AttesterDutiesOpts, AttesterDutiesResponse, + CommitteeIndex, NodeVersionResponse, ProposerDutiesOpts, ProposerDutiesResponse, + SyncCommitteeDutiesOpts, SyncCommitteeDutiesResponse, ValIndexes, + }, }; +/// Query parameters for `GET /eth/v1/validator/attestation_data`. +#[derive(Debug, Clone, Deserialize)] +struct AttestationDataQuery { + slot: u64, + committee_index: CommitteeIndex, +} + /// Shared router state. Cloned per request via [`Arc`]. pub(super) struct AppState { /// Request handler invoked by each route. @@ -43,7 +66,7 @@ pub fn new_router(handler: Arc, builder_enabled: bool) -> Router { Router::new() .route( "/eth/v1/validator/duties/attester/{epoch}", - post(attester_duties), + duties_post(attester_duties), ) .route( "/eth/v1/validator/duties/proposer/{epoch}", @@ -51,7 +74,7 @@ pub fn new_router(handler: Arc, builder_enabled: bool) -> Router { ) .route( "/eth/v1/validator/duties/sync/{epoch}", - post(sync_committee_duties), + duties_post(sync_committee_duties), ) .route("/eth/v1/validator/attestation_data", get(attestation_data)) .route("/eth/v1/beacon/pool/attestations", post(respond_404)) @@ -120,8 +143,21 @@ pub fn new_router(handler: Arc, builder_enabled: bool) -> Router { .with_state(state) } -async fn attester_duties() { - todo!("vapi: attester_duties"); +async fn attester_duties( + State(state): State>, + Path(epoch): Path, + indices: Result, JsonRejection>, +) -> Result, ApiError> { + let Json(indices) = indices.map_err(json_rejection_to_api_error)?; + let response = state + .handler + .attester_duties(AttesterDutiesOpts { + epoch, + indices: indices.0, + }) + .await?; + + Ok(Json(response)) } async fn proposer_duties( @@ -136,12 +172,110 @@ async fn proposer_duties( Ok(Json(response)) } -async fn sync_committee_duties() { - todo!("vapi: sync_committee_duties"); +async fn sync_committee_duties( + State(state): State>, + Path(epoch): Path, + indices: Result, JsonRejection>, +) -> Result, ApiError> { + let Json(indices) = indices.map_err(json_rejection_to_api_error)?; + let response = state + .handler + .sync_committee_duties(SyncCommitteeDutiesOpts { + epoch, + indices: indices.0, + }) + .await?; + + Ok(Json(response)) +} + +async fn attestation_data( + State(state): State>, + query: Result, QueryRejection>, +) -> Result, ApiError> { + let Query(query) = query.map_err(query_rejection_to_api_error)?; + let response = state + .handler + .attestation_data(AttestationDataOpts { + slot: query.slot, + committee_index: query.committee_index, + }) + .await?; + + Ok(Json(response)) } -async fn attestation_data() { - todo!("vapi: attestation_data"); +/// Wraps a `POST /eth/v1/validator/duties/*` handler with a body-size cap +/// and the Charon-parity content-type policy. The cap is local to these +/// two routes so unrelated POST handlers (e.g. `submit_attestations`) keep +/// axum's default 2 MiB. +fn duties_post(handler: H) -> MethodRouter +where + H: axum::handler::Handler, + T: 'static, + S: Clone + Send + Sync + 'static, +{ + post(handler) + .route_layer(DefaultBodyLimit::max(DUTIES_BODY_LIMIT)) + .route_layer(middleware::from_fn(enforce_json_content_type)) +} + +/// Matches Charon's content-type handling at `core/validatorapi/router.go:365`: +/// a missing `Content-Type` is treated as `application/json`; an unrecognized +/// content type is rejected with `415 Unsupported Media Type`. SSZ is not +/// supported yet — when it lands, this is the right seam to extend. +/// +/// Without this layer, axum's `Json` extractor would reject a missing header +/// with `MissingJsonContentType`, which our envelope normalises to `400` — +/// diverging from Charon, which lets VCs that don't set the header through. +async fn enforce_json_content_type(mut req: Request, next: Next) -> Result { + match req.headers().get(header::CONTENT_TYPE) { + None => { + req.headers_mut().insert( + header::CONTENT_TYPE, + HeaderValue::from_static("application/json"), + ); + } + Some(value) => { + let s = value.to_str().unwrap_or(""); + if !s.contains("application/json") { + return Err(ApiError::new( + StatusCode::UNSUPPORTED_MEDIA_TYPE, + format!("unsupported media type {s}"), + )); + } + } + } + Ok(next.run(req).await) +} + +/// Renders an axum query-extractor rejection as Pluto's standard +/// [`ApiError`] body shape, so all 4xx responses from this router share the +/// same `{ "code", "message" }` schema. +fn query_rejection_to_api_error(rejection: QueryRejection) -> ApiError { + ApiError::new(StatusCode::BAD_REQUEST, "invalid query parameters") + .with_source(std::io::Error::other(rejection.body_text())) +} + +/// Renders an axum JSON body-extractor rejection as Pluto's standard +/// [`ApiError`] body shape, so it shares the `{ "code", "message" }` schema +/// instead of axum's default plain-text response. +/// +/// Genuine parse failures — malformed JSON (`400`) and wrong element type +/// (`422`) — are normalised to a uniform `400`, matching Charon's `unmarshal`, +/// which returns `400` for all body unmarshal failures. Content-Type rejections +/// no longer reach this function: [`enforce_json_content_type`] intercepts +/// them upstream so missing/JSON requests pass through and non-JSON requests +/// return `415`. The body-size-limit rejection from [`DefaultBodyLimit`] +/// surfaces here (the limit is enforced as the `Json` extractor reads the +/// body); its `413 Payload Too Large` is preserved, since that is Pluto's +/// DoS defense rather than a parse error. +fn json_rejection_to_api_error(rejection: JsonRejection) -> ApiError { + let (status, message) = match rejection.status() { + StatusCode::PAYLOAD_TOO_LARGE => (StatusCode::PAYLOAD_TOO_LARGE, "request body too large"), + _ => (StatusCode::BAD_REQUEST, "invalid request body"), + }; + ApiError::new(status, message).with_source(std::io::Error::other(rejection.body_text())) } async fn submit_attestations() { @@ -227,9 +361,14 @@ async fn proxy_handler() { #[cfg(test)] mod tests { use super::*; + use pluto_eth2api::spec::phase0; + use crate::validatorapi::{ testutils::TestHandler, - types::{ProposerDutiesResponse, ProposerDuty}, + types::{ + AttestationDataResponse, AttesterDutiesResponse, AttesterDuty, ProposerDutiesResponse, + ProposerDuty, SyncCommitteeDutiesResponse, SyncCommitteeDuty, ValIndexes, + }, }; #[tokio::test] @@ -244,6 +383,124 @@ mod tests { assert_eq!(body.data.version, "pluto/test/v1.0"); } + #[tokio::test] + async fn attester_duties_wraps_handler_value() { + let duty = AttesterDuty { + pubkey: "0xaabbccddeeff".to_owned(), + slot: "12".to_owned(), + committee_index: "3".to_owned(), + committee_length: "16".to_owned(), + committees_at_slot: "4".to_owned(), + validator_committee_index: "2".to_owned(), + validator_index: "7".to_owned(), + }; + let handler = TestHandler::default().with_attester_duties(AttesterDutiesResponse { + data: vec![duty], + dependent_root: "0xab".to_owned(), + execution_optimistic: false, + }); + let state = Arc::new(AppState { + handler: Arc::new(handler), + builder_enabled: false, + }); + + let Json(body) = attester_duties( + State(state), + Path(42u64), + Ok(Json(ValIndexes(vec!["7".to_owned()]))), + ) + .await + .unwrap(); + + let json = serde_json::to_value(&body).unwrap(); + assert_eq!(json["dependent_root"], "0xab"); + assert_eq!(json["execution_optimistic"], false); + assert_eq!(json["data"][0]["slot"], "12"); + assert_eq!(json["data"][0]["committee_index"], "3"); + assert_eq!(json["data"][0]["validator_index"], "7"); + } + + #[tokio::test] + async fn sync_committee_duties_wraps_handler_value() { + let duty = SyncCommitteeDuty { + pubkey: "0x112233".to_owned(), + validator_index: "9".to_owned(), + validator_sync_committee_indices: vec!["0".to_owned(), "5".to_owned()], + }; + let handler = + TestHandler::default().with_sync_committee_duties(SyncCommitteeDutiesResponse { + data: vec![duty], + execution_optimistic: true, + }); + let state = Arc::new(AppState { + handler: Arc::new(handler), + builder_enabled: false, + }); + + let Json(body) = sync_committee_duties( + State(state), + Path(7u64), + Ok(Json(ValIndexes(vec!["9".to_owned()]))), + ) + .await + .unwrap(); + + let json = serde_json::to_value(&body).unwrap(); + assert_eq!(json["execution_optimistic"], true); + assert_eq!(json["data"][0]["validator_index"], "9"); + assert_eq!(json["data"][0]["validator_sync_committee_indices"][1], "5"); + } + + #[tokio::test] + async fn attestation_data_wraps_handler_value() { + let data = phase0::AttestationData { + slot: 99, + index: 3, + beacon_block_root: [0xaa; 32], + source: phase0::Checkpoint { + epoch: 7, + root: [0xbb; 32], + }, + target: phase0::Checkpoint { + epoch: 8, + root: [0xcc; 32], + }, + }; + let handler = + TestHandler::default().with_attestation_data(AttestationDataResponse { data }); + let state = Arc::new(AppState { + handler: Arc::new(handler), + builder_enabled: false, + }); + + let Json(body) = attestation_data( + State(state), + Ok(Query(AttestationDataQuery { + slot: 99, + committee_index: 3, + })), + ) + .await + .unwrap(); + + let json = serde_json::to_value(&body).unwrap(); + assert_eq!(json["data"]["slot"], "99"); + assert_eq!(json["data"]["index"], "3"); + assert_eq!(json["data"]["source"]["epoch"], "7"); + } + + #[test] + fn val_indexes_accepts_numbers_and_strings() { + let nums: ValIndexes = serde_json::from_str("[1, 2, 3]").unwrap(); + assert_eq!(nums.0, vec!["1", "2", "3"]); + + let strs: ValIndexes = serde_json::from_str(r#"["4", "5"]"#).unwrap(); + assert_eq!(strs.0, vec!["4", "5"]); + + let bad = serde_json::from_str::(r#"["not-a-number"]"#); + assert!(bad.is_err()); + } + #[tokio::test] async fn proposer_duties_wraps_handler_value() { let duty = ProposerDuty { @@ -270,4 +527,207 @@ mod tests { assert_eq!(json["data"][0]["validator_index"], "7"); assert_eq!(json["data"][0]["pubkey"], "0xaabbccddeeff"); } + + /// Verifies the manual `Query` rejection path emits the same + /// `{ code, message }` envelope as the rest of the router, instead of + /// axum's default plain-text 400. + #[tokio::test] + async fn attestation_data_returns_api_error_shape_on_bad_query() { + use axum::{ + body::{Body, to_bytes}, + http::Request, + }; + use tower::ServiceExt; + + let handler = TestHandler::default().with_attestation_data(AttestationDataResponse { + data: phase0::AttestationData { + slot: 0, + index: 0, + beacon_block_root: [0; 32], + source: phase0::Checkpoint::default(), + target: phase0::Checkpoint::default(), + }, + }); + let app = new_router(Arc::new(handler), false); + + // Missing `committee_index`. + let req = Request::builder() + .uri("/eth/v1/validator/attestation_data?slot=10") + .body(Body::empty()) + .unwrap(); + let resp = app.clone().oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + let body = to_bytes(resp.into_body(), 64 * 1024).await.unwrap(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(json["code"], 400); + assert!(json["message"].is_string()); + + // Non-numeric `slot`. + let req = Request::builder() + .uri("/eth/v1/validator/attestation_data?slot=foo&committee_index=1") + .body(Body::empty()) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + let body = to_bytes(resp.into_body(), 64 * 1024).await.unwrap(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(json["code"], 400); + } + + /// Verifies the body-limit layer on `POST /eth/v1/validator/duties/*` + /// rejects oversized bodies — defense against the `Vec` parse + /// amplification on the duties endpoints. + #[tokio::test] + async fn attester_duties_rejects_oversized_body() { + use axum::{ + body::Body, + http::{Method, Request}, + }; + use tower::ServiceExt; + + let handler = TestHandler::default(); + let app = new_router(Arc::new(handler), false); + + // 128 KiB of zeros — well past the 64 KiB cap, valid JSON or not. + let big = vec![b'0'; 128 * 1024]; + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v1/validator/duties/attester/42") + .header("content-type", "application/json") + .header("content-length", big.len()) + .body(Body::from(big)) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::PAYLOAD_TOO_LARGE); + } + + /// A malformed duties body emits the same `{ code, message }` envelope and + /// uniform 400 as the rest of the router, rather than axum's default + /// plain-text rejection (which would be 400 for a syntax error but 422 for + /// a type error). Mirrors Charon's `unmarshal`, which returns 400 for every + /// body parse failure. + #[tokio::test] + async fn attester_duties_returns_api_error_shape_on_bad_body() { + use axum::{ + body::{Body, to_bytes}, + http::{Method, Request}, + }; + use tower::ServiceExt; + + let app = new_router(Arc::new(TestHandler::default()), false); + + // Valid JSON, wrong shape (object, not an array) — axum's default + // would surface this as a 422 type error. + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v1/validator/duties/attester/42") + .header("content-type", "application/json") + .body(Body::from(r#"{"not":"an array"}"#)) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + let body = to_bytes(resp.into_body(), 64 * 1024).await.unwrap(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(json["code"], 400); + assert!(json["message"].is_string()); + } + + /// Charon-parity: a duties request that omits `Content-Type` is + /// treated as `application/json` rather than rejected — the + /// `enforce_json_content_type` middleware injects the header before + /// the `Json` extractor sees the request. See `core/validatorapi/ + /// router.go:365` (`if contentHeader == "" || ...`). + #[tokio::test] + async fn attester_duties_accepts_missing_content_type() { + use axum::{ + body::Body, + http::{Method, Request}, + }; + use tower::ServiceExt; + + let handler = TestHandler::default().with_attester_duties(AttesterDutiesResponse { + data: vec![], + dependent_root: "0x00".to_owned(), + execution_optimistic: false, + }); + let app = new_router(Arc::new(handler), false); + + // No Content-Type header at all. + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v1/validator/duties/attester/42") + .body(Body::from("[]")) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + } + + /// Charon-parity: a duties request with a non-JSON `Content-Type` + /// returns `415 Unsupported Media Type`, not the `400` that the + /// generic body-parse normaliser would produce. + #[tokio::test] + async fn attester_duties_rejects_non_json_content_type() { + use axum::{ + body::{Body, to_bytes}, + http::{Method, Request}, + }; + use tower::ServiceExt; + + let app = new_router(Arc::new(TestHandler::default()), false); + + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v1/validator/duties/attester/42") + .header("content-type", "text/plain") + .body(Body::from("[]")) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::UNSUPPORTED_MEDIA_TYPE); + let body = to_bytes(resp.into_body(), 64 * 1024).await.unwrap(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(json["code"], 415); + assert!( + json["message"] + .as_str() + .is_some_and(|m| m.contains("text/plain")) + ); + } + + /// `[]` is a valid request body — the upstream returns an empty duty + /// list — and `ValIndexes` should accept it. + #[test] + fn val_indexes_accepts_empty_array() { + let v: ValIndexes = serde_json::from_str("[]").unwrap(); + assert!(v.0.is_empty()); + } + + /// Mixed numeric + string elements are accepted; each element is + /// validated independently. The previous untagged-enum implementation + /// rejected this entirely. + #[test] + fn val_indexes_accepts_mixed_elements() { + let v: ValIndexes = serde_json::from_str(r#"[1, "2", 3, "4"]"#).unwrap(); + assert_eq!(v.0, vec!["1", "2", "3", "4"]); + } + + /// Caps the request to `VAL_INDEXES_MAX_LEN` elements. + #[test] + fn val_indexes_rejects_oversized_array() { + use crate::validatorapi::types::VAL_INDEXES_MAX_LEN; + + let too_many = (0..=VAL_INDEXES_MAX_LEN) + .map(|n| n.to_string()) + .collect::>() + .join(","); + let json = format!("[{too_many}]"); + let err = serde_json::from_str::(&json).unwrap_err(); + assert!(err.to_string().contains("too many validator indices")); + } + + /// Negative integers are rejected (validator indices are u64). + #[test] + fn val_indexes_rejects_negative_numbers() { + let bad = serde_json::from_str::("[-1]"); + assert!(bad.is_err()); + } } diff --git a/crates/core/src/validatorapi/testutils.rs b/crates/core/src/validatorapi/testutils.rs index 90a480d2..45980fe7 100644 --- a/crates/core/src/validatorapi/testutils.rs +++ b/crates/core/src/validatorapi/testutils.rs @@ -10,13 +10,13 @@ use super::{ error::ApiError, handler::Handler, types::{ - AggregateAttestationOpts, AttestationData, AttestationDataOpts, AttesterDutiesOpts, - AttesterDuty, BeaconCommitteeSelection, EthResponse, NodeVersionData, NodeVersionResponse, - ProposalOpts, ProposerDutiesOpts, ProposerDutiesResponse, SignedContributionAndProof, - SignedValidatorRegistration, SignedVoluntaryExit, SyncCommitteeContribution, - SyncCommitteeContributionOpts, SyncCommitteeDutiesOpts, SyncCommitteeDuty, - SyncCommitteeMessage, SyncCommitteeSelection, Validator, ValidatorsOpts, - VersionedAttestation, VersionedProposal, VersionedSignedAggregateAndProof, + AggregateAttestationOpts, AttestationDataOpts, AttestationDataResponse, AttesterDutiesOpts, + AttesterDutiesResponse, BeaconCommitteeSelection, EthResponse, NodeVersionData, + NodeVersionResponse, ProposalOpts, ProposerDutiesOpts, ProposerDutiesResponse, + SignedContributionAndProof, SignedValidatorRegistration, SignedVoluntaryExit, + SyncCommitteeContribution, SyncCommitteeContributionOpts, SyncCommitteeDutiesOpts, + SyncCommitteeDutiesResponse, SyncCommitteeMessage, SyncCommitteeSelection, Validator, + ValidatorsOpts, VersionedAttestation, VersionedProposal, VersionedSignedAggregateAndProof, VersionedSignedBlindedProposal, VersionedSignedProposal, }, }; @@ -28,6 +28,12 @@ pub struct TestHandler { pub version: String, /// Value returned by [`Handler::proposer_duties`]. pub proposer_duties_response: Option, + /// Value returned by [`Handler::attester_duties`]. + pub attester_duties_response: Option, + /// Value returned by [`Handler::sync_committee_duties`]. + pub sync_committee_duties_response: Option, + /// Value returned by [`Handler::attestation_data`]. + pub attestation_data_response: Option, } impl TestHandler { @@ -44,6 +50,24 @@ impl TestHandler { self.proposer_duties_response = Some(response); self } + + /// Sets the response returned by [`Handler::attester_duties`]. + pub fn with_attester_duties(mut self, response: AttesterDutiesResponse) -> Self { + self.attester_duties_response = Some(response); + self + } + + /// Sets the response returned by [`Handler::sync_committee_duties`]. + pub fn with_sync_committee_duties(mut self, response: SyncCommitteeDutiesResponse) -> Self { + self.sync_committee_duties_response = Some(response); + self + } + + /// Sets the response returned by [`Handler::attestation_data`]. + pub fn with_attestation_data(mut self, response: AttestationDataResponse) -> Self { + self.attestation_data_response = Some(response); + self + } } #[async_trait] @@ -59,8 +83,11 @@ impl Handler for TestHandler { async fn attester_duties( &self, _opts: AttesterDutiesOpts, - ) -> Result>, ApiError> { - unimplemented!("attester_duties not stubbed in TestHandler") + ) -> Result { + Ok(self + .attester_duties_response + .clone() + .expect("attester_duties not stubbed in TestHandler")) } async fn proposer_duties( @@ -76,15 +103,21 @@ impl Handler for TestHandler { async fn sync_committee_duties( &self, _opts: SyncCommitteeDutiesOpts, - ) -> Result>, ApiError> { - unimplemented!("sync_committee_duties not stubbed in TestHandler") + ) -> Result { + Ok(self + .sync_committee_duties_response + .clone() + .expect("sync_committee_duties not stubbed in TestHandler")) } async fn attestation_data( &self, _opts: AttestationDataOpts, - ) -> Result, ApiError> { - unimplemented!("attestation_data not stubbed in TestHandler") + ) -> Result { + Ok(self + .attestation_data_response + .clone() + .expect("attestation_data not stubbed in TestHandler")) } async fn submit_attestations( diff --git a/crates/core/src/validatorapi/types.rs b/crates/core/src/validatorapi/types.rs index 01f3405a..8e18456a 100644 --- a/crates/core/src/validatorapi/types.rs +++ b/crates/core/src/validatorapi/types.rs @@ -4,15 +4,29 @@ //! Most data payloads are empty placeholders for now and will be swapped //! for the proper consensus-spec types in a later phase. +use std::fmt; + +use serde::{ + Deserialize, Deserializer, Serialize, + de::{self, SeqAccess, Visitor}, +}; + pub use pluto_crypto::types::{PublicKey as BlsPubKey, Signature as BlsSignature}; pub use pluto_eth2api::{ + GetAttesterDutiesResponseResponse as AttesterDutiesResponse, + GetAttesterDutiesResponseResponseDatum as AttesterDuty, GetProposerDutiesResponseResponse as ProposerDutiesResponse, GetProposerDutiesResponseResponseDatum as ProposerDuty, + GetSyncCommitteeDutiesResponseResponse as SyncCommitteeDutiesResponse, + GetSyncCommitteeDutiesResponseResponseDatum as SyncCommitteeDuty, GetVersionResponseResponse as NodeVersionResponse, GetVersionResponseResponseData as NodeVersionData, - spec::phase0::{Epoch, Root, Slot, ValidatorIndex}, + spec::phase0::{self, Epoch, Root, Slot, ValidatorIndex}, }; +/// Attestation data alias for the consensus-spec phase0 type. +pub type AttestationData = phase0::AttestationData; + /// Index of a beacon committee within a slot. pub type CommitteeIndex = u64; @@ -35,8 +49,9 @@ pub struct EthResponse { pub struct AttesterDutiesOpts { /// Epoch to fetch duties for. pub epoch: Epoch, - /// Validator indices to fetch duties for. - pub indices: Vec, + /// Validator indices to fetch duties for. Carried as strings since the + /// upstream auto-generated client takes string-typed indices. + pub indices: Vec, } /// Options for @@ -53,8 +68,9 @@ pub struct ProposerDutiesOpts { pub struct SyncCommitteeDutiesOpts { /// Epoch to fetch duties for. pub epoch: Epoch, - /// Validator indices to fetch duties for. - pub indices: Vec, + /// Validator indices to fetch duties for. Carried as strings since the + /// upstream auto-generated client takes string-typed indices. + pub indices: Vec, } /// Options for @@ -116,17 +132,12 @@ pub struct SyncCommitteeContributionOpts { pub beacon_block_root: Root, } -/// Attester duty payload. Placeholder. -#[derive(Debug, Clone)] -pub struct AttesterDuty {} - -/// Sync-committee duty payload. Placeholder. -#[derive(Debug, Clone)] -pub struct SyncCommitteeDuty {} - -/// Attestation data payload. Placeholder. -#[derive(Debug, Clone)] -pub struct AttestationData {} +/// Response envelope for the `attestation_data` endpoint. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AttestationDataResponse { + /// Unsigned attestation data produced by the consensus pipeline. + pub data: AttestationData, +} /// Validator payload. Placeholder. #[derive(Debug, Clone)] @@ -179,3 +190,100 @@ pub struct BeaconCommitteeSelection {} /// Sync-committee selection payload. Placeholder. #[derive(Debug, Clone)] pub struct SyncCommitteeSelection {} + +/// Validator-index request body for the `attester_duties` and +/// `sync_committee_duties` endpoints. +/// +/// Accepts both numeric (`[1, 2]`) and string-encoded (`["1", "2"]`) JSON +/// arrays. Indices are stored as decimal strings so they pass straight through +/// to the auto-generated request builders. +#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize)] +pub struct ValIndexes(pub Vec); + +/// Hard cap on the number of validator indices accepted per request. A real +/// cluster has at most a few hundred validators; the cap is set generously +/// above that to leave room for future growth while still bounding the work +/// per request so a single misbehaving caller cannot drive unbounded +/// allocation. Pairs with the route-level [`DUTIES_BODY_LIMIT`] +/// (`router.rs`) which limits the *bytes* the deserializer ever sees; +/// this limits the *count* even within those bytes. +pub const VAL_INDEXES_MAX_LEN: usize = 8192; + +impl<'de> Deserialize<'de> for ValIndexes { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + // Custom visitor: streams elements via `SeqAccess::next_element`, + // validates each on read, and aborts as soon as the cap is exceeded. + // Avoids the `#[serde(untagged)]` two-pass behavior (which buffers the + // input via serde's `Content` cache before retrying) and the + // single-allocation `Vec` materialization. + struct ValIndexesVisitor; + + impl<'de> Visitor<'de> for ValIndexesVisitor { + type Value = ValIndexes; + + fn expecting(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("an array of validator indices (numeric or decimal string)") + } + + fn visit_seq(self, mut seq: A) -> Result + where + A: SeqAccess<'de>, + { + let mut out = Vec::with_capacity(seq.size_hint().unwrap_or(0).min(64)); + while let Some(elem) = seq.next_element::()? { + if out.len() >= VAL_INDEXES_MAX_LEN { + return Err(de::Error::custom(format!( + "too many validator indices (max {VAL_INDEXES_MAX_LEN})" + ))); + } + out.push(elem.0); + } + Ok(ValIndexes(out)) + } + } + + deserializer.deserialize_seq(ValIndexesVisitor) + } +} + +/// One validator-index element. Accepts either a JSON number (formatted into +/// a decimal string) or a JSON string (validated as a `u64` then kept +/// verbatim). Single-pass; no untagged-enum buffering. +struct Element(String); + +impl<'de> Deserialize<'de> for Element { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + struct ElemVisitor; + + impl Visitor<'_> for ElemVisitor { + type Value = Element; + + fn expecting(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("a validator index (u64 or decimal string)") + } + + fn visit_u64(self, v: u64) -> Result { + Ok(Element(v.to_string())) + } + + fn visit_i64(self, v: i64) -> Result { + u64::try_from(v) + .map(|n| Element(n.to_string())) + .map_err(|_| de::Error::custom("validator index must be non-negative")) + } + + fn visit_str(self, v: &str) -> Result { + v.parse::().map_err(de::Error::custom)?; + Ok(Element(v.to_owned())) + } + } + + deserializer.deserialize_any(ElemVisitor) + } +}