Skip to content

Commit 5da9093

Browse files
varex83agentvarex83
andcommitted
fix(core): address PR #451 review feedback
Bug fixes (must-fix per review): - attestation_data: wrap MemDB::await_attestation in tokio::time::timeout (24s) so a request for a slot that never produces consensus output cannot hold a handler task indefinitely. delete_duty now records evicted keys per duty type and notifies waiters, so await_data returns Error::AwaitDutyExpired immediately when the awaited duty is gone instead of spinning until the timeout fires. Maps to 408 on the wire. - Stop leaking upstream BlindedBlock400Response Debug output (incl. stacktraces) into the client-visible ApiError.message. The variant payload is now attached as `source` for debug logs; the message stays generic. Hardening: - new_insecure is gated behind #[cfg(test)] so the insecure_test flag cannot reach production builds. - new_router applies DefaultBodyLimit::max(64 KiB) on the two POST /duties/{attester,sync}/{epoch} routes — defends against the Vec<u64> parse amplification on the ValIndexes deserializer. - All upstream eth2_cl calls are wrapped in tokio::time::timeout(12s) so a hanging beacon node cannot stall handler tasks. - proposer_duties / attester_duties / sync_committee_duties propagate upstream BadRequest as 400 and ServiceUnavailable as 503 instead of collapsing every non-Ok variant to 502 — the VC can now back off on upstream syncing instead of treating it as a gateway failure. - swap_attester_pubshares / swap_sync_committee_pubshares now return 500 (cluster misconfig) instead of 502 when a pubshare is missing — the upstream returned well-formed data, the failure is local. ValIndexes: - Replace #[serde(untagged)] with a streaming Visitor that validates each element via SeqAccess::next_element. Avoids the speculative Vec<u64> parse and the serde Content cache. Now accepts mixed numeric/string elements and rejects negative integers. - Hard cap at 8192 indices per request. ApiError: - with_boxed_source for sources that aren't std::error::Error (e.g. anyhow::Error from auto-gen request builders). Router: - attestation_data uses Result<Query<...>, QueryRejection> so 4xx responses from missing/malformed query params share the same { code, message } envelope as the rest of the router. Tests (+13): - attestation_data: timeout when data never arrives; 408 when duty is evicted while a waiter is parked; cancellation cleanup when the handler future is dropped; negative lookup on wrong committee_index. - Status-mapping helpers: confirm upstream Debug output is never serialized into the message. - Router: ApiError envelope on bad query; oversized body rejection; ValIndexes empty/mixed/oversized/negative cases. Co-Authored-By: Bohdan Ohorodnii <35969035+varex83@users.noreply.github.com>
1 parent 26675fe commit 5da9093

8 files changed

Lines changed: 834 additions & 156 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ tree_hash_derive = "0.12"
106106
tar = "0.4"
107107
flate2 = "1.1"
108108
wiremock = "0.6"
109+
tower = "0.5"
109110
sysinfo = "0.33"
110111
quick-xml = { version = "0.39", features = ["serialize"] }
111112

crates/core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ pluto-testutil.workspace = true
5555
pluto-tracing.workspace = true
5656
tokio = { workspace = true, features = ["test-util"] }
5757
wiremock.workspace = true
58+
tower = { workspace = true, features = ["util"] }
5859

5960
[build-dependencies]
6061
pluto-build-proto.workspace = true

crates/core/src/dutydb/memory.rs

Lines changed: 115 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//!
33
//! Equivalent to charon/core/dutydb/memory.go.
44
5-
use std::collections::HashMap;
5+
use std::collections::{HashMap, HashSet};
66

