Skip to content

Commit f3ed3f8

Browse files
committed
fix(scheduling): harden timing invariants
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent 28ac071 commit f3ed3f8

4 files changed

Lines changed: 47 additions & 42 deletions

File tree

rsworkspace/crates/trogon-cron/src/domain.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ impl JobSchedule {
177177
fn next_fire_after(&self, from: DateTime<Utc>) -> Option<DateTime<Utc>> {
178178
match self {
179179
Self::Interval(interval) => {
180-
Some(from + chrono::Duration::seconds(interval.as_chrono_seconds()))
180+
from.checked_add_signed(chrono::Duration::seconds(interval.as_chrono_seconds()))
181181
}
182182
Self::Cron(expr) => expr.next_after(from),
183183
}
@@ -499,4 +499,11 @@ mod tests {
499499
JobSchedule::Interval(_) => panic!("expected cron schedule"),
500500
}
501501
}
502+
503+
#[test]
504+
fn interval_next_fire_overflow_returns_none() {
505+
let schedule = JobSchedule::Interval(IntervalSeconds::new(1).unwrap());
506+
507+
assert_eq!(schedule.next_fire_after(DateTime::<Utc>::MAX_UTC), None);
508+
}
502509
}

rsworkspace/crates/trogon-cron/src/scheduler.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ impl Scheduler<NatsConfigStore, jetstream::Context, NatsKvLease> {
7272
tick_publisher: js,
7373
leader_lock,
7474
node_id: Uuid::new_v4().to_string(),
75-
leader_renew_interval: leader_config.renew_interval().get(),
75+
leader_renew_interval: leader_config.renew_interval(),
7676
})
7777
}
7878
}

rsworkspace/crates/trogon-nats/src/lease.rs

Lines changed: 36 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -46,44 +46,47 @@ impl LeaseKey {
4646
}
4747
}
4848

49-
#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
50-
pub struct LeaseTtl(Duration);
49+
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
50+
pub struct LeaseTiming {
51+
ttl: Duration,
52+
renew_interval: Duration,
53+
}
5154

52-
impl LeaseTtl {
53-
pub fn new(ttl: Duration) -> Result<Self, LeaseConfigError> {
55+
impl LeaseTiming {
56+
pub fn new(ttl: Duration, renew_interval: Duration) -> Result<Self, LeaseConfigError> {
5457
if ttl.is_zero() {
5558
return Err(LeaseConfigError::ZeroTtl);
5659
}
57-
Ok(Self(ttl))
58-
}
59-
60-
pub fn get(self) -> Duration {
61-
self.0
62-
}
63-
}
64-
65-
#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
66-
pub struct LeaseRenewInterval(Duration);
67-
68-
impl LeaseRenewInterval {
69-
pub fn new(renew_interval: Duration) -> Result<Self, LeaseConfigError> {
7060
if renew_interval.is_zero() {
7161
return Err(LeaseConfigError::ZeroRenewInterval);
7262
}
73-
Ok(Self(renew_interval))
63+
if renew_interval >= ttl {
64+
return Err(LeaseConfigError::RenewIntervalNotLessThanTtl {
65+
renew_interval,
66+
ttl,
67+
});
68+
}
69+
70+
Ok(Self {
71+
ttl,
72+
renew_interval,
73+
})
74+
}
75+
76+
pub fn ttl(self) -> Duration {
77+
self.ttl
7478
}
7579

76-
pub fn get(self) -> Duration {
77-
self.0
80+
pub fn renew_interval(self) -> Duration {
81+
self.renew_interval
7882
}
7983
}
8084

8185
#[derive(Debug, Clone, Eq, PartialEq)]
8286
pub struct NatsKvLeaseConfig {
8387
bucket: LeaseBucket,
8488
key: LeaseKey,
85-
ttl: LeaseTtl,
86-
renew_interval: LeaseRenewInterval,
89+
timing: LeaseTiming,
8790
}
8891

8992
impl NatsKvLeaseConfig {
@@ -95,21 +98,12 @@ impl NatsKvLeaseConfig {
9598
) -> Result<Self, LeaseConfigError> {
9699
let bucket = LeaseBucket::new(bucket)?;
97100
let key = LeaseKey::new(key)?;
98-
let ttl = LeaseTtl::new(ttl)?;
99-
let renew_interval = LeaseRenewInterval::new(renew_interval)?;
100-
101-
if renew_interval.get() >= ttl.get() {
102-
return Err(LeaseConfigError::RenewIntervalNotLessThanTtl {
103-
renew_interval: renew_interval.get(),
104-
ttl: ttl.get(),
105-
});
106-
}
101+
let timing = LeaseTiming::new(ttl, renew_interval)?;
107102

108103
Ok(Self {
109104
bucket,
110105
key,
111-
ttl,
112-
renew_interval,
106+
timing,
113107
})
114108
}
115109

@@ -121,12 +115,16 @@ impl NatsKvLeaseConfig {
121115
&self.key
122116
}
123117

124-
pub fn ttl(&self) -> LeaseTtl {
125-
self.ttl
118+
pub fn timing(&self) -> LeaseTiming {
119+
self.timing
126120
}
127121

128-
pub fn renew_interval(&self) -> LeaseRenewInterval {
129-
self.renew_interval
122+
pub fn ttl(&self) -> Duration {
123+
self.timing.ttl()
124+
}
125+
126+
pub fn renew_interval(&self) -> Duration {
127+
self.timing.renew_interval()
130128
}
131129
}
132130

@@ -295,7 +293,7 @@ impl NatsKvLease {
295293
.create_key_value(kv::Config {
296294
bucket: config.bucket().as_str().to_owned(),
297295
history: 1,
298-
max_age: config.ttl().get(),
296+
max_age: config.ttl(),
299297
..Default::default()
300298
})
301299
.await

rsworkspace/crates/trogon-nats/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@ pub use client::{FlushClient, PublishClient, RequestClient, SubscribeClient};
5656
pub use connect::{ConnectError, connect};
5757
pub use constants::REQ_ID_HEADER;
5858
pub use lease::{
59-
LeaderElection, Lease, LeaseBucket, LeaseConfigError, LeaseError, LeaseKey, LeaseRenewInterval,
60-
LeaseTtl, NatsKvLease, NatsKvLeaseConfig, TryAcquireOutcome,
59+
LeaderElection, Lease, LeaseBucket, LeaseConfigError, LeaseError, LeaseKey, LeaseTiming,
60+
NatsKvLease, NatsKvLeaseConfig, TryAcquireOutcome,
6161
};
6262
pub use messaging::{
6363
FlushPolicy, NatsError, PublishOperationError, PublishOptions, PublishOptionsBuilder,

0 commit comments

Comments
 (0)