Skip to content

Commit 33221ad

Browse files
varex83agentvarex83
andcommitted
fix(core): address PR #451 review feedback
Bug fixes (must-fix per review): - attestation_data: wrap MemDB::await_attestation in tokio::time::timeout (24s) so a request for a slot that never produces consensus output cannot hold a handler task indefinitely. delete_duty now records evicted keys per duty type and notifies waiters, so await_data returns Error::AwaitDutyExpired immediately when the awaited duty is gone instead of spinning until the timeout fires. Maps to 408 on the wire. - Stop leaking upstream BlindedBlock400Response Debug output (incl. stacktraces) into the client-visible ApiError.message. The variant payload is now attached as `source` for debug logs; the message stays generic. Hardening: - new_insecure is gated behind #[cfg(test)] so the insecure_test flag cannot reach production builds. - new_router applies DefaultBodyLimit::max(64 KiB) on the two POST /duties/{attester,sync}/{epoch} routes — defends against the Vec<u64> parse amplification on the ValIndexes deserializer. - All upstream eth2_cl calls are wrapped in tokio::time::timeout(12s) so a hanging beacon node cannot stall handler tasks. - proposer_duties / attester_duties / sync_committee_duties propagate upstream BadRequest as 400 and ServiceUnavailable as 503 instead of collapsing every non-Ok variant to 502 — the VC can now back off on upstream syncing instead of treating it as a gateway failure. - swap_attester_pubshares / swap_sync_committee_pubshares now return 500 (cluster misconfig) instead of 502 when a pubshare is missing — the upstream returned well-formed data, the failure is local. ValIndexes: - Replace #[serde(untagged)] with a streaming Visitor that validates each element via SeqAccess::next_element. Avoids the speculative Vec<u64> parse and the serde Content cache. Now accepts mixed numeric/string elements and rejects negative integers. - Hard cap at 8192 indices per request. ApiError: - with_boxed_source for sources that aren't std::error::Error (e.g. anyhow::Error from auto-gen request builders). Router: - attestation_data uses Result<Query<...>, QueryRejection> so 4xx responses from missing/malformed query params share the same { code, message } envelope as the rest of the router. Tests (+13): - attestation_data: timeout when data never arrives; 408 when duty is evicted while a waiter is parked; cancellation cleanup when the handler future is dropped; negative lookup on wrong committee_index. - Status-mapping helpers: confirm upstream Debug output is never serialized into the message. - Router: ApiError envelope on bad query; oversized body rejection; ValIndexes empty/mixed/oversized/negative cases. Co-Authored-By: Bohdan Ohorodnii <35969035+varex83@users.noreply.github.com>
1 parent 26675fe commit 33221ad

8 files changed

Lines changed: 838 additions & 151 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ tree_hash_derive = "0.12"
106106
tar = "0.4"
107107
flate2 = "1.1"
108108
wiremock = "0.6"
109+
tower = "0.5"
109110
sysinfo = "0.33"
110111
quick-xml = { version = "0.39", features = ["serialize"] }
111112

crates/core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ pluto-testutil.workspace = true
5555
pluto-tracing.workspace = true
5656
tokio = { workspace = true, features = ["test-util"] }
5757
wiremock.workspace = true
58+
tower = { workspace = true, features = ["util"] }
5859

5960
[build-dependencies]
6061
pluto-build-proto.workspace = true

crates/core/src/dutydb/memory.rs

Lines changed: 119 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//!
33
//! Equivalent to charon/core/dutydb/memory.go.
44
5-
use std::collections::HashMap;
5+
use std::collections::{HashMap, HashSet};
66

