Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ tree_hash_derive = "0.12"
tar = "0.4"
flate2 = "1.1"
wiremock = "0.6"
tower = "0.5"
sysinfo = "0.33"
quick-xml = { version = "0.39", features = ["serialize"] }

Expand Down
1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pluto-testutil.workspace = true
pluto-tracing.workspace = true
tokio = { workspace = true, features = ["test-util"] }
wiremock.workspace = true
tower = { workspace = true, features = ["util"] }

[build-dependencies]
pluto-build-proto.workspace = true
Expand Down
211 changes: 191 additions & 20 deletions crates/core/src/dutydb/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ pub enum Error {
#[error("dutydb shutdown: query could not be answered")]
Shutdown,

/// The awaited duty was evicted before its unsigned data became
/// available. Distinct from `Shutdown` so callers can map this to a
/// timeout-style error rather than a service-down error.
#[error("dutydb: awaited duty expired before data was stored")]
AwaitDutyExpired,

/// Two validators share the same `(slot, committee_index, valIdx)` with
/// different public keys.
#[error(
Expand Down Expand Up @@ -177,6 +183,17 @@ struct ContribKey {
root: phase0::Root,
}

/// Per-poll outcome handed back by an `await_data` lookup closure.
enum Lookup<V> {
/// The awaited value is now present — return it to the caller.
Found(V),
/// The awaited duty has been evicted; the lookup will never succeed.
/// `await_data` returns [`Error::AwaitDutyExpired`].
Evicted,
/// Neither stored nor evicted yet — park on the notify and retry.
Pending,
}

struct State {
attestation_duties: HashMap<AttKey, phase0::AttestationData>,
attestation_pub_keys: HashMap<PkKey, PubKey>,
Expand All @@ -190,6 +207,27 @@ struct State {
contrib_duties: HashMap<ContribKey, altair::SyncCommitteeContribution>,
contrib_keys_by_slot: HashMap<u64, Vec<ContribKey>>,

/// Highest slot whose attester duty has been evicted by the deadliner.
/// Because the deadliner expires duties in non-decreasing slot order and
/// `store()` refuses already-expired duties (`AddOutcome::AlreadyExpired`),
/// any awaited slot `<=` this mark that is not currently stored will never
/// be stored. Tracking only the high-water mark — rather than the set of
/// every evicted slot, which would grow without bound for the lifetime of
/// the node — lets `await_attestation` return `AwaitDutyExpired`
/// immediately for a gone duty while keeping the bookkeeping O(1) in
/// memory.
max_evicted_attestation_slot: Option<u64>,
/// Highest slot whose proposer duty has been evicted. See
/// [`max_evicted_attestation_slot`](Self::max_evicted_attestation_slot).
max_evicted_proposer_slot: Option<u64>,
/// Highest slot whose sync-contribution duty has been evicted. See
/// [`max_evicted_attestation_slot`](Self::max_evicted_attestation_slot).
max_evicted_contrib_slot: Option<u64>,
// NB: there is no eviction mark for aggregated attestations. They are
// awaited by root only (`await_agg_attestation` has no slot), so there is
// no slot to compare against a high-water mark; an evicted root relies on
// the caller's request timeout instead, matching Charon's Go dutydb which
// keeps no eviction record at all.
deadliner_rx: tokio::sync::mpsc::Receiver<Duty>,
}

Expand Down Expand Up @@ -225,6 +263,9 @@ impl MemDB {
aggregation_keys_by_slot: HashMap::new(),
contrib_duties: HashMap::new(),
contrib_keys_by_slot: HashMap::new(),
max_evicted_attestation_slot: None,
max_evicted_proposer_slot: None,
max_evicted_contrib_slot: None,
deadliner_rx,
}),
attestation_notify: Notify::new(),
Expand Down Expand Up @@ -272,7 +313,6 @@ impl MemDB {
Some(UnsignedDutyData::Proposal(p)) => state.store_proposal(p)?,
Some(_) => return Err(Error::InvalidVersionedProposal),
}
self.proposer_notify.notify_waiters();
}
DutyType::Attester => {
for (pubkey, data) in &unsigned_set {
Expand All @@ -282,7 +322,6 @@ impl MemDB {
};
state.store_attestation(*pubkey, att)?;
}
self.attestation_notify.notify_waiters();
}
DutyType::Aggregator => {
for data in unsigned_set.values() {
Expand All @@ -292,7 +331,6 @@ impl MemDB {
};
state.store_agg_attestation(agg)?;
}
self.aggregation_notify.notify_waiters();
}
DutyType::SyncContribution => {
for data in unsigned_set.values() {
Expand All @@ -302,24 +340,54 @@ impl MemDB {
};
state.store_sync_contribution(contrib)?;
}
self.contrib_notify.notify_waiters();
}
_ => return Err(Error::UnsupportedDutyType),
}

