feat(core): implement attester_duties, sync_committee_duties, attestation_data handlers#451
Conversation
Threads the Handler through Axum state via AppState<H> + with_state, wires the node_version route to the real handler, and adds a TestHandler mock that future PRs will extend per-endpoint.
Re-uses the auto-generated pluto_eth2api envelopes (GetProposerDutiesResponseResponse, GetVersionResponseResponse) as the on-the-wire shape rather than hand-rolling parallel types. node_version is migrated to the same pattern; the body.rs hand-rolled wrapper module is removed.
Drops the per-handler generic parameter and routes through Arc<dyn Handler> via AppState. The Handler trait is object-safe (Send + Sync + 'static + async_trait-generated methods), so this is a pure type change with no surface impact.
Adds the Handler impl that the router has been calling through.
node_version returns the obolnetwork/pluto/{version}-{commit}/{arch}-{os}
identity string; proposer_duties calls the upstream beacon node and
rewrites known DV root public keys to this node's public share so the
validator client sees keys matching its keystore. The remaining 17
trait methods are unimplemented!() stubs that land per-PR as their
router handlers are ported.
Wires POST /eth/v1/validator/duties/attester/{epoch}: dual-format
(numeric or string-encoded) validator index body, upstream call,
pubshare swap.
Wires POST /eth/v1/validator/duties/sync/{epoch}, reusing the
ValIndexes dual-format body extractor.
Wires GET /eth/v1/validator/attestation_data. The Component now holds an Arc<MemDB> and awaits unsigned attestation data from the local DutyDB rather than hitting upstream.
|
@varex83agent /review-pr |
varex83agent
left a comment
There was a problem hiding this comment.
Implements three validator-API handlers (attester_duties, sync_committee_duties, attestation_data) with reasonable parity to Charon and unit tests for the swap helpers, the dual-format ValIndexes deserializer, and the DutyDB-backed attestation path. Code style is idiomatic. However, two bug-level issues need fixing before merge, plus several major hardening gaps on this attacker-reachable surface.
Bugs (must fix):
attestation_dataawaitsMemDBwith no timeout, no request-cancel propagation, and no eviction signal — a request for a slot that never arrives becomes a permanent zombie task, contending on every legitimatestorevianotify_waiters. Charon plumbs the requestctxthroughawaitAttFunc; the Rust port loses this entirely. Wrap the await in atokio::time::timeout(matching the slot deadline) and either wake ondelete_dutyor plumb a per-requestCancellationToken.format!("unexpected upstream … response: {other:?}")interpolates the upstream beacon node's Debug output — includingBlindedBlock400Response.messageand anystacktracesfield — directly into the client-visibleApiError.message. This forwards upstream internals (and possibly file paths) to anyone reachable on the validator API. Move{other:?}intowith_source(...)and use a genericmessage.
Majors:
Component::new_insecure(which disables submit-side signature verification) is plainpub fn, not#[cfg(test)]— once submit endpoints land, this is a footgun.new_routerinstalls noDefaultBodyLimit, noTimeoutLayer, and no concurrency cap. Combined with the unbounded await above, slowloris-style requests are trivially possible.- All non-
Okupstream variants (BadRequest/InternalServerError/ServiceUnavailable/Unknown) are collapsed under 502. A syncing upstream (503) should propagate as 503 so the VC retries with backoff; a 400 from upstream means our forwarded request was bad. - Test coverage is missing for: cancellation/timeout on
attestation_data, the upstream non-Ok variants, malformedslot/committee_indexquery params, and emptyindicesbody.
Minors and a nit inline. Verdict: REQUEST_CHANGES — please address the two bug-level findings before merge; majors can land in a follow-up or this PR at your discretion.
Bug fixes (must-fix per review):
- attestation_data: wrap MemDB::await_attestation in tokio::time::timeout
(24s) so a request for a slot that never produces consensus output
cannot hold a handler task indefinitely. delete_duty now records
evicted keys per duty type and notifies waiters, so await_data returns
Error::AwaitDutyExpired immediately when the awaited duty is gone
instead of spinning until the timeout fires. Maps to 408 on the wire.
- Stop leaking upstream BlindedBlock400Response Debug output (incl.
stacktraces) into the client-visible ApiError.message. The variant
payload is now attached as `source` for debug logs; the message stays
generic.
Hardening:
- new_insecure is gated behind #[cfg(test)] so the insecure_test flag
cannot reach production builds.
- new_router applies DefaultBodyLimit::max(64 KiB) on the two
POST /duties/{attester,sync}/{epoch} routes — defends against the
Vec<u64> parse amplification on the ValIndexes deserializer.
- All upstream eth2_cl calls are wrapped in tokio::time::timeout(12s)
so a hanging beacon node cannot stall handler tasks.
- proposer_duties / attester_duties / sync_committee_duties propagate
upstream BadRequest as 400 and ServiceUnavailable as 503 instead of
collapsing every non-Ok variant to 502 — the VC can now back off on
upstream syncing instead of treating it as a gateway failure.
- swap_attester_pubshares / swap_sync_committee_pubshares now return
500 (cluster misconfig) instead of 502 when a pubshare is missing —
the upstream returned well-formed data, the failure is local.
ValIndexes:
- Replace #[serde(untagged)] with a streaming Visitor that validates
each element via SeqAccess::next_element. Avoids the speculative
Vec<u64> parse and the serde Content cache. Now accepts mixed
numeric/string elements and rejects negative integers.
- Hard cap at 8192 indices per request.
ApiError:
- with_boxed_source for sources that aren't std::error::Error (e.g.
anyhow::Error from auto-gen request builders).
Router:
- attestation_data uses Result<Query<...>, QueryRejection> so 4xx
responses from missing/malformed query params share the same
{ code, message } envelope as the rest of the router.
Tests (+13):
- attestation_data: timeout when data never arrives; 408 when duty is
evicted while a waiter is parked; cancellation cleanup when the
handler future is dropped; negative lookup on wrong committee_index.
- Status-mapping helpers: confirm upstream Debug output is never
serialized into the message.
- Router: ApiError envelope on bad query; oversized body rejection;
ValIndexes empty/mixed/oversized/negative cases.
Co-Authored-By: Bohdan Ohorodnii <35969035+varex83@users.noreply.github.com>
Bug fixes (must-fix per review):
- attestation_data: wrap MemDB::await_attestation in tokio::time::timeout
(24s) so a request for a slot that never produces consensus output
cannot hold a handler task indefinitely. delete_duty now records
evicted keys per duty type and notifies waiters, so await_data returns
Error::AwaitDutyExpired immediately when the awaited duty is gone
instead of spinning until the timeout fires. Maps to 408 on the wire.
- Stop leaking upstream BlindedBlock400Response Debug output (incl.
stacktraces) into the client-visible ApiError.message. The variant
payload is now attached as `source` for debug logs; the message stays
generic.
Hardening:
- new_insecure is gated behind #[cfg(test)] so the insecure_test flag
cannot reach production builds.
- new_router applies DefaultBodyLimit::max(64 KiB) on the two
POST /duties/{attester,sync}/{epoch} routes — defends against the
Vec<u64> parse amplification on the ValIndexes deserializer.
- All upstream eth2_cl calls are wrapped in tokio::time::timeout(12s)
so a hanging beacon node cannot stall handler tasks.
- proposer_duties / attester_duties / sync_committee_duties propagate
upstream BadRequest as 400 and ServiceUnavailable as 503 instead of
collapsing every non-Ok variant to 502 — the VC can now back off on
upstream syncing instead of treating it as a gateway failure.
- swap_attester_pubshares / swap_sync_committee_pubshares now return
500 (cluster misconfig) instead of 502 when a pubshare is missing —
the upstream returned well-formed data, the failure is local.
ValIndexes:
- Replace #[serde(untagged)] with a streaming Visitor that validates
each element via SeqAccess::next_element. Avoids the speculative
Vec<u64> parse and the serde Content cache. Now accepts mixed
numeric/string elements and rejects negative integers.
- Hard cap at 8192 indices per request.
ApiError:
- with_boxed_source for sources that aren't std::error::Error (e.g.
anyhow::Error from auto-gen request builders).
Router:
- attestation_data uses Result<Query<...>, QueryRejection> so 4xx
responses from missing/malformed query params share the same
{ code, message } envelope as the rest of the router.
Tests (+13):
- attestation_data: timeout when data never arrives; 408 when duty is
evicted while a waiter is parked; cancellation cleanup when the
handler future is dropped; negative lookup on wrong committee_index.
- Status-mapping helpers: confirm upstream Debug output is never
serialized into the message.
- Router: ApiError envelope on bad query; oversized body rejection;
ValIndexes empty/mixed/oversized/negative cases.
Co-Authored-By: Bohdan Ohorodnii <35969035+varex83@users.noreply.github.com>
33221ad to
5da9093
Compare
|
@varex83agent /review-pr |
varex83agent
left a comment
There was a problem hiding this comment.
This PR wires up three validator-API handlers (attester_duties, sync_committee_duties, attestation_data) and is generally high quality — the hardening from the prior review round is solid (request timeouts, 64 KiB body cap, streaming ValIndexes visitor with an 8192 cap, no info leak into client-visible error messages, new_insecure gated to #[cfg(test)]), the await_data refactor is free of the classic lost-wakeup race (notified() is created before the lock-check), and the pubshare-swap parity with Charon (reject-unknown for attester/sync, pass-through for proposer) is correct. Test coverage is good.
Two must-fix issues before merge:
- Unbounded memory growth in the
evicted_*HashSets (dutydb/memory.rs). The four eviction-tracking sets introduced in the fix commit are only ever inserted into and never pruned, so every evicted slot/key is retained for the entire process lifetime — an unbounded leak in a long-running daemon and a divergence from Charon, which keeps no such set. This optimization needs a bound (e.g. amin_live_slotwatermark, or pruning below a retention horizon). - Missing
committee_index=0dual-write breaks post-Electra VCs (dutydb/memory.rs). Charon'sstoreAttestationUnsafedeliberately stores each attestation twice — under the real committee index and under index 0 — so post-Electra VCs that requestattestation_data?committee_index=0resolve. Pluto'sstore_attestationwrites only the real index; this PR makes theattestation_dataendpoint live, so a conformant Electra VC asking for index 0 will now time out (408) and fail to attest. (Root cause is in pre-existing store logic, but it becomes reachable here.)
The remaining findings are minor/nit: an error-envelope inconsistency on the {epoch} path param, a ValIndexes parity superset (mixed numeric/string accepted where Charon is all-or-nothing), the std::io::Error::other debug-string carrier smell, and the fact that the source attached "for debug logging" is never actually logged. Requesting changes for the two majors; everything else is author discretion.
The evicted_* HashSets in MemDB were insert-only: delete_duty added an entry per evicted slot/aggregation/contrib key and nothing ever removed them, so on a long-running node they grew without bound (one entry per 12s slot, forever). Charon's Go dutydb keeps no eviction record at all and relies on the per-request context timeout. Because the deadliner expires duties in non-decreasing slot order and store() refuses already-expired duties, the highest evicted slot is sufficient: any awaited slot <= the mark that is not currently stored will never be stored. Replace the three slot-keyed sets with O(1) Option<u64> high-water marks (attester/proposer/sync-contribution), preserving the immediate-AwaitDutyExpired fast path. Aggregated attestations are awaited by root only (no slot at await time), so they cannot consult a slot mark; drop their eviction tracking and let an evicted root rely on the request timeout, matching Go. Add a regression test asserting an evicted slot and any older slot fail fast with AwaitDutyExpired without parking. Co-Authored-By: varex83 <ivan@nethermind.io>
attester_duties and sync_committee_duties extracted the body as bare
`Json<ValIndexes>`, so a malformed body produced axum's default
plain-text rejection — 400 for a JSON syntax error but 422 for a wrong
type — instead of the router's standard {code,message} envelope. Charon's
unmarshal returns a uniform 400 with its apiError body for every request
body parse failure.
Extract as `Result<Json<ValIndexes>, JsonRejection>` and map through a new
json_rejection_to_api_error, mirroring the existing query-rejection path.
Genuine parse failures normalise to 400; the DefaultBodyLimit size cap
still surfaces as 413 (Pluto's DoS defense), preserved explicitly. Add a
router test asserting the 400 envelope for a wrong-shape body.
Co-Authored-By: varex83 <ivan@nethermind.io>
/loop-review-pr summaryRan 3 review-and-fix iterations (plus 2 verification passes) against this PR, each driving the same four-agent pipeline as Terminated by: completion (PR reached ideal state) — internal review produced zero Quality gates (final, full workspace,
|
Resolve conflicts in crates/core/src/validatorapi/{component,handler,router,testutils,types}.rs.
Main re-landed the node_version + proposer_duties scaffold via #449 with
EthResponse<Vec<...>> placeholder return types and unimplemented stubs
for attester_duties / sync_committee_duties / attestation_data. This
branch had already implemented all three handlers on top of the
attester_duties / sync_committee_duties / attestation_data work plus
typed *Response wrappers, timeout-bounded upstream calls, status-mapping
helpers, ValIndexes streaming deserializer, and DutyDB integration.
Taking ours for all five files preserves the implemented handlers; main's
side was a strict subset (the same scaffold without the new handlers).
error.rs auto-merged with both with_source and with_boxed_source intact.
Verified: cargo check --workspace passes; pluto-core clippy passes; the
28 validatorapi tests pass.
Co-Authored-By: varex83 <ivan@nethermind.io>
|
|
||
| /// Hard deadline for upstream beacon-node calls. Bounds the worst-case | ||
| /// handler latency when the upstream hangs or stalls. Roughly one slot. | ||
| const UPSTREAM_REQUEST_TIMEOUT: Duration = Duration::from_secs(12); |
There was a problem hiding this comment.
Why 12? Should be 10 seconds right?
defaultRequestTimeout = 10 * time.Second
There was a problem hiding this comment.
Good catch — fixed in 12e2263 (timeout 12s → 10s, doc comment now points to core/validatorapi/router.go:61).
| /// instead of axum's default plain-text response. | ||
| /// | ||
| /// Every genuine parse failure — malformed JSON (`400`), wrong element type | ||
| /// (`422`), missing `content-type` (`415`) — is normalised to a uniform |
There was a problem hiding this comment.
missing content-type is accepted in charon
if contentHeader == "" || strings.Contains(contentHeader, string(contentTypeJSON)) {
typ = contentTypeJSON
} else if strings.Contains(contentHeader, string(contentTypeSSZ)) {
typ = contentTypeSSZ
} else {
writeError(ctx, w, endpoint, apiError{
StatusCode: http.StatusUnsupportedMediaType,
Message: "unsupported media type " + contentHeader,
})
return
}
There was a problem hiding this comment.
Fixed in 12e2263 — added an enforce_json_content_type middleware on the duties POST routes that mirrors Charon's policy (missing or JSON Content-Type passes; anything else → 415 with the offending value in the message). SSZ is not supported yet, so for now non-JSON is rejected; the middleware is the right seam to add SSZ later. Two new router tests cover the missing-CT and non-JSON cases.
Addresses inline review comments from @iamquang95 on PR #451. UPSTREAM_REQUEST_TIMEOUT was 12s — set to 10s to match Charon's defaultRequestTimeout (core/validatorapi/router.go:61). POST /eth/v1/validator/duties/{attester,sync}/{epoch} now matches Charon's content-type policy via a new enforce_json_content_type middleware layered onto duties_post: - missing Content-Type → treated as application/json (Charon parity) - application/json → passes through to the existing Json extractor - anything else → 415 Unsupported Media Type with the offending value Previously axum's Json extractor rejected a missing header with MissingJsonContentType, which json_rejection_to_api_error normalised to 400 — diverging from Charon, which lets VCs that don't set the header through. Non-JSON Content-Type was also collapsed to 400; it is now the 415 Charon returns. Updated json_rejection_to_api_error doc comment: it no longer sees content-type rejections (the middleware intercepts them upstream). Tests (+2): - attester_duties_accepts_missing_content_type asserts 200 when Content-Type is absent. - attester_duties_rejects_non_json_content_type asserts 415 with the offending media type echoed in the message. Co-Authored-By: varex83 <ivan@nethermind.io>
Summary
Implements three validator-API handlers, stacked over PR #449.
POST /eth/v1/validator/duties/attester/{epoch}) — uses auto-genGetAttesterDuties*envelope, accepts both numeric and string-encoded validator index arrays via a newValIndexesdual-format deserializer, performs pubshare swap on the upstream response (unknown pubkey rejected).POST /eth/v1/validator/duties/sync/{epoch}) — same shape as above, reusesValIndexes.GET /eth/v1/validator/attestation_data) — query-stringslot/committee_index, served from the local DutyDB (MemDB::await_attestation) rather than upstream. Component now holds anArc<MemDB>.All three return the auto-generated response envelope directly;
attestation_datauses a small hand-rolled wrapper aroundphase0::AttestationDatato avoid pointless typed-↔-string conversion.Test plan
cargo +nightly fmt --all --checkcargo clippy -p pluto-core --all-targets --all-features -- -D warningscargo test -p pluto-core validatorapi::— 14/14 passing (5 new tests across router + component)