Skip to content

Commit b2fce2b

Browse files
committed
Improve the deadline api
1 parent 81bd360 commit b2fce2b

5 files changed

Lines changed: 78 additions & 111 deletions

File tree

crates/core/src/deadline/mod.rs

Lines changed: 31 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -8,25 +8,22 @@
88
//!
99
//! ```no_run
1010
//! use pluto_core::{
11-
//! deadline::{DeadlinerTask, DutyDeadlineCalculator},
11+
//! deadline::{Deadliner, DeadlinerTask, DutyDeadlineCalculator},
1212
//! types::{Duty, SlotNumber},
1313
//! };
1414
//! use pluto_eth2api::EthBeaconNodeApiClient;
15-
//! use std::sync::Arc;
1615
//! use tokio_util::sync::CancellationToken;
1716
//!
1817
//! # async fn example(client: &EthBeaconNodeApiClient) -> anyhow::Result<()> {
1918
//! let cancel_token = CancellationToken::new();
2019
//! let calculator = DutyDeadlineCalculator::from_client(client).await?;
21-
//! let deadliner = DeadlinerTask::start(cancel_token, "example", calculator);
20+
//! let (deadliner, mut rx) = DeadlinerTask::start(cancel_token, "example", calculator);
2221
//!
2322
//! let duty = Duty::new_attester_duty(SlotNumber::new(1));
2423
//! let added = deadliner.add(duty).await;
2524
//!
26-
//! if let Some(mut rx) = deadliner.c() {
27-
//! while let Some(expired_duty) = rx.recv().await {
28-
//! println!("Duty expired: {}", expired_duty);
29-
//! }
25+
//! while let Some(expired_duty) = rx.recv().await {
26+
//! println!("Duty expired: {}", expired_duty);
3027
//! }
3128
//! # Ok(())
3229
//! # }
@@ -41,11 +38,7 @@ use crate::types::{Duty, DutyType, SlotNumber};
4138
use async_trait::async_trait;
4239
use chrono::{DateTime, Utc};
4340
use pluto_eth2api::EthBeaconNodeApiClientError;
44-
use std::{
45-
collections::HashSet,
46-
sync::{Arc, Mutex},
47-
time::Duration,
48-
};
41+
use std::{collections::HashSet, time::Duration};
4942
use tokio::{
5043
sync::{mpsc, oneshot},
5144
time::sleep,
@@ -87,10 +80,9 @@ fn to_chrono_duration(duration: Duration) -> Result<chrono::Duration> {
8780

8881
/// Deadliner provides duty deadline functionality.
8982
///
90-
/// The `c()` method returns a channel for receiving expired duties.
91-
/// It may only be called once and the returned channel should be used
92-
/// by a single task. Multiple instances are required for different
93-
/// components and use cases.
83+
/// Producers submit duties via [`add`](Self::add). Expired duties are
84+
/// delivered on the receiver paired with the handle at
85+
/// [`DeadlinerTask::start`].
9486
#[async_trait]
9587
pub trait Deadliner: Send + Sync {
9688
/// Adds a duty for deadline scheduling.
@@ -104,12 +96,6 @@ pub trait Deadliner: Send + Sync {
10496
/// - The calculator reports the duty has no deadline (`Ok(None)`)
10597
/// - The calculator failed to compute the deadline (`Err(_)`)
10698
async fn add(&self, duty: Duty) -> bool;
107-
108-
/// Returns the channel for receiving deadlined duties.
109-
///
110-
/// This method may only be called once and returns `None` on subsequent
111-
/// calls. The returned channel should only be used by a single task.
112-
fn c(&self) -> Option<mpsc::Receiver<Duty>>;
11399
}
114100

115101
/// Internal message type for adding duties to the deadliner.
@@ -118,13 +104,13 @@ struct DeadlineInput {
118104
response_tx: oneshot::Sender<bool>,
119105
}
120106

121-
/// Public-facing handle: the `Arc<dyn Deadliner>` returned by
122-
/// [`DeadlinerTask::start`] wraps this. Holds the input channel, the
123-
/// take-once output receiver, and the cancellation token.
124-
struct DeadlinerHandle {
107+
/// Public-facing handle returned (paired with the expired-duty receiver) by
108+
/// [`DeadlinerTask::start`]. Cloning is cheap and shares the same background
109+
/// task — share it freely across producers inside one service.
110+
#[derive(Clone)]
111+
pub struct DeadlinerHandle {
125112
cancel_token: CancellationToken,
126113
input_tx: mpsc::Sender<DeadlineInput>,
127-
output_rx: Mutex<Option<mpsc::Receiver<Duty>>>,
128114
}
129115

130116
#[async_trait]
@@ -146,18 +132,11 @@ impl Deadliner for DeadlinerHandle {
146132
// Wait for response
147133
response_rx.await.unwrap_or(false)
148134
}
149-
150-
fn c(&self) -> Option<mpsc::Receiver<Duty>> {
151-
self.output_rx
152-
.lock()
153-
.ok()
154-
.and_then(|mut guard| guard.take())
155-
}
156135
}
157136

158137
/// Owned state of the background task that drives a [`DeadlinerHandle`]'s
159138
/// duty timers. Held exclusively by the spawned task — that's why it lives
160-
/// outside the `Arc<dyn Deadliner>` and `run_task` can take `mut self`.
139+
/// outside the public handle and `run_task` can take `mut self`.
161140
/// Constructed and spawned via [`DeadlinerTask::start`].
162141
pub struct DeadlinerTask<C> {
163142
cancel_token: CancellationToken,
@@ -172,14 +151,15 @@ pub struct DeadlinerTask<C> {
172151
}
173152

174153
impl<C: DeadlineCalculator> DeadlinerTask<C> {
175-
/// Builds the public-facing [`Deadliner`] handle and spawns the background
176-
/// task that drives it. The background loop exits when `cancel_token` is
177-
/// cancelled.
154+
/// Spawns the background task and returns a `(handle, expired_rx)` pair.
155+
/// The cloneable `handle` is for adding duties from any number of
156+
/// producers; `expired_rx` is the single consumer's receiver of expired
157+
/// duties. The background loop exits when `cancel_token` is cancelled.
178158
pub fn start(
179159
cancel_token: CancellationToken,
180160
label: impl Into<String>,
181161
calculator: C,
182-
) -> Arc<dyn Deadliner> {
162+
) -> (DeadlinerHandle, mpsc::Receiver<Duty>) {
183163
// Matches Charon's `outputBuffer = 10` — big enough for all duty
184164
// types expiring simultaneously while the consumer drains synchronously.
185165
const OUTPUT_BUFFER: usize = 10;
@@ -204,13 +184,12 @@ impl<C: DeadlineCalculator> DeadlinerTask<C> {
204184
};
205185
tokio::spawn(task.run_task());
206186

207-
let link = DeadlinerHandle {
187+
let handle = DeadlinerHandle {
208188
cancel_token,
209189
input_tx,
210-
output_rx: Mutex::new(Some(output_rx)),
211190
};
212191

213-
Arc::new(link)
192+
(handle, output_rx)
214193
}
215194

216195
/// Background task that manages duty deadlines.
@@ -407,7 +386,7 @@ mod tests {
407386
/// channel.
408387
async fn add_duties(
409388
duties: Vec<Duty>,
410-
deadliner: Arc<dyn Deadliner>,
389+
deadliner: DeadlinerHandle,
411390
result_tx: mpsc::Sender<bool>,
412391
) {
413392
for duty in duties {
@@ -461,9 +440,8 @@ mod tests {
461440
};
462441

463442
let cancel_token = CancellationToken::new();
464-
let deadliner = DeadlinerTask::start(cancel_token.clone(), "test", calculator);
465-
466-
let mut output_rx = deadliner.c().context("output receiver already taken")?;
443+
let (deadliner, mut output_rx) =
444+
DeadlinerTask::start(cancel_token.clone(), "test", calculator);
467445

468446
let (expired_tx, mut expired_rx) = mpsc::channel(100);
469447
let (non_expired_tx, mut non_expired_rx) = mpsc::channel(100);
@@ -472,21 +450,15 @@ mod tests {
472450
let non_expired_len = non_expired_duties.len();
473451
let future_duties_len = future_duties.len();
474452

475-
let handler_expired = tokio::spawn(add_duties(
476-
expired_duties,
477-
Arc::clone(&deadliner),
478-
expired_tx,
479-
));
453+
let handler_expired =
454+
tokio::spawn(add_duties(expired_duties, deadliner.clone(), expired_tx));
480455
let handler_non_expired = tokio::spawn(add_duties(
481456
non_expired_duties.clone(),
482-
Arc::clone(&deadliner),
457+
deadliner.clone(),
483458
non_expired_tx.clone(),
484459
));
485-
let handler_future_duties = tokio::spawn(add_duties(
486-
future_duties,
487-
Arc::clone(&deadliner),
488-
non_expired_tx,
489-
));
460+
let handler_future_duties =
461+
tokio::spawn(add_duties(future_duties, deadliner.clone(), non_expired_tx));
490462

491463
let (result_expired, result_non_expired, result_future_duties) =
492464
tokio::join!(handler_expired, handler_non_expired, handler_future_duties);
@@ -545,9 +517,8 @@ mod tests {
545517
};
546518

547519
let cancel_token = CancellationToken::new();
548-
let deadliner = DeadlinerTask::start(cancel_token.clone(), "order-test", calculator);
549-
550-
let mut output_rx = deadliner.c().context("output receiver already taken")?;
520+
let (deadliner, mut output_rx) =
521+
DeadlinerTask::start(cancel_token.clone(), "order-test", calculator);
551522

552523
// TestCalculator: deadline = start_time + slot * 500ms.
553524
// Insert the later one first to make sure ordering is by deadline,

crates/core/src/dutydb/memory.rs

Lines changed: 31 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use pluto_eth2api::{
88
spec::{altair, phase0},
99
versioned,
1010
};
11-
use tokio::sync::{Notify, RwLock};
11+
use tokio::sync::{Notify, RwLock, mpsc};
1212
use tokio_util::sync::CancellationToken;
1313
use tracing::{info, warn};
1414
use tree_hash::TreeHash;
@@ -203,11 +203,13 @@ pub struct MemDB {
203203
}
204204

205205
impl MemDB {
206-
/// Creates a new in-memory DutyDB.
207-
pub fn new(deadliner: Arc<dyn Deadliner>, cancel: &CancellationToken) -> Self {
208-
let deadliner_rx = deadliner.c().expect(
209-
"Deadliner::c() returned None — the receiver was already consumed. Each MemDB must use a fresh Deadliner.",
210-
);
206+
/// Creates a new in-memory DutyDB. `deadliner_rx` is the receiver paired
207+
/// with `deadliner` (typically from `DeadlinerTask::start`).
208+
pub fn new(
209+
deadliner: Arc<dyn Deadliner>,
210+
deadliner_rx: mpsc::Receiver<Duty>,
211+
cancel: &CancellationToken,
212+
) -> Self {
211213
Self {
212214
state: RwLock::new(State {
213215
attestation_duties: HashMap::new(),
@@ -620,29 +622,31 @@ mod tests {
620622
async fn add(&self, _duty: Duty) -> bool {
621623
true
622624
}
625+
}
623626

624-
fn c(&self) -> Option<Receiver<Duty>> {
625-
let (_, rx) = channel(1);
626-
Some(rx)
627-
}
627+
/// Builds a never-firing receiver for tests that don't exercise eviction.
628+
pub(crate) fn noop_deadliner_rx() -> Receiver<Duty> {
629+
let (_, rx) = channel(1);
630+
rx
628631
}
629632

630633
/// Deadliner that collects duties and can flush them to a channel on
631634
/// demand.
632635
pub(crate) struct TestDeadliner {
633636
added: Mutex<Vec<Duty>>,
634637
tx: Sender<Duty>,
635-
rx: Mutex<Option<Receiver<Duty>>>,
636638
}
637639

638640
impl TestDeadliner {
639-
pub(crate) fn new() -> Arc<Self> {
641+
/// Returns the test deadliner alongside the matching expiry receiver
642+
/// — call `expire()` on the deadliner to drive `rx`.
643+
pub(crate) fn new() -> (Arc<Self>, Receiver<Duty>) {
640644
let (tx, rx) = channel(64);
641-
Arc::new(Self {
645+
let deadliner = Arc::new(Self {
642646
added: Mutex::new(Vec::new()),
643647
tx,
644-
rx: Mutex::new(Some(rx)),
645-
})
648+
});
649+
(deadliner, rx)
646650
}
647651

648652
/// Send all collected duties to the expiry channel.
@@ -663,18 +667,21 @@ mod tests {
663667
self.added.lock().unwrap().push(duty);
664668
true
665669
}
666-
667-
fn c(&self) -> Option<Receiver<Duty>> {
668-
self.rx.lock().unwrap().take()
669-
}
670670
}
671671

672672
fn make_db() -> MemDB {
673-
MemDB::new(Arc::new(NoopDeadliner), &CancellationToken::new())
673+
MemDB::new(
674+
Arc::new(NoopDeadliner),
675+
noop_deadliner_rx(),
676+
&CancellationToken::new(),
677+
)
674678
}
675679

676-
fn make_db_with_deadliner(deadliner: Arc<dyn Deadliner>) -> MemDB {
677-
MemDB::new(deadliner, &CancellationToken::new())
680+
fn make_db_with_deadliner(
681+
deadliner: Arc<dyn Deadliner>,
682+
deadliner_rx: Receiver<Duty>,
683+
) -> MemDB {
684+
MemDB::new(deadliner, deadliner_rx, &CancellationToken::new())
678685
}
679686

680687
fn att_data(slot: u64, committee_index: u64, validator_index: u64) -> AttestationData {
@@ -1104,8 +1111,8 @@ mod tests {
11041111

11051112
#[tokio::test]
11061113
async fn duty_expiry() {
1107-
let deadliner = TestDeadliner::new();
1108-
let db = make_db_with_deadliner(Arc::clone(&deadliner) as Arc<dyn Deadliner>);
1114+
let (deadliner, deadliner_rx) = TestDeadliner::new();
1115+
let db = make_db_with_deadliner(deadliner.clone(), deadliner_rx);
11091116

11101117
const SLOT: u64 = 123;
11111118

crates/core/src/parsigdb/memory.rs

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::{collections::HashMap, error::Error as StdError, future::Future, pin::Pin, sync::Arc};
2-
use tokio::sync::Mutex;
2+
use tokio::sync::{Mutex, mpsc};
33
use tokio_util::sync::CancellationToken;
4-
use tracing::{debug, warn};
4+
use tracing::debug;
55

66
use crate::{
77
deadline::Deadliner,
@@ -300,16 +300,12 @@ impl MemDB {
300300

301301
/// Trims expired duties from the database.
302302
///
303-
/// This method runs in a loop, listening for expired duties from the
304-
/// deadliner and removing their associated data from the database. It
305-
/// should be spawned as a background task and will run until the
306-
/// cancellation token is triggered.
307-
pub async fn trim(&self) {
308-
let Some(mut deadliner_rx) = self.deadliner.c() else {
309-
warn!("Deadliner channel is not available");
310-
return;
311-
};
312-
303+
/// Runs in a loop, listening on `deadliner_rx` for expired duties and
304+
/// removing their associated data. Should be spawned as a background task;
305+
/// returns when the cancellation token is triggered or the receiver
306+
/// closes. The receiver is the one paired with the [`Deadliner`] handle at
307+
/// `DeadlinerTask::start`.
308+
pub async fn trim(&self, mut deadliner_rx: mpsc::Receiver<Duty>) {
313309
loop {
314310
tokio::select! {
315311
biased;

crates/core/src/parsigdb/memory_internal_test.rs

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -103,14 +103,15 @@ async fn memdb_threshold() {
103103
const THRESHOLD: u64 = 7;
104104
const N: usize = 10;
105105

106-
let deadliner = Arc::new(TestDeadliner::new());
106+
let (deadliner, deadliner_rx) = TestDeadliner::new();
107+
let deadliner = Arc::new(deadliner);
107108
let cancel = CancellationToken::new();
108109
let db = Arc::new(MemDB::new(cancel.clone(), THRESHOLD, deadliner.clone()));
109110

110111
let trim_handle = tokio::spawn({
111112
let db = db.clone();
112113
async move {
113-
db.trim().await;
114+
db.trim(deadliner_rx).await;
114115
}
115116
});
116117

@@ -166,17 +167,17 @@ async fn memdb_threshold() {
166167
struct TestDeadliner {
167168
added: StdMutex<Vec<Duty>>,
168169
tx: mpsc::Sender<Duty>,
169-
rx: StdMutex<Option<mpsc::Receiver<Duty>>>,
170170
}
171171

172172
impl TestDeadliner {
173-
fn new() -> Self {
173+
/// Returns the test deadliner with the matching expiry receiver.
174+
fn new() -> (Self, mpsc::Receiver<Duty>) {
174175
let (tx, rx) = mpsc::channel(32);
175-
Self {
176+
let deadliner = Self {
176177
added: StdMutex::new(Vec::new()),
177178
tx,
178-
rx: StdMutex::new(Some(rx)),
179-
}
179+
};
180+
(deadliner, rx)
180181
}
181182

182183
async fn expire(&self) -> bool {
@@ -204,8 +205,4 @@ impl Deadliner for TestDeadliner {
204205
.push(duty);
205206
true
206207
}
207-
208-
fn c(&self) -> Option<mpsc::Receiver<Duty>> {
209-
self.rx.lock().expect("test deadliner lock poisoned").take()
210-
}
211208
}

0 commit comments

Comments
 (0)