Skip to content

Commit 53f0d7c

Browse files
committed
Add outcomes
1 parent b2fce2b commit 53f0d7c

4 files changed

Lines changed: 85 additions & 46 deletions

File tree

crates/core/src/deadline/mod.rs

Lines changed: 73 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
//!
99
//! ```no_run
1010
//! use pluto_core::{
11-
//! deadline::{Deadliner, DeadlinerTask, DutyDeadlineCalculator},
11+
//! deadline::{AddOutcome, Deadliner, DeadlinerTask, DutyDeadlineCalculator},
1212
//! types::{Duty, SlotNumber},
1313
//! };
1414
//! use pluto_eth2api::EthBeaconNodeApiClient;
@@ -20,7 +20,12 @@
2020
//! let (deadliner, mut rx) = DeadlinerTask::start(cancel_token, "example", calculator);
2121
//!
2222
//! let duty = Duty::new_attester_duty(SlotNumber::new(1));
23-
//! let added = deadliner.add(duty).await;
23+
//! match deadliner.add(duty).await {
24+
//! AddOutcome::Scheduled => {}
25+
//! AddOutcome::AlreadyExpired => eprintln!("duty already expired — skipped"),
26+
//! AddOutcome::NoDeadline => {}
27+
//! AddOutcome::FailedToCompute => eprintln!("deadline calculation failed"),
28+
//! }
2429
//!
2530
//! while let Some(expired_duty) = rx.recv().await {
2631
//! println!("Duty expired: {}", expired_duty);
@@ -78,6 +83,32 @@ fn to_chrono_duration(duration: Duration) -> Result<chrono::Duration> {
7883
chrono::Duration::from_std(duration).map_err(|_| DeadlineError::DurationConversion)
7984
}
8085

