From 0767f40729bd3c0f72756dd01393ecb890fb2c88 Mon Sep 17 00:00:00 2001 From: Bohdan Ohorodnii <35969035+varex83@users.noreply.github.com> Date: Thu, 28 May 2026 14:08:31 +0200 Subject: [PATCH 01/12] feat(core): implement validatorapi node_version handler Threads the Handler through Axum state via AppState + with_state, wires the node_version route to the real handler, and adds a TestHandler mock that future PRs will extend per-endpoint. --- crates/core/src/validatorapi/body.rs | 17 +++ crates/core/src/validatorapi/mod.rs | 4 + crates/core/src/validatorapi/router.rs | 65 +++++++-- crates/core/src/validatorapi/testutils.rs | 169 ++++++++++++++++++++++ 4 files changed, 246 insertions(+), 9 deletions(-) create mode 100644 crates/core/src/validatorapi/body.rs create mode 100644 crates/core/src/validatorapi/testutils.rs diff --git a/crates/core/src/validatorapi/body.rs b/crates/core/src/validatorapi/body.rs new file mode 100644 index 00000000..2342b03f --- /dev/null +++ b/crates/core/src/validatorapi/body.rs @@ -0,0 +1,17 @@ +//! HTTP request and response body shapes for the validator API. + +use serde::Serialize; + +/// Wire body for `GET /eth/v1/node/version`. +#[derive(Debug, Clone, Serialize)] +pub struct NodeVersionResponse { + /// Version payload. + pub data: NodeVersionData, +} + +/// `data` field of [`NodeVersionResponse`]. +#[derive(Debug, Clone, Serialize)] +pub struct NodeVersionData { + /// Node version string. + pub version: String, +} diff --git a/crates/core/src/validatorapi/mod.rs b/crates/core/src/validatorapi/mod.rs index 399ef53a..f37f16ff 100644 --- a/crates/core/src/validatorapi/mod.rs +++ b/crates/core/src/validatorapi/mod.rs @@ -3,12 +3,16 @@ //! Serves the subset of beacon-API endpoints related to distributed //! validation and reverse-proxies the rest to the upstream beacon node. +pub mod body; pub mod error; pub mod handler; pub mod metrics; pub mod router; pub mod types; +#[cfg(test)] +pub mod testutils; + pub use error::ApiError; pub use handler::Handler; pub use router::new_router; diff --git a/crates/core/src/validatorapi/router.rs b/crates/core/src/validatorapi/router.rs index bbf70f5d..75fed8a0 100644 --- a/crates/core/src/validatorapi/router.rs +++ b/crates/core/src/validatorapi/router.rs @@ -3,23 +3,43 @@ //! The endpoint table preserves the order of the upstream definition, //! including which endpoints unconditionally respond `404`. +use std::sync::Arc; + use axum::{ - Router, + Json, Router, + extract::State, response::IntoResponse, routing::{get, post}, }; -use super::{error::ApiError, handler::Handler}; +use super::{ + body::{NodeVersionData, NodeVersionResponse}, + error::ApiError, + handler::Handler, +}; + +/// Shared router state. Cloned per request via [`Arc`]. +pub(super) struct AppState { + /// Request handler invoked by each route. + pub handler: H, + /// Whether builder mode is enabled. Read by `propose_block_v3`. + #[allow(dead_code, reason = "consumed by propose_block_v3 in a later PR")] + pub builder_enabled: bool, +} /// Builds the validator API HTTP router. /// /// Registers the distributed-validator-related endpoints and a fallback /// that reverse-proxies everything else to the upstream beacon node. /// -/// `_handler` will be threaded into Axum router state once request bodies -/// and responses are wired. `_builder_enabled` is consumed only by -/// `propose_block_v3`. -pub fn new_router(_handler: H, _builder_enabled: bool) -> Router { +/// `builder_enabled` is consumed by `propose_block_v3` to maximise the +/// builder boost factor. +pub fn new_router(handler: H, builder_enabled: bool) -> Router { + let state = Arc::new(AppState { + handler, + builder_enabled, + }); + Router::new() .route( "/eth/v1/validator/duties/attester/{epoch}", @@ -95,8 +115,9 @@ pub fn new_router(_handler: H, _builder_enabled: bool) -> Router { "/eth/v1/validator/sync_committee_selections", post(sync_committee_selections), ) - .route("/eth/v1/node/version", get(node_version)) + .route("/eth/v1/node/version", get(node_version::)) .fallback(proxy_handler) + .with_state(state) } async fn attester_duties() { @@ -179,8 +200,16 @@ async fn sync_committee_selections() { todo!("vapi: sync_committee_selections"); } -async fn node_version() { - todo!("vapi: node_version"); +async fn node_version( + State(state): State>>, +) -> Result, ApiError> { + let response = state.handler.node_version().await?; + + Ok(Json(NodeVersionResponse { + data: NodeVersionData { + version: response.data, + }, + })) } async fn respond_404() -> impl IntoResponse { @@ -190,3 +219,21 @@ async fn respond_404() -> impl IntoResponse { async fn proxy_handler() { todo!("vapi: proxy_handler"); } + +#[cfg(test)] +mod tests { + use super::*; + use crate::validatorapi::testutils::TestHandler; + + #[tokio::test] + async fn node_version_wraps_handler_value() { + let state = Arc::new(AppState { + handler: TestHandler::with_version("pluto/test/v1.0"), + builder_enabled: false, + }); + + let Json(body) = node_version(State(state)).await.unwrap(); + + assert_eq!(body.data.version, "pluto/test/v1.0"); + } +} diff --git a/crates/core/src/validatorapi/testutils.rs b/crates/core/src/validatorapi/testutils.rs new file mode 100644 index 00000000..5e61d05c --- /dev/null +++ b/crates/core/src/validatorapi/testutils.rs @@ -0,0 +1,169 @@ +//! Test helpers for the validator API router. +//! +//! [`TestHandler`] implements [`Handler`] with `unimplemented!()` stubs for +//! every method. As each router endpoint is ported, the relevant method is +//! overridden here so the route's unit test can drive it. + +use async_trait::async_trait; + +use super::{ + error::ApiError, + handler::Handler, + types::{ + AggregateAttestationOpts, AttestationData, AttestationDataOpts, AttesterDutiesOpts, + AttesterDuty, BeaconCommitteeSelection, EthResponse, ProposalOpts, ProposerDutiesOpts, + ProposerDuty, SignedContributionAndProof, SignedValidatorRegistration, SignedVoluntaryExit, + SyncCommitteeContribution, SyncCommitteeContributionOpts, SyncCommitteeDutiesOpts, + SyncCommitteeDuty, SyncCommitteeMessage, SyncCommitteeSelection, Validator, ValidatorsOpts, + VersionedAttestation, VersionedProposal, VersionedSignedAggregateAndProof, + VersionedSignedBlindedProposal, VersionedSignedProposal, + }, +}; + +/// Mock [`Handler`] used by router unit tests. +#[derive(Debug, Default, Clone)] +pub struct TestHandler { + /// Value returned by [`Handler::node_version`]. + pub version: String, +} + +impl TestHandler { + /// Builds a [`TestHandler`] with the given node version string. + pub fn with_version(version: impl Into) -> Self { + Self { + version: version.into(), + } + } +} + +#[async_trait] +impl Handler for TestHandler { + async fn node_version(&self) -> Result, ApiError> { + Ok(EthResponse { + data: self.version.clone(), + execution_optimistic: false, + finalized: false, + dependent_root: None, + }) + } + + async fn attester_duties( + &self, + _opts: AttesterDutiesOpts, + ) -> Result>, ApiError> { + unimplemented!("attester_duties not stubbed in TestHandler") + } + + async fn proposer_duties( + &self, + _opts: ProposerDutiesOpts, + ) -> Result>, ApiError> { + unimplemented!("proposer_duties not stubbed in TestHandler") + } + + async fn sync_committee_duties( + &self, + _opts: SyncCommitteeDutiesOpts, + ) -> Result>, ApiError> { + unimplemented!("sync_committee_duties not stubbed in TestHandler") + } + + async fn attestation_data( + &self, + _opts: AttestationDataOpts, + ) -> Result, ApiError> { + unimplemented!("attestation_data not stubbed in TestHandler") + } + + async fn submit_attestations( + &self, + _attestations: Vec, + ) -> Result<(), ApiError> { + unimplemented!("submit_attestations not stubbed in TestHandler") + } + + async fn proposal( + &self, + _opts: ProposalOpts, + ) -> Result, ApiError> { + unimplemented!("proposal not stubbed in TestHandler") + } + + async fn submit_proposal(&self, _proposal: VersionedSignedProposal) -> Result<(), ApiError> { + unimplemented!("submit_proposal not stubbed in TestHandler") + } + + async fn submit_blinded_proposal( + &self, + _proposal: VersionedSignedBlindedProposal, + ) -> Result<(), ApiError> { + unimplemented!("submit_blinded_proposal not stubbed in TestHandler") + } + + async fn aggregate_attestation( + &self, + _opts: AggregateAttestationOpts, + ) -> Result, ApiError> { + unimplemented!("aggregate_attestation not stubbed in TestHandler") + } + + async fn submit_aggregate_attestations( + &self, + _aggregates: Vec, + ) -> Result<(), ApiError> { + unimplemented!("submit_aggregate_attestations not stubbed in TestHandler") + } + + async fn beacon_committee_selections( + &self, + _selections: Vec, + ) -> Result>, ApiError> { + unimplemented!("beacon_committee_selections not stubbed in TestHandler") + } + + async fn sync_committee_selections( + &self, + _selections: Vec, + ) -> Result>, ApiError> { + unimplemented!("sync_committee_selections not stubbed in TestHandler") + } + + async fn validators( + &self, + _opts: ValidatorsOpts, + ) -> Result>, ApiError> { + unimplemented!("validators not stubbed in TestHandler") + } + + async fn submit_validator_registrations( + &self, + _registrations: Vec, + ) -> Result<(), ApiError> { + unimplemented!("submit_validator_registrations not stubbed in TestHandler") + } + + async fn submit_voluntary_exit(&self, _exit: SignedVoluntaryExit) -> Result<(), ApiError> { + unimplemented!("submit_voluntary_exit not stubbed in TestHandler") + } + + async fn sync_committee_contribution( + &self, + _opts: SyncCommitteeContributionOpts, + ) -> Result, ApiError> { + unimplemented!("sync_committee_contribution not stubbed in TestHandler") + } + + async fn submit_sync_committee_contributions( + &self, + _contributions: Vec, + ) -> Result<(), ApiError> { + unimplemented!("submit_sync_committee_contributions not stubbed in TestHandler") + } + + async fn submit_sync_committee_messages( + &self, + _messages: Vec, + ) -> Result<(), ApiError> { + unimplemented!("submit_sync_committee_messages not stubbed in TestHandler") + } +} From 41f5fb0f7ee54ef6fe462cbd3194a3ecc56d03d4 Mon Sep 17 00:00:00 2001 From: Bohdan Ohorodnii <35969035+varex83@users.noreply.github.com> Date: Thu, 28 May 2026 14:14:39 +0200 Subject: [PATCH 02/12] fix: linter --- crates/core/src/validatorapi/testutils.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/core/src/validatorapi/testutils.rs b/crates/core/src/validatorapi/testutils.rs index 5e61d05c..e64f13b2 100644 --- a/crates/core/src/validatorapi/testutils.rs +++ b/crates/core/src/validatorapi/testutils.rs @@ -37,7 +37,7 @@ impl TestHandler { } #[async_trait] -impl Handler for TestHandler { +impl Handler for TestHandler { async fn node_version(&self) -> Result, ApiError> { Ok(EthResponse { data: self.version.clone(), From aaf053e3abc7a32d4c3071f535742a7facb793a6 Mon Sep 17 00:00:00 2001 From: Bohdan Ohorodnii <35969035+varex83@users.noreply.github.com> Date: Thu, 28 May 2026 15:37:36 +0200 Subject: [PATCH 03/12] feat(core): implement validatorapi proposer_duties handler (#450) Re-uses the auto-generated pluto_eth2api envelopes (GetProposerDutiesResponseResponse, GetVersionResponseResponse) as the on-the-wire shape rather than hand-rolling parallel types. node_version is migrated to the same pattern; the body.rs hand-rolled wrapper module is removed. --- crates/core/src/validatorapi/body.rs | 17 ------- crates/core/src/validatorapi/handler.rs | 13 +++--- crates/core/src/validatorapi/mod.rs | 1 - crates/core/src/validatorapi/router.rs | 56 ++++++++++++++++++----- crates/core/src/validatorapi/testutils.rs | 36 ++++++++++----- crates/core/src/validatorapi/types.rs | 12 +++-- 6 files changed, 83 insertions(+), 52 deletions(-) delete mode 100644 crates/core/src/validatorapi/body.rs diff --git a/crates/core/src/validatorapi/body.rs b/crates/core/src/validatorapi/body.rs deleted file mode 100644 index 2342b03f..00000000 --- a/crates/core/src/validatorapi/body.rs +++ /dev/null @@ -1,17 +0,0 @@ -//! HTTP request and response body shapes for the validator API. - -use serde::Serialize; - -/// Wire body for `GET /eth/v1/node/version`. -#[derive(Debug, Clone, Serialize)] -pub struct NodeVersionResponse { - /// Version payload. - pub data: NodeVersionData, -} - -/// `data` field of [`NodeVersionResponse`]. -#[derive(Debug, Clone, Serialize)] -pub struct NodeVersionData { - /// Node version string. - pub version: String, -} diff --git a/crates/core/src/validatorapi/handler.rs b/crates/core/src/validatorapi/handler.rs index 9aebe758..06604aff 100644 --- a/crates/core/src/validatorapi/handler.rs +++ b/crates/core/src/validatorapi/handler.rs @@ -6,10 +6,11 @@ use super::{ error::ApiError, types::{ AggregateAttestationOpts, AttestationData, AttestationDataOpts, AttesterDutiesOpts, - AttesterDuty, BeaconCommitteeSelection, EthResponse, ProposalOpts, ProposerDutiesOpts, - ProposerDuty, SignedContributionAndProof, SignedValidatorRegistration, SignedVoluntaryExit, - SyncCommitteeContribution, SyncCommitteeContributionOpts, SyncCommitteeDutiesOpts, - SyncCommitteeDuty, SyncCommitteeMessage, SyncCommitteeSelection, Validator, ValidatorsOpts, + AttesterDuty, BeaconCommitteeSelection, EthResponse, NodeVersionResponse, ProposalOpts, + ProposerDutiesOpts, ProposerDutiesResponse, SignedContributionAndProof, + SignedValidatorRegistration, SignedVoluntaryExit, SyncCommitteeContribution, + SyncCommitteeContributionOpts, SyncCommitteeDutiesOpts, SyncCommitteeDuty, + SyncCommitteeMessage, SyncCommitteeSelection, Validator, ValidatorsOpts, VersionedAttestation, VersionedProposal, VersionedSignedAggregateAndProof, VersionedSignedBlindedProposal, VersionedSignedProposal, }, @@ -33,7 +34,7 @@ pub trait Handler: Send + Sync + 'static { async fn proposer_duties( &self, opts: ProposerDutiesOpts, - ) -> Result>, ApiError>; + ) -> Result; /// `POST /eth/v1/validator/duties/sync/{epoch}`. async fn sync_committee_duties( @@ -126,5 +127,5 @@ pub trait Handler: Send + Sync + 'static { ) -> Result<(), ApiError>; /// `GET /eth/v1/node/version`. - async fn node_version(&self) -> Result, ApiError>; + async fn node_version(&self) -> Result; } diff --git a/crates/core/src/validatorapi/mod.rs b/crates/core/src/validatorapi/mod.rs index f37f16ff..ef6be81f 100644 --- a/crates/core/src/validatorapi/mod.rs +++ b/crates/core/src/validatorapi/mod.rs @@ -3,7 +3,6 @@ //! Serves the subset of beacon-API endpoints related to distributed //! validation and reverse-proxies the rest to the upstream beacon node. -pub mod body; pub mod error; pub mod handler; pub mod metrics; diff --git a/crates/core/src/validatorapi/router.rs b/crates/core/src/validatorapi/router.rs index 75fed8a0..13000fdf 100644 --- a/crates/core/src/validatorapi/router.rs +++ b/crates/core/src/validatorapi/router.rs @@ -7,15 +7,15 @@ use std::sync::Arc; use axum::{ Json, Router, - extract::State, + extract::{Path, State}, response::IntoResponse, routing::{get, post}, }; use super::{ - body::{NodeVersionData, NodeVersionResponse}, error::ApiError, handler::Handler, + types::{NodeVersionResponse, ProposerDutiesOpts, ProposerDutiesResponse}, }; /// Shared router state. Cloned per request via [`Arc`]. @@ -47,7 +47,7 @@ pub fn new_router(handler: H, builder_enabled: bool) -> Router { ) .route( "/eth/v1/validator/duties/proposer/{epoch}", - get(proposer_duties), + get(proposer_duties::), ) .route( "/eth/v1/validator/duties/sync/{epoch}", @@ -124,8 +124,16 @@ async fn attester_duties() { todo!("vapi: attester_duties"); } -async fn proposer_duties() { - todo!("vapi: proposer_duties"); +async fn proposer_duties( + State(state): State>>, + Path(epoch): Path, +) -> Result, ApiError> { + let response = state + .handler + .proposer_duties(ProposerDutiesOpts { epoch }) + .await?; + + Ok(Json(response)) } async fn sync_committee_duties() { @@ -205,11 +213,7 @@ async fn node_version( ) -> Result, ApiError> { let response = state.handler.node_version().await?; - Ok(Json(NodeVersionResponse { - data: NodeVersionData { - version: response.data, - }, - })) + Ok(Json(response)) } async fn respond_404() -> impl IntoResponse { @@ -223,7 +227,10 @@ async fn proxy_handler() { #[cfg(test)] mod tests { use super::*; - use crate::validatorapi::testutils::TestHandler; + use crate::validatorapi::{ + testutils::TestHandler, + types::{ProposerDutiesResponse, ProposerDuty}, + }; #[tokio::test] async fn node_version_wraps_handler_value() { @@ -236,4 +243,31 @@ mod tests { assert_eq!(body.data.version, "pluto/test/v1.0"); } + + #[tokio::test] + async fn proposer_duties_wraps_handler_value() { + let duty = ProposerDuty { + pubkey: "0xaabbccddeeff".to_owned(), + slot: "1234".to_owned(), + validator_index: "7".to_owned(), + }; + let handler = TestHandler::default().with_proposer_duties(ProposerDutiesResponse { + data: vec![duty], + dependent_root: "0xcd".to_owned(), + execution_optimistic: true, + }); + let state = Arc::new(AppState { + handler, + builder_enabled: false, + }); + + let Json(body) = proposer_duties(State(state), Path(99u64)).await.unwrap(); + + let json = serde_json::to_value(&body).unwrap(); + assert_eq!(json["dependent_root"], "0xcd"); + assert_eq!(json["execution_optimistic"], true); + assert_eq!(json["data"][0]["slot"], "1234"); + assert_eq!(json["data"][0]["validator_index"], "7"); + assert_eq!(json["data"][0]["pubkey"], "0xaabbccddeeff"); + } } diff --git a/crates/core/src/validatorapi/testutils.rs b/crates/core/src/validatorapi/testutils.rs index e64f13b2..90a480d2 100644 --- a/crates/core/src/validatorapi/testutils.rs +++ b/crates/core/src/validatorapi/testutils.rs @@ -11,10 +11,11 @@ use super::{ handler::Handler, types::{ AggregateAttestationOpts, AttestationData, AttestationDataOpts, AttesterDutiesOpts, - AttesterDuty, BeaconCommitteeSelection, EthResponse, ProposalOpts, ProposerDutiesOpts, - ProposerDuty, SignedContributionAndProof, SignedValidatorRegistration, SignedVoluntaryExit, - SyncCommitteeContribution, SyncCommitteeContributionOpts, SyncCommitteeDutiesOpts, - SyncCommitteeDuty, SyncCommitteeMessage, SyncCommitteeSelection, Validator, ValidatorsOpts, + AttesterDuty, BeaconCommitteeSelection, EthResponse, NodeVersionData, NodeVersionResponse, + ProposalOpts, ProposerDutiesOpts, ProposerDutiesResponse, SignedContributionAndProof, + SignedValidatorRegistration, SignedVoluntaryExit, SyncCommitteeContribution, + SyncCommitteeContributionOpts, SyncCommitteeDutiesOpts, SyncCommitteeDuty, + SyncCommitteeMessage, SyncCommitteeSelection, Validator, ValidatorsOpts, VersionedAttestation, VersionedProposal, VersionedSignedAggregateAndProof, VersionedSignedBlindedProposal, VersionedSignedProposal, }, @@ -25,6 +26,8 @@ use super::{ pub struct TestHandler { /// Value returned by [`Handler::node_version`]. pub version: String, + /// Value returned by [`Handler::proposer_duties`]. + pub proposer_duties_response: Option, } impl TestHandler { @@ -32,18 +35,24 @@ impl TestHandler { pub fn with_version(version: impl Into) -> Self { Self { version: version.into(), + ..Self::default() } } + + /// Sets the response returned by [`Handler::proposer_duties`]. + pub fn with_proposer_duties(mut self, response: ProposerDutiesResponse) -> Self { + self.proposer_duties_response = Some(response); + self + } } #[async_trait] impl Handler for TestHandler { - async fn node_version(&self) -> Result, ApiError> { - Ok(EthResponse { - data: self.version.clone(), - execution_optimistic: false, - finalized: false, - dependent_root: None, + async fn node_version(&self) -> Result { + Ok(NodeVersionResponse { + data: NodeVersionData { + version: self.version.clone(), + }, }) } @@ -57,8 +66,11 @@ impl Handler for TestHandler { async fn proposer_duties( &self, _opts: ProposerDutiesOpts, - ) -> Result>, ApiError> { - unimplemented!("proposer_duties not stubbed in TestHandler") + ) -> Result { + Ok(self + .proposer_duties_response + .clone() + .expect("proposer_duties not stubbed in TestHandler")) } async fn sync_committee_duties( diff --git a/crates/core/src/validatorapi/types.rs b/crates/core/src/validatorapi/types.rs index 9fef4de0..01f3405a 100644 --- a/crates/core/src/validatorapi/types.rs +++ b/crates/core/src/validatorapi/types.rs @@ -5,7 +5,13 @@ //! for the proper consensus-spec types in a later phase. pub use pluto_crypto::types::{PublicKey as BlsPubKey, Signature as BlsSignature}; -pub use pluto_eth2api::spec::phase0::{Epoch, Root, Slot, ValidatorIndex}; +pub use pluto_eth2api::{ + GetProposerDutiesResponseResponse as ProposerDutiesResponse, + GetProposerDutiesResponseResponseDatum as ProposerDuty, + GetVersionResponseResponse as NodeVersionResponse, + GetVersionResponseResponseData as NodeVersionData, + spec::phase0::{Epoch, Root, Slot, ValidatorIndex}, +}; /// Index of a beacon committee within a slot. pub type CommitteeIndex = u64; @@ -114,10 +120,6 @@ pub struct SyncCommitteeContributionOpts { #[derive(Debug, Clone)] pub struct AttesterDuty {} -/// Proposer duty payload. Placeholder. -#[derive(Debug, Clone)] -pub struct ProposerDuty {} - /// Sync-committee duty payload. Placeholder. #[derive(Debug, Clone)] pub struct SyncCommitteeDuty {} From 09b14ac676195183bfa1e8740a6ca28bcda0d31c Mon Sep 17 00:00:00 2001 From: Bohdan Ohorodnii <35969035+varex83@users.noreply.github.com> Date: Thu, 28 May 2026 15:54:59 +0200 Subject: [PATCH 04/12] refactor(core): use dynamic dispatch for validatorapi Handler Drops the per-handler generic parameter and routes through Arc via AppState. The Handler trait is object-safe (Send + Sync + 'static + async_trait-generated methods), so this is a pure type change with no surface impact. --- crates/core/src/validatorapi/router.rs | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/crates/core/src/validatorapi/router.rs b/crates/core/src/validatorapi/router.rs index 13000fdf..e619f37d 100644 --- a/crates/core/src/validatorapi/router.rs +++ b/crates/core/src/validatorapi/router.rs @@ -19,9 +19,9 @@ use super::{ }; /// Shared router state. Cloned per request via [`Arc`]. -pub(super) struct AppState { +pub(super) struct AppState { /// Request handler invoked by each route. - pub handler: H, + pub handler: Arc, /// Whether builder mode is enabled. Read by `propose_block_v3`. #[allow(dead_code, reason = "consumed by propose_block_v3 in a later PR")] pub builder_enabled: bool, @@ -34,7 +34,7 @@ pub(super) struct AppState { /// /// `builder_enabled` is consumed by `propose_block_v3` to maximise the /// builder boost factor. -pub fn new_router(handler: H, builder_enabled: bool) -> Router { +pub fn new_router(handler: Arc, builder_enabled: bool) -> Router { let state = Arc::new(AppState { handler, builder_enabled, @@ -47,7 +47,7 @@ pub fn new_router(handler: H, builder_enabled: bool) -> Router { ) .route( "/eth/v1/validator/duties/proposer/{epoch}", - get(proposer_duties::), + get(proposer_duties), ) .route( "/eth/v1/validator/duties/sync/{epoch}", @@ -115,7 +115,7 @@ pub fn new_router(handler: H, builder_enabled: bool) -> Router { "/eth/v1/validator/sync_committee_selections", post(sync_committee_selections), ) - .route("/eth/v1/node/version", get(node_version::)) + .route("/eth/v1/node/version", get(node_version)) .fallback(proxy_handler) .with_state(state) } @@ -124,8 +124,8 @@ async fn attester_duties() { todo!("vapi: attester_duties"); } -async fn proposer_duties( - State(state): State>>, +async fn proposer_duties( + State(state): State>, Path(epoch): Path, ) -> Result, ApiError> { let response = state @@ -208,8 +208,8 @@ async fn sync_committee_selections() { todo!("vapi: sync_committee_selections"); } -async fn node_version( - State(state): State>>, +async fn node_version( + State(state): State>, ) -> Result, ApiError> { let response = state.handler.node_version().await?; @@ -235,7 +235,7 @@ mod tests { #[tokio::test] async fn node_version_wraps_handler_value() { let state = Arc::new(AppState { - handler: TestHandler::with_version("pluto/test/v1.0"), + handler: Arc::new(TestHandler::with_version("pluto/test/v1.0")), builder_enabled: false, }); @@ -257,7 +257,7 @@ mod tests { execution_optimistic: true, }); let state = Arc::new(AppState { - handler, + handler: Arc::new(handler), builder_enabled: false, }); From a3ffa9e36f8288758183a0887a307884f259ddbb Mon Sep 17 00:00:00 2001 From: Bohdan Ohorodnii <35969035+varex83@users.noreply.github.com> Date: Thu, 28 May 2026 16:15:52 +0200 Subject: [PATCH 05/12] feat(core): scaffold validatorapi Component handler Adds the Handler impl that the router has been calling through. node_version returns the obolnetwork/pluto/{version}-{commit}/{arch}-{os} identity string; proposer_duties calls the upstream beacon node and rewrites known DV root public keys to this node's public share so the validator client sees keys matching its keystore. The remaining 17 trait methods are unimplemented!() stubs that land per-PR as their router handlers are ported. --- crates/core/src/validatorapi/component.rs | 350 ++++++++++++++++++++++ crates/core/src/validatorapi/mod.rs | 2 + 2 files changed, 352 insertions(+) create mode 100644 crates/core/src/validatorapi/component.rs diff --git a/crates/core/src/validatorapi/component.rs b/crates/core/src/validatorapi/component.rs new file mode 100644 index 00000000..e0e609b1 --- /dev/null +++ b/crates/core/src/validatorapi/component.rs @@ -0,0 +1,350 @@ +//! Validator API [`Handler`] implementation. +//! +//! The component owns the upstream beacon-node client plus the public-key +//! and public-share mappings needed to translate between distributed-validator +//! root keys and this node's threshold-BLS share. + +use std::{collections::HashMap, sync::Arc}; + +use async_trait::async_trait; +use axum::http::StatusCode; +use pluto_eth2api::{ + EthBeaconNodeApiClient, GetProposerDutiesRequest, GetProposerDutiesResponse, + spec::phase0::BLSPubKey, +}; + +use super::{ + error::ApiError, + handler::Handler, + types::{ + AggregateAttestationOpts, AttestationData, AttestationDataOpts, AttesterDutiesOpts, + AttesterDuty, BeaconCommitteeSelection, EthResponse, NodeVersionData, NodeVersionResponse, + ProposalOpts, ProposerDutiesOpts, ProposerDutiesResponse, ProposerDuty, + SignedContributionAndProof, SignedValidatorRegistration, SignedVoluntaryExit, + SyncCommitteeContribution, SyncCommitteeContributionOpts, SyncCommitteeDutiesOpts, + SyncCommitteeDuty, SyncCommitteeMessage, SyncCommitteeSelection, Validator, ValidatorsOpts, + VersionedAttestation, VersionedProposal, VersionedSignedAggregateAndProof, + VersionedSignedBlindedProposal, VersionedSignedProposal, + }, +}; +use crate::version; + +/// Validator API [`Handler`] implementation. +/// +/// Holds the upstream beacon-node client and the cluster's public-key / +/// public-share mappings. Each per-endpoint method calls upstream, rewrites +/// root pubkeys to this node's share where the endpoint exposes data to the +/// validator client, and emits partial-signed-data to subscribers on submit +/// endpoints. +pub struct Component { + /// Upstream beacon-node API client. + eth2_cl: Arc, + /// Threshold BLS share index assigned to this node (1-indexed). + #[allow(dead_code, reason = "consumed by submit_* handlers in later PRs")] + share_idx: u64, + /// Maps DV root public keys to this node's public share. Used to rewrite + /// validator-client-facing endpoints (proposer/attester duties, etc.) so + /// the VC sees the share it is configured to sign with. + pub_share_by_pubkey: HashMap, + /// Whether builder mode is enabled. Read by `propose_block_v3` and the + /// validator-registration submitter. + #[allow( + dead_code, + reason = "consumed by propose_block_v3 / submit_validator_registrations" + )] + builder_enabled: bool, + /// Skip signature verification on partial-signed submissions. Test-only. + #[allow(dead_code, reason = "consumed by submit_* handlers in later PRs")] + insecure_test: bool, +} + +impl Component { + /// Builds a new component. + pub fn new( + eth2_cl: Arc, + share_idx: u64, + pub_share_by_pubkey: HashMap, + builder_enabled: bool, + ) -> Self { + Self { + eth2_cl, + share_idx, + pub_share_by_pubkey, + builder_enabled, + insecure_test: false, + } + } + + /// Builds a component that skips partial-signature verification on + /// submit endpoints. Test use only. + pub fn new_insecure(eth2_cl: Arc, share_idx: u64) -> Self { + Self { + eth2_cl, + share_idx, + pub_share_by_pubkey: HashMap::new(), + builder_enabled: false, + insecure_test: true, + } + } +} + +#[async_trait] +impl Handler for Component { + async fn node_version(&self) -> Result { + let (commit, _) = version::git_commit(); + let version = format!( + "obolnetwork/pluto/{}-{}/{}-{}", + *version::VERSION, + commit, + std::env::consts::ARCH, + std::env::consts::OS, + ); + + Ok(NodeVersionResponse { + data: NodeVersionData { version }, + }) + } + + async fn proposer_duties( + &self, + opts: ProposerDutiesOpts, + ) -> Result { + let request = GetProposerDutiesRequest::builder() + .epoch(opts.epoch.to_string()) + .build() + .map_err(|err| { + ApiError::new(StatusCode::BAD_REQUEST, "invalid epoch").with_source( + std::io::Error::new(std::io::ErrorKind::InvalidInput, err.to_string()), + ) + })?; + + let response = self + .eth2_cl + .get_proposer_duties(request) + .await + .map_err(|err| { + ApiError::new(StatusCode::BAD_GATEWAY, "upstream proposer duties failed") + .with_source(std::io::Error::other(err.to_string())) + })?; + + let mut payload = match response { + GetProposerDutiesResponse::Ok(payload) => payload, + other => { + return Err(ApiError::new( + StatusCode::BAD_GATEWAY, + format!("unexpected upstream proposer duties response: {other:?}"), + )); + } + }; + + swap_proposer_pubshares(&mut payload.data, &self.pub_share_by_pubkey)?; + + Ok(payload) + } + + async fn attester_duties( + &self, + _opts: AttesterDutiesOpts, + ) -> Result>, ApiError> { + unimplemented!("attester_duties not yet ported") + } + + async fn sync_committee_duties( + &self, + _opts: SyncCommitteeDutiesOpts, + ) -> Result>, ApiError> { + unimplemented!("sync_committee_duties not yet ported") + } + + async fn attestation_data( + &self, + _opts: AttestationDataOpts, + ) -> Result, ApiError> { + unimplemented!("attestation_data not yet ported") + } + + async fn submit_attestations( + &self, + _attestations: Vec, + ) -> Result<(), ApiError> { + unimplemented!("submit_attestations not yet ported") + } + + async fn proposal( + &self, + _opts: ProposalOpts, + ) -> Result, ApiError> { + unimplemented!("proposal not yet ported") + } + + async fn submit_proposal(&self, _proposal: VersionedSignedProposal) -> Result<(), ApiError> { + unimplemented!("submit_proposal not yet ported") + } + + async fn submit_blinded_proposal( + &self, + _proposal: VersionedSignedBlindedProposal, + ) -> Result<(), ApiError> { + unimplemented!("submit_blinded_proposal not yet ported") + } + + async fn aggregate_attestation( + &self, + _opts: AggregateAttestationOpts, + ) -> Result, ApiError> { + unimplemented!("aggregate_attestation not yet ported") + } + + async fn submit_aggregate_attestations( + &self, + _aggregates: Vec, + ) -> Result<(), ApiError> { + unimplemented!("submit_aggregate_attestations not yet ported") + } + + async fn beacon_committee_selections( + &self, + _selections: Vec, + ) -> Result>, ApiError> { + unimplemented!("beacon_committee_selections not yet ported") + } + + async fn sync_committee_selections( + &self, + _selections: Vec, + ) -> Result>, ApiError> { + unimplemented!("sync_committee_selections not yet ported") + } + + async fn validators( + &self, + _opts: ValidatorsOpts, + ) -> Result>, ApiError> { + unimplemented!("validators not yet ported") + } + + async fn submit_validator_registrations( + &self, + _registrations: Vec, + ) -> Result<(), ApiError> { + unimplemented!("submit_validator_registrations not yet ported") + } + + async fn submit_voluntary_exit(&self, _exit: SignedVoluntaryExit) -> Result<(), ApiError> { + unimplemented!("submit_voluntary_exit not yet ported") + } + + async fn sync_committee_contribution( + &self, + _opts: SyncCommitteeContributionOpts, + ) -> Result, ApiError> { + unimplemented!("sync_committee_contribution not yet ported") + } + + async fn submit_sync_committee_contributions( + &self, + _contributions: Vec, + ) -> Result<(), ApiError> { + unimplemented!("submit_sync_committee_contributions not yet ported") + } + + async fn submit_sync_committee_messages( + &self, + _messages: Vec, + ) -> Result<(), ApiError> { + unimplemented!("submit_sync_committee_messages not yet ported") + } +} + +/// Rewrites each duty's root public key to this node's public share. Duties +/// whose pubkey is not in `pub_share_by_pubkey` are passed through unchanged +/// (the upstream returns all proposers for the epoch, not just ours). +fn swap_proposer_pubshares( + duties: &mut [ProposerDuty], + pub_share_by_pubkey: &HashMap, +) -> Result<(), ApiError> { + for duty in duties { + let pubkey = parse_bls_pubkey(&duty.pubkey)?; + if let Some(share) = pub_share_by_pubkey.get(&pubkey) { + duty.pubkey = format_bls_pubkey(share); + } + } + Ok(()) +} + +fn parse_bls_pubkey(s: &str) -> Result { + let trimmed = s.strip_prefix("0x").unwrap_or(s); + let bytes = hex::decode(trimmed).map_err(|err| { + ApiError::new( + StatusCode::BAD_GATEWAY, + format!("invalid pubkey hex: {err}"), + ) + })?; + bytes.as_slice().try_into().map_err(|_| { + ApiError::new( + StatusCode::BAD_GATEWAY, + format!("invalid pubkey length: got {}, want 48", bytes.len()), + ) + }) +} + +fn format_bls_pubkey(pubkey: &BLSPubKey) -> String { + format!("0x{}", hex::encode(pubkey)) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn swap_replaces_known_pubkeys_and_keeps_unknown() { + let root = [0xAA_u8; 48]; + let share = [0xBB_u8; 48]; + let stranger = [0xCC_u8; 48]; + + let map = HashMap::from([(root, share)]); + + let mut duties = vec![ + ProposerDuty { + pubkey: format_bls_pubkey(&root), + slot: "10".to_owned(), + validator_index: "1".to_owned(), + }, + ProposerDuty { + pubkey: format_bls_pubkey(&stranger), + slot: "11".to_owned(), + validator_index: "2".to_owned(), + }, + ]; + + swap_proposer_pubshares(&mut duties, &map).unwrap(); + + assert_eq!(duties[0].pubkey, format_bls_pubkey(&share)); + assert_eq!(duties[1].pubkey, format_bls_pubkey(&stranger)); + } + + #[test] + fn swap_rejects_malformed_pubkey() { + let mut duties = vec![ProposerDuty { + pubkey: "0xnothex".to_owned(), + slot: "0".to_owned(), + validator_index: "0".to_owned(), + }]; + let err = swap_proposer_pubshares(&mut duties, &HashMap::new()).unwrap_err(); + assert_eq!(err.status_code, StatusCode::BAD_GATEWAY); + } + + #[tokio::test] + async fn node_version_formats_pluto_string() { + // Use an unreachable upstream — node_version doesn't call it. + let eth2_cl = + Arc::new(EthBeaconNodeApiClient::with_base_url("http://127.0.0.1:0").unwrap()); + let component = Component::new_insecure(eth2_cl, 1); + + let response = component.node_version().await.unwrap(); + + assert!(response.data.version.starts_with("obolnetwork/pluto/")); + assert!(response.data.version.contains(std::env::consts::ARCH)); + assert!(response.data.version.contains(std::env::consts::OS)); + } +} diff --git a/crates/core/src/validatorapi/mod.rs b/crates/core/src/validatorapi/mod.rs index ef6be81f..8442859c 100644 --- a/crates/core/src/validatorapi/mod.rs +++ b/crates/core/src/validatorapi/mod.rs @@ -3,6 +3,7 @@ //! Serves the subset of beacon-API endpoints related to distributed //! validation and reverse-proxies the rest to the upstream beacon node. +pub mod component; pub mod error; pub mod handler; pub mod metrics; @@ -12,6 +13,7 @@ pub mod types; #[cfg(test)] pub mod testutils; +pub use component::Component; pub use error::ApiError; pub use handler::Handler; pub use router::new_router; From 6fe5785f035b258be4df7b7bc6faf94642c843c4 Mon Sep 17 00:00:00 2001 From: Bohdan Ohorodnii <35969035+varex83@users.noreply.github.com> Date: Thu, 28 May 2026 19:45:04 +0200 Subject: [PATCH 06/12] feat(core): implement validatorapi attester_duties handler Wires POST /eth/v1/validator/duties/attester/{epoch}: dual-format (numeric or string-encoded) validator index body, upstream call, pubshare swap. --- crates/core/src/validatorapi/component.rs | 108 ++++++++++++++++++++-- crates/core/src/validatorapi/handler.rs | 6 +- crates/core/src/validatorapi/router.rs | 74 ++++++++++++++- crates/core/src/validatorapi/testutils.rs | 25 +++-- crates/core/src/validatorapi/types.rs | 53 +++++++++-- 5 files changed, 234 insertions(+), 32 deletions(-) diff --git a/crates/core/src/validatorapi/component.rs b/crates/core/src/validatorapi/component.rs index e0e609b1..05a4f600 100644 --- a/crates/core/src/validatorapi/component.rs +++ b/crates/core/src/validatorapi/component.rs @@ -9,8 +9,8 @@ use std::{collections::HashMap, sync::Arc}; use async_trait::async_trait; use axum::http::StatusCode; use pluto_eth2api::{ - EthBeaconNodeApiClient, GetProposerDutiesRequest, GetProposerDutiesResponse, - spec::phase0::BLSPubKey, + EthBeaconNodeApiClient, GetAttesterDutiesRequest, GetAttesterDutiesResponse, + GetProposerDutiesRequest, GetProposerDutiesResponse, spec::phase0::BLSPubKey, }; use super::{ @@ -18,11 +18,12 @@ use super::{ handler::Handler, types::{ AggregateAttestationOpts, AttestationData, AttestationDataOpts, AttesterDutiesOpts, - AttesterDuty, BeaconCommitteeSelection, EthResponse, NodeVersionData, NodeVersionResponse, - ProposalOpts, ProposerDutiesOpts, ProposerDutiesResponse, ProposerDuty, - SignedContributionAndProof, SignedValidatorRegistration, SignedVoluntaryExit, - SyncCommitteeContribution, SyncCommitteeContributionOpts, SyncCommitteeDutiesOpts, - SyncCommitteeDuty, SyncCommitteeMessage, SyncCommitteeSelection, Validator, ValidatorsOpts, + AttesterDutiesResponse, AttesterDuty, BeaconCommitteeSelection, EthResponse, + NodeVersionData, NodeVersionResponse, ProposalOpts, ProposerDutiesOpts, + ProposerDutiesResponse, ProposerDuty, SignedContributionAndProof, + SignedValidatorRegistration, SignedVoluntaryExit, SyncCommitteeContribution, + SyncCommitteeContributionOpts, SyncCommitteeDutiesOpts, SyncCommitteeDuty, + SyncCommitteeMessage, SyncCommitteeSelection, Validator, ValidatorsOpts, VersionedAttestation, VersionedProposal, VersionedSignedAggregateAndProof, VersionedSignedBlindedProposal, VersionedSignedProposal, }, @@ -144,9 +145,42 @@ impl Handler for Component { async fn attester_duties( &self, - _opts: AttesterDutiesOpts, - ) -> Result>, ApiError> { - unimplemented!("attester_duties not yet ported") + opts: AttesterDutiesOpts, + ) -> Result { + let request = GetAttesterDutiesRequest::builder() + .epoch(opts.epoch.to_string()) + .body(opts.indices) + .build() + .map_err(|err| { + ApiError::new(StatusCode::BAD_REQUEST, "invalid attester duties request") + .with_source(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + err.to_string(), + )) + })?; + + let response = self + .eth2_cl + .get_attester_duties(request) + .await + .map_err(|err| { + ApiError::new(StatusCode::BAD_GATEWAY, "upstream attester duties failed") + .with_source(std::io::Error::other(err.to_string())) + })?; + + let mut payload = match response { + GetAttesterDutiesResponse::Ok(payload) => payload, + other => { + return Err(ApiError::new( + StatusCode::BAD_GATEWAY, + format!("unexpected upstream attester duties response: {other:?}"), + )); + } + }; + + swap_attester_pubshares(&mut payload.data, &self.pub_share_by_pubkey)?; + + Ok(payload) } async fn sync_committee_duties( @@ -272,6 +306,26 @@ fn swap_proposer_pubshares( Ok(()) } +/// Like [`swap_proposer_pubshares`] but for attester duties. Attester duties +/// only ever come back for validators owned by this cluster, so an unknown +/// pubkey indicates a misconfiguration and is rejected. +fn swap_attester_pubshares( + duties: &mut [AttesterDuty], + pub_share_by_pubkey: &HashMap, +) -> Result<(), ApiError> { + for duty in duties { + let pubkey = parse_bls_pubkey(&duty.pubkey)?; + let share = pub_share_by_pubkey.get(&pubkey).ok_or_else(|| { + ApiError::new( + StatusCode::BAD_GATEWAY, + "pubshare not found for attester duty", + ) + })?; + duty.pubkey = format_bls_pubkey(share); + } + Ok(()) +} + fn parse_bls_pubkey(s: &str) -> Result { let trimmed = s.strip_prefix("0x").unwrap_or(s); let bytes = hex::decode(trimmed).map_err(|err| { @@ -323,6 +377,40 @@ mod tests { assert_eq!(duties[1].pubkey, format_bls_pubkey(&stranger)); } + #[test] + fn swap_attester_replaces_pubkeys_and_rejects_unknown() { + let root = [0x11_u8; 48]; + let share = [0x22_u8; 48]; + let unknown = [0x33_u8; 48]; + + let map = HashMap::from([(root, share)]); + + let mut duties = vec![AttesterDuty { + pubkey: format_bls_pubkey(&root), + slot: "1".to_owned(), + committee_index: "0".to_owned(), + committee_length: "16".to_owned(), + committees_at_slot: "4".to_owned(), + validator_committee_index: "0".to_owned(), + validator_index: "5".to_owned(), + }]; + + swap_attester_pubshares(&mut duties, &map).unwrap(); + assert_eq!(duties[0].pubkey, format_bls_pubkey(&share)); + + let mut stranger_duties = vec![AttesterDuty { + pubkey: format_bls_pubkey(&unknown), + slot: "2".to_owned(), + committee_index: "0".to_owned(), + committee_length: "16".to_owned(), + committees_at_slot: "4".to_owned(), + validator_committee_index: "0".to_owned(), + validator_index: "6".to_owned(), + }]; + let err = swap_attester_pubshares(&mut stranger_duties, &map).unwrap_err(); + assert_eq!(err.status_code, StatusCode::BAD_GATEWAY); + } + #[test] fn swap_rejects_malformed_pubkey() { let mut duties = vec![ProposerDuty { diff --git a/crates/core/src/validatorapi/handler.rs b/crates/core/src/validatorapi/handler.rs index 06604aff..da0375fa 100644 --- a/crates/core/src/validatorapi/handler.rs +++ b/crates/core/src/validatorapi/handler.rs @@ -6,8 +6,8 @@ use super::{ error::ApiError, types::{ AggregateAttestationOpts, AttestationData, AttestationDataOpts, AttesterDutiesOpts, - AttesterDuty, BeaconCommitteeSelection, EthResponse, NodeVersionResponse, ProposalOpts, - ProposerDutiesOpts, ProposerDutiesResponse, SignedContributionAndProof, + AttesterDutiesResponse, BeaconCommitteeSelection, EthResponse, NodeVersionResponse, + ProposalOpts, ProposerDutiesOpts, ProposerDutiesResponse, SignedContributionAndProof, SignedValidatorRegistration, SignedVoluntaryExit, SyncCommitteeContribution, SyncCommitteeContributionOpts, SyncCommitteeDutiesOpts, SyncCommitteeDuty, SyncCommitteeMessage, SyncCommitteeSelection, Validator, ValidatorsOpts, @@ -28,7 +28,7 @@ pub trait Handler: Send + Sync + 'static { async fn attester_duties( &self, opts: AttesterDutiesOpts, - ) -> Result>, ApiError>; + ) -> Result; /// `GET /eth/v1/validator/duties/proposer/{epoch}`. async fn proposer_duties( diff --git a/crates/core/src/validatorapi/router.rs b/crates/core/src/validatorapi/router.rs index e619f37d..4c6a5761 100644 --- a/crates/core/src/validatorapi/router.rs +++ b/crates/core/src/validatorapi/router.rs @@ -15,7 +15,10 @@ use axum::{ use super::{ error::ApiError, handler::Handler, - types::{NodeVersionResponse, ProposerDutiesOpts, ProposerDutiesResponse}, + types::{ + AttesterDutiesOpts, AttesterDutiesResponse, NodeVersionResponse, ProposerDutiesOpts, + ProposerDutiesResponse, ValIndexes, + }, }; /// Shared router state. Cloned per request via [`Arc`]. @@ -120,8 +123,20 @@ pub fn new_router(handler: Arc, builder_enabled: bool) -> Router { .with_state(state) } -async fn attester_duties() { - todo!("vapi: attester_duties"); +async fn attester_duties( + State(state): State>, + Path(epoch): Path, + Json(indices): Json, +) -> Result, ApiError> { + let response = state + .handler + .attester_duties(AttesterDutiesOpts { + epoch, + indices: indices.0, + }) + .await?; + + Ok(Json(response)) } async fn proposer_duties( @@ -229,7 +244,9 @@ mod tests { use super::*; use crate::validatorapi::{ testutils::TestHandler, - types::{ProposerDutiesResponse, ProposerDuty}, + types::{ + AttesterDutiesResponse, AttesterDuty, ProposerDutiesResponse, ProposerDuty, ValIndexes, + }, }; #[tokio::test] @@ -244,6 +261,55 @@ mod tests { assert_eq!(body.data.version, "pluto/test/v1.0"); } + #[tokio::test] + async fn attester_duties_wraps_handler_value() { + let duty = AttesterDuty { + pubkey: "0xaabbccddeeff".to_owned(), + slot: "12".to_owned(), + committee_index: "3".to_owned(), + committee_length: "16".to_owned(), + committees_at_slot: "4".to_owned(), + validator_committee_index: "2".to_owned(), + validator_index: "7".to_owned(), + }; + let handler = TestHandler::default().with_attester_duties(AttesterDutiesResponse { + data: vec![duty], + dependent_root: "0xab".to_owned(), + execution_optimistic: false, + }); + let state = Arc::new(AppState { + handler: Arc::new(handler), + builder_enabled: false, + }); + + let Json(body) = attester_duties( + State(state), + Path(42u64), + Json(ValIndexes(vec!["7".to_owned()])), + ) + .await + .unwrap(); + + let json = serde_json::to_value(&body).unwrap(); + assert_eq!(json["dependent_root"], "0xab"); + assert_eq!(json["execution_optimistic"], false); + assert_eq!(json["data"][0]["slot"], "12"); + assert_eq!(json["data"][0]["committee_index"], "3"); + assert_eq!(json["data"][0]["validator_index"], "7"); + } + + #[test] + fn val_indexes_accepts_numbers_and_strings() { + let nums: ValIndexes = serde_json::from_str("[1, 2, 3]").unwrap(); + assert_eq!(nums.0, vec!["1", "2", "3"]); + + let strs: ValIndexes = serde_json::from_str(r#"["4", "5"]"#).unwrap(); + assert_eq!(strs.0, vec!["4", "5"]); + + let bad = serde_json::from_str::(r#"["not-a-number"]"#); + assert!(bad.is_err()); + } + #[tokio::test] async fn proposer_duties_wraps_handler_value() { let duty = ProposerDuty { diff --git a/crates/core/src/validatorapi/testutils.rs b/crates/core/src/validatorapi/testutils.rs index 90a480d2..1ace3212 100644 --- a/crates/core/src/validatorapi/testutils.rs +++ b/crates/core/src/validatorapi/testutils.rs @@ -11,11 +11,11 @@ use super::{ handler::Handler, types::{ AggregateAttestationOpts, AttestationData, AttestationDataOpts, AttesterDutiesOpts, - AttesterDuty, BeaconCommitteeSelection, EthResponse, NodeVersionData, NodeVersionResponse, - ProposalOpts, ProposerDutiesOpts, ProposerDutiesResponse, SignedContributionAndProof, - SignedValidatorRegistration, SignedVoluntaryExit, SyncCommitteeContribution, - SyncCommitteeContributionOpts, SyncCommitteeDutiesOpts, SyncCommitteeDuty, - SyncCommitteeMessage, SyncCommitteeSelection, Validator, ValidatorsOpts, + AttesterDutiesResponse, BeaconCommitteeSelection, EthResponse, NodeVersionData, + NodeVersionResponse, ProposalOpts, ProposerDutiesOpts, ProposerDutiesResponse, + SignedContributionAndProof, SignedValidatorRegistration, SignedVoluntaryExit, + SyncCommitteeContribution, SyncCommitteeContributionOpts, SyncCommitteeDutiesOpts, + SyncCommitteeDuty, SyncCommitteeMessage, SyncCommitteeSelection, Validator, ValidatorsOpts, VersionedAttestation, VersionedProposal, VersionedSignedAggregateAndProof, VersionedSignedBlindedProposal, VersionedSignedProposal, }, @@ -28,6 +28,8 @@ pub struct TestHandler { pub version: String, /// Value returned by [`Handler::proposer_duties`]. pub proposer_duties_response: Option, + /// Value returned by [`Handler::attester_duties`]. + pub attester_duties_response: Option, } impl TestHandler { @@ -44,6 +46,12 @@ impl TestHandler { self.proposer_duties_response = Some(response); self } + + /// Sets the response returned by [`Handler::attester_duties`]. + pub fn with_attester_duties(mut self, response: AttesterDutiesResponse) -> Self { + self.attester_duties_response = Some(response); + self + } } #[async_trait] @@ -59,8 +67,11 @@ impl Handler for TestHandler { async fn attester_duties( &self, _opts: AttesterDutiesOpts, - ) -> Result>, ApiError> { - unimplemented!("attester_duties not stubbed in TestHandler") + ) -> Result { + Ok(self + .attester_duties_response + .clone() + .expect("attester_duties not stubbed in TestHandler")) } async fn proposer_duties( diff --git a/crates/core/src/validatorapi/types.rs b/crates/core/src/validatorapi/types.rs index 01f3405a..a095e9b0 100644 --- a/crates/core/src/validatorapi/types.rs +++ b/crates/core/src/validatorapi/types.rs @@ -4,8 +4,12 @@ //! Most data payloads are empty placeholders for now and will be swapped //! for the proper consensus-spec types in a later phase. +use serde::{Deserialize, Deserializer, Serialize}; + pub use pluto_crypto::types::{PublicKey as BlsPubKey, Signature as BlsSignature}; pub use pluto_eth2api::{ + GetAttesterDutiesResponseResponse as AttesterDutiesResponse, + GetAttesterDutiesResponseResponseDatum as AttesterDuty, GetProposerDutiesResponseResponse as ProposerDutiesResponse, GetProposerDutiesResponseResponseDatum as ProposerDuty, GetVersionResponseResponse as NodeVersionResponse, @@ -35,8 +39,9 @@ pub struct EthResponse { pub struct AttesterDutiesOpts { /// Epoch to fetch duties for. pub epoch: Epoch, - /// Validator indices to fetch duties for. - pub indices: Vec, + /// Validator indices to fetch duties for. Carried as strings since the + /// upstream auto-generated client takes string-typed indices. + pub indices: Vec, } /// Options for @@ -53,8 +58,9 @@ pub struct ProposerDutiesOpts { pub struct SyncCommitteeDutiesOpts { /// Epoch to fetch duties for. pub epoch: Epoch, - /// Validator indices to fetch duties for. - pub indices: Vec, + /// Validator indices to fetch duties for. Carried as strings since the + /// upstream auto-generated client takes string-typed indices. + pub indices: Vec, } /// Options for @@ -116,10 +122,6 @@ pub struct SyncCommitteeContributionOpts { pub beacon_block_root: Root, } -/// Attester duty payload. Placeholder. -#[derive(Debug, Clone)] -pub struct AttesterDuty {} - /// Sync-committee duty payload. Placeholder. #[derive(Debug, Clone)] pub struct SyncCommitteeDuty {} @@ -179,3 +181,38 @@ pub struct BeaconCommitteeSelection {} /// Sync-committee selection payload. Placeholder. #[derive(Debug, Clone)] pub struct SyncCommitteeSelection {} + +/// Validator-index request body for the `attester_duties` and +/// `sync_committee_duties` endpoints. +/// +/// Accepts both numeric (`[1, 2]`) and string-encoded (`["1", "2"]`) JSON +/// arrays. Indices are stored as decimal strings so they pass straight through +/// to the auto-generated request builders. +#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize)] +pub struct ValIndexes(pub Vec); + +impl<'de> Deserialize<'de> for ValIndexes { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + #[derive(Deserialize)] + #[serde(untagged)] + enum Either { + Numbers(Vec), + Strings(Vec), + } + + let value = Either::deserialize(deserializer)?; + let indices = match value { + Either::Numbers(ns) => ns.into_iter().map(|n| n.to_string()).collect(), + Either::Strings(strs) => { + for s in &strs { + s.parse::().map_err(serde::de::Error::custom)?; + } + strs + } + }; + Ok(Self(indices)) + } +} From 11ad9406e3a544c8bce1f2852028c2d7fd4314b7 Mon Sep 17 00:00:00 2001 From: Bohdan Ohorodnii <35969035+varex83@users.noreply.github.com> Date: Thu, 28 May 2026 19:48:11 +0200 Subject: [PATCH 07/12] feat(core): implement validatorapi sync_committee_duties handler Wires POST /eth/v1/validator/duties/sync/{epoch}, reusing the ValIndexes dual-format body extractor. --- crates/core/src/validatorapi/component.rs | 95 +++++++++++++++++++++-- crates/core/src/validatorapi/handler.rs | 4 +- crates/core/src/validatorapi/router.rs | 52 ++++++++++++- crates/core/src/validatorapi/testutils.rs | 19 ++++- crates/core/src/validatorapi/types.rs | 6 +- 5 files changed, 156 insertions(+), 20 deletions(-) diff --git a/crates/core/src/validatorapi/component.rs b/crates/core/src/validatorapi/component.rs index 05a4f600..3f5ae822 100644 --- a/crates/core/src/validatorapi/component.rs +++ b/crates/core/src/validatorapi/component.rs @@ -10,7 +10,8 @@ use async_trait::async_trait; use axum::http::StatusCode; use pluto_eth2api::{ EthBeaconNodeApiClient, GetAttesterDutiesRequest, GetAttesterDutiesResponse, - GetProposerDutiesRequest, GetProposerDutiesResponse, spec::phase0::BLSPubKey, + GetProposerDutiesRequest, GetProposerDutiesResponse, GetSyncCommitteeDutiesRequest, + GetSyncCommitteeDutiesResponse, spec::phase0::BLSPubKey, }; use super::{ @@ -22,8 +23,8 @@ use super::{ NodeVersionData, NodeVersionResponse, ProposalOpts, ProposerDutiesOpts, ProposerDutiesResponse, ProposerDuty, SignedContributionAndProof, SignedValidatorRegistration, SignedVoluntaryExit, SyncCommitteeContribution, - SyncCommitteeContributionOpts, SyncCommitteeDutiesOpts, SyncCommitteeDuty, - SyncCommitteeMessage, SyncCommitteeSelection, Validator, ValidatorsOpts, + SyncCommitteeContributionOpts, SyncCommitteeDutiesOpts, SyncCommitteeDutiesResponse, + SyncCommitteeDuty, SyncCommitteeMessage, SyncCommitteeSelection, Validator, ValidatorsOpts, VersionedAttestation, VersionedProposal, VersionedSignedAggregateAndProof, VersionedSignedBlindedProposal, VersionedSignedProposal, }, @@ -185,9 +186,48 @@ impl Handler for Component { async fn sync_committee_duties( &self, - _opts: SyncCommitteeDutiesOpts, - ) -> Result>, ApiError> { - unimplemented!("sync_committee_duties not yet ported") + opts: SyncCommitteeDutiesOpts, + ) -> Result { + let request = GetSyncCommitteeDutiesRequest::builder() + .epoch(opts.epoch.to_string()) + .body(opts.indices) + .build() + .map_err(|err| { + ApiError::new( + StatusCode::BAD_REQUEST, + "invalid sync committee duties request", + ) + .with_source(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + err.to_string(), + )) + })?; + + let response = self + .eth2_cl + .get_sync_committee_duties(request) + .await + .map_err(|err| { + ApiError::new( + StatusCode::BAD_GATEWAY, + "upstream sync committee duties failed", + ) + .with_source(std::io::Error::other(err.to_string())) + })?; + + let mut payload = match response { + GetSyncCommitteeDutiesResponse::Ok(payload) => payload, + other => { + return Err(ApiError::new( + StatusCode::BAD_GATEWAY, + format!("unexpected upstream sync committee duties response: {other:?}"), + )); + } + }; + + swap_sync_committee_pubshares(&mut payload.data, &self.pub_share_by_pubkey)?; + + Ok(payload) } async fn attestation_data( @@ -326,6 +366,24 @@ fn swap_attester_pubshares( Ok(()) } +/// Sync-committee duties variant of [`swap_attester_pubshares`]. +fn swap_sync_committee_pubshares( + duties: &mut [SyncCommitteeDuty], + pub_share_by_pubkey: &HashMap, +) -> Result<(), ApiError> { + for duty in duties { + let pubkey = parse_bls_pubkey(&duty.pubkey)?; + let share = pub_share_by_pubkey.get(&pubkey).ok_or_else(|| { + ApiError::new( + StatusCode::BAD_GATEWAY, + "pubshare not found for sync committee duty", + ) + })?; + duty.pubkey = format_bls_pubkey(share); + } + Ok(()) +} + fn parse_bls_pubkey(s: &str) -> Result { let trimmed = s.strip_prefix("0x").unwrap_or(s); let bytes = hex::decode(trimmed).map_err(|err| { @@ -411,6 +469,31 @@ mod tests { assert_eq!(err.status_code, StatusCode::BAD_GATEWAY); } + #[test] + fn swap_sync_committee_replaces_pubkeys_and_rejects_unknown() { + let root = [0x44_u8; 48]; + let share = [0x55_u8; 48]; + let unknown = [0x66_u8; 48]; + + let map = HashMap::from([(root, share)]); + + let mut duties = vec![SyncCommitteeDuty { + pubkey: format_bls_pubkey(&root), + validator_index: "12".to_owned(), + validator_sync_committee_indices: vec!["0".to_owned()], + }]; + swap_sync_committee_pubshares(&mut duties, &map).unwrap(); + assert_eq!(duties[0].pubkey, format_bls_pubkey(&share)); + + let mut stranger = vec![SyncCommitteeDuty { + pubkey: format_bls_pubkey(&unknown), + validator_index: "13".to_owned(), + validator_sync_committee_indices: vec![], + }]; + let err = swap_sync_committee_pubshares(&mut stranger, &map).unwrap_err(); + assert_eq!(err.status_code, StatusCode::BAD_GATEWAY); + } + #[test] fn swap_rejects_malformed_pubkey() { let mut duties = vec![ProposerDuty { diff --git a/crates/core/src/validatorapi/handler.rs b/crates/core/src/validatorapi/handler.rs index da0375fa..3be1667a 100644 --- a/crates/core/src/validatorapi/handler.rs +++ b/crates/core/src/validatorapi/handler.rs @@ -9,7 +9,7 @@ use super::{ AttesterDutiesResponse, BeaconCommitteeSelection, EthResponse, NodeVersionResponse, ProposalOpts, ProposerDutiesOpts, ProposerDutiesResponse, SignedContributionAndProof, SignedValidatorRegistration, SignedVoluntaryExit, SyncCommitteeContribution, - SyncCommitteeContributionOpts, SyncCommitteeDutiesOpts, SyncCommitteeDuty, + SyncCommitteeContributionOpts, SyncCommitteeDutiesOpts, SyncCommitteeDutiesResponse, SyncCommitteeMessage, SyncCommitteeSelection, Validator, ValidatorsOpts, VersionedAttestation, VersionedProposal, VersionedSignedAggregateAndProof, VersionedSignedBlindedProposal, VersionedSignedProposal, @@ -40,7 +40,7 @@ pub trait Handler: Send + Sync + 'static { async fn sync_committee_duties( &self, opts: SyncCommitteeDutiesOpts, - ) -> Result>, ApiError>; + ) -> Result; /// `GET /eth/v1/validator/attestation_data`. async fn attestation_data( diff --git a/crates/core/src/validatorapi/router.rs b/crates/core/src/validatorapi/router.rs index 4c6a5761..8ca70ad4 100644 --- a/crates/core/src/validatorapi/router.rs +++ b/crates/core/src/validatorapi/router.rs @@ -17,7 +17,7 @@ use super::{ handler::Handler, types::{ AttesterDutiesOpts, AttesterDutiesResponse, NodeVersionResponse, ProposerDutiesOpts, - ProposerDutiesResponse, ValIndexes, + ProposerDutiesResponse, SyncCommitteeDutiesOpts, SyncCommitteeDutiesResponse, ValIndexes, }, }; @@ -151,8 +151,20 @@ async fn proposer_duties( Ok(Json(response)) } -async fn sync_committee_duties() { - todo!("vapi: sync_committee_duties"); +async fn sync_committee_duties( + State(state): State>, + Path(epoch): Path, + Json(indices): Json, +) -> Result, ApiError> { + let response = state + .handler + .sync_committee_duties(SyncCommitteeDutiesOpts { + epoch, + indices: indices.0, + }) + .await?; + + Ok(Json(response)) } async fn attestation_data() { @@ -245,7 +257,8 @@ mod tests { use crate::validatorapi::{ testutils::TestHandler, types::{ - AttesterDutiesResponse, AttesterDuty, ProposerDutiesResponse, ProposerDuty, ValIndexes, + AttesterDutiesResponse, AttesterDuty, ProposerDutiesResponse, ProposerDuty, + SyncCommitteeDutiesResponse, SyncCommitteeDuty, ValIndexes, }, }; @@ -298,6 +311,37 @@ mod tests { assert_eq!(json["data"][0]["validator_index"], "7"); } + #[tokio::test] + async fn sync_committee_duties_wraps_handler_value() { + let duty = SyncCommitteeDuty { + pubkey: "0x112233".to_owned(), + validator_index: "9".to_owned(), + validator_sync_committee_indices: vec!["0".to_owned(), "5".to_owned()], + }; + let handler = + TestHandler::default().with_sync_committee_duties(SyncCommitteeDutiesResponse { + data: vec![duty], + execution_optimistic: true, + }); + let state = Arc::new(AppState { + handler: Arc::new(handler), + builder_enabled: false, + }); + + let Json(body) = sync_committee_duties( + State(state), + Path(7u64), + Json(ValIndexes(vec!["9".to_owned()])), + ) + .await + .unwrap(); + + let json = serde_json::to_value(&body).unwrap(); + assert_eq!(json["execution_optimistic"], true); + assert_eq!(json["data"][0]["validator_index"], "9"); + assert_eq!(json["data"][0]["validator_sync_committee_indices"][1], "5"); + } + #[test] fn val_indexes_accepts_numbers_and_strings() { let nums: ValIndexes = serde_json::from_str("[1, 2, 3]").unwrap(); diff --git a/crates/core/src/validatorapi/testutils.rs b/crates/core/src/validatorapi/testutils.rs index 1ace3212..fa997703 100644 --- a/crates/core/src/validatorapi/testutils.rs +++ b/crates/core/src/validatorapi/testutils.rs @@ -15,8 +15,8 @@ use super::{ NodeVersionResponse, ProposalOpts, ProposerDutiesOpts, ProposerDutiesResponse, SignedContributionAndProof, SignedValidatorRegistration, SignedVoluntaryExit, SyncCommitteeContribution, SyncCommitteeContributionOpts, SyncCommitteeDutiesOpts, - SyncCommitteeDuty, SyncCommitteeMessage, SyncCommitteeSelection, Validator, ValidatorsOpts, - VersionedAttestation, VersionedProposal, VersionedSignedAggregateAndProof, + SyncCommitteeDutiesResponse, SyncCommitteeMessage, SyncCommitteeSelection, Validator, + ValidatorsOpts, VersionedAttestation, VersionedProposal, VersionedSignedAggregateAndProof, VersionedSignedBlindedProposal, VersionedSignedProposal, }, }; @@ -30,6 +30,8 @@ pub struct TestHandler { pub proposer_duties_response: Option, /// Value returned by [`Handler::attester_duties`]. pub attester_duties_response: Option, + /// Value returned by [`Handler::sync_committee_duties`]. + pub sync_committee_duties_response: Option, } impl TestHandler { @@ -52,6 +54,12 @@ impl TestHandler { self.attester_duties_response = Some(response); self } + + /// Sets the response returned by [`Handler::sync_committee_duties`]. + pub fn with_sync_committee_duties(mut self, response: SyncCommitteeDutiesResponse) -> Self { + self.sync_committee_duties_response = Some(response); + self + } } #[async_trait] @@ -87,8 +95,11 @@ impl Handler for TestHandler { async fn sync_committee_duties( &self, _opts: SyncCommitteeDutiesOpts, - ) -> Result>, ApiError> { - unimplemented!("sync_committee_duties not stubbed in TestHandler") + ) -> Result { + Ok(self + .sync_committee_duties_response + .clone() + .expect("sync_committee_duties not stubbed in TestHandler")) } async fn attestation_data( diff --git a/crates/core/src/validatorapi/types.rs b/crates/core/src/validatorapi/types.rs index a095e9b0..7ea63855 100644 --- a/crates/core/src/validatorapi/types.rs +++ b/crates/core/src/validatorapi/types.rs @@ -12,6 +12,8 @@ pub use pluto_eth2api::{ GetAttesterDutiesResponseResponseDatum as AttesterDuty, GetProposerDutiesResponseResponse as ProposerDutiesResponse, GetProposerDutiesResponseResponseDatum as ProposerDuty, + GetSyncCommitteeDutiesResponseResponse as SyncCommitteeDutiesResponse, + GetSyncCommitteeDutiesResponseResponseDatum as SyncCommitteeDuty, GetVersionResponseResponse as NodeVersionResponse, GetVersionResponseResponseData as NodeVersionData, spec::phase0::{Epoch, Root, Slot, ValidatorIndex}, @@ -122,10 +124,6 @@ pub struct SyncCommitteeContributionOpts { pub beacon_block_root: Root, } -/// Sync-committee duty payload. Placeholder. -#[derive(Debug, Clone)] -pub struct SyncCommitteeDuty {} - /// Attestation data payload. Placeholder. #[derive(Debug, Clone)] pub struct AttestationData {} From 26675fe1a7146e48e9602b79be210485e2c4b823 Mon Sep 17 00:00:00 2001 From: Bohdan Ohorodnii <35969035+varex83@users.noreply.github.com> Date: Thu, 28 May 2026 19:53:08 +0200 Subject: [PATCH 08/12] feat(core): implement validatorapi attestation_data handler Wires GET /eth/v1/validator/attestation_data. The Component now holds an Arc and awaits unsigned attestation data from the local DutyDB rather than hitting upstream. --- crates/core/src/validatorapi/component.rs | 125 ++++++++++++++++++++-- crates/core/src/validatorapi/handler.rs | 4 +- crates/core/src/validatorapi/router.rs | 74 +++++++++++-- crates/core/src/validatorapi/testutils.rs | 17 ++- crates/core/src/validatorapi/types.rs | 14 ++- 5 files changed, 208 insertions(+), 26 deletions(-) diff --git a/crates/core/src/validatorapi/component.rs b/crates/core/src/validatorapi/component.rs index 3f5ae822..2076a521 100644 --- a/crates/core/src/validatorapi/component.rs +++ b/crates/core/src/validatorapi/component.rs @@ -18,7 +18,7 @@ use super::{ error::ApiError, handler::Handler, types::{ - AggregateAttestationOpts, AttestationData, AttestationDataOpts, AttesterDutiesOpts, + AggregateAttestationOpts, AttestationDataOpts, AttestationDataResponse, AttesterDutiesOpts, AttesterDutiesResponse, AttesterDuty, BeaconCommitteeSelection, EthResponse, NodeVersionData, NodeVersionResponse, ProposalOpts, ProposerDutiesOpts, ProposerDutiesResponse, ProposerDuty, SignedContributionAndProof, @@ -29,7 +29,7 @@ use super::{ VersionedSignedBlindedProposal, VersionedSignedProposal, }, }; -use crate::version; +use crate::{dutydb::MemDB, version}; /// Validator API [`Handler`] implementation. /// @@ -41,6 +41,9 @@ use crate::version; pub struct Component { /// Upstream beacon-node API client. eth2_cl: Arc, + /// In-memory DutyDB used to await consensus output (e.g. attestation + /// data) produced by the rest of the pipeline. + dutydb: Arc, /// Threshold BLS share index assigned to this node (1-indexed). #[allow(dead_code, reason = "consumed by submit_* handlers in later PRs")] share_idx: u64, @@ -64,12 +67,14 @@ impl Component { /// Builds a new component. pub fn new( eth2_cl: Arc, + dutydb: Arc, share_idx: u64, pub_share_by_pubkey: HashMap, builder_enabled: bool, ) -> Self { Self { eth2_cl, + dutydb, share_idx, pub_share_by_pubkey, builder_enabled, @@ -79,9 +84,14 @@ impl Component { /// Builds a component that skips partial-signature verification on /// submit endpoints. Test use only. - pub fn new_insecure(eth2_cl: Arc, share_idx: u64) -> Self { + pub fn new_insecure( + eth2_cl: Arc, + dutydb: Arc, + share_idx: u64, + ) -> Self { Self { eth2_cl, + dutydb, share_idx, pub_share_by_pubkey: HashMap::new(), builder_enabled: false, @@ -232,9 +242,21 @@ impl Handler for Component { async fn attestation_data( &self, - _opts: AttestationDataOpts, - ) -> Result, ApiError> { - unimplemented!("attestation_data not yet ported") + opts: AttestationDataOpts, + ) -> Result { + let data = self + .dutydb + .await_attestation(opts.slot, opts.committee_index) + .await + .map_err(|err| { + ApiError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "await attestation failed", + ) + .with_source(std::io::Error::other(err.to_string())) + })?; + + Ok(AttestationDataResponse { data }) } async fn submit_attestations( @@ -505,12 +527,49 @@ mod tests { assert_eq!(err.status_code, StatusCode::BAD_GATEWAY); } - #[tokio::test] - async fn node_version_formats_pluto_string() { - // Use an unreachable upstream — node_version doesn't call it. + use chrono::{DateTime, Utc}; + use tokio::sync::mpsc; + use tokio_util::sync::CancellationToken; + + use crate::{ + deadline::{DeadlineCalculator, DeadlinerTask, Result as DeadlineResult}, + dutydb::{UnsignedDataSet, UnsignedDutyData}, + signeddata::{ + AttestationData as SignedAttestationData, AttesterDuty as SignedAttesterDuty, + }, + testutils::random_core_pub_key, + types::{Duty, DutyType, SlotNumber}, + validatorapi::types::AttestationDataOpts, + }; + + /// Schedules every duty with a deadline at `MAX_UTC`, so duties are + /// `Scheduled` but never naturally expire. + struct FarFutureCalculator; + + impl DeadlineCalculator for FarFutureCalculator { + fn deadline(&self, _: &Duty) -> DeadlineResult>> { + Ok(Some(DateTime::::MAX_UTC)) + } + } + + /// Build a Component backed by a real (but never-expiring) DutyDB plus a + /// dummy upstream client. Useful for tests that only exercise endpoints + /// served from the DB. + fn make_test_component() -> (Component, Arc) { + let cancel = CancellationToken::new(); + let (deadliner, _deadliner_rx) = + DeadlinerTask::start(cancel.clone(), "validatorapi-tests", FarFutureCalculator); + let (_unused_tx, evict_rx) = mpsc::channel(1); + let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url("http://127.0.0.1:0").unwrap()); - let component = Component::new_insecure(eth2_cl, 1); + let component = Component::new_insecure(eth2_cl, Arc::clone(&dutydb), 1); + (component, dutydb) + } + + #[tokio::test] + async fn node_version_formats_pluto_string() { + let (component, _db) = make_test_component(); let response = component.node_version().await.unwrap(); @@ -518,4 +577,50 @@ mod tests { assert!(response.data.version.contains(std::env::consts::ARCH)); assert!(response.data.version.contains(std::env::consts::OS)); } + + #[tokio::test] + async fn attestation_data_returns_data_stored_in_dutydb() { + const SLOT: u64 = 100; + const COMM_IDX: u64 = 4; + const V_IDX: u64 = 1; + + let (component, db) = make_test_component(); + + let unsigned = SignedAttestationData { + data: pluto_eth2api::spec::phase0::AttestationData { + slot: SLOT, + index: COMM_IDX, + beacon_block_root: [0x11; 32], + source: pluto_eth2api::spec::phase0::Checkpoint::default(), + target: pluto_eth2api::spec::phase0::Checkpoint::default(), + }, + duty: SignedAttesterDuty { + slot: SLOT, + validator_index: V_IDX, + committee_index: COMM_IDX, + committee_length: 8, + committees_at_slot: 1, + validator_committee_index: 0, + }, + }; + let mut set = UnsignedDataSet::new(); + set.insert( + random_core_pub_key(), + UnsignedDutyData::Attestation(unsigned.clone()), + ); + db.store(Duty::new(SlotNumber::new(SLOT), DutyType::Attester), set) + .await + .unwrap(); + + let response = component + .attestation_data(AttestationDataOpts { + slot: SLOT, + committee_index: COMM_IDX, + }) + .await + .unwrap(); + assert_eq!(response.data.slot, SLOT); + assert_eq!(response.data.index, COMM_IDX); + assert_eq!(response.data.beacon_block_root, [0x11; 32]); + } } diff --git a/crates/core/src/validatorapi/handler.rs b/crates/core/src/validatorapi/handler.rs index 3be1667a..b1a3a050 100644 --- a/crates/core/src/validatorapi/handler.rs +++ b/crates/core/src/validatorapi/handler.rs @@ -5,7 +5,7 @@ use async_trait::async_trait; use super::{ error::ApiError, types::{ - AggregateAttestationOpts, AttestationData, AttestationDataOpts, AttesterDutiesOpts, + AggregateAttestationOpts, AttestationDataOpts, AttestationDataResponse, AttesterDutiesOpts, AttesterDutiesResponse, BeaconCommitteeSelection, EthResponse, NodeVersionResponse, ProposalOpts, ProposerDutiesOpts, ProposerDutiesResponse, SignedContributionAndProof, SignedValidatorRegistration, SignedVoluntaryExit, SyncCommitteeContribution, @@ -46,7 +46,7 @@ pub trait Handler: Send + Sync + 'static { async fn attestation_data( &self, opts: AttestationDataOpts, - ) -> Result, ApiError>; + ) -> Result; /// `POST /eth/v2/beacon/pool/attestations`. async fn submit_attestations( diff --git a/crates/core/src/validatorapi/router.rs b/crates/core/src/validatorapi/router.rs index 8ca70ad4..547a372d 100644 --- a/crates/core/src/validatorapi/router.rs +++ b/crates/core/src/validatorapi/router.rs @@ -7,20 +7,29 @@ use std::sync::Arc; use axum::{ Json, Router, - extract::{Path, State}, + extract::{Path, Query, State}, response::IntoResponse, routing::{get, post}, }; +use serde::Deserialize; use super::{ error::ApiError, handler::Handler, types::{ - AttesterDutiesOpts, AttesterDutiesResponse, NodeVersionResponse, ProposerDutiesOpts, - ProposerDutiesResponse, SyncCommitteeDutiesOpts, SyncCommitteeDutiesResponse, ValIndexes, + AttestationDataOpts, AttestationDataResponse, AttesterDutiesOpts, AttesterDutiesResponse, + CommitteeIndex, NodeVersionResponse, ProposerDutiesOpts, ProposerDutiesResponse, + SyncCommitteeDutiesOpts, SyncCommitteeDutiesResponse, ValIndexes, }, }; +/// Query parameters for `GET /eth/v1/validator/attestation_data`. +#[derive(Debug, Clone, Deserialize)] +struct AttestationDataQuery { + slot: u64, + committee_index: CommitteeIndex, +} + /// Shared router state. Cloned per request via [`Arc`]. pub(super) struct AppState { /// Request handler invoked by each route. @@ -167,8 +176,19 @@ async fn sync_committee_duties( Ok(Json(response)) } -async fn attestation_data() { - todo!("vapi: attestation_data"); +async fn attestation_data( + State(state): State>, + Query(query): Query, +) -> Result, ApiError> { + let response = state + .handler + .attestation_data(AttestationDataOpts { + slot: query.slot, + committee_index: query.committee_index, + }) + .await?; + + Ok(Json(response)) } async fn submit_attestations() { @@ -254,11 +274,13 @@ async fn proxy_handler() { #[cfg(test)] mod tests { use super::*; + use pluto_eth2api::spec::phase0; + use crate::validatorapi::{ testutils::TestHandler, types::{ - AttesterDutiesResponse, AttesterDuty, ProposerDutiesResponse, ProposerDuty, - SyncCommitteeDutiesResponse, SyncCommitteeDuty, ValIndexes, + AttestationDataResponse, AttesterDutiesResponse, AttesterDuty, ProposerDutiesResponse, + ProposerDuty, SyncCommitteeDutiesResponse, SyncCommitteeDuty, ValIndexes, }, }; @@ -342,6 +364,44 @@ mod tests { assert_eq!(json["data"][0]["validator_sync_committee_indices"][1], "5"); } + #[tokio::test] + async fn attestation_data_wraps_handler_value() { + let data = phase0::AttestationData { + slot: 99, + index: 3, + beacon_block_root: [0xaa; 32], + source: phase0::Checkpoint { + epoch: 7, + root: [0xbb; 32], + }, + target: phase0::Checkpoint { + epoch: 8, + root: [0xcc; 32], + }, + }; + let handler = + TestHandler::default().with_attestation_data(AttestationDataResponse { data }); + let state = Arc::new(AppState { + handler: Arc::new(handler), + builder_enabled: false, + }); + + let Json(body) = attestation_data( + State(state), + Query(AttestationDataQuery { + slot: 99, + committee_index: 3, + }), + ) + .await + .unwrap(); + + let json = serde_json::to_value(&body).unwrap(); + assert_eq!(json["data"]["slot"], "99"); + assert_eq!(json["data"]["index"], "3"); + assert_eq!(json["data"]["source"]["epoch"], "7"); + } + #[test] fn val_indexes_accepts_numbers_and_strings() { let nums: ValIndexes = serde_json::from_str("[1, 2, 3]").unwrap(); diff --git a/crates/core/src/validatorapi/testutils.rs b/crates/core/src/validatorapi/testutils.rs index fa997703..45980fe7 100644 --- a/crates/core/src/validatorapi/testutils.rs +++ b/crates/core/src/validatorapi/testutils.rs @@ -10,7 +10,7 @@ use super::{ error::ApiError, handler::Handler, types::{ - AggregateAttestationOpts, AttestationData, AttestationDataOpts, AttesterDutiesOpts, + AggregateAttestationOpts, AttestationDataOpts, AttestationDataResponse, AttesterDutiesOpts, AttesterDutiesResponse, BeaconCommitteeSelection, EthResponse, NodeVersionData, NodeVersionResponse, ProposalOpts, ProposerDutiesOpts, ProposerDutiesResponse, SignedContributionAndProof, SignedValidatorRegistration, SignedVoluntaryExit, @@ -32,6 +32,8 @@ pub struct TestHandler { pub attester_duties_response: Option, /// Value returned by [`Handler::sync_committee_duties`]. pub sync_committee_duties_response: Option, + /// Value returned by [`Handler::attestation_data`]. + pub attestation_data_response: Option, } impl TestHandler { @@ -60,6 +62,12 @@ impl TestHandler { self.sync_committee_duties_response = Some(response); self } + + /// Sets the response returned by [`Handler::attestation_data`]. + pub fn with_attestation_data(mut self, response: AttestationDataResponse) -> Self { + self.attestation_data_response = Some(response); + self + } } #[async_trait] @@ -105,8 +113,11 @@ impl Handler for TestHandler { async fn attestation_data( &self, _opts: AttestationDataOpts, - ) -> Result, ApiError> { - unimplemented!("attestation_data not stubbed in TestHandler") + ) -> Result { + Ok(self + .attestation_data_response + .clone() + .expect("attestation_data not stubbed in TestHandler")) } async fn submit_attestations( diff --git a/crates/core/src/validatorapi/types.rs b/crates/core/src/validatorapi/types.rs index 7ea63855..ddc93c14 100644 --- a/crates/core/src/validatorapi/types.rs +++ b/crates/core/src/validatorapi/types.rs @@ -16,9 +16,12 @@ pub use pluto_eth2api::{ GetSyncCommitteeDutiesResponseResponseDatum as SyncCommitteeDuty, GetVersionResponseResponse as NodeVersionResponse, GetVersionResponseResponseData as NodeVersionData, - spec::phase0::{Epoch, Root, Slot, ValidatorIndex}, + spec::phase0::{self, Epoch, Root, Slot, ValidatorIndex}, }; +/// Attestation data alias for the consensus-spec phase0 type. +pub type AttestationData = phase0::AttestationData; + /// Index of a beacon committee within a slot. pub type CommitteeIndex = u64; @@ -124,9 +127,12 @@ pub struct SyncCommitteeContributionOpts { pub beacon_block_root: Root, } -/// Attestation data payload. Placeholder. -#[derive(Debug, Clone)] -pub struct AttestationData {} +/// Response envelope for the `attestation_data` endpoint. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AttestationDataResponse { + /// Unsigned attestation data produced by the consensus pipeline. + pub data: AttestationData, +} /// Validator payload. Placeholder. #[derive(Debug, Clone)] From 5da909307f705c0a9c33d59fc149add0bd42fdaf Mon Sep 17 00:00:00 2001 From: Bohdan Ohorodnii <273991985+varex83agent@users.noreply.github.com> Date: Fri, 29 May 2026 12:18:42 +0200 Subject: [PATCH 09/12] fix(core): address PR #451 review feedback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bug fixes (must-fix per review): - attestation_data: wrap MemDB::await_attestation in tokio::time::timeout (24s) so a request for a slot that never produces consensus output cannot hold a handler task indefinitely. delete_duty now records evicted keys per duty type and notifies waiters, so await_data returns Error::AwaitDutyExpired immediately when the awaited duty is gone instead of spinning until the timeout fires. Maps to 408 on the wire. - Stop leaking upstream BlindedBlock400Response Debug output (incl. stacktraces) into the client-visible ApiError.message. The variant payload is now attached as `source` for debug logs; the message stays generic. Hardening: - new_insecure is gated behind #[cfg(test)] so the insecure_test flag cannot reach production builds. - new_router applies DefaultBodyLimit::max(64 KiB) on the two POST /duties/{attester,sync}/{epoch} routes — defends against the Vec parse amplification on the ValIndexes deserializer. - All upstream eth2_cl calls are wrapped in tokio::time::timeout(12s) so a hanging beacon node cannot stall handler tasks. - proposer_duties / attester_duties / sync_committee_duties propagate upstream BadRequest as 400 and ServiceUnavailable as 503 instead of collapsing every non-Ok variant to 502 — the VC can now back off on upstream syncing instead of treating it as a gateway failure. - swap_attester_pubshares / swap_sync_committee_pubshares now return 500 (cluster misconfig) instead of 502 when a pubshare is missing — the upstream returned well-formed data, the failure is local. ValIndexes: - Replace #[serde(untagged)] with a streaming Visitor that validates each element via SeqAccess::next_element. Avoids the speculative Vec parse and the serde Content cache. Now accepts mixed numeric/string elements and rejects negative integers. - Hard cap at 8192 indices per request. ApiError: - with_boxed_source for sources that aren't std::error::Error (e.g. anyhow::Error from auto-gen request builders). Router: - attestation_data uses Result, QueryRejection> so 4xx responses from missing/malformed query params share the same { code, message } envelope as the rest of the router. Tests (+13): - attestation_data: timeout when data never arrives; 408 when duty is evicted while a waiter is parked; cancellation cleanup when the handler future is dropped; negative lookup on wrong committee_index. - Status-mapping helpers: confirm upstream Debug output is never serialized into the message. - Router: ApiError envelope on bad query; oversized body rejection; ValIndexes empty/mixed/oversized/negative cases. Co-Authored-By: Bohdan Ohorodnii <35969035+varex83@users.noreply.github.com> --- Cargo.lock | 1 + Cargo.toml | 1 + crates/core/Cargo.toml | 1 + crates/core/src/dutydb/memory.rs | 140 +++++- crates/core/src/validatorapi/component.rs | 583 ++++++++++++++++++---- crates/core/src/validatorapi/error.rs | 12 + crates/core/src/validatorapi/router.rs | 153 +++++- crates/core/src/validatorapi/types.rs | 99 +++- 8 files changed, 834 insertions(+), 156 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 009c7203..e5d24398 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5624,6 +5624,7 @@ dependencies = [ "thiserror 2.0.18", "tokio", "tokio-util", + "tower", "tracing", "tree_hash", "vise", diff --git a/Cargo.toml b/Cargo.toml index ac569187..bd7beb79 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -106,6 +106,7 @@ tree_hash_derive = "0.12" tar = "0.4" flate2 = "1.1" wiremock = "0.6" +tower = "0.5" sysinfo = "0.33" quick-xml = { version = "0.39", features = ["serialize"] } diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index adf5f88c..f03f60d6 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -55,6 +55,7 @@ pluto-testutil.workspace = true pluto-tracing.workspace = true tokio = { workspace = true, features = ["test-util"] } wiremock.workspace = true +tower = { workspace = true, features = ["util"] } [build-dependencies] pluto-build-proto.workspace = true diff --git a/crates/core/src/dutydb/memory.rs b/crates/core/src/dutydb/memory.rs index 01a68b86..49b107df 100644 --- a/crates/core/src/dutydb/memory.rs +++ b/crates/core/src/dutydb/memory.rs @@ -2,7 +2,7 @@ //! //! Equivalent to charon/core/dutydb/memory.go. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use pluto_eth2api::{ spec::{altair, phase0}, @@ -49,6 +49,12 @@ pub enum Error { #[error("dutydb shutdown: query could not be answered")] Shutdown, + /// The awaited duty was evicted before its unsigned data became + /// available. Distinct from `Shutdown` so callers can map this to a + /// timeout-style error rather than a service-down error. + #[error("dutydb: awaited duty expired before data was stored")] + AwaitDutyExpired, + /// Two validators share the same `(slot, committee_index, valIdx)` with /// different public keys. #[error( @@ -177,6 +183,17 @@ struct ContribKey { root: phase0::Root, } +/// Per-poll outcome handed back by an `await_data` lookup closure. +enum Lookup { + /// The awaited value is now present — return it to the caller. + Found(V), + /// The awaited duty has been evicted; the lookup will never succeed. + /// `await_data` returns [`Error::AwaitDutyExpired`]. + Evicted, + /// Neither stored nor evicted yet — park on the notify and retry. + Pending, +} + struct State { attestation_duties: HashMap, attestation_pub_keys: HashMap, @@ -190,6 +207,18 @@ struct State { contrib_duties: HashMap, contrib_keys_by_slot: HashMap>, + /// Slots whose attester duty has been evicted by the deadliner. Lets + /// `await_attestation` return `AwaitDutyExpired` immediately when the + /// awaited slot is gone, rather than spinning on every `store()` until + /// the request-level timeout fires. + evicted_attestation_slots: HashSet, + /// Slots whose proposer duty has been evicted. + evicted_proposer_slots: HashSet, + /// Aggregation roots whose duty has been evicted. + evicted_aggregation_keys: HashSet, + /// Sync contribution keys whose duty has been evicted. + evicted_contrib_keys: HashSet, + deadliner_rx: tokio::sync::mpsc::Receiver, } @@ -225,6 +254,10 @@ impl MemDB { aggregation_keys_by_slot: HashMap::new(), contrib_duties: HashMap::new(), contrib_keys_by_slot: HashMap::new(), + evicted_attestation_slots: HashSet::new(), + evicted_proposer_slots: HashSet::new(), + evicted_aggregation_keys: HashSet::new(), + evicted_contrib_keys: HashSet::new(), deadliner_rx, }), attestation_notify: Notify::new(), @@ -272,7 +305,6 @@ impl MemDB { Some(UnsignedDutyData::Proposal(p)) => state.store_proposal(p)?, Some(_) => return Err(Error::InvalidVersionedProposal), } - self.proposer_notify.notify_waiters(); } DutyType::Attester => { for (pubkey, data) in &unsigned_set { @@ -282,7 +314,6 @@ impl MemDB { }; state.store_attestation(*pubkey, att)?; } - self.attestation_notify.notify_waiters(); } DutyType::Aggregator => { for data in unsigned_set.values() { @@ -292,7 +323,6 @@ impl MemDB { }; state.store_agg_attestation(agg)?; } - self.aggregation_notify.notify_waiters(); } DutyType::SyncContribution => { for data in unsigned_set.values() { @@ -302,24 +332,54 @@ impl MemDB { }; state.store_sync_contribution(contrib)?; } - self.contrib_notify.notify_waiters(); } _ => return Err(Error::UnsupportedDutyType), } - - // Drain all expired duties that the deadliner has sent. + // Wake the matching notify for the duty we just stored, plus + // anything we drain below. `notify_waiters` is cheap if no one is + // parked and just bumps a counter, so calling it under the write + // lock is harmless — woken tasks block on `state.read()` until we + // drop. + self.wake(duty.duty_type); + + // Drain all expired duties that the deadliner has sent. Waiters + // whose duty just expired need to see `Lookup::Evicted` and exit, + // not re-park — so we wake the matching notify after each eviction. while let Ok(expired) = state.deadliner_rx.try_recv() { + let duty_type = expired.duty_type.clone(); state.delete_duty(expired)?; + self.wake(duty_type); } Ok(()) } + /// Wakes the [`Notify`] paired with `duty_type`. No-op for duty types + /// the DB doesn't track (e.g. `Exit`, `BuilderRegistration`). + fn wake(&self, duty_type: DutyType) { + let notify = match duty_type { + DutyType::Proposer => &self.proposer_notify, + DutyType::Attester => &self.attestation_notify, + DutyType::Aggregator => &self.aggregation_notify, + DutyType::SyncContribution => &self.contrib_notify, + _ => return, + }; + notify.notify_waiters(); + } + /// Blocks until a proposal for the given slot is available, then returns /// it. pub async fn await_proposal(&self, slot: u64) -> Result { - self.await_data(&self.proposer_notify, |s| s.proposer_duties.get(&slot)) - .await + self.await_data(&self.proposer_notify, |s| { + if let Some(v) = s.proposer_duties.get(&slot) { + Lookup::Found(v.clone()) + } else if s.evicted_proposer_slots.contains(&slot) { + Lookup::Evicted + } else { + Lookup::Pending + } + }) + .await } /// Blocks until attestation data for the given slot and committee index is @@ -333,8 +393,16 @@ impl MemDB { slot, committee_index, }; - self.await_data(&self.attestation_notify, |s| s.attestation_duties.get(&key)) - .await + self.await_data(&self.attestation_notify, |s| { + if let Some(v) = s.attestation_duties.get(&key) { + Lookup::Found(v.clone()) + } else if s.evicted_attestation_slots.contains(&key.slot) { + Lookup::Evicted + } else { + Lookup::Pending + } + }) + .await } /// Blocks until an aggregated attestation for the given slot and @@ -347,7 +415,13 @@ impl MemDB { root: attestation_root, }; self.await_data(&self.aggregation_notify, |s| { - s.aggregation_duties.get(&key).map(|a| &a.0) + if let Some(v) = s.aggregation_duties.get(&key) { + Lookup::Found(v.0.clone()) + } else if s.evicted_aggregation_keys.contains(&key) { + Lookup::Evicted + } else { + Lookup::Pending + } }) .await } @@ -365,8 +439,16 @@ impl MemDB { subcommittee_index, root: beacon_block_root, }; - self.await_data(&self.contrib_notify, |s| s.contrib_duties.get(&key)) - .await + self.await_data(&self.contrib_notify, |s| { + if let Some(v) = s.contrib_duties.get(&key) { + Lookup::Found(v.clone()) + } else if s.evicted_contrib_keys.contains(&key) { + Lookup::Evicted + } else { + Lookup::Pending + } + }) + .await } // A single Notify per duty type wakes all waiters on every store, not only @@ -374,22 +456,26 @@ impl MemDB { // is small (one per validator), so the extra wakeups are cheap. A keyed // notify (HashMap) would avoid them but adds complexity that // isn't worth it here. + // + // `delete_duty` also wakes the notify so waiters whose duty just expired + // exit immediately via the `Lookup::Evicted` branch, instead of parking + // for another `notify_waiters` call or for the per-request timeout in + // the caller. async fn await_data( &self, notify: &Notify, - lookup: impl for<'s> Fn(&'s State) -> Option<&'s V>, - ) -> Result - where - V: Clone, - { + lookup: impl Fn(&State) -> Lookup, + ) -> Result { loop { let notified = notify.notified(); tokio::pin!(notified); { let state = self.state.read().await; - if let Some(v) = lookup(&state) { - return Ok(v.clone()); + match lookup(&state) { + Lookup::Found(v) => return Ok(v), + Lookup::Evicted => return Err(Error::AwaitDutyExpired), + Lookup::Pending => {} } } @@ -577,6 +663,7 @@ impl State { match duty.duty_type { DutyType::Proposer => { self.proposer_duties.remove(&slot); + self.evicted_proposer_slots.insert(slot); } DutyType::BuilderProposer => return Err(Error::DeprecatedDutyBuilderProposer), DutyType::Attester => { @@ -589,19 +676,22 @@ impl State { }); } } + self.evicted_attestation_slots.insert(slot); } DutyType::Aggregator => { if let Some(keys) = self.aggregation_keys_by_slot.remove(&slot) { - for key in keys { - self.aggregation_duties.remove(&key); + for key in &keys { + self.aggregation_duties.remove(key); } + self.evicted_aggregation_keys.extend(keys); } } DutyType::SyncContribution => { if let Some(keys) = self.contrib_keys_by_slot.remove(&slot) { - for key in keys { - self.contrib_duties.remove(&key); + for key in &keys { + self.contrib_duties.remove(key); } + self.evicted_contrib_keys.extend(keys); } } _ => return Err(Error::UnknownDutyType), diff --git a/crates/core/src/validatorapi/component.rs b/crates/core/src/validatorapi/component.rs index 2076a521..ac19fbc0 100644 --- a/crates/core/src/validatorapi/component.rs +++ b/crates/core/src/validatorapi/component.rs @@ -4,7 +4,7 @@ //! and public-share mappings needed to translate between distributed-validator //! root keys and this node's threshold-BLS share. -use std::{collections::HashMap, sync::Arc}; +use std::{collections::HashMap, sync::Arc, time::Duration}; use async_trait::async_trait; use axum::http::StatusCode; @@ -13,6 +13,7 @@ use pluto_eth2api::{ GetProposerDutiesRequest, GetProposerDutiesResponse, GetSyncCommitteeDutiesRequest, GetSyncCommitteeDutiesResponse, spec::phase0::BLSPubKey, }; +use tokio::time::error::Elapsed; use super::{ error::ApiError, @@ -29,7 +30,20 @@ use super::{ VersionedSignedBlindedProposal, VersionedSignedProposal, }, }; -use crate::{dutydb::MemDB, version}; +use crate::{ + dutydb::{Error as DutyDbError, MemDB}, + version, +}; + +/// Hard deadline for upstream beacon-node calls. Bounds the worst-case +/// handler latency when the upstream hangs or stalls. Roughly one slot. +const UPSTREAM_REQUEST_TIMEOUT: Duration = Duration::from_secs(12); + +/// Hard deadline for the `attestation_data` await on the local DutyDB. +/// Bounded so a request whose slot never produces consensus output cannot +/// hold a handler task indefinitely. Sized at roughly two slots so a real +/// attestation duty has time to flow through the pipeline. +const ATTESTATION_DATA_TIMEOUT: Duration = Duration::from_secs(24); /// Validator API [`Handler`] implementation. /// @@ -83,7 +97,10 @@ impl Component { } /// Builds a component that skips partial-signature verification on - /// submit endpoints. Test use only. + /// submit endpoints. Gated to test builds — `insecure_test: true` must + /// never reach production, since later submit handlers consult this flag + /// to bypass signature checks. + #[cfg(test)] pub fn new_insecure( eth2_cl: Arc, dutydb: Arc, @@ -125,28 +142,38 @@ impl Handler for Component { .epoch(opts.epoch.to_string()) .build() .map_err(|err| { - ApiError::new(StatusCode::BAD_REQUEST, "invalid epoch").with_source( - std::io::Error::new(std::io::ErrorKind::InvalidInput, err.to_string()), - ) + ApiError::new(StatusCode::BAD_REQUEST, "invalid epoch") + .with_boxed_source(err.into()) })?; - let response = self - .eth2_cl - .get_proposer_duties(request) - .await - .map_err(|err| { - ApiError::new(StatusCode::BAD_GATEWAY, "upstream proposer duties failed") - .with_source(std::io::Error::other(err.to_string())) - })?; + let response = tokio::time::timeout( + UPSTREAM_REQUEST_TIMEOUT, + self.eth2_cl.get_proposer_duties(request), + ) + .await + .map_err(|_| upstream_timeout("proposer duties"))? + .map_err(|err| upstream_call_failed("proposer duties", err.into()))?; let mut payload = match response { GetProposerDutiesResponse::Ok(payload) => payload, - other => { - return Err(ApiError::new( - StatusCode::BAD_GATEWAY, - format!("unexpected upstream proposer duties response: {other:?}"), + GetProposerDutiesResponse::BadRequest(body) => { + return Err(upstream_status_error( + StatusCode::BAD_REQUEST, + "proposer duties", + body, + )); + } + GetProposerDutiesResponse::ServiceUnavailable(body) => { + return Err(upstream_status_error( + StatusCode::SERVICE_UNAVAILABLE, + "proposer duties", + body, )); } + other @ (GetProposerDutiesResponse::InternalServerError(_) + | GetProposerDutiesResponse::Unknown) => { + return Err(upstream_unexpected("proposer duties", other)); + } }; swap_proposer_pubshares(&mut payload.data, &self.pub_share_by_pubkey)?; @@ -164,29 +191,37 @@ impl Handler for Component { .build() .map_err(|err| { ApiError::new(StatusCode::BAD_REQUEST, "invalid attester duties request") - .with_source(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - err.to_string(), - )) + .with_boxed_source(err.into()) })?; - let response = self - .eth2_cl - .get_attester_duties(request) - .await - .map_err(|err| { - ApiError::new(StatusCode::BAD_GATEWAY, "upstream attester duties failed") - .with_source(std::io::Error::other(err.to_string())) - })?; + let response = tokio::time::timeout( + UPSTREAM_REQUEST_TIMEOUT, + self.eth2_cl.get_attester_duties(request), + ) + .await + .map_err(|_| upstream_timeout("attester duties"))? + .map_err(|err| upstream_call_failed("attester duties", err.into()))?; let mut payload = match response { GetAttesterDutiesResponse::Ok(payload) => payload, - other => { - return Err(ApiError::new( - StatusCode::BAD_GATEWAY, - format!("unexpected upstream attester duties response: {other:?}"), + GetAttesterDutiesResponse::BadRequest(body) => { + return Err(upstream_status_error( + StatusCode::BAD_REQUEST, + "attester duties", + body, + )); + } + GetAttesterDutiesResponse::ServiceUnavailable(body) => { + return Err(upstream_status_error( + StatusCode::SERVICE_UNAVAILABLE, + "attester duties", + body, )); } + other @ (GetAttesterDutiesResponse::InternalServerError(_) + | GetAttesterDutiesResponse::Unknown) => { + return Err(upstream_unexpected("attester duties", other)); + } }; swap_attester_pubshares(&mut payload.data, &self.pub_share_by_pubkey)?; @@ -207,32 +242,37 @@ impl Handler for Component { StatusCode::BAD_REQUEST, "invalid sync committee duties request", ) - .with_source(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - err.to_string(), - )) + .with_boxed_source(err.into()) })?; - let response = self - .eth2_cl - .get_sync_committee_duties(request) - .await - .map_err(|err| { - ApiError::new( - StatusCode::BAD_GATEWAY, - "upstream sync committee duties failed", - ) - .with_source(std::io::Error::other(err.to_string())) - })?; + let response = tokio::time::timeout( + UPSTREAM_REQUEST_TIMEOUT, + self.eth2_cl.get_sync_committee_duties(request), + ) + .await + .map_err(|_| upstream_timeout("sync committee duties"))? + .map_err(|err| upstream_call_failed("sync committee duties", err.into()))?; let mut payload = match response { GetSyncCommitteeDutiesResponse::Ok(payload) => payload, - other => { - return Err(ApiError::new( - StatusCode::BAD_GATEWAY, - format!("unexpected upstream sync committee duties response: {other:?}"), + GetSyncCommitteeDutiesResponse::BadRequest(body) => { + return Err(upstream_status_error( + StatusCode::BAD_REQUEST, + "sync committee duties", + body, )); } + GetSyncCommitteeDutiesResponse::ServiceUnavailable(body) => { + return Err(upstream_status_error( + StatusCode::SERVICE_UNAVAILABLE, + "sync committee duties", + body, + )); + } + other @ (GetSyncCommitteeDutiesResponse::InternalServerError(_) + | GetSyncCommitteeDutiesResponse::Unknown) => { + return Err(upstream_unexpected("sync committee duties", other)); + } }; swap_sync_committee_pubshares(&mut payload.data, &self.pub_share_by_pubkey)?; @@ -244,17 +284,19 @@ impl Handler for Component { &self, opts: AttestationDataOpts, ) -> Result { - let data = self - .dutydb - .await_attestation(opts.slot, opts.committee_index) - .await - .map_err(|err| { - ApiError::new( - StatusCode::INTERNAL_SERVER_ERROR, - "await attestation failed", - ) - .with_source(std::io::Error::other(err.to_string())) - })?; + let data = tokio::time::timeout( + ATTESTATION_DATA_TIMEOUT, + self.dutydb + .await_attestation(opts.slot, opts.committee_index), + ) + .await + .map_err(|_: Elapsed| { + ApiError::new( + StatusCode::REQUEST_TIMEOUT, + "attestation data not available before deadline", + ) + })? + .map_err(map_dutydb_error)?; Ok(AttestationDataResponse { data }) } @@ -352,6 +394,81 @@ impl Handler for Component { } } +/// Builds the `ApiError` returned when an upstream beacon-node call elapses +/// past [`UPSTREAM_REQUEST_TIMEOUT`]. +fn upstream_timeout(endpoint: &'static str) -> ApiError { + ApiError::new( + StatusCode::GATEWAY_TIMEOUT, + format!("upstream {endpoint} timed out"), + ) +} + +/// Builds the `ApiError` returned when an upstream beacon-node call returns a +/// transport-level error. Boxed so `anyhow::Error` (which doesn't itself +/// implement `std::error::Error`) can be attached via `.into()`. +fn upstream_call_failed( + endpoint: &'static str, + err: Box, +) -> ApiError { + ApiError::new( + StatusCode::BAD_GATEWAY, + format!("upstream {endpoint} failed"), + ) + .with_boxed_source(err) +} + +/// Builds the `ApiError` returned when the upstream responds with a faithful +/// HTTP status that we propagate (e.g. 400, 503). The upstream body is +/// attached as a `source` for debug logging — never serialized into the +/// client-visible message. +fn upstream_status_error( + status: StatusCode, + endpoint: &'static str, + body: B, +) -> ApiError { + ApiError::new( + status, + format!("upstream {endpoint} returned {}", status.as_u16()), + ) + .with_source(std::io::Error::other(format!( + "upstream {endpoint} body: {body:?}" + ))) +} + +/// Builds the `ApiError` returned when the upstream responds with an +/// unexpected variant (e.g. `Unknown`, or `InternalServerError`). The variant +/// is attached as a `source` so the debug log retains it but the client +/// message stays generic. +fn upstream_unexpected(endpoint: &'static str, response: R) -> ApiError { + ApiError::new( + StatusCode::BAD_GATEWAY, + format!("unexpected upstream {endpoint} response"), + ) + .with_source(std::io::Error::other(format!( + "upstream {endpoint} variant: {response:?}" + ))) +} + +/// Maps a [`crate::dutydb::Error`] into the `ApiError` returned to the client +/// when an `attestation_data` await fails. `Shutdown` propagates as 503 so the +/// VC can retry; `AwaitDutyExpired` propagates as 408 — same as a timeout — +/// since the duty is gone and the data will never arrive. Anything else is a +/// programming error here and becomes 500. +fn map_dutydb_error(err: DutyDbError) -> ApiError { + let (status, message) = match err { + DutyDbError::Shutdown => (StatusCode::SERVICE_UNAVAILABLE, "dutydb is shutting down"), + DutyDbError::AwaitDutyExpired => ( + StatusCode::REQUEST_TIMEOUT, + "attestation duty expired before data was stored", + ), + _ => ( + StatusCode::INTERNAL_SERVER_ERROR, + "await attestation failed", + ), + }; + ApiError::new(status, message).with_source(err) +} + /// Rewrites each duty's root public key to this node's public share. Duties /// whose pubkey is not in `pub_share_by_pubkey` are passed through unchanged /// (the upstream returns all proposers for the epoch, not just ours). @@ -378,8 +495,11 @@ fn swap_attester_pubshares( for duty in duties { let pubkey = parse_bls_pubkey(&duty.pubkey)?; let share = pub_share_by_pubkey.get(&pubkey).ok_or_else(|| { + // Cluster/lock-file misconfiguration — the upstream returned a + // well-formed duty, but this node has no share for that validator. + // 500 (not 502): the failure is local, not gateway-level. ApiError::new( - StatusCode::BAD_GATEWAY, + StatusCode::INTERNAL_SERVER_ERROR, "pubshare not found for attester duty", ) })?; @@ -396,8 +516,9 @@ fn swap_sync_committee_pubshares( for duty in duties { let pubkey = parse_bls_pubkey(&duty.pubkey)?; let share = pub_share_by_pubkey.get(&pubkey).ok_or_else(|| { + // See `swap_attester_pubshares` — same 500-not-502 reasoning. ApiError::new( - StatusCode::BAD_GATEWAY, + StatusCode::INTERNAL_SERVER_ERROR, "pubshare not found for sync committee duty", ) })?; @@ -428,7 +549,48 @@ fn format_bls_pubkey(pubkey: &BLSPubKey) -> String { #[cfg(test)] mod tests { + use chrono::{DateTime, Utc}; + use tokio::sync::mpsc; + use tokio_util::sync::CancellationToken; + use super::*; + use crate::{ + deadline::{DeadlineCalculator, DeadlinerTask, Result as DeadlineResult}, + dutydb::{UnsignedDataSet, UnsignedDutyData}, + signeddata::{ + AttestationData as SignedAttestationData, AttesterDuty as SignedAttesterDuty, + }, + testutils::random_core_pub_key, + types::{Duty, DutyType, SlotNumber}, + validatorapi::types::AttestationDataOpts, + }; + + /// Schedules every duty with a deadline at `MAX_UTC`, so duties are + /// `Scheduled` but never naturally expire. + struct FarFutureCalculator; + + impl DeadlineCalculator for FarFutureCalculator { + fn deadline(&self, _: &Duty) -> DeadlineResult>> { + Ok(Some(DateTime::::MAX_UTC)) + } + } + + /// Build a Component backed by a real (but never-expiring) DutyDB plus a + /// dummy upstream client. Useful for tests that only exercise endpoints + /// served from the DB. + fn make_test_component() -> (Component, Arc) { + let cancel = CancellationToken::new(); + let (deadliner, _deadliner_rx) = + DeadlinerTask::start(cancel.clone(), "validatorapi-tests", FarFutureCalculator); + // Held to keep the eviction channel's sender alive so the dutydb's + // `evict_rx` doesn't observe a closed channel. + let (_evict_tx, evict_rx) = mpsc::channel(1); + let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); + let eth2_cl = + Arc::new(EthBeaconNodeApiClient::with_base_url("http://127.0.0.1:0").unwrap()); + let component = Component::new_insecure(eth2_cl, Arc::clone(&dutydb), 1); + (component, dutydb) + } #[test] fn swap_replaces_known_pubkeys_and_keeps_unknown() { @@ -488,7 +650,7 @@ mod tests { validator_index: "6".to_owned(), }]; let err = swap_attester_pubshares(&mut stranger_duties, &map).unwrap_err(); - assert_eq!(err.status_code, StatusCode::BAD_GATEWAY); + assert_eq!(err.status_code, StatusCode::INTERNAL_SERVER_ERROR); } #[test] @@ -513,7 +675,7 @@ mod tests { validator_sync_committee_indices: vec![], }]; let err = swap_sync_committee_pubshares(&mut stranger, &map).unwrap_err(); - assert_eq!(err.status_code, StatusCode::BAD_GATEWAY); + assert_eq!(err.status_code, StatusCode::INTERNAL_SERVER_ERROR); } #[test] @@ -527,46 +689,6 @@ mod tests { assert_eq!(err.status_code, StatusCode::BAD_GATEWAY); } - use chrono::{DateTime, Utc}; - use tokio::sync::mpsc; - use tokio_util::sync::CancellationToken; - - use crate::{ - deadline::{DeadlineCalculator, DeadlinerTask, Result as DeadlineResult}, - dutydb::{UnsignedDataSet, UnsignedDutyData}, - signeddata::{ - AttestationData as SignedAttestationData, AttesterDuty as SignedAttesterDuty, - }, - testutils::random_core_pub_key, - types::{Duty, DutyType, SlotNumber}, - validatorapi::types::AttestationDataOpts, - }; - - /// Schedules every duty with a deadline at `MAX_UTC`, so duties are - /// `Scheduled` but never naturally expire. - struct FarFutureCalculator; - - impl DeadlineCalculator for FarFutureCalculator { - fn deadline(&self, _: &Duty) -> DeadlineResult>> { - Ok(Some(DateTime::::MAX_UTC)) - } - } - - /// Build a Component backed by a real (but never-expiring) DutyDB plus a - /// dummy upstream client. Useful for tests that only exercise endpoints - /// served from the DB. - fn make_test_component() -> (Component, Arc) { - let cancel = CancellationToken::new(); - let (deadliner, _deadliner_rx) = - DeadlinerTask::start(cancel.clone(), "validatorapi-tests", FarFutureCalculator); - let (_unused_tx, evict_rx) = mpsc::channel(1); - let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); - let eth2_cl = - Arc::new(EthBeaconNodeApiClient::with_base_url("http://127.0.0.1:0").unwrap()); - let component = Component::new_insecure(eth2_cl, Arc::clone(&dutydb), 1); - (component, dutydb) - } - #[tokio::test] async fn node_version_formats_pluto_string() { let (component, _db) = make_test_component(); @@ -623,4 +745,249 @@ mod tests { assert_eq!(response.data.index, COMM_IDX); assert_eq!(response.data.beacon_block_root, [0x11; 32]); } + + /// Storing `(SLOT, COMM_IDX)` must NOT satisfy an `attestation_data` + /// request for `(SLOT, COMM_IDX + 1)`. Verifies the dutydb is keyed on + /// the full `(slot, committee_index)` tuple, not just the slot. + #[tokio::test(start_paused = true)] + async fn attestation_data_does_not_resolve_for_wrong_committee_index() { + const SLOT: u64 = 200; + const COMM_IDX: u64 = 7; + + let (component, db) = make_test_component(); + + let unsigned = SignedAttestationData { + data: pluto_eth2api::spec::phase0::AttestationData { + slot: SLOT, + index: COMM_IDX, + beacon_block_root: [0x22; 32], + source: pluto_eth2api::spec::phase0::Checkpoint::default(), + target: pluto_eth2api::spec::phase0::Checkpoint::default(), + }, + duty: SignedAttesterDuty { + slot: SLOT, + validator_index: 9, + committee_index: COMM_IDX, + committee_length: 8, + committees_at_slot: 1, + validator_committee_index: 0, + }, + }; + let mut set = UnsignedDataSet::new(); + set.insert( + random_core_pub_key(), + UnsignedDutyData::Attestation(unsigned), + ); + db.store(Duty::new(SlotNumber::new(SLOT), DutyType::Attester), set) + .await + .unwrap(); + + // Auto-advance past the handler timeout so the await trips on the + // wrong committee_index, not on the existing one. + let err = component + .attestation_data(AttestationDataOpts { + slot: SLOT, + committee_index: COMM_IDX + 1, + }) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::REQUEST_TIMEOUT); + } + + /// Verifies the handler enforces `ATTESTATION_DATA_TIMEOUT` — an + /// `await_attestation` for a slot that is never stored returns 408 + /// instead of hanging. + #[tokio::test(start_paused = true)] + async fn attestation_data_times_out_when_data_never_arrives() { + let (component, _db) = make_test_component(); + + let err = component + .attestation_data(AttestationDataOpts { + slot: 999, + committee_index: 0, + }) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::REQUEST_TIMEOUT); + } + + /// Verifies that when the dutydb evicts the awaited duty (via the + /// deadliner), the in-flight handler exits promptly with + /// `REQUEST_TIMEOUT` instead of parking on the notify forever. + #[tokio::test] + async fn attestation_data_returns_408_when_duty_is_evicted() { + use tokio::sync::mpsc::channel; + + const SLOT: u64 = 333; + const COMM_IDX: u64 = 1; + + // Hand-build a Component whose dutydb shares its eviction channel + // with the test, so we can drive eviction deterministically. + let cancel = CancellationToken::new(); + let (deadliner, _deadliner_rx) = + DeadlinerTask::start(cancel.clone(), "validatorapi-tests", FarFutureCalculator); + let (trim_tx, trim_rx) = channel::(8); + let dutydb = Arc::new(MemDB::new(deadliner, trim_rx, &cancel)); + let eth2_cl = + Arc::new(EthBeaconNodeApiClient::with_base_url("http://127.0.0.1:0").unwrap()); + let component = Component::new_insecure(eth2_cl, Arc::clone(&dutydb), 1); + + // Start an await before any data is stored. + let waiter = { + let component = Arc::new(component); + let c = Arc::clone(&component); + tokio::spawn(async move { + c.attestation_data(AttestationDataOpts { + slot: SLOT, + committee_index: COMM_IDX, + }) + .await + }) + }; + + // Yield so the waiter parks. + tokio::task::yield_now().await; + + // Simulate the deadliner emitting an eviction for this slot… + trim_tx + .send(Duty::new(SlotNumber::new(SLOT), DutyType::Attester)) + .await + .unwrap(); + + // …then trigger eviction processing by storing an unrelated duty. + let unsigned = SignedAttestationData { + data: pluto_eth2api::spec::phase0::AttestationData { + slot: SLOT.saturating_add(1), + index: 0, + beacon_block_root: [0x33; 32], + source: pluto_eth2api::spec::phase0::Checkpoint::default(), + target: pluto_eth2api::spec::phase0::Checkpoint::default(), + }, + duty: SignedAttesterDuty { + slot: SLOT.saturating_add(1), + validator_index: 0, + committee_index: 0, + committee_length: 8, + committees_at_slot: 1, + validator_committee_index: 0, + }, + }; + let mut set = UnsignedDataSet::new(); + set.insert( + random_core_pub_key(), + UnsignedDutyData::Attestation(unsigned), + ); + dutydb + .store( + Duty::new(SlotNumber::new(SLOT.saturating_add(1)), DutyType::Attester), + set, + ) + .await + .unwrap(); + + let err = waiter.await.unwrap().unwrap_err(); + assert_eq!(err.status_code, StatusCode::REQUEST_TIMEOUT); + } + + /// Verifies that dropping the handler future releases the dutydb + /// waiter — the next store() should not see a hanging reader on the + /// state lock. + #[tokio::test] + async fn attestation_data_drops_waiter_when_future_dropped() { + let (component, db) = make_test_component(); + let component = Arc::new(component); + + let waiter = { + let component = Arc::clone(&component); + tokio::spawn(async move { + component + .attestation_data(AttestationDataOpts { + slot: 4242, + committee_index: 0, + }) + .await + }) + }; + + tokio::task::yield_now().await; + waiter.abort(); + let _ = waiter.await; + + // Confirm db is still usable — store should not deadlock. + let unsigned = SignedAttestationData { + data: pluto_eth2api::spec::phase0::AttestationData { + slot: 1, + index: 0, + beacon_block_root: [0x44; 32], + source: pluto_eth2api::spec::phase0::Checkpoint::default(), + target: pluto_eth2api::spec::phase0::Checkpoint::default(), + }, + duty: SignedAttesterDuty { + slot: 1, + validator_index: 0, + committee_index: 0, + committee_length: 8, + committees_at_slot: 1, + validator_committee_index: 0, + }, + }; + let mut set = UnsignedDataSet::new(); + set.insert( + random_core_pub_key(), + UnsignedDutyData::Attestation(unsigned), + ); + db.store(Duty::new(SlotNumber::new(1), DutyType::Attester), set) + .await + .unwrap(); + } + + /// `map_dutydb_error` covers the three distinguishable variants from + /// `crate::dutydb::Error`. + #[test] + fn map_dutydb_error_status_codes() { + assert_eq!( + map_dutydb_error(DutyDbError::Shutdown).status_code, + StatusCode::SERVICE_UNAVAILABLE + ); + assert_eq!( + map_dutydb_error(DutyDbError::AwaitDutyExpired).status_code, + StatusCode::REQUEST_TIMEOUT + ); + assert_eq!( + map_dutydb_error(DutyDbError::UnsupportedDutyType).status_code, + StatusCode::INTERNAL_SERVER_ERROR + ); + } + + /// `upstream_status_error` keeps the upstream response body out of the + /// client-visible message but preserves it on `source()` so it lands in + /// the debug log. + #[test] + fn upstream_status_error_does_not_leak_body_into_message() { + use pluto_eth2api::BlindedBlock400Response; + + let body = BlindedBlock400Response { + code: 503.0, + message: "secret upstream stacktrace path=/etc/secret".to_owned(), + stacktraces: Some(vec!["at /etc/secret/lighthouse:42".to_owned()]), + }; + let err = upstream_status_error(StatusCode::SERVICE_UNAVAILABLE, "attester duties", body); + + assert_eq!(err.status_code, StatusCode::SERVICE_UNAVAILABLE); + assert!(!err.message.contains("secret")); + assert!(!err.message.contains("stacktrace")); + // But the source carries it for debug logging. + let src = err.source.as_ref().unwrap().to_string(); + assert!(src.contains("secret")); + } + + /// `upstream_unexpected` mirrors `upstream_status_error`'s no-leak shape + /// for the `Unknown` / `InternalServerError` arms. + #[test] + fn upstream_unexpected_does_not_leak_variant_into_message() { + let err = upstream_unexpected("attester duties", GetAttesterDutiesResponse::Unknown); + assert_eq!(err.status_code, StatusCode::BAD_GATEWAY); + assert!(!err.message.contains("Unknown")); + assert!(err.source.as_ref().unwrap().to_string().contains("Unknown")); + } } diff --git a/crates/core/src/validatorapi/error.rs b/crates/core/src/validatorapi/error.rs index 12064c51..e13c440d 100644 --- a/crates/core/src/validatorapi/error.rs +++ b/crates/core/src/validatorapi/error.rs @@ -47,6 +47,18 @@ impl ApiError { self.source = Some(Box::new(source)); self } + + /// Attaches a boxed source error for debug logging. Use this when the + /// upstream error is not `std::error::Error` itself (e.g. `anyhow::Error`, + /// which only implements `AsRef` and converts via `.into()`). + #[must_use] + pub fn with_boxed_source( + mut self, + source: Box, + ) -> Self { + self.source = Some(source); + self + } } impl fmt::Display for ApiError { diff --git a/crates/core/src/validatorapi/router.rs b/crates/core/src/validatorapi/router.rs index 547a372d..c71031ec 100644 --- a/crates/core/src/validatorapi/router.rs +++ b/crates/core/src/validatorapi/router.rs @@ -7,12 +7,19 @@ use std::sync::Arc; use axum::{ Json, Router, - extract::{Path, Query, State}, + extract::{DefaultBodyLimit, Path, Query, State, rejection::QueryRejection}, + http::StatusCode, response::IntoResponse, - routing::{get, post}, + routing::{MethodRouter, get, post}, }; use serde::Deserialize; +/// Cap on the `POST /eth/v1/validator/duties/{attester,sync}/{epoch}` request +/// bodies. A realistic cluster ships at most a few thousand validator indices; +/// 64 KiB still allows ~10k indices in either numeric or string encoding, +/// well above any plausible workload. +const DUTIES_BODY_LIMIT: usize = 64 * 1024; + use super::{ error::ApiError, handler::Handler, @@ -55,7 +62,7 @@ pub fn new_router(handler: Arc, builder_enabled: bool) -> Router { Router::new() .route( "/eth/v1/validator/duties/attester/{epoch}", - post(attester_duties), + duties_post(attester_duties), ) .route( "/eth/v1/validator/duties/proposer/{epoch}", @@ -63,7 +70,7 @@ pub fn new_router(handler: Arc, builder_enabled: bool) -> Router { ) .route( "/eth/v1/validator/duties/sync/{epoch}", - post(sync_committee_duties), + duties_post(sync_committee_duties), ) .route("/eth/v1/validator/attestation_data", get(attestation_data)) .route("/eth/v1/beacon/pool/attestations", post(respond_404)) @@ -178,8 +185,9 @@ async fn sync_committee_duties( async fn attestation_data( State(state): State>, - Query(query): Query, + query: Result, QueryRejection>, ) -> Result, ApiError> { + let Query(query) = query.map_err(query_rejection_to_api_error)?; let response = state .handler .attestation_data(AttestationDataOpts { @@ -191,6 +199,26 @@ async fn attestation_data( Ok(Json(response)) } +/// Wraps a `POST /eth/v1/validator/duties/*` handler with a body-size cap. +/// The cap is local to these two routes so unrelated POST handlers (e.g. +/// `submit_attestations`) keep axum's default 2 MiB. +fn duties_post(handler: H) -> MethodRouter +where + H: axum::handler::Handler, + T: 'static, + S: Clone + Send + Sync + 'static, +{ + post(handler).route_layer(DefaultBodyLimit::max(DUTIES_BODY_LIMIT)) +} + +/// Renders an axum query-extractor rejection as Pluto's standard +/// [`ApiError`] body shape, so all 4xx responses from this router share the +/// same `{ "code", "message" }` schema. +fn query_rejection_to_api_error(rejection: QueryRejection) -> ApiError { + ApiError::new(StatusCode::BAD_REQUEST, "invalid query parameters") + .with_source(std::io::Error::other(rejection.body_text())) +} + async fn submit_attestations() { todo!("vapi: submit_attestations"); } @@ -388,10 +416,10 @@ mod tests { let Json(body) = attestation_data( State(state), - Query(AttestationDataQuery { + Ok(Query(AttestationDataQuery { slot: 99, committee_index: 3, - }), + })), ) .await .unwrap(); @@ -440,4 +468,115 @@ mod tests { assert_eq!(json["data"][0]["validator_index"], "7"); assert_eq!(json["data"][0]["pubkey"], "0xaabbccddeeff"); } + + /// Verifies the manual `Query` rejection path emits the same + /// `{ code, message }` envelope as the rest of the router, instead of + /// axum's default plain-text 400. + #[tokio::test] + async fn attestation_data_returns_api_error_shape_on_bad_query() { + use axum::{ + body::{Body, to_bytes}, + http::Request, + }; + use tower::ServiceExt; + + let handler = TestHandler::default().with_attestation_data(AttestationDataResponse { + data: phase0::AttestationData { + slot: 0, + index: 0, + beacon_block_root: [0; 32], + source: phase0::Checkpoint::default(), + target: phase0::Checkpoint::default(), + }, + }); + let app = new_router(Arc::new(handler), false); + + // Missing `committee_index`. + let req = Request::builder() + .uri("/eth/v1/validator/attestation_data?slot=10") + .body(Body::empty()) + .unwrap(); + let resp = app.clone().oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + let body = to_bytes(resp.into_body(), 64 * 1024).await.unwrap(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(json["code"], 400); + assert!(json["message"].is_string()); + + // Non-numeric `slot`. + let req = Request::builder() + .uri("/eth/v1/validator/attestation_data?slot=foo&committee_index=1") + .body(Body::empty()) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + let body = to_bytes(resp.into_body(), 64 * 1024).await.unwrap(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(json["code"], 400); + } + + /// Verifies the body-limit layer on `POST /eth/v1/validator/duties/*` + /// rejects oversized bodies — defense against the `Vec` parse + /// amplification on the duties endpoints. + #[tokio::test] + async fn attester_duties_rejects_oversized_body() { + use axum::{ + body::Body, + http::{Method, Request}, + }; + use tower::ServiceExt; + + let handler = TestHandler::default(); + let app = new_router(Arc::new(handler), false); + + // 128 KiB of zeros — well past the 64 KiB cap, valid JSON or not. + let big = vec![b'0'; 128 * 1024]; + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v1/validator/duties/attester/42") + .header("content-type", "application/json") + .header("content-length", big.len()) + .body(Body::from(big)) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::PAYLOAD_TOO_LARGE); + } + + /// `[]` is a valid request body — the upstream returns an empty duty + /// list — and `ValIndexes` should accept it. + #[test] + fn val_indexes_accepts_empty_array() { + let v: ValIndexes = serde_json::from_str("[]").unwrap(); + assert!(v.0.is_empty()); + } + + /// Mixed numeric + string elements are accepted; each element is + /// validated independently. The previous untagged-enum implementation + /// rejected this entirely. + #[test] + fn val_indexes_accepts_mixed_elements() { + let v: ValIndexes = serde_json::from_str(r#"[1, "2", 3, "4"]"#).unwrap(); + assert_eq!(v.0, vec!["1", "2", "3", "4"]); + } + + /// Caps the request to `VAL_INDEXES_MAX_LEN` elements. + #[test] + fn val_indexes_rejects_oversized_array() { + use crate::validatorapi::types::VAL_INDEXES_MAX_LEN; + + let too_many = (0..=VAL_INDEXES_MAX_LEN) + .map(|n| n.to_string()) + .collect::>() + .join(","); + let json = format!("[{too_many}]"); + let err = serde_json::from_str::(&json).unwrap_err(); + assert!(err.to_string().contains("too many validator indices")); + } + + /// Negative integers are rejected (validator indices are u64). + #[test] + fn val_indexes_rejects_negative_numbers() { + let bad = serde_json::from_str::("[-1]"); + assert!(bad.is_err()); + } } diff --git a/crates/core/src/validatorapi/types.rs b/crates/core/src/validatorapi/types.rs index ddc93c14..8e18456a 100644 --- a/crates/core/src/validatorapi/types.rs +++ b/crates/core/src/validatorapi/types.rs @@ -4,7 +4,12 @@ //! Most data payloads are empty placeholders for now and will be swapped //! for the proper consensus-spec types in a later phase. -use serde::{Deserialize, Deserializer, Serialize}; +use std::fmt; + +use serde::{ + Deserialize, Deserializer, Serialize, + de::{self, SeqAccess, Visitor}, +}; pub use pluto_crypto::types::{PublicKey as BlsPubKey, Signature as BlsSignature}; pub use pluto_eth2api::{ @@ -195,28 +200,90 @@ pub struct SyncCommitteeSelection {} #[derive(Debug, Clone, Default, PartialEq, Eq, Serialize)] pub struct ValIndexes(pub Vec); +/// Hard cap on the number of validator indices accepted per request. A real +/// cluster has at most a few hundred validators; the cap is set generously +/// above that to leave room for future growth while still bounding the work +/// per request so a single misbehaving caller cannot drive unbounded +/// allocation. Pairs with the route-level [`DUTIES_BODY_LIMIT`] +/// (`router.rs`) which limits the *bytes* the deserializer ever sees; +/// this limits the *count* even within those bytes. +pub const VAL_INDEXES_MAX_LEN: usize = 8192; + impl<'de> Deserialize<'de> for ValIndexes { fn deserialize(deserializer: D) -> Result where D: Deserializer<'de>, { - #[derive(Deserialize)] - #[serde(untagged)] - enum Either { - Numbers(Vec), - Strings(Vec), - } + // Custom visitor: streams elements via `SeqAccess::next_element`, + // validates each on read, and aborts as soon as the cap is exceeded. + // Avoids the `#[serde(untagged)]` two-pass behavior (which buffers the + // input via serde's `Content` cache before retrying) and the + // single-allocation `Vec` materialization. + struct ValIndexesVisitor; + + impl<'de> Visitor<'de> for ValIndexesVisitor { + type Value = ValIndexes; + + fn expecting(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("an array of validator indices (numeric or decimal string)") + } - let value = Either::deserialize(deserializer)?; - let indices = match value { - Either::Numbers(ns) => ns.into_iter().map(|n| n.to_string()).collect(), - Either::Strings(strs) => { - for s in &strs { - s.parse::().map_err(serde::de::Error::custom)?; + fn visit_seq(self, mut seq: A) -> Result + where + A: SeqAccess<'de>, + { + let mut out = Vec::with_capacity(seq.size_hint().unwrap_or(0).min(64)); + while let Some(elem) = seq.next_element::()? { + if out.len() >= VAL_INDEXES_MAX_LEN { + return Err(de::Error::custom(format!( + "too many validator indices (max {VAL_INDEXES_MAX_LEN})" + ))); + } + out.push(elem.0); } - strs + Ok(ValIndexes(out)) + } + } + + deserializer.deserialize_seq(ValIndexesVisitor) + } +} + +/// One validator-index element. Accepts either a JSON number (formatted into +/// a decimal string) or a JSON string (validated as a `u64` then kept +/// verbatim). Single-pass; no untagged-enum buffering. +struct Element(String); + +impl<'de> Deserialize<'de> for Element { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + struct ElemVisitor; + + impl Visitor<'_> for ElemVisitor { + type Value = Element; + + fn expecting(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("a validator index (u64 or decimal string)") + } + + fn visit_u64(self, v: u64) -> Result { + Ok(Element(v.to_string())) + } + + fn visit_i64(self, v: i64) -> Result { + u64::try_from(v) + .map(|n| Element(n.to_string())) + .map_err(|_| de::Error::custom("validator index must be non-negative")) } - }; - Ok(Self(indices)) + + fn visit_str(self, v: &str) -> Result { + v.parse::().map_err(de::Error::custom)?; + Ok(Element(v.to_owned())) + } + } + + deserializer.deserialize_any(ElemVisitor) } } From 6a8a94c8e5c6ced2a6911aecf251f18df3d380e5 Mon Sep 17 00:00:00 2001 From: Bohdan Ohorodnii <273991985+varex83agent@users.noreply.github.com> Date: Mon, 1 Jun 2026 17:17:50 +0200 Subject: [PATCH 10/12] fix(core): bound dutydb eviction state with high-water marks The evicted_* HashSets in MemDB were insert-only: delete_duty added an entry per evicted slot/aggregation/contrib key and nothing ever removed them, so on a long-running node they grew without bound (one entry per 12s slot, forever). Charon's Go dutydb keeps no eviction record at all and relies on the per-request context timeout. Because the deadliner expires duties in non-decreasing slot order and store() refuses already-expired duties, the highest evicted slot is sufficient: any awaited slot <= the mark that is not currently stored will never be stored. Replace the three slot-keyed sets with O(1) Option high-water marks (attester/proposer/sync-contribution), preserving the immediate-AwaitDutyExpired fast path. Aggregated attestations are awaited by root only (no slot at await time), so they cannot consult a slot mark; drop their eviction tracking and let an evicted root rely on the request timeout, matching Go. Add a regression test asserting an evicted slot and any older slot fail fast with AwaitDutyExpired without parking. Co-Authored-By: varex83 --- crates/core/src/dutydb/memory.rs | 141 ++++++++++++++++++++++++------- 1 file changed, 111 insertions(+), 30 deletions(-) diff --git a/crates/core/src/dutydb/memory.rs b/crates/core/src/dutydb/memory.rs index 49b107df..abcc6c16 100644 --- a/crates/core/src/dutydb/memory.rs +++ b/crates/core/src/dutydb/memory.rs @@ -2,7 +2,7 @@ //! //! Equivalent to charon/core/dutydb/memory.go. -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use pluto_eth2api::{ spec::{altair, phase0}, @@ -207,18 +207,27 @@ struct State { contrib_duties: HashMap, contrib_keys_by_slot: HashMap>, - /// Slots whose attester duty has been evicted by the deadliner. Lets - /// `await_attestation` return `AwaitDutyExpired` immediately when the - /// awaited slot is gone, rather than spinning on every `store()` until - /// the request-level timeout fires. - evicted_attestation_slots: HashSet, - /// Slots whose proposer duty has been evicted. - evicted_proposer_slots: HashSet, - /// Aggregation roots whose duty has been evicted. - evicted_aggregation_keys: HashSet, - /// Sync contribution keys whose duty has been evicted. - evicted_contrib_keys: HashSet, - + /// Highest slot whose attester duty has been evicted by the deadliner. + /// Because the deadliner expires duties in non-decreasing slot order and + /// `store()` refuses already-expired duties (`AddOutcome::AlreadyExpired`), + /// any awaited slot `<=` this mark that is not currently stored will never + /// be stored. Tracking only the high-water mark — rather than the set of + /// every evicted slot, which would grow without bound for the lifetime of + /// the node — lets `await_attestation` return `AwaitDutyExpired` + /// immediately for a gone duty while keeping the bookkeeping O(1) in + /// memory. + max_evicted_attestation_slot: Option, + /// Highest slot whose proposer duty has been evicted. See + /// [`max_evicted_attestation_slot`](Self::max_evicted_attestation_slot). + max_evicted_proposer_slot: Option, + /// Highest slot whose sync-contribution duty has been evicted. See + /// [`max_evicted_attestation_slot`](Self::max_evicted_attestation_slot). + max_evicted_contrib_slot: Option, + // NB: there is no eviction mark for aggregated attestations. They are + // awaited by root only (`await_agg_attestation` has no slot), so there is + // no slot to compare against a high-water mark; an evicted root relies on + // the caller's request timeout instead, matching Charon's Go dutydb which + // keeps no eviction record at all. deadliner_rx: tokio::sync::mpsc::Receiver, } @@ -254,10 +263,9 @@ impl MemDB { aggregation_keys_by_slot: HashMap::new(), contrib_duties: HashMap::new(), contrib_keys_by_slot: HashMap::new(), - evicted_attestation_slots: HashSet::new(), - evicted_proposer_slots: HashSet::new(), - evicted_aggregation_keys: HashSet::new(), - evicted_contrib_keys: HashSet::new(), + max_evicted_attestation_slot: None, + max_evicted_proposer_slot: None, + max_evicted_contrib_slot: None, deadliner_rx, }), attestation_notify: Notify::new(), @@ -373,7 +381,7 @@ impl MemDB { self.await_data(&self.proposer_notify, |s| { if let Some(v) = s.proposer_duties.get(&slot) { Lookup::Found(v.clone()) - } else if s.evicted_proposer_slots.contains(&slot) { + } else if s.max_evicted_proposer_slot.is_some_and(|hw| slot <= hw) { Lookup::Evicted } else { Lookup::Pending @@ -396,7 +404,10 @@ impl MemDB { self.await_data(&self.attestation_notify, |s| { if let Some(v) = s.attestation_duties.get(&key) { Lookup::Found(v.clone()) - } else if s.evicted_attestation_slots.contains(&key.slot) { + } else if s + .max_evicted_attestation_slot + .is_some_and(|hw| key.slot <= hw) + { Lookup::Evicted } else { Lookup::Pending @@ -415,10 +426,11 @@ impl MemDB { root: attestation_root, }; self.await_data(&self.aggregation_notify, |s| { + // Awaited by root only, so there is no slot to test against an + // eviction high-water mark: an evicted root relies on the caller's + // request timeout to terminate (matching Charon's Go dutydb). if let Some(v) = s.aggregation_duties.get(&key) { Lookup::Found(v.0.clone()) - } else if s.evicted_aggregation_keys.contains(&key) { - Lookup::Evicted } else { Lookup::Pending } @@ -442,7 +454,7 @@ impl MemDB { self.await_data(&self.contrib_notify, |s| { if let Some(v) = s.contrib_duties.get(&key) { Lookup::Found(v.clone()) - } else if s.evicted_contrib_keys.contains(&key) { + } else if s.max_evicted_contrib_slot.is_some_and(|hw| slot <= hw) { Lookup::Evicted } else { Lookup::Pending @@ -657,13 +669,20 @@ impl State { Ok(()) } + /// Raises an eviction high-water mark to `slot` if `slot` is newer (or the + /// mark is unset). The deadliner expires duties in non-decreasing slot + /// order, so in practice this only ever moves the mark forward. + fn bump_high_water(mark: &mut Option, slot: u64) { + *mark = Some(mark.map_or(slot, |current| current.max(slot))); + } + fn delete_duty(&mut self, duty: Duty) -> Result<()> { let slot = duty.slot.inner(); info!(slot, duty_type = %duty.duty_type, "dutydb: deleting expired duty"); match duty.duty_type { DutyType::Proposer => { self.proposer_duties.remove(&slot); - self.evicted_proposer_slots.insert(slot); + Self::bump_high_water(&mut self.max_evicted_proposer_slot, slot); } DutyType::BuilderProposer => return Err(Error::DeprecatedDutyBuilderProposer), DutyType::Attester => { @@ -676,23 +695,24 @@ impl State { }); } } - self.evicted_attestation_slots.insert(slot); + Self::bump_high_water(&mut self.max_evicted_attestation_slot, slot); } DutyType::Aggregator => { + // No eviction mark: aggregated attestations are awaited by root + // only, so there is nothing for a slot high-water mark to gate. if let Some(keys) = self.aggregation_keys_by_slot.remove(&slot) { - for key in &keys { - self.aggregation_duties.remove(key); + for key in keys { + self.aggregation_duties.remove(&key); } - self.evicted_aggregation_keys.extend(keys); } } DutyType::SyncContribution => { if let Some(keys) = self.contrib_keys_by_slot.remove(&slot) { - for key in &keys { - self.contrib_duties.remove(key); + for key in keys { + self.contrib_duties.remove(&key); } - self.evicted_contrib_keys.extend(keys); } + Self::bump_high_water(&mut self.max_evicted_contrib_slot, slot); } _ => return Err(Error::UnknownDutyType), } @@ -1256,6 +1276,67 @@ mod tests { assert!(db.pub_key_by_attestation(SLOT, 0, 0).await.is_err()); } + /// After a slot is evicted, `await_attestation` must return + /// `AwaitDutyExpired` immediately (not park until the request timeout) for + /// that slot AND for any older slot — the eviction state is a single + /// high-water mark, so it stays O(1) in memory rather than accumulating one + /// entry per evicted slot for the lifetime of the node. + #[tokio::test] + async fn await_attestation_expired_after_eviction_high_water() { + let deadliner = far_future_handle(); + let (trim_tx, trim_rx) = channel::(64); + let db = make_db_with_deadliner(deadliner, trim_rx); + + const SLOT: u64 = 123; + + let mut set = UnsignedDataSet::new(); + set.insert( + random_core_pub_key(), + UnsignedDutyData::Attestation(att_data(SLOT, 0, 0)), + ); + db.store(Duty::new(SlotNumber::new(SLOT), DutyType::Attester), set) + .await + .unwrap(); + + // Evict SLOT, then trigger expiry processing with an unrelated store. + trim_tx + .send(Duty::new(SlotNumber::new(SLOT), DutyType::Attester)) + .await + .expect("trim_tx should be open"); + let mut set2 = UnsignedDataSet::new(); + set2.insert( + random_core_pub_key(), + UnsignedDutyData::Proposal(Box::new(phase0_proposal(SLOT.saturating_add(1), 0))), + ); + db.store( + Duty::new(SlotNumber::new(SLOT.saturating_add(1)), DutyType::Proposer), + set2, + ) + .await + .unwrap(); + + // The evicted slot resolves to AwaitDutyExpired without parking. + let timeout = std::time::Duration::from_secs(5); + let evicted = tokio::time::timeout(timeout, db.await_attestation(SLOT, 0)) + .await + .expect("await must not park for an evicted slot"); + assert!( + matches!(evicted, Err(Error::AwaitDutyExpired)), + "evicted slot: expected AwaitDutyExpired, got {evicted:?}" + ); + + // An older, never-stored slot is also below the high-water mark: its + // deadline has necessarily passed too, so it must fail fast rather than + // park — and we keep no per-slot record to answer this. + let older = tokio::time::timeout(timeout, db.await_attestation(SLOT.saturating_sub(1), 0)) + .await + .expect("await must not park for a slot below the eviction high-water"); + assert!( + matches!(older, Err(Error::AwaitDutyExpired)), + "older slot: expected AwaitDutyExpired, got {older:?}" + ); + } + #[tokio::test] async fn agg_attestation_two_roots_same_slot() { const SLOT: u64 = 300; From b5eb0ca7c4f3bfbb8b11e49fe213f4b9c914dcbb Mon Sep 17 00:00:00 2001 From: Bohdan Ohorodnii <273991985+varex83agent@users.noreply.github.com> Date: Mon, 1 Jun 2026 17:27:16 +0200 Subject: [PATCH 11/12] fix(core): return {code,message} envelope for malformed duties body MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit attester_duties and sync_committee_duties extracted the body as bare `Json`, so a malformed body produced axum's default plain-text rejection — 400 for a JSON syntax error but 422 for a wrong type — instead of the router's standard {code,message} envelope. Charon's unmarshal returns a uniform 400 with its apiError body for every request body parse failure. Extract as `Result, JsonRejection>` and map through a new json_rejection_to_api_error, mirroring the existing query-rejection path. Genuine parse failures normalise to 400; the DefaultBodyLimit size cap still surfaces as 413 (Pluto's DoS defense), preserved explicitly. Add a router test asserting the 400 envelope for a wrong-shape body. Co-Authored-By: varex83 --- crates/core/src/validatorapi/router.rs | 65 ++++++++++++++++++++++++-- 1 file changed, 60 insertions(+), 5 deletions(-) diff --git a/crates/core/src/validatorapi/router.rs b/crates/core/src/validatorapi/router.rs index c71031ec..9fffa5f3 100644 --- a/crates/core/src/validatorapi/router.rs +++ b/crates/core/src/validatorapi/router.rs @@ -7,7 +7,10 @@ use std::sync::Arc; use axum::{ Json, Router, - extract::{DefaultBodyLimit, Path, Query, State, rejection::QueryRejection}, + extract::{ + DefaultBodyLimit, Path, Query, State, + rejection::{JsonRejection, QueryRejection}, + }, http::StatusCode, response::IntoResponse, routing::{MethodRouter, get, post}, @@ -142,8 +145,9 @@ pub fn new_router(handler: Arc, builder_enabled: bool) -> Router { async fn attester_duties( State(state): State>, Path(epoch): Path, - Json(indices): Json, + indices: Result, JsonRejection>, ) -> Result, ApiError> { + let Json(indices) = indices.map_err(json_rejection_to_api_error)?; let response = state .handler .attester_duties(AttesterDutiesOpts { @@ -170,8 +174,9 @@ async fn proposer_duties( async fn sync_committee_duties( State(state): State>, Path(epoch): Path, - Json(indices): Json, + indices: Result, JsonRejection>, ) -> Result, ApiError> { + let Json(indices) = indices.map_err(json_rejection_to_api_error)?; let response = state .handler .sync_committee_duties(SyncCommitteeDutiesOpts { @@ -219,6 +224,25 @@ fn query_rejection_to_api_error(rejection: QueryRejection) -> ApiError { .with_source(std::io::Error::other(rejection.body_text())) } +/// Renders an axum JSON body-extractor rejection as Pluto's standard +/// [`ApiError`] body shape, so it shares the `{ "code", "message" }` schema +/// instead of axum's default plain-text response. +/// +/// Every genuine parse failure — malformed JSON (`400`), wrong element type +/// (`422`), missing `content-type` (`415`) — is normalised to a uniform +/// `400`, matching Charon's `unmarshal`, which returns `400` for all body +/// unmarshal failures. The body-size-limit rejection from [`DefaultBodyLimit`] +/// surfaces here too (the limit is enforced as the `Json` extractor reads the +/// body); its `413 Payload Too Large` is preserved, since that is Pluto's +/// DoS defense rather than a parse error. +fn json_rejection_to_api_error(rejection: JsonRejection) -> ApiError { + let (status, message) = match rejection.status() { + StatusCode::PAYLOAD_TOO_LARGE => (StatusCode::PAYLOAD_TOO_LARGE, "request body too large"), + _ => (StatusCode::BAD_REQUEST, "invalid request body"), + }; + ApiError::new(status, message).with_source(std::io::Error::other(rejection.body_text())) +} + async fn submit_attestations() { todo!("vapi: submit_attestations"); } @@ -348,7 +372,7 @@ mod tests { let Json(body) = attester_duties( State(state), Path(42u64), - Json(ValIndexes(vec!["7".to_owned()])), + Ok(Json(ValIndexes(vec!["7".to_owned()]))), ) .await .unwrap(); @@ -381,7 +405,7 @@ mod tests { let Json(body) = sync_committee_duties( State(state), Path(7u64), - Json(ValIndexes(vec!["9".to_owned()])), + Ok(Json(ValIndexes(vec!["9".to_owned()]))), ) .await .unwrap(); @@ -542,6 +566,37 @@ mod tests { assert_eq!(resp.status(), StatusCode::PAYLOAD_TOO_LARGE); } + /// A malformed duties body emits the same `{ code, message }` envelope and + /// uniform 400 as the rest of the router, rather than axum's default + /// plain-text rejection (which would be 400 for a syntax error but 422 for + /// a type error). Mirrors Charon's `unmarshal`, which returns 400 for every + /// body parse failure. + #[tokio::test] + async fn attester_duties_returns_api_error_shape_on_bad_body() { + use axum::{ + body::{Body, to_bytes}, + http::{Method, Request}, + }; + use tower::ServiceExt; + + let app = new_router(Arc::new(TestHandler::default()), false); + + // Valid JSON, wrong shape (object, not an array) — axum's default + // would surface this as a 422 type error. + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v1/validator/duties/attester/42") + .header("content-type", "application/json") + .body(Body::from(r#"{"not":"an array"}"#)) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + let body = to_bytes(resp.into_body(), 64 * 1024).await.unwrap(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(json["code"], 400); + assert!(json["message"].is_string()); + } + /// `[]` is a valid request body — the upstream returns an empty duty /// list — and `ValIndexes` should accept it. #[test] From 12e2263d99536993679c0bedb2b4a3d15d3ad219 Mon Sep 17 00:00:00 2001 From: Bohdan Ohorodnii <273991985+varex83agent@users.noreply.github.com> Date: Tue, 2 Jun 2026 14:45:26 +0200 Subject: [PATCH 12/12] fix(core): align validatorapi with Charon (timeout, content-type) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses inline review comments from @iamquang95 on PR #451. UPSTREAM_REQUEST_TIMEOUT was 12s — set to 10s to match Charon's defaultRequestTimeout (core/validatorapi/router.go:61). POST /eth/v1/validator/duties/{attester,sync}/{epoch} now matches Charon's content-type policy via a new enforce_json_content_type middleware layered onto duties_post: - missing Content-Type → treated as application/json (Charon parity) - application/json → passes through to the existing Json extractor - anything else → 415 Unsupported Media Type with the offending value Previously axum's Json extractor rejected a missing header with MissingJsonContentType, which json_rejection_to_api_error normalised to 400 — diverging from Charon, which lets VCs that don't set the header through. Non-JSON Content-Type was also collapsed to 400; it is now the 415 Charon returns. Updated json_rejection_to_api_error doc comment: it no longer sees content-type rejections (the middleware intercepts them upstream). Tests (+2): - attester_duties_accepts_missing_content_type asserts 200 when Content-Type is absent. - attester_duties_rejects_non_json_content_type asserts 415 with the offending media type echoed in the message. Co-Authored-By: varex83 --- crates/core/src/validatorapi/component.rs | 5 +- crates/core/src/validatorapi/router.rs | 120 +++++++++++++++++++--- 2 files changed, 111 insertions(+), 14 deletions(-) diff --git a/crates/core/src/validatorapi/component.rs b/crates/core/src/validatorapi/component.rs index ac19fbc0..e61df23b 100644 --- a/crates/core/src/validatorapi/component.rs +++ b/crates/core/src/validatorapi/component.rs @@ -36,8 +36,9 @@ use crate::{ }; /// Hard deadline for upstream beacon-node calls. Bounds the worst-case -/// handler latency when the upstream hangs or stalls. Roughly one slot. -const UPSTREAM_REQUEST_TIMEOUT: Duration = Duration::from_secs(12); +/// handler latency when the upstream hangs or stalls. Mirrors Charon's +/// `defaultRequestTimeout` (`core/validatorapi/router.go:61`). +const UPSTREAM_REQUEST_TIMEOUT: Duration = Duration::from_secs(10); /// Hard deadline for the `attestation_data` await on the local DutyDB. /// Bounded so a request whose slot never produces consensus output cannot diff --git a/crates/core/src/validatorapi/router.rs b/crates/core/src/validatorapi/router.rs index 9fffa5f3..a32ab9b9 100644 --- a/crates/core/src/validatorapi/router.rs +++ b/crates/core/src/validatorapi/router.rs @@ -8,11 +8,12 @@ use std::sync::Arc; use axum::{ Json, Router, extract::{ - DefaultBodyLimit, Path, Query, State, + DefaultBodyLimit, Path, Query, Request, State, rejection::{JsonRejection, QueryRejection}, }, - http::StatusCode, - response::IntoResponse, + http::{HeaderValue, StatusCode, header}, + middleware::{self, Next}, + response::{IntoResponse, Response}, routing::{MethodRouter, get, post}, }; use serde::Deserialize; @@ -204,16 +205,48 @@ async fn attestation_data( Ok(Json(response)) } -/// Wraps a `POST /eth/v1/validator/duties/*` handler with a body-size cap. -/// The cap is local to these two routes so unrelated POST handlers (e.g. -/// `submit_attestations`) keep axum's default 2 MiB. +/// Wraps a `POST /eth/v1/validator/duties/*` handler with a body-size cap +/// and the Charon-parity content-type policy. The cap is local to these +/// two routes so unrelated POST handlers (e.g. `submit_attestations`) keep +/// axum's default 2 MiB. fn duties_post(handler: H) -> MethodRouter where H: axum::handler::Handler, T: 'static, S: Clone + Send + Sync + 'static, { - post(handler).route_layer(DefaultBodyLimit::max(DUTIES_BODY_LIMIT)) + post(handler) + .route_layer(DefaultBodyLimit::max(DUTIES_BODY_LIMIT)) + .route_layer(middleware::from_fn(enforce_json_content_type)) +} + +/// Matches Charon's content-type handling at `core/validatorapi/router.go:365`: +/// a missing `Content-Type` is treated as `application/json`; an unrecognized +/// content type is rejected with `415 Unsupported Media Type`. SSZ is not +/// supported yet — when it lands, this is the right seam to extend. +/// +/// Without this layer, axum's `Json` extractor would reject a missing header +/// with `MissingJsonContentType`, which our envelope normalises to `400` — +/// diverging from Charon, which lets VCs that don't set the header through. +async fn enforce_json_content_type(mut req: Request, next: Next) -> Result { + match req.headers().get(header::CONTENT_TYPE) { + None => { + req.headers_mut().insert( + header::CONTENT_TYPE, + HeaderValue::from_static("application/json"), + ); + } + Some(value) => { + let s = value.to_str().unwrap_or(""); + if !s.contains("application/json") { + return Err(ApiError::new( + StatusCode::UNSUPPORTED_MEDIA_TYPE, + format!("unsupported media type {s}"), + )); + } + } + } + Ok(next.run(req).await) } /// Renders an axum query-extractor rejection as Pluto's standard @@ -228,11 +261,13 @@ fn query_rejection_to_api_error(rejection: QueryRejection) -> ApiError { /// [`ApiError`] body shape, so it shares the `{ "code", "message" }` schema /// instead of axum's default plain-text response. /// -/// Every genuine parse failure — malformed JSON (`400`), wrong element type -/// (`422`), missing `content-type` (`415`) — is normalised to a uniform -/// `400`, matching Charon's `unmarshal`, which returns `400` for all body -/// unmarshal failures. The body-size-limit rejection from [`DefaultBodyLimit`] -/// surfaces here too (the limit is enforced as the `Json` extractor reads the +/// Genuine parse failures — malformed JSON (`400`) and wrong element type +/// (`422`) — are normalised to a uniform `400`, matching Charon's `unmarshal`, +/// which returns `400` for all body unmarshal failures. Content-Type rejections +/// no longer reach this function: [`enforce_json_content_type`] intercepts +/// them upstream so missing/JSON requests pass through and non-JSON requests +/// return `415`. The body-size-limit rejection from [`DefaultBodyLimit`] +/// surfaces here (the limit is enforced as the `Json` extractor reads the /// body); its `413 Payload Too Large` is preserved, since that is Pluto's /// DoS defense rather than a parse error. fn json_rejection_to_api_error(rejection: JsonRejection) -> ApiError { @@ -597,6 +632,67 @@ mod tests { assert!(json["message"].is_string()); } + /// Charon-parity: a duties request that omits `Content-Type` is + /// treated as `application/json` rather than rejected — the + /// `enforce_json_content_type` middleware injects the header before + /// the `Json` extractor sees the request. See `core/validatorapi/ + /// router.go:365` (`if contentHeader == "" || ...`). + #[tokio::test] + async fn attester_duties_accepts_missing_content_type() { + use axum::{ + body::Body, + http::{Method, Request}, + }; + use tower::ServiceExt; + + let handler = TestHandler::default().with_attester_duties(AttesterDutiesResponse { + data: vec![], + dependent_root: "0x00".to_owned(), + execution_optimistic: false, + }); + let app = new_router(Arc::new(handler), false); + + // No Content-Type header at all. + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v1/validator/duties/attester/42") + .body(Body::from("[]")) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + } + + /// Charon-parity: a duties request with a non-JSON `Content-Type` + /// returns `415 Unsupported Media Type`, not the `400` that the + /// generic body-parse normaliser would produce. + #[tokio::test] + async fn attester_duties_rejects_non_json_content_type() { + use axum::{ + body::{Body, to_bytes}, + http::{Method, Request}, + }; + use tower::ServiceExt; + + let app = new_router(Arc::new(TestHandler::default()), false); + + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v1/validator/duties/attester/42") + .header("content-type", "text/plain") + .body(Body::from("[]")) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::UNSUPPORTED_MEDIA_TYPE); + let body = to_bytes(resp.into_body(), 64 * 1024).await.unwrap(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(json["code"], 415); + assert!( + json["message"] + .as_str() + .is_some_and(|m| m.contains("text/plain")) + ); + } + /// `[]` is a valid request body — the upstream returns an empty duty /// list — and `ValIndexes` should accept it. #[test]