Skip to content

Commit e75a114

Browse files
varex83varex83agentvarex83
authored
feat(core): implement attester_duties, sync_committee_duties, attestation_data handlers (#451)
* feat(core): implement validatorapi node_version handler Threads the Handler through Axum state via AppState<H> + with_state, wires the node_version route to the real handler, and adds a TestHandler mock that future PRs will extend per-endpoint. * fix: linter * feat(core): implement validatorapi proposer_duties handler (#450) Re-uses the auto-generated pluto_eth2api envelopes (GetProposerDutiesResponseResponse, GetVersionResponseResponse) as the on-the-wire shape rather than hand-rolling parallel types. node_version is migrated to the same pattern; the body.rs hand-rolled wrapper module is removed. * refactor(core): use dynamic dispatch for validatorapi Handler Drops the per-handler generic parameter and routes through Arc<dyn Handler> via AppState. The Handler trait is object-safe (Send + Sync + 'static + async_trait-generated methods), so this is a pure type change with no surface impact. * feat(core): scaffold validatorapi Component handler Adds the Handler impl that the router has been calling through. node_version returns the obolnetwork/pluto/{version}-{commit}/{arch}-{os} identity string; proposer_duties calls the upstream beacon node and rewrites known DV root public keys to this node's public share so the validator client sees keys matching its keystore. The remaining 17 trait methods are unimplemented!() stubs that land per-PR as their router handlers are ported. * feat(core): implement validatorapi attester_duties handler Wires POST /eth/v1/validator/duties/attester/{epoch}: dual-format (numeric or string-encoded) validator index body, upstream call, pubshare swap. * feat(core): implement validatorapi sync_committee_duties handler Wires POST /eth/v1/validator/duties/sync/{epoch}, reusing the ValIndexes dual-format body extractor. * feat(core): implement validatorapi attestation_data handler Wires GET /eth/v1/validator/attestation_data. The Component now holds an Arc<MemDB> and awaits unsigned attestation data from the local DutyDB rather than hitting upstream. * fix(core): address PR #451 review feedback Bug fixes (must-fix per review): - attestation_data: wrap MemDB::await_attestation in tokio::time::timeout (24s) so a request for a slot that never produces consensus output cannot hold a handler task indefinitely. delete_duty now records evicted keys per duty type and notifies waiters, so await_data returns Error::AwaitDutyExpired immediately when the awaited duty is gone instead of spinning until the timeout fires. Maps to 408 on the wire. - Stop leaking upstream BlindedBlock400Response Debug output (incl. stacktraces) into the client-visible ApiError.message. The variant payload is now attached as `source` for debug logs; the message stays generic. Hardening: - new_insecure is gated behind #[cfg(test)] so the insecure_test flag cannot reach production builds. - new_router applies DefaultBodyLimit::max(64 KiB) on the two POST /duties/{attester,sync}/{epoch} routes — defends against the Vec<u64> parse amplification on the ValIndexes deserializer. - All upstream eth2_cl calls are wrapped in tokio::time::timeout(12s) so a hanging beacon node cannot stall handler tasks. - proposer_duties / attester_duties / sync_committee_duties propagate upstream BadRequest as 400 and ServiceUnavailable as 503 instead of collapsing every non-Ok variant to 502 — the VC can now back off on upstream syncing instead of treating it as a gateway failure. - swap_attester_pubshares / swap_sync_committee_pubshares now return 500 (cluster misconfig) instead of 502 when a pubshare is missing — the upstream returned well-formed data, the failure is local. ValIndexes: - Replace #[serde(untagged)] with a streaming Visitor that validates each element via SeqAccess::next_element. Avoids the speculative Vec<u64> parse and the serde Content cache. Now accepts mixed numeric/string elements and rejects negative integers. - Hard cap at 8192 indices per request. ApiError: - with_boxed_source for sources that aren't std::error::Error (e.g. anyhow::Error from auto-gen request builders). Router: - attestation_data uses Result<Query<...>, QueryRejection> so 4xx responses from missing/malformed query params share the same { code, message } envelope as the rest of the router. Tests (+13): - attestation_data: timeout when data never arrives; 408 when duty is evicted while a waiter is parked; cancellation cleanup when the handler future is dropped; negative lookup on wrong committee_index. - Status-mapping helpers: confirm upstream Debug output is never serialized into the message. - Router: ApiError envelope on bad query; oversized body rejection; ValIndexes empty/mixed/oversized/negative cases. Co-Authored-By: Bohdan Ohorodnii <35969035+varex83@users.noreply.github.com> * fix(core): bound dutydb eviction state with high-water marks The evicted_* HashSets in MemDB were insert-only: delete_duty added an entry per evicted slot/aggregation/contrib key and nothing ever removed them, so on a long-running node they grew without bound (one entry per 12s slot, forever). Charon's Go dutydb keeps no eviction record at all and relies on the per-request context timeout. Because the deadliner expires duties in non-decreasing slot order and store() refuses already-expired duties, the highest evicted slot is sufficient: any awaited slot <= the mark that is not currently stored will never be stored. Replace the three slot-keyed sets with O(1) Option<u64> high-water marks (attester/proposer/sync-contribution), preserving the immediate-AwaitDutyExpired fast path. Aggregated attestations are awaited by root only (no slot at await time), so they cannot consult a slot mark; drop their eviction tracking and let an evicted root rely on the request timeout, matching Go. Add a regression test asserting an evicted slot and any older slot fail fast with AwaitDutyExpired without parking. Co-Authored-By: varex83 <ivan@nethermind.io> * fix(core): return {code,message} envelope for malformed duties body attester_duties and sync_committee_duties extracted the body as bare `Json<ValIndexes>`, so a malformed body produced axum's default plain-text rejection — 400 for a JSON syntax error but 422 for a wrong type — instead of the router's standard {code,message} envelope. Charon's unmarshal returns a uniform 400 with its apiError body for every request body parse failure. Extract as `Result<Json<ValIndexes>, JsonRejection>` and map through a new json_rejection_to_api_error, mirroring the existing query-rejection path. Genuine parse failures normalise to 400; the DefaultBodyLimit size cap still surfaces as 413 (Pluto's DoS defense), preserved explicitly. Add a router test asserting the 400 envelope for a wrong-shape body. Co-Authored-By: varex83 <ivan@nethermind.io> * fix(core): align validatorapi with Charon (timeout, content-type) Addresses inline review comments from @iamquang95 on PR #451. UPSTREAM_REQUEST_TIMEOUT was 12s — set to 10s to match Charon's defaultRequestTimeout (core/validatorapi/router.go:61). POST /eth/v1/validator/duties/{attester,sync}/{epoch} now matches Charon's content-type policy via a new enforce_json_content_type middleware layered onto duties_post: - missing Content-Type → treated as application/json (Charon parity) - application/json → passes through to the existing Json extractor - anything else → 415 Unsupported Media Type with the offending value Previously axum's Json extractor rejected a missing header with MissingJsonContentType, which json_rejection_to_api_error normalised to 400 — diverging from Charon, which lets VCs that don't set the header through. Non-JSON Content-Type was also collapsed to 400; it is now the 415 Charon returns. Updated json_rejection_to_api_error doc comment: it no longer sees content-type rejections (the middleware intercepts them upstream). Tests (+2): - attester_duties_accepts_missing_content_type asserts 200 when Content-Type is absent. - attester_duties_rejects_non_json_content_type asserts 415 with the offending media type echoed in the message. Co-Authored-By: varex83 <ivan@nethermind.io> --------- Co-authored-by: Bohdan Ohorodnii <273991985+varex83agent@users.noreply.github.com> Co-authored-by: varex83 <ivan@nethermind.io>
1 parent c896771 commit e75a114

10 files changed

Lines changed: 1539 additions & 108 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ tree_hash_derive = "0.12"
106106
tar = "0.4"
107107
flate2 = "1.1"
108108
wiremock = "0.6"
109+
tower = "0.5"
109110
sysinfo = "0.33"
110111
quick-xml = { version = "0.39", features = ["serialize"] }
111112

crates/core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ pluto-testutil.workspace = true
5656
pluto-tracing.workspace = true
5757
tokio = { workspace = true, features = ["test-util"] }
5858
wiremock.workspace = true
59+
tower = { workspace = true, features = ["util"] }
5960

6061
[build-dependencies]
6162
pluto-build-proto.workspace = true

crates/core/src/dutydb/memory.rs

Lines changed: 191 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,12 @@ pub enum Error {
4949
#[error("dutydb shutdown: query could not be answered")]
5050
Shutdown,
5151

52+
/// The awaited duty was evicted before its unsigned data became
53+
/// available. Distinct from `Shutdown` so callers can map this to a
54+
/// timeout-style error rather than a service-down error.
55+
#[error("dutydb: awaited duty expired before data was stored")]
56+
AwaitDutyExpired,
57+
5258
/// Two validators share the same `(slot, committee_index, valIdx)` with
5359
/// different public keys.
5460
#[error(
@@ -177,6 +183,17 @@ struct ContribKey {
177183
root: phase0::Root,
178184
}
179185

186+
/// Per-poll outcome handed back by an `await_data` lookup closure.
187+
enum Lookup<V> {
188+
/// The awaited value is now present — return it to the caller.
189+
Found(V),
190+
/// The awaited duty has been evicted; the lookup will never succeed.
191+
/// `await_data` returns [`Error::AwaitDutyExpired`].
192+
Evicted,
193+
/// Neither stored nor evicted yet — park on the notify and retry.
194+
Pending,
195+
}
196+
180197
struct State {
181198
attestation_duties: HashMap<AttKey, phase0::AttestationData>,
182199
attestation_pub_keys: HashMap<PkKey, PubKey>,
@@ -190,6 +207,27 @@ struct State {
190207
contrib_duties: HashMap<ContribKey, altair::SyncCommitteeContribution>,
191208
contrib_keys_by_slot: HashMap<u64, Vec<ContribKey>>,
192209

210+
/// Highest slot whose attester duty has been evicted by the deadliner.
211+
/// Because the deadliner expires duties in non-decreasing slot order and
212+
/// `store()` refuses already-expired duties (`AddOutcome::AlreadyExpired`),
213+
/// any awaited slot `<=` this mark that is not currently stored will never
214+
/// be stored. Tracking only the high-water mark — rather than the set of
215+
/// every evicted slot, which would grow without bound for the lifetime of
216+
/// the node — lets `await_attestation` return `AwaitDutyExpired`
217+
/// immediately for a gone duty while keeping the bookkeeping O(1) in
218+
/// memory.
219+
max_evicted_attestation_slot: Option<u64>,
220+
/// Highest slot whose proposer duty has been evicted. See
221+
/// [`max_evicted_attestation_slot`](Self::max_evicted_attestation_slot).
222+
max_evicted_proposer_slot: Option<u64>,
223+
/// Highest slot whose sync-contribution duty has been evicted. See
224+
/// [`max_evicted_attestation_slot`](Self::max_evicted_attestation_slot).
225+
max_evicted_contrib_slot: Option<u64>,
226+
// NB: there is no eviction mark for aggregated attestations. They are
227+
// awaited by root only (`await_agg_attestation` has no slot), so there is
228+
// no slot to compare against a high-water mark; an evicted root relies on
229+
// the caller's request timeout instead, matching Charon's Go dutydb which
230+
// keeps no eviction record at all.
193231
deadliner_rx: tokio::sync::mpsc::Receiver<Duty>,
194232
}
195233

@@ -225,6 +263,9 @@ impl MemDB {
225263
aggregation_keys_by_slot: HashMap::new(),
226264
contrib_duties: HashMap::new(),
227265
contrib_keys_by_slot: HashMap::new(),
266+
max_evicted_attestation_slot: None,
267+
max_evicted_proposer_slot: None,
268+
max_evicted_contrib_slot: None,
228269
deadliner_rx,
229270
}),
230271
attestation_notify: Notify::new(),
@@ -272,7 +313,6 @@ impl MemDB {
272313
Some(UnsignedDutyData::Proposal(p)) => state.store_proposal(p)?,
273314
Some(_) => return Err(Error::InvalidVersionedProposal),
274315
}
275-
self.proposer_notify.notify_waiters();
276316
}
277317
DutyType::Attester => {
278318
for (pubkey, data) in &unsigned_set {
@@ -282,7 +322,6 @@ impl MemDB {
282322
};
283323
state.store_attestation(*pubkey, att)?;
284324
}
285-
self.attestation_notify.notify_waiters();
286325
}
287326
DutyType::Aggregator => {
288327
for data in unsigned_set.values() {
@@ -292,7 +331,6 @@ impl MemDB {
292331
};
293332
state.store_agg_attestation(agg)?;
294333
}
295-
self.aggregation_notify.notify_waiters();
296334
}
297335
DutyType::SyncContribution => {
298336
for data in unsigned_set.values() {
@@ -302,24 +340,54 @@ impl MemDB {
302340
};
303341
state.store_sync_contribution(contrib)?;
304342
}
305-
self.contrib_notify.notify_waiters();
306343
}
307344
_ => return Err(Error::UnsupportedDutyType),
308345
}
309-
310-
// Drain all expired duties that the deadliner has sent.
346+
// Wake the matching notify for the duty we just stored, plus
347+
// anything we drain below. `notify_waiters` is cheap if no one is
348+
// parked and just bumps a counter, so calling it under the write
349+
// lock is harmless — woken tasks block on `state.read()` until we
350+
// drop.
351+
self.wake(duty.duty_type);
352+
353+
// Drain all expired duties that the deadliner has sent. Waiters
354+
// whose duty just expired need to see `Lookup::Evicted` and exit,
355+
// not re-park — so we wake the matching notify after each eviction.
311356
while let Ok(expired) = state.deadliner_rx.try_recv() {
357+
let duty_type = expired.duty_type.clone();
312358
state.delete_duty(expired)?;
359+
self.wake(duty_type);
313360
}
314361

315362
Ok(())
316363
}
317364