77
use pluto_eth2api::{
88
spec::{altair, phase0},
@@ -49,6 +49,12 @@ pub enum Error {
4949
#[error("dutydb shutdown: query could not be answered")]
5050
Shutdown,
5151

52+
/// The awaited duty was evicted before its unsigned data became
53+
/// available. Distinct from `Shutdown` so callers can map this to a
54+
/// timeout-style error rather than a service-down error.
55+
#[error("dutydb: awaited duty expired before data was stored")]
56+
AwaitDutyExpired,
57+
5258
/// Two validators share the same `(slot, committee_index, valIdx)` with
5359
/// different public keys.
5460
#[error(
@@ -177,6 +183,17 @@ struct ContribKey {
177183
root: phase0::Root,
178184
}
179185

186+
/// Per-poll outcome handed back by an `await_data` lookup closure.
187+
enum Lookup<V> {
188+
/// The awaited value is now present — return it to the caller.
189+
Found(V),
190+
/// The awaited duty has been evicted; the lookup will never succeed.
191+
/// `await_data` returns [`Error::AwaitDutyExpired`].
192+
Evicted,
193+
/// Neither stored nor evicted yet — park on the notify and retry.
194+
Pending,
195+
}
196+
180197
struct State {
181198
attestation_duties: HashMap<AttKey, phase0::AttestationData>,
182199
attestation_pub_keys: HashMap<PkKey, PubKey>,
@@ -190,6 +207,18 @@ struct State {
190207
contrib_duties: HashMap<ContribKey, altair::SyncCommitteeContribution>,
191208
contrib_keys_by_slot: HashMap<u64, Vec<ContribKey>>,
192209

210+
/// Slots whose attester duty has been evicted by the deadliner. Lets
211+
/// `await_attestation` return `AwaitDutyExpired` immediately when the
212+
/// awaited slot is gone, rather than spinning on every `store()` until
213+
/// the request-level timeout fires.
214+
evicted_attestation_slots: HashSet<u64>,
215+
/// Slots whose proposer duty has been evicted.
216+
evicted_proposer_slots: HashSet<u64>,
217+
/// Aggregation roots whose duty has been evicted.
218+
evicted_aggregation_keys: HashSet<AggKey>,
219+
/// Sync contribution keys whose duty has been evicted.
220+
evicted_contrib_keys: HashSet<ContribKey>,
221+
193222
deadliner_rx: tokio::sync::mpsc::Receiver<Duty>,
194223
}
195224

@@ -225,6 +254,10 @@ impl MemDB {
225254
aggregation_keys_by_slot: HashMap::new(),
226255
contrib_duties: HashMap::new(),
227256
contrib_keys_by_slot: HashMap::new(),
257+
evicted_attestation_slots: HashSet::new(),
258+
evicted_proposer_slots: HashSet::new(),
259+
evicted_aggregation_keys: HashSet::new(),
260+
evicted_contrib_keys: HashSet::new(),
228261
deadliner_rx,
229262
}),
230263
attestation_notify: Notify::new(),
@@ -307,19 +340,55 @@ impl MemDB {
307340
_ => return Err(Error::UnsupportedDutyType),
308341
}
309342

310-
// Drain all expired duties that the deadliner has sent.
343+
// Drain all expired duties that the deadliner has sent. Track which
344+
// duty types were touched so the right `Notify` is woken at the end —
345+
// waiters whose duty just expired need to see the `Evicted` flag and
346+
// exit, not re-park.
347+
let mut woke_attestation = false;
348+
let mut woke_proposer = false;
349+
let mut woke_aggregation = false;
350+
let mut woke_contrib = false;
311351
while let Ok(expired) = state.deadliner_rx.try_recv() {
352+
match expired.duty_type {
353+
DutyType::Proposer => woke_proposer = true,
354+
DutyType::Attester => woke_attestation = true,
355+
DutyType::Aggregator => woke_aggregation = true,
356+
DutyType::SyncContribution => woke_contrib = true,
357+
_ => {}
358+
}
312359
state.delete_duty(expired)?;
313360
}
361+
drop(state);
362+
363+
if woke_proposer {
364+
self.proposer_notify.notify_waiters();
365+
}
366+
if woke_attestation {
367+
self.attestation_notify.notify_waiters();
368+
}
369+
if woke_aggregation {
370+
self.aggregation_notify.notify_waiters();
371+
}
372+
if woke_contrib {
373+
self.contrib_notify.notify_waiters();
374+
}
314375

315376
Ok(())
316377
}
317378

318379
/// Blocks until a proposal for the given slot is available, then returns
319380
/// it.
320381
pub async fn await_proposal(&self, slot: u64) -> Result<VersionedProposal> {
321-
self.await_data(&self.proposer_notify, |s| s.proposer_duties.get(&slot))
322-
.await
382+
self.await_data(&self.proposer_notify, |s| {
383+
if let Some(v) = s.proposer_duties.get(&slot) {
384+
Lookup::Found(v.clone())
385+
} else if s.evicted_proposer_slots.contains(&slot) {
386+
Lookup::Evicted
387+
} else {
388+
Lookup::Pending
389+
}
390+
})
391+
.await
323392
}
324393

325394
/// Blocks until attestation data for the given slot and committee index is
@@ -333,8 +402,16 @@ impl MemDB {
333402
slot,
334403
committee_index,
335404
};
336-
self.await_data(&self.attestation_notify, |s| s.attestation_duties.get(&key))
337-
.await
405+
self.await_data(&self.attestation_notify, |s| {
406+
if let Some(v) = s.attestation_duties.get(&key) {
407+
Lookup::Found(v.clone())
408+
} else if s.evicted_attestation_slots.contains(&key.slot) {
409+
Lookup::Evicted
410+
} else {
411+
Lookup::Pending
412+
}
413+
})
414+
.await
338415
}
339416