// Drain all expired duties that the deadliner has sent.
// Wake the matching notify for the duty we just stored, plus
// anything we drain below. `notify_waiters` is cheap if no one is
// parked and just bumps a counter, so calling it under the write
// lock is harmless — woken tasks block on `state.read()` until we
// drop.
self.wake(duty.duty_type);

// Drain all expired duties that the deadliner has sent. Waiters
// whose duty just expired need to see `Lookup::Evicted` and exit,
// not re-park — so we wake the matching notify after each eviction.
while let Ok(expired) = state.deadliner_rx.try_recv() {
let duty_type = expired.duty_type.clone();
state.delete_duty(expired)?;
Comment thread
varex83 marked this conversation as resolved.
self.wake(duty_type);
}

Ok(())
}

/// Wakes the [`Notify`] paired with `duty_type`. No-op for duty types
/// the DB doesn't track (e.g. `Exit`, `BuilderRegistration`).
fn wake(&self, duty_type: DutyType) {
let notify = match duty_type {
DutyType::Proposer => &self.proposer_notify,
DutyType::Attester => &self.attestation_notify,
DutyType::Aggregator => &self.aggregation_notify,
DutyType::SyncContribution => &self.contrib_notify,
_ => return,
};
notify.notify_waiters();
}

/// Blocks until a proposal for the given slot is available, then returns
/// it.
pub async fn await_proposal(&self, slot: u64) -> Result<VersionedProposal> {
self.await_data(&self.proposer_notify, |s| s.proposer_duties.get(&slot))
.await
self.await_data(&self.proposer_notify, |s| {
if let Some(v) = s.proposer_duties.get(&slot) {
Lookup::Found(v.clone())
} else if s.max_evicted_proposer_slot.is_some_and(|hw| slot <= hw) {
Lookup::Evicted
} else {
Lookup::Pending
}
})
.await
}

/// Blocks until attestation data for the given slot and committee index is
Expand All @@ -333,8 +401,19 @@ impl MemDB {
slot,
committee_index,
};
self.await_data(&self.attestation_notify, |s| s.attestation_duties.get(&key))
.await
self.await_data(&self.attestation_notify, |s| {
if let Some(v) = s.attestation_duties.get(&key) {
Lookup::Found(v.clone())
} else if s
.max_evicted_attestation_slot
.is_some_and(|hw| key.slot <= hw)
{
Lookup::Evicted
} else {
Lookup::Pending
}
})
.await
}

/// Blocks until an aggregated attestation for the given slot and
Expand All @@ -347,7 +426,14 @@ impl MemDB {
root: attestation_root,
};
self.await_data(&self.aggregation_notify, |s| {
s.aggregation_duties.get(&key).map(|a| &a.0)
// Awaited by root only, so there is no slot to test against an
// eviction high-water mark: an evicted root relies on the caller's
// request timeout to terminate (matching Charon's Go dutydb).
if let Some(v) = s.aggregation_duties.get(&key) {
Lookup::Found(v.0.clone())
} else {
Lookup::Pending
}
})
.await
}
Expand All @@ -365,31 +451,43 @@ impl MemDB {
subcommittee_index,
root: beacon_block_root,
};
self.await_data(&self.contrib_notify, |s| s.contrib_duties.get(&key))
.await
self.await_data(&self.contrib_notify, |s| {
if let Some(v) = s.contrib_duties.get(&key) {
Lookup::Found(v.clone())
} else if s.max_evicted_contrib_slot.is_some_and(|hw| slot <= hw) {
Lookup::Evicted
} else {
Lookup::Pending
}
})
.await
}

// A single Notify per duty type wakes all waiters on every store, not only
// those whose key matches. The number of concurrent waiters per duty type
// is small (one per validator), so the extra wakeups are cheap. A keyed
// notify (HashMap<Key, Sender>) would avoid them but adds complexity that
// isn't worth it here.
//
// `delete_duty` also wakes the notify so waiters whose duty just expired
// exit immediately via the `Lookup::Evicted` branch, instead of parking
// for another `notify_waiters` call or for the per-request timeout in
// the caller.
async fn await_data<V>(
&self,
notify: &Notify,
lookup: impl for<'s> Fn(&'s State) -> Option<&'s V>,
) -> Result<V>
where
V: Clone,
{
lookup: impl Fn(&State) -> Lookup<V>,
) -> Result<V> {
loop {
let notified = notify.notified();
tokio::pin!(notified);

{
let state = self.state.read().await;
if let Some(v) = lookup(&state) {
return Ok(v.clone());
match lookup(&state) {
Lookup::Found(v) => return Ok(v),
Lookup::Evicted => return Err(Error::AwaitDutyExpired),
Lookup::Pending => {}
}
}

Expand Down Expand Up @@ -571,12 +669,20 @@ impl State {
Ok(())
}

/// Raises an eviction high-water mark to `slot` if `slot` is newer (or the
/// mark is unset). The deadliner expires duties in non-decreasing slot
/// order, so in practice this only ever moves the mark forward.
fn bump_high_water(mark: &mut Option<u64>, slot: u64) {
*mark = Some(mark.map_or(slot, |current| current.max(slot)));
}

fn delete_duty(&mut self, duty: Duty) -> Result<()> {
let slot = duty.slot.inner();
info!(slot, duty_type = %duty.duty_type, "dutydb: deleting expired duty");
match duty.duty_type {
DutyType::Proposer => {
self.proposer_duties.remove(&slot);
Self::bump_high_water(&mut self.max_evicted_proposer_slot, slot);
}
DutyType::BuilderProposer => return Err(Error::DeprecatedDutyBuilderProposer),
DutyType::Attester => {
Expand All @@ -589,8 +695,11 @@ impl State {
});
}
}
Self::bump_high_water(&mut self.max_evicted_attestation_slot, slot);
}
DutyType::Aggregator => {
// No eviction mark: aggregated attestations are awaited by root
// only, so there is nothing for a slot high-water mark to gate.
if let Some(keys) = self.aggregation_keys_by_slot.remove(&slot) {
for key in keys {
self.aggregation_duties.remove(&key);
Expand All @@ -603,6 +712,7 @@ impl State {
self.contrib_duties.remove(&key);
}
}
Self::bump_high_water(&mut self.max_evicted_contrib_slot, slot);
}
_ => return Err(Error::UnknownDutyType),
}
Expand Down Expand Up @@ -1166,6 +1276,67 @@ mod tests {
assert!(db.pub_key_by_attestation(SLOT, 0, 0).await.is_err());
}

/// After a slot is evicted, `await_attestation` must return
/// `AwaitDutyExpired` immediately (not park until the request timeout) for
/// that slot AND for any older slot — the eviction state is a single
/// high-water mark, so it stays O(1) in memory rather than accumulating one
/// entry per evicted slot for the lifetime of the node.
#[tokio::test]
async fn await_attestation_expired_after_eviction_high_water() {
let deadliner = far_future_handle();
let (trim_tx, trim_rx) = channel::<Duty>(64);
let db = make_db_with_deadliner(deadliner, trim_rx);

const SLOT: u64 = 123;

let mut set = UnsignedDataSet::new();
set.insert(
random_core_pub_key(),
UnsignedDutyData::Attestation(att_data(SLOT, 0, 0)),
);
db.store(Duty::new(SlotNumber::new(SLOT), DutyType::Attester), set)
.await
.unwrap();

// Evict SLOT, then trigger expiry processing with an unrelated store.
trim_tx
.send(Duty::new(SlotNumber::new(SLOT), DutyType::Attester))
.await
.expect("trim_tx should be open");
let mut set2 = UnsignedDataSet::new();
set2.insert(
random_core_pub_key(),
UnsignedDutyData::Proposal(Box::new(phase0_proposal(SLOT.saturating_add(1), 0))),
);
db.store(
Duty::new(SlotNumber::new(SLOT.saturating_add(1)), DutyType::Proposer),
set2,
)
.await
.unwrap();

// The evicted slot resolves to AwaitDutyExpired without parking.
let timeout = std::time::Duration::from_secs(5);
let evicted = tokio::time::timeout(timeout, db.await_attestation(SLOT, 0))
.await
.expect("await must not park for an evicted slot");
assert!(
matches!(evicted, Err(Error::AwaitDutyExpired)),
"evicted slot: expected AwaitDutyExpired, got {evicted:?}"
);

// An older, never-stored slot is also below the high-water mark: its
// deadline has necessarily passed too, so it must fail fast rather than
// park — and we keep no per-slot record to answer this.
let older = tokio::time::timeout(timeout, db.await_attestation(SLOT.saturating_sub(1), 0))
.await
.expect("await must not park for a slot below the eviction high-water");
assert!(
matches!(older, Err(Error::AwaitDutyExpired)),
"older slot: expected AwaitDutyExpired, got {older:?}"
);
}

#[tokio::test]
async fn agg_attestation_two_roots_same_slot() {
const SLOT: u64 = 300;
Expand Down
Loading
Loading