365+
/// Wakes the [`Notify`] paired with `duty_type`. No-op for duty types
366+
/// the DB doesn't track (e.g. `Exit`, `BuilderRegistration`).
367+
fn wake(&self, duty_type: DutyType) {
368+
let notify = match duty_type {
369+
DutyType::Proposer => &self.proposer_notify,
370+
DutyType::Attester => &self.attestation_notify,
371+
DutyType::Aggregator => &self.aggregation_notify,
372+
DutyType::SyncContribution => &self.contrib_notify,
373+
_ => return,
374+
};
375+
notify.notify_waiters();
376+
}
377+
318378
/// Blocks until a proposal for the given slot is available, then returns
319379
/// it.
320380
pub async fn await_proposal(&self, slot: u64) -> Result<VersionedProposal> {
321-
self.await_data(&self.proposer_notify, |s| s.proposer_duties.get(&slot))
322-
.await
381+
self.await_data(&self.proposer_notify, |s| {
382+
if let Some(v) = s.proposer_duties.get(&slot) {
383+
Lookup::Found(v.clone())
384+
} else if s.max_evicted_proposer_slot.is_some_and(|hw| slot <= hw) {
385+
Lookup::Evicted
386+
} else {
387+
Lookup::Pending
388+
}
389+
})
390+
.await
323391
}
324392