77
use pluto_eth2api::{
88
spec::{altair, phase0},
@@ -49,6 +49,12 @@ pub enum Error {
4949
#[error("dutydb shutdown: query could not be answered")]
5050
Shutdown,
5151

52+
/// The awaited duty was evicted before its unsigned data became
53+
/// available. Distinct from `Shutdown` so callers can map this to a
54+
/// timeout-style error rather than a service-down error.
55+
#[error("dutydb: awaited duty expired before data was stored")]
56+
AwaitDutyExpired,
57+
5258
/// Two validators share the same `(slot, committee_index, valIdx)` with
5359
/// different public keys.
5460
#[error(
@@ -177,6 +183,17 @@ struct ContribKey {
177183
root: phase0::Root,
178184
}
179185

186+
/// Per-poll outcome handed back by an `await_data` lookup closure.
187+
enum Lookup<V> {
188+
/// The awaited value is now present — return it to the caller.
189+
Found(V),
190+
/// The awaited duty has been evicted; the lookup will never succeed.
191+
/// `await_data` returns [`Error::AwaitDutyExpired`].
192+
Evicted,
193+
/// Neither stored nor evicted yet — park on the notify and retry.
194+
Pending,
195+
}
196+
180197
struct State {
181198
attestation_duties: HashMap<AttKey, phase0::AttestationData>,
182199
attestation_pub_keys: HashMap<PkKey, PubKey>,
@@ -190,6 +207,18 @@ struct State {
190207
contrib_duties: HashMap<ContribKey, altair::SyncCommitteeContribution>,
191208
contrib_keys_by_slot: HashMap<u64, Vec<ContribKey>>,
192209

210+
/// Slots whose attester duty has been evicted by the deadliner. Lets
211+
/// `await_attestation` return `AwaitDutyExpired` immediately when the
212+
/// awaited slot is gone, rather than spinning on every `store()` until
213+
/// the request-level timeout fires.
214+
evicted_attestation_slots: HashSet<u64>,
215+
/// Slots whose proposer duty has been evicted.
216+
evicted_proposer_slots: HashSet<u64>,
217+
/// Aggregation roots whose duty has been evicted.
218+
evicted_aggregation_keys: HashSet<AggKey>,
219+
/// Sync contribution keys whose duty has been evicted.
220+
evicted_contrib_keys: HashSet<ContribKey>,
221+
193222
deadliner_rx: tokio::sync::mpsc::Receiver<Duty>,
194223
}
195224

@@ -225,6 +254,10 @@ impl MemDB {
225254
aggregation_keys_by_slot: HashMap::new(),
226255
contrib_duties: HashMap::new(),
227256
contrib_keys_by_slot: HashMap::new(),
257+
evicted_attestation_slots: HashSet::new(),
258+
evicted_proposer_slots: HashSet::new(),
259+
evicted_aggregation_keys: HashSet::new(),
260+
evicted_contrib_keys: HashSet::new(),
228261
deadliner_rx,
229262
}),
230263
attestation_notify: Notify::new(),
@@ -272,7 +305,6 @@ impl MemDB {
272305
Some(UnsignedDutyData::Proposal(p)) => state.store_proposal(p)?,
273306
Some(_) => return Err(Error::InvalidVersionedProposal),
274307
}
275-
self.proposer_notify.notify_waiters();
276308
}
277309
DutyType::Attester => {
278310
for (pubkey, data) in &unsigned_set {
@@ -282,7 +314,6 @@ impl MemDB {
282314
};
283315
state.store_attestation(*pubkey, att)?;
284316
}
285-
self.attestation_notify.notify_waiters();
286317
}
287318
DutyType::Aggregator => {
288319
for data in unsigned_set.values() {
@@ -292,7 +323,6 @@ impl MemDB {
292323
};
293324
state.store_agg_attestation(agg)?;
294325
}
295-
self.aggregation_notify.notify_waiters();
296326
}
297327
DutyType::SyncContribution => {
298328
for data in unsigned_set.values() {
@@ -302,24 +332,54 @@ impl MemDB {
302332
};
303333
state.store_sync_contribution(contrib)?;
304334
}
305-
self.contrib_notify.notify_waiters();
306335
}
307336
_ => return Err(Error::UnsupportedDutyType),
308337
}
309-
310-
// Drain all expired duties that the deadliner has sent.
338+
// Wake the matching notify for the duty we just stored, plus
339+
// anything we drain below. `notify_waiters` is cheap if no one is
340+
// parked and just bumps a counter, so calling it under the write
341+
// lock is harmless — woken tasks block on `state.read()` until we
342+
// drop.
343+
self.wake(duty.duty_type);
344+
345+
// Drain all expired duties that the deadliner has sent. Waiters
346+
// whose duty just expired need to see `Lookup::Evicted` and exit,
347+
// not re-park — so we wake the matching notify after each eviction.
311348
while let Ok(expired) = state.deadliner_rx.try_recv() {
349+
let duty_type = expired.duty_type.clone();
312350
state.delete_duty(expired)?;
351+
self.wake(duty_type);
313352
}
314353

315354
Ok(())
316355
}
317356

357+
/// Wakes the [`Notify`] paired with `duty_type`. No-op for duty types
358+
/// the DB doesn't track (e.g. `Exit`, `BuilderRegistration`).
359+
fn wake(&self, duty_type: DutyType) {
360+
let notify = match duty_type {
361+
DutyType::Proposer => &self.proposer_notify,
362+
DutyType::Attester => &self.attestation_notify,
363+
DutyType::Aggregator => &self.aggregation_notify,
364+
DutyType::SyncContribution => &self.contrib_notify,
365+
_ => return,
366+
};
367+
notify.notify_waiters();
368+
}
369+
318370
/// Blocks until a proposal for the given slot is available, then returns
319371
/// it.
320372
pub async fn await_proposal(&self, slot: u64) -> Result<VersionedProposal> {
321-
self.await_data(&self.proposer_notify, |s| s.proposer_duties.get(&slot))
322-
.await
373+
self.await_data(&self.proposer_notify, |s| {
374+
if let Some(v) = s.proposer_duties.get(&slot) {
375+
Lookup::Found(v.clone())
376+
} else if s.evicted_proposer_slots.contains(&slot) {
377+
Lookup::Evicted
378+
} else {
379+
Lookup::Pending
380+
}
381+
})
382+
.await
323383
}
324384