340417
/// Blocks until an aggregated attestation for the given slot and
@@ -347,7 +424,13 @@ impl MemDB {
347424
root: attestation_root,
348425
};
349426
self.await_data(&self.aggregation_notify, |s| {
350-
s.aggregation_duties.get(&key).map(|a| &a.0)
427+
if let Some(v) = s.aggregation_duties.get(&key) {
428+
Lookup::Found(v.0.clone())
429+
} else if s.evicted_aggregation_keys.contains(&key) {
430+
Lookup::Evicted
431+
} else {
432+
Lookup::Pending
433+
}
351434
})
352435
.await
353436
}
@@ -365,31 +448,43 @@ impl MemDB {
365448
subcommittee_index,
366449
root: beacon_block_root,
367450
};
368-
self.await_data(&self.contrib_notify, |s| s.contrib_duties.get(&key))
369-
.await
451+
self.await_data(&self.contrib_notify, |s| {
452+
if let Some(v) = s.contrib_duties.get(&key) {
453+
Lookup::Found(v.clone())
454+
} else if s.evicted_contrib_keys.contains(&key) {
455+
Lookup::Evicted
456+
} else {
457+
Lookup::Pending
458+
}
459+
})
460+
.await
370461
}
371462

372463
// A single Notify per duty type wakes all waiters on every store, not only
373464
// those whose key matches. The number of concurrent waiters per duty type
374465
// is small (one per validator), so the extra wakeups are cheap. A keyed
375466
// notify (HashMap<Key, Sender>) would avoid them but adds complexity that
376467
// isn't worth it here.
468+
//
469+
// `delete_duty` also wakes the notify so waiters whose duty just expired
470+
// exit immediately via the `Lookup::Evicted` branch, instead of parking
471+
// for another `notify_waiters` call or for the per-request timeout in
472+
// the caller.
377473
async fn await_data<V>(
378474
&self,
379475
notify: &Notify,
380-
lookup: impl for<'s> Fn(&'s State) -> Option<&'s V>,
381-
) -> Result<V>
382-
where
383-
V: Clone,
384-
{
476+
lookup: impl Fn(&State) -> Lookup<V>,
477+
) -> Result<V> {
385478
loop {
386479
let notified = notify.notified();
387480
tokio::pin!(notified);
388481

389482
{
390483
let state = self.state.read().await;
391-
if let Some(v) = lookup(&state) {
392-
return Ok(v.clone());
484+
match lookup(&state) {
485+
Lookup::Found(v) => return Ok(v),
486+
Lookup::Evicted => return Err(Error::AwaitDutyExpired),
487+
Lookup::Pending => {}
393488
}
394489
}
395490

@@ -577,6 +672,7 @@ impl State {
577672
match duty.duty_type {
578673
DutyType::Proposer => {
579674
self.proposer_duties.remove(&slot);
675+
self.evicted_proposer_slots.insert(slot);
580676
}
581677
DutyType::BuilderProposer => return Err(Error::DeprecatedDutyBuilderProposer),
582678
DutyType::Attester => {
@@ -589,19 +685,22 @@ impl State {
589685
});
590686
}
591687
}
688+
self.evicted_attestation_slots.insert(slot);
592689
}
593690
DutyType::Aggregator => {
594691
if let Some(keys) = self.aggregation_keys_by_slot.remove(&slot) {
595-
for key in keys {
596-
self.aggregation_duties.remove(&key);
692+
for key in &keys {
693+
self.aggregation_duties.remove(key);
597694
}
695+
self.evicted_aggregation_keys.extend(keys);
598696
}
599697
}
600698
DutyType::SyncContribution => {
601699
if let Some(keys) = self.contrib_keys_by_slot.remove(&slot) {
602-
for key in keys {
603-
self.contrib_duties.remove(&key);
700+
for key in &keys {
701+
self.contrib_duties.remove(key);
604702
}
703+
self.evicted_contrib_keys.extend(keys);
605704
}
606705
}
607706
_ => return Err(Error::UnknownDutyType),

0 commit comments

Comments
 (0)