325393
/// Blocks until attestation data for the given slot and committee index is
@@ -333,8 +401,19 @@ impl MemDB {
333401
slot,
334402
committee_index,
335403
};
336-
self.await_data(&self.attestation_notify, |s| s.attestation_duties.get(&key))
337-
.await
404+
self.await_data(&self.attestation_notify, |s| {
405+
if let Some(v) = s.attestation_duties.get(&key) {
406+
Lookup::Found(v.clone())
407+
} else if s
408+
.max_evicted_attestation_slot
409+
.is_some_and(|hw| key.slot <= hw)
410+
{
411+
Lookup::Evicted
412+
} else {
413+
Lookup::Pending
414+
}
415+
})
416+
.await
338417
}
339418

340419
/// Blocks until an aggregated attestation for the given slot and
@@ -347,7 +426,14 @@ impl MemDB {
347426
root: attestation_root,
348427
};
349428
self.await_data(&self.aggregation_notify, |s| {
350-
s.aggregation_duties.get(&key).map(|a| &a.0)
429+
// Awaited by root only, so there is no slot to test against an
430+
// eviction high-water mark: an evicted root relies on the caller's
431+
// request timeout to terminate (matching Charon's Go dutydb).
432+
if let Some(v) = s.aggregation_duties.get(&key) {
433+
Lookup::Found(v.0.clone())
434+
} else {
435+
Lookup::Pending
436+
}
351437
})
352438
.await
353439
}
@@ -365,31 +451,43 @@ impl MemDB {
365451
subcommittee_index,
366452
root: beacon_block_root,
367453
};
368-
self.await_data(&self.contrib_notify, |s| s.contrib_duties.get(&key))
369-
.await
454+
self.await_data(&self.contrib_notify, |s| {
455+
if let Some(v) = s.contrib_duties.get(&key) {
456+
Lookup::Found(v.clone())
457+
} else if s.max_evicted_contrib_slot.is_some_and(|hw| slot <= hw) {
458+
Lookup::Evicted
459+
} else {
460+
Lookup::Pending
461+
}
462+
})
463+
.await
370464
}
371465