325385
/// Blocks until attestation data for the given slot and committee index is
@@ -333,8 +393,16 @@ impl MemDB {
333393
slot,
334394
committee_index,
335395
};
336-
self.await_data(&self.attestation_notify, |s| s.attestation_duties.get(&key))
337-
.await
396+
self.await_data(&self.attestation_notify, |s| {
397+
if let Some(v) = s.attestation_duties.get(&key) {
398+
Lookup::Found(v.clone())
399+
} else if s.evicted_attestation_slots.contains(&key.slot) {
400+
Lookup::Evicted
401+
} else {
402+
Lookup::Pending
403+
}
404+
})
405+
.await
338406
}
339407

340408
/// Blocks until an aggregated attestation for the given slot and
@@ -347,7 +415,13 @@ impl MemDB {
347415
root: attestation_root,
348416
};
349417
self.await_data(&self.aggregation_notify, |s| {
350-
s.aggregation_duties.get(&key).map(|a| &a.0)
418+
if let Some(v) = s.aggregation_duties.get(&key) {
419+
Lookup::Found(v.0.clone())
420+
} else if s.evicted_aggregation_keys.contains(&key) {
421+
Lookup::Evicted
422+
} else {
423+
Lookup::Pending
424+
}
351425
})
352426
.await
353427
}
@@ -365,31 +439,43 @@ impl MemDB {
365439
subcommittee_index,
366440
root: beacon_block_root,
367441
};
368-
self.await_data(&self.contrib_notify, |s| s.contrib_duties.get(&key))
369-
.await
442+
self.await_data(&self.contrib_notify, |s| {
443+
if let Some(v) = s.contrib_duties.get(&key) {
444+
Lookup::Found(v.clone())
445+
} else if s.evicted_contrib_keys.contains(&key) {
446+
Lookup::Evicted
447+
} else {
448+
Lookup::Pending
449+
}
450+
})
451+
.await
370452
}
371453

372454
// A single Notify per duty type wakes all waiters on every store, not only
373455
// those whose key matches. The number of concurrent waiters per duty type
374456
// is small (one per validator), so the extra wakeups are cheap. A keyed
375457
// notify (HashMap<Key, Sender>) would avoid them but adds complexity that
376458
// isn't worth it here.
459+
//
460+
// `delete_duty` also wakes the notify so waiters whose duty just expired
461+
// exit immediately via the `Lookup::Evicted` branch, instead of parking
462+
// for another `notify_waiters` call or for the per-request timeout in
463+
// the caller.
377464
async fn await_data<V>(
378465
&self,
379466
notify: &Notify,
380-
lookup: impl for<'s> Fn(&'s State) -> Option<&'s V>,
381-
) -> Result<V>
382-
where
383-
V: Clone,
384-
{
467+
lookup: impl Fn(&State) -> Lookup<V>,
468+
) -> Result<V> {
385469
loop {
386470
let notified = notify.notified();
387471
tokio::pin!(notified);
388472

389473
{
390474
let state = self.state.read().await;
391-
if let Some(v) = lookup(&state) {
392-
return Ok(v.clone());
475+
match lookup(&state) {
476+
Lookup::Found(v) => return Ok(v),
477+
Lookup::Evicted => return Err(Error::AwaitDutyExpired),
478+
Lookup::Pending => {}
393479
}
394480
}
395481

@@ -577,6 +663,7 @@ impl State {
577663
match duty.duty_type {
578664
DutyType::Proposer => {
579665
self.proposer_duties.remove(&slot);
666+
self.evicted_proposer_slots.insert(slot);
580667
}
581668
DutyType::BuilderProposer => return Err(Error::DeprecatedDutyBuilderProposer),
582669
DutyType::Attester => {
@@ -589,19 +676,22 @@ impl State {
589676
});
590677
}
591678
}
679+
self.evicted_attestation_slots.insert(slot);
592680
}
593681
DutyType::Aggregator => {
594682
if let Some(keys) = self.aggregation_keys_by_slot.remove(&slot) {
595-
for key in keys {
596-
self.aggregation_duties.remove(&key);
683+
for key in &keys {
684+
self.aggregation_duties.remove(key);
597685
}
686+
self.evicted_aggregation_keys.extend(keys);
598687
}
599688
}
600689
DutyType::SyncContribution => {
601690
if let Some(keys) = self.contrib_keys_by_slot.remove(&slot) {
602-
for key in keys {
603-
self.contrib_duties.remove(&key);
691+
for key in &keys {
692+
self.contrib_duties.remove(key);
604693
}
694+
self.evicted_contrib_keys.extend(keys);
605695
}
606696
}
607697
_ => return Err(Error::UnknownDutyType),

0 commit comments

Comments
 (0)