Skip to content

Commit 207827d

Browse files
committed
Remove the deadliner trait
1 parent 53f0d7c commit 207827d

6 files changed

Lines changed: 92 additions & 161 deletions

File tree

crates/core/src/deadline/calculator.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,17 @@ pub trait DeadlineCalculator: Send + Sync + 'static {
8686
fn deadline(&self, duty: &Duty) -> Result<Option<DateTime<Utc>>>;
8787
}
8888

89+
/// Calculator that reports every duty as never expiring. Useful for
90+
/// scenarios that need to plug into the deadliner API but don't actually want
91+
/// any eviction (e.g. DKG, which is one-shot and outside the slot timeline).
92+
pub struct NeverExpiringCalculator;
93+
94+
impl DeadlineCalculator for NeverExpiringCalculator {
95+
fn deadline(&self, _duty: &Duty) -> Result<Option<DateTime<Utc>>> {
96+
Ok(None)
97+
}
98+
}
99+
89100
impl DeadlineCalculator for DutyDeadlineCalculator {
90101
fn deadline(&self, duty: &Duty) -> Result<Option<DateTime<Utc>>> {
91102
if duty.duty_type.never_expires() {

crates/core/src/deadline/mod.rs

Lines changed: 10 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
//!
99
//! ```no_run
1010
//! use pluto_core::{
11-
//! deadline::{AddOutcome, Deadliner, DeadlinerTask, DutyDeadlineCalculator},
11+
//! deadline::{AddOutcome, DeadlinerTask, DutyDeadlineCalculator},
1212
//! types::{Duty, SlotNumber},
1313
//! };
1414
//! use pluto_eth2api::EthBeaconNodeApiClient;
@@ -37,10 +37,9 @@
3737
mod calculator;
3838
mod msecs;
3939

40-
pub use calculator::{DeadlineCalculator, DutyDeadlineCalculator};
40+
pub use calculator::{DeadlineCalculator, DutyDeadlineCalculator, NeverExpiringCalculator};
4141

4242
use crate::types::{Duty, DutyType, SlotNumber};
43-
use async_trait::async_trait;
4443
use chrono::{DateTime, Utc};
4544
use pluto_eth2api::EthBeaconNodeApiClientError;
4645
use std::{collections::HashSet, time::Duration};
@@ -109,21 +108,6 @@ impl AddOutcome {
109108
}
110109
}
111110

112-
/// Deadliner provides duty deadline functionality.
113-
///
114-
/// Producers submit duties via [`add`](Self::add). Expired duties are
115-
/// delivered on the receiver paired with the handle at
116-
/// [`DeadlinerTask::start`].
117-
#[async_trait]
118-
pub trait Deadliner: Send + Sync {
119-
/// Adds a duty for deadline scheduling.
120-
///
121-
/// Idempotent: re-adding a duty already tracked returns
122-
/// [`AddOutcome::Scheduled`] again. See [`AddOutcome`] for the meaning of
123-
/// each variant.
124-
async fn add(&self, duty: Duty) -> AddOutcome;
125-
}
126-
127111
/// Internal message type for adding duties to the deadliner.
128112
struct DeadlineInput {
129113
duty: Duty,
@@ -139,24 +123,25 @@ pub struct DeadlinerHandle {
139123
input_tx: mpsc::Sender<DeadlineInput>,
140124
}
141125

142-
#[async_trait]
143-
impl Deadliner for DeadlinerHandle {
144-
async fn add(&self, duty: Duty) -> AddOutcome {
145-
// Check if shut down
126+
impl DeadlinerHandle {
127+
/// Adds a duty for deadline scheduling.
128+
///
129+
/// Idempotent: re-adding a duty already tracked returns
130+
/// [`AddOutcome::Scheduled`] again. See [`AddOutcome`] for the meaning of
131+
/// each variant.
132+
pub async fn add(&self, duty: Duty) -> AddOutcome {
146133
if self.cancel_token.is_cancelled() {
147134
return AddOutcome::FailedToCompute;
148135
}
149136

150137
let (response_tx, response_rx) = oneshot::channel();
151138
let input = DeadlineInput { duty, response_tx };
152139

153-
// Send the duty to the background task
154140
if self.input_tx.send(input).await.is_err() {
155141
return AddOutcome::FailedToCompute;
156142
}
157143

158-
// Wait for response — `FailedToCompute` if the task dropped the
159-
// sender (shutdown race).
144+
// `FailedToCompute` if the task dropped the sender (shutdown race).
160145
response_rx.await.unwrap_or(AddOutcome::FailedToCompute)
161146
}
162147
}

crates/core/src/dutydb/memory.rs

Lines changed: 39 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//!
33
//! Equivalent to charon/core/dutydb/memory.go.
44
5-
use std::{collections::HashMap, sync::Arc};
5+
use std::collections::HashMap;
66

77
use pluto_eth2api::{
88
spec::{altair, phase0},
@@ -14,7 +14,7 @@ use tracing::{info, warn};
1414
use tree_hash::TreeHash;
1515

1616
use crate::{
17-
deadline::Deadliner,
17+
deadline::DeadlinerHandle,
1818
signeddata::{
1919
AttestationData, SyncContribution, VersionedAggregatedAttestation, VersionedProposal,
2020
},
@@ -199,14 +199,14 @@ pub struct MemDB {
199199
aggregation_notify: Notify,
200200
contrib_notify: Notify,
201201
cancel: CancellationToken,
202-
deadliner: Arc<dyn Deadliner>,
202+
deadliner: DeadlinerHandle,
203203
}
204204

205205
impl MemDB {
206206
/// Creates a new in-memory DutyDB. `deadliner_rx` is the receiver paired
207207
/// with `deadliner` (typically from `DeadlinerTask::start`).
208208
pub fn new(
209-
deadliner: Arc<dyn Deadliner>,
209+
deadliner: DeadlinerHandle,
210210
deadliner_rx: mpsc::Receiver<Duty>,
211211
cancel: &CancellationToken,
212212
) -> Self {
@@ -601,27 +601,29 @@ impl State {
601601

602602
#[cfg(test)]
603603
mod tests {
604-
use std::sync::{Arc, Mutex};
604+
use std::sync::Arc;
605605

606-
use async_trait::async_trait;
607-
use tokio::sync::mpsc::{Receiver, Sender, channel};
606+
use chrono::{DateTime, Utc};
607+
use tokio::sync::mpsc::{Receiver, channel};
608608
use tokio_util::sync::CancellationToken;
609609

610610
use super::*;
611611
use crate::{
612-
deadline::AddOutcome,
612+
deadline::{self, DeadlineCalculator, DeadlinerTask},
613613
signeddata::{AttesterDuty, ProposalBlock},
614614
testutils::random_core_pub_key,
615615
types::{DutyType, SlotNumber},
616616
};
617617

618-
/// Deadliner that always accepts duties and never expires them.
619-
pub(crate) struct NoopDeadliner;
618+
/// Test calculator whose every duty is `Scheduled` (deadline is `MAX_UTC`).
619+
/// The deadliner never actually fires for this calculator, so the paired
620+
/// output receiver stays silent — eviction in tests is driven manually
621+
/// through a separate channel (see `duty_expiry`).
622+
struct FarFutureCalculator;
620623

621-
#[async_trait]
622-
impl Deadliner for NoopDeadliner {
623-
async fn add(&self, _duty: Duty) -> AddOutcome {
624-
AddOutcome::Scheduled
624+
impl DeadlineCalculator for FarFutureCalculator {
625+
fn deadline(&self, _: &Duty) -> deadline::Result<Option<DateTime<Utc>>> {
626+
Ok(Some(DateTime::<Utc>::MAX_UTC))
625627
}
626628
}
627629

@@ -631,57 +633,26 @@ mod tests {
631633
rx
632634
}
633635

634-
/// Deadliner that collects duties and can flush them to a channel on
635-
/// demand.
636-
pub(crate) struct TestDeadliner {
637-
added: Mutex<Vec<Duty>>,
638-
tx: Sender<Duty>,
639-
}
640-
641-
impl TestDeadliner {
642-
/// Returns the test deadliner alongside the matching expiry receiver
643-
/// — call `expire()` on the deadliner to drive `rx`.
644-
pub(crate) fn new() -> (Arc<Self>, Receiver<Duty>) {
645-
let (tx, rx) = channel(64);
646-
let deadliner = Arc::new(Self {
647-
added: Mutex::new(Vec::new()),
648-
tx,
649-
});
650-
(deadliner, rx)
651-
}
652-
653-
/// Send all collected duties to the expiry channel.
654-
pub(crate) async fn expire(&self) {
655-
let duties: Vec<Duty> = {
656-
let mut added = self.added.lock().unwrap();
657-
std::mem::take(&mut *added)
658-
};
659-
for duty in duties {
660-
let _ = self.tx.send(duty).await;
661-
}
662-
}
663-
}
664-
665-
#[async_trait]
666-
impl Deadliner for TestDeadliner {
667-
async fn add(&self, duty: Duty) -> AddOutcome {
668-
self.added.lock().unwrap().push(duty);
669-
AddOutcome::Scheduled
670-
}
636+
/// Creates a real deadliner handle backed by [`FarFutureCalculator`] —
637+
/// `add()` always reports `Scheduled` but nothing naturally expires.
638+
fn far_future_handle() -> DeadlinerHandle {
639+
let (handle, _drop_rx) = DeadlinerTask::start(
640+
CancellationToken::new(),
641+
"dutydb-tests",
642+
FarFutureCalculator,
643+
);
644+
handle
671645
}
672646

673647
fn make_db() -> MemDB {
674648
MemDB::new(
675-
Arc::new(NoopDeadliner),
649+
far_future_handle(),
676650
noop_deadliner_rx(),
677651
&CancellationToken::new(),
678652
)
679653
}
680654

681-
fn make_db_with_deadliner(
682-
deadliner: Arc<dyn Deadliner>,
683-
deadliner_rx: Receiver<Duty>,
684-
) -> MemDB {
655+
fn make_db_with_deadliner(deadliner: DeadlinerHandle, deadliner_rx: Receiver<Duty>) -> MemDB {
685656
MemDB::new(deadliner, deadliner_rx, &CancellationToken::new())
686657
}
687658

@@ -1112,8 +1083,12 @@ mod tests {
11121083

11131084
#[tokio::test]
11141085
async fn duty_expiry() {
1115-
let (deadliner, deadliner_rx) = TestDeadliner::new();
1116-
let db = make_db_with_deadliner(deadliner.clone(), deadliner_rx);
1086+
// Real handle so `store()`'s `add(...).is_scheduled()` check passes.
1087+
// Eviction is driven manually via `trim_tx` so the test stays
1088+
// deterministic instead of racing the deadliner's timer.
1089+
let deadliner = far_future_handle();
1090+
let (trim_tx, trim_rx) = channel::<Duty>(64);
1091+
let db = make_db_with_deadliner(deadliner, trim_rx);
11171092

11181093
const SLOT: u64 = 123;
11191094

@@ -1130,8 +1105,12 @@ mod tests {
11301105
// Should be findable now.
11311106
db.pub_key_by_attestation(SLOT, 0, 0).await.unwrap();
11321107

1133-
// Expire the duty.
1134-
deadliner.expire().await;
1108+
// Expire the duty: simulate the deadliner emitting it.
1109+
let expired_duty = Duty::new(SlotNumber::new(SLOT), DutyType::Attester);
1110+
trim_tx
1111+
.send(expired_duty)
1112+
.await
1113+
.expect("trim_tx should be open");
11351114

11361115
// Trigger expiry processing by storing another duty.
11371116
let proposal = phase0_proposal(SLOT.saturating_add(1), 0);

crates/core/src/parsigdb/memory.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use tokio_util::sync::CancellationToken;
44
use tracing::debug;
55

66
use crate::{
7-
deadline::Deadliner,
7+
deadline::DeadlinerHandle,
88
parsigdb::metrics::PARSIG_DB_METRICS,
99
signeddata::SignedDataError,
1010
types::{Duty, DutyType, ParSignedData, ParSignedDataSet, PubKey},
@@ -178,7 +178,7 @@ pub struct MemDBInner {
178178
pub struct MemDB {
179179
ct: CancellationToken,
180180
inner: Arc<Mutex<MemDBInner>>,
181-
deadliner: Arc<dyn Deadliner>,
181+
deadliner: DeadlinerHandle,
182182
threshold: u64,
183183
}
184184

@@ -189,7 +189,7 @@ impl MemDB {
189189
/// * `ct` - Cancellation token for graceful shutdown
190190
/// * `threshold` - Number of matching partial signatures required
191191
/// * `deadliner` - Deadliner for managing duty expiration
192-
pub fn new(ct: CancellationToken, threshold: u64, deadliner: Arc<dyn Deadliner>) -> Self {
192+
pub fn new(ct: CancellationToken, threshold: u64, deadliner: DeadlinerHandle) -> Self {
193193
Self {
194194
ct,
195195
inner: Arc::new(Mutex::new(MemDBInner {

0 commit comments

Comments
 (0)