372466
// A single Notify per duty type wakes all waiters on every store, not only
373467
// those whose key matches. The number of concurrent waiters per duty type
374468
// is small (one per validator), so the extra wakeups are cheap. A keyed
375469
// notify (HashMap<Key, Sender>) would avoid them but adds complexity that
376470
// isn't worth it here.
471+
//
472+
// `delete_duty` also wakes the notify so waiters whose duty just expired
473+
// exit immediately via the `Lookup::Evicted` branch, instead of parking
474+
// for another `notify_waiters` call or for the per-request timeout in
475+
// the caller.
377476
async fn await_data<V>(
378477
&self,
379478
notify: &Notify,
380-
lookup: impl for<'s> Fn(&'s State) -> Option<&'s V>,
381-
) -> Result<V>
382-
where
383-
V: Clone,
384-
{
479+
lookup: impl Fn(&State) -> Lookup<V>,
480+
) -> Result<V> {
385481
loop {
386482
let notified = notify.notified();
387483
tokio::pin!(notified);
388484

389485
{
390486
let state = self.state.read().await;
391-
if let Some(v) = lookup(&state) {
392-
return Ok(v.clone());
487+
match lookup(&state) {
488+
Lookup::Found(v) => return Ok(v),
489+
Lookup::Evicted => return Err(Error::AwaitDutyExpired),
490+
Lookup::Pending => {}
393491
}
394492
}
395493

@@ -571,12 +669,20 @@ impl State {
571669
Ok(())
572670
}
573671

672+
/// Raises an eviction high-water mark to `slot` if `slot` is newer (or the
673+
/// mark is unset). The deadliner expires duties in non-decreasing slot
674+
/// order, so in practice this only ever moves the mark forward.
675+
fn bump_high_water(mark: &mut Option<u64>, slot: u64) {
676+
*mark = Some(mark.map_or(slot, |current| current.max(slot)));
677+
}
678+
574679
fn delete_duty(&mut self, duty: Duty) -> Result<()> {
575680
let slot = duty.slot.inner();
576681
info!(slot, duty_type = %duty.duty_type, "dutydb: deleting expired duty");
577682
match duty.duty_type {
578683
DutyType::Proposer => {
579684
self.proposer_duties.remove(&slot);
685+
Self::bump_high_water(&mut self.max_evicted_proposer_slot, slot);
580686
}
581687
DutyType::BuilderProposer => return Err(Error::DeprecatedDutyBuilderProposer),
582688
DutyType::Attester => {
@@ -589,8 +695,11 @@ impl State {
589695
});
590696
}
591697
}
698+
Self::bump_high_water(&mut self.max_evicted_attestation_slot, slot);
592699
}
593700
DutyType::Aggregator => {
701+
// No eviction mark: aggregated attestations are awaited by root
702+
// only, so there is nothing for a slot high-water mark to gate.
594703
if let Some(keys) = self.aggregation_keys_by_slot.remove(&slot) {
595704
for key in keys {
596705
self.aggregation_duties.remove(&key);
@@ -603,6 +712,7 @@ impl State {
603712
self.contrib_duties.remove(&key);
604713
}
605714
}
715+
Self::bump_high_water(&mut self.max_evicted_contrib_slot, slot);
606716
}
607717
_ => return Err(Error::UnknownDutyType),
608718
}
@@ -1166,6 +1276,67 @@ mod tests {
11661276
assert!(db.pub_key_by_attestation(SLOT, 0, 0).await.is_err());
11671277
}
11681278

1279+
/// After a slot is evicted, `await_attestation` must return
1280+
/// `AwaitDutyExpired` immediately (not park until the request timeout) for
1281+
/// that slot AND for any older slot — the eviction state is a single
1282+
/// high-water mark, so it stays O(1) in memory rather than accumulating one
1283+
/// entry per evicted slot for the lifetime of the node.
1284+
#[tokio::test]
1285+
async fn await_attestation_expired_after_eviction_high_water() {
1286+
let deadliner = far_future_handle();
1287+
let (trim_tx, trim_rx) = channel::<Duty>(64);
1288+
let db = make_db_with_deadliner(deadliner, trim_rx);
1289+
1290+
const SLOT: u64 = 123;
1291+
1292+
let mut set = UnsignedDataSet::new();
1293+
set.insert(
1294+
random_core_pub_key(),
1295+
UnsignedDutyData::Attestation(att_data(SLOT, 0, 0)),
1296+
);
1297+
db.store(Duty::new(SlotNumber::new(SLOT), DutyType::Attester), set)
1298+
.await
1299+
.unwrap();
1300+
1301+
// Evict SLOT, then trigger expiry processing with an unrelated store.
1302+
trim_tx
1303+
.send(Duty::new(SlotNumber::new(SLOT), DutyType::Attester))
1304+
.await
1305+
.expect("trim_tx should be open");
1306+
let mut set2 = UnsignedDataSet::new();
1307+
set2.insert(
1308+
random_core_pub_key(),
1309+
UnsignedDutyData::Proposal(Box::new(phase0_proposal(SLOT.saturating_add(1), 0))),
1310+
);
1311+
db.store(
1312+
Duty::new(SlotNumber::new(SLOT.saturating_add(1)), DutyType::Proposer),
1313+
set2,
1314+
)
1315+
.await
1316+
.unwrap();
1317+
1318+
// The evicted slot resolves to AwaitDutyExpired without parking.
1319+
let timeout = std::time::Duration::from_secs(5);
1320+
let evicted = tokio::time::timeout(timeout, db.await_attestation(SLOT, 0))
1321+
.await
1322+
.expect("await must not park for an evicted slot");
1323+
assert!(
1324+
matches!(evicted, Err(Error::AwaitDutyExpired)),
1325+
"evicted slot: expected AwaitDutyExpired, got {evicted:?}"
1326+
);
1327+
1328+
// An older, never-stored slot is also below the high-water mark: its
1329+
// deadline has necessarily passed too, so it must fail fast rather than
1330+
// park — and we keep no per-slot record to answer this.
1331+
let older = tokio::time::timeout(timeout, db.await_attestation(SLOT.saturating_sub(1), 0))
1332+
.await
1333+
.expect("await must not park for a slot below the eviction high-water");
1334+
assert!(
1335+
matches!(older, Err(Error::AwaitDutyExpired)),
1336+
"older slot: expected AwaitDutyExpired, got {older:?}"
1337+
);
1338+
}
1339+
11691340
#[tokio::test]
11701341
async fn agg_attestation_two_roots_same_slot() {
11711342
const SLOT: u64 = 300;

0 commit comments

Comments
 (0)