86+
/// Outcome of [`Deadliner::add`].
87+
///
88+
/// Spells out the four distinct cases the previous `bool` return value
89+
/// conflated, so callers can react specifically (e.g. drop a duty that
90+
/// already expired vs. log a calculator error).
91+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
92+
pub enum AddOutcome {
93+
/// The duty was accepted and a timer is now armed for its deadline.
94+
Scheduled,
95+
/// The duty's deadline is already in the past — nothing scheduled.
96+
AlreadyExpired,
97+
/// The calculator reports this duty type has no deadline (e.g. Exit,
98+
/// BuilderRegistration). Not an error — just not tracked.
99+
NoDeadline,
100+
/// The calculator returned an error while computing the deadline.
101+
FailedToCompute,
102+
}
103+
104+
impl AddOutcome {
105+
/// `true` only for [`AddOutcome::Scheduled`] — convenient for callers
106+
/// that still treat "added" as a yes/no question.
107+
pub fn is_scheduled(self) -> bool {
108+
matches!(self, AddOutcome::Scheduled)
109+
}
110+
}
111+
81112
/// Deadliner provides duty deadline functionality.
82113
///
83114
/// Producers submit duties via [`add`](Self::add). Expired duties are
@@ -87,21 +118,16 @@ fn to_chrono_duration(duration: Duration) -> Result<chrono::Duration> {
87118
pub trait Deadliner: Send + Sync {
88119
/// Adds a duty for deadline scheduling.
89120
///
90-
/// Returns `true` if the duty was added for future deadline scheduling.
91-
/// This method is idempotent and returns `true` if the duty was previously
92-
/// added and still awaits deadline scheduling.
93-
///
94-
/// Returns `false` if:
95-
/// - The duty has already expired and cannot be scheduled
96-
/// - The calculator reports the duty has no deadline (`Ok(None)`)
97-
/// - The calculator failed to compute the deadline (`Err(_)`)
98-
async fn add(&self, duty: Duty) -> bool;
121+
/// Idempotent: re-adding a duty already tracked returns
122+
/// [`AddOutcome::Scheduled`] again. See [`AddOutcome`] for the meaning of
123+
/// each variant.
124+
async fn add(&self, duty: Duty) -> AddOutcome;
99125
}
100126

101127
/// Internal message type for adding duties to the deadliner.
102128
struct DeadlineInput {
103129
duty: Duty,
104-
response_tx: oneshot::Sender<bool>,
130+
response_tx: oneshot::Sender<AddOutcome>,
105131
}
106132

107133
/// Public-facing handle returned (paired with the expired-duty receiver) by
@@ -115,22 +141,23 @@ pub struct DeadlinerHandle {
115141

116142
#[async_trait]
117143
impl Deadliner for DeadlinerHandle {
118-
async fn add(&self, duty: Duty) -> bool {
144+
async fn add(&self, duty: Duty) -> AddOutcome {
119145
// Check if shut down
120146
if self.cancel_token.is_cancelled() {
121-
return false;
147+
return AddOutcome::FailedToCompute;
122148
}
123149

124150
let (response_tx, response_rx) = oneshot::channel();
125151
let input = DeadlineInput { duty, response_tx };
126152

127153
// Send the duty to the background task
128154
if self.input_tx.send(input).await.is_err() {
129-
return false;
155+
return AddOutcome::FailedToCompute;
130156
}
131157

132-
// Wait for response
133-
response_rx.await.unwrap_or(false)
158+
// Wait for response — `FailedToCompute` if the task dropped the
159+
// sender (shutdown race).
160+
response_rx.await.unwrap_or(AddOutcome::FailedToCompute)
134161
}
135162
}
136163

@@ -274,11 +301,11 @@ impl<C: DeadlineCalculator> DeadlinerTask<C> {
274301
let duty = input.duty;
275302
match self.calculator.deadline(&duty) {
276303
Ok(Some(deadline)) => {
277-
let expired = deadline < Utc::now();
278-
let _ = input.response_tx.send(!expired);
279-
if expired {
304+
if deadline < Utc::now() {
305+
let _ = input.response_tx.send(AddOutcome::AlreadyExpired);
280306
return None;
281307
}
308+
let _ = input.response_tx.send(AddOutcome::Scheduled);
282309
self.duties.insert(duty);
283310
if deadline < self.curr_deadline {
284311
self.recompute_curr();
@@ -294,12 +321,13 @@ impl<C: DeadlineCalculator> DeadlinerTask<C> {
294321
error = %err,
295322
"Failed to compute deadline for duty"
296323
);
297-
let _ = input.response_tx.send(false);
324+
let _ = input.response_tx.send(AddOutcome::FailedToCompute);
298325
None
299326
}
300327
Ok(None) => {
301-
// Drop duties that never expire
302-
let _ = input.response_tx.send(false);
328+
// Duty type has no deadline (Exit, BuilderRegistration) —
329+
// not tracked.
330+
let _ = input.response_tx.send(AddOutcome::NoDeadline);
303331
None
304332
}
305333
}
@@ -340,7 +368,7 @@ impl<C: DeadlineCalculator> DeadlinerTask<C> {
340368
mod tests {
341369
use super::{msecs::Msecs, *};
342370
use crate::types::SlotNumber;
343-
use anyhow::{Context, Result, bail};
371+
use anyhow::{Context, Result, bail, ensure};
344372
use pluto_testutil::BeaconMock;
345373
use tokio::time::timeout;
346374

@@ -387,11 +415,11 @@ mod tests {
387415
async fn add_duties(
388416
duties: Vec<Duty>,
389417
deadliner: DeadlinerHandle,
390-
result_tx: mpsc::Sender<bool>,
418+
result_tx: mpsc::Sender<AddOutcome>,
391419
) {
392420
for duty in duties {
393-
let added = deadliner.add(duty).await;
394-
let _ = result_tx.send(added).await;
421+
let outcome = deadliner.add(duty).await;
422+
let _ = result_tx.send(outcome).await;
395423
}
396424
}
397425

@@ -467,19 +495,25 @@ mod tests {
467495
result_future_duties?;
468496

469497
for _ in 0..expired_len {
470-
let result = expired_rx.recv().await.context("expected expired ack")?;
471-
assert!(!result, "expired duties should return false");
498+
let outcome = expired_rx.recv().await.context("expected expired ack")?;
499+
ensure!(
500+
outcome == AddOutcome::AlreadyExpired,
501+
"expired duties should report AlreadyExpired, got {outcome:?}"
502+
);
472503
}
473504

474505
let added_count = non_expired_len
475506
.checked_add(future_duties_len)
476507
.context("added_count overflow")?;
477508
for _ in 0..added_count {
478-
let result = non_expired_rx
509+
let outcome = non_expired_rx
479510
.recv()
480511
.await
481512
.context("expected non-expired ack")?;
482-
assert!(result, "non-expired duties should return true");
513+
ensure!(
514+
outcome == AddOutcome::Scheduled,
515+
"non-expired duties should be Scheduled, got {outcome:?}"
516+
);
483517
}
484518

485519
// Collect expired duties from output channel.
@@ -508,8 +542,6 @@ mod tests {
508542
/// deadliner.
509543
#[tokio::test]
510544
async fn expired_duties_arrive_in_deadline_order() -> Result<()> {
511-
use anyhow::ensure;
512-
513545
let start_time = Utc::now();
514546
let calculator = TestCalculator {
515547
start_time,
@@ -527,9 +559,15 @@ mod tests {
527559
let earlier = Duty::new_attester_duty(SlotNumber::new(1));
528560

529561
let added_later = deadliner.add(later.clone()).await;
530-
ensure!(added_later, "later duty should be added");
562+
ensure!(
563+
added_later == AddOutcome::Scheduled,
564+
"later duty should be Scheduled, got {added_later:?}"
565+
);
531566
let added_earlier = deadliner.add(earlier.clone()).await;
532-
ensure!(added_earlier, "earlier duty should be added");
567+
ensure!(
568+
added_earlier == AddOutcome::Scheduled,
569+
"earlier duty should be Scheduled, got {added_earlier:?}"
570+
);
533571

534572
let first = timeout(Duration::from_secs(5), output_rx.recv())
535573
.await

crates/core/src/dutydb/memory.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ impl MemDB {
247247

248248
let mut state = self.state.write().await;
249249

250-
if !self.deadliner.add(duty.clone()).await {
250+
if !self.deadliner.add(duty.clone()).await.is_scheduled() {
251251
return Err(Error::ExpiredDuty);
252252
}
253253

@@ -609,6 +609,7 @@ mod tests {
609609

610610
use super::*;
611611
use crate::{
612+
deadline::AddOutcome,
612613
signeddata::{AttesterDuty, ProposalBlock},
613614
testutils::random_core_pub_key,
614615
types::{DutyType, SlotNumber},
@@ -619,8 +620,8 @@ mod tests {
619620

620621
#[async_trait]
621622
impl Deadliner for NoopDeadliner {
622-
async fn add(&self, _duty: Duty) -> bool {
623-
true
623+
async fn add(&self, _duty: Duty) -> AddOutcome {
624+
AddOutcome::Scheduled
624625
}
625626
}
626627

@@ -663,9 +664,9 @@ mod tests {
663664

664665
#[async_trait]
665666
impl Deadliner for TestDeadliner {
666-
async fn add(&self, duty: Duty) -> bool {
667+
async fn add(&self, duty: Duty) -> AddOutcome {
667668
self.added.lock().unwrap().push(duty);
668-
true
669+
AddOutcome::Scheduled
669670
}
670671
}
671672

crates/core/src/parsigdb/memory_internal_test.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use tokio_util::sync::CancellationToken;
1111

1212
use super::{MemDB, get_threshold_matching, threshold_subscriber};
1313
use crate::{
14-
deadline::Deadliner,
14+
deadline::{AddOutcome, Deadliner},
1515
signeddata::{BeaconCommitteeSelection, SignedSyncMessage, VersionedAttestation},
1616
testutils::random_core_pub_key,
1717
types::{Duty, DutyType, ParSignedData, ParSignedDataSet, SlotNumber},
@@ -198,11 +198,11 @@ impl TestDeadliner {
198198

199199
#[async_trait::async_trait]
200200
impl Deadliner for TestDeadliner {
201-
async fn add(&self, duty: Duty) -> bool {
201+
async fn add(&self, duty: Duty) -> AddOutcome {
202202
self.added
203203
.lock()
204204
.expect("test deadliner lock poisoned")
205205
.push(duty);
206-
true
206+
AddOutcome::Scheduled
207207
}
208208
}

crates/dkg/src/exchanger.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ use tokio_util::sync::CancellationToken;
4949
use tracing::warn;
5050

5151
use pluto_core::{
52-
deadline::Deadliner,
52+
deadline::{AddOutcome, Deadliner},
5353
parsigdb::memory::{
5454
InternalSubscriberError, MemDB, MemDBError, internal_subscriber, threshold_subscriber,
5555
},
@@ -140,8 +140,8 @@ struct NoopDeadliner;
140140

141141
#[async_trait]
142142
impl Deadliner for NoopDeadliner {
143-
async fn add(&self, _duty: Duty) -> bool {
144-
true
143+
async fn add(&self, _duty: Duty) -> AddOutcome {
144+
AddOutcome::Scheduled
145145
}
146146
}
147147

0 commit comments

Comments
 (0)