Skip to content

Commit 360396f

Browse files
committed
fix(trogon-nats): harden lease correctness
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent f3ed3f8 commit 360396f

File tree

4 files changed

+184
-36
lines changed

4 files changed

+184
-36
lines changed

rsworkspace/crates/trogon-cron/src/mocks.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ impl Lease for MockLeaderLock {
151151
}
152152
}
153153

154-
async fn release(&self) -> Result<(), MockLockError> {
154+
async fn release(&self, _revision: u64) -> Result<(), MockLockError> {
155155
Ok(())
156156
}
157157
}

rsworkspace/crates/trogon-cron/src/scheduler.rs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::time::Duration;
55
use async_nats::jetstream;
66
use chrono::Utc;
77
use futures::{Stream, StreamExt};
8-
use trogon_nats::lease::{LeaderElection, Lease, NatsKvLease, NatsKvLeaseConfig};
8+
use trogon_nats::lease::{LeaderElection, Lease, LeaseTiming, NatsKvLease, NatsKvLeaseConfig};
99
use uuid::Uuid;
1010

1111
use crate::{
@@ -40,7 +40,12 @@ pub struct Scheduler<C = NatsConfigStore, P = jetstream::Context, L = NatsKvLeas
4040
tick_publisher: P,
4141
leader_lock: L,
4242
node_id: String,
43-
leader_renew_interval: Duration,
43+
leader_timing: LeaseTiming,
44+
}
45+
46+
fn default_leader_timing() -> LeaseTiming {
47+
LeaseTiming::new(DEFAULT_LEADER_TTL, DEFAULT_LEADER_RENEW_INTERVAL)
48+
.expect("default leader lease timing must be valid")
4449
}
4550

4651
// ── NATS convenience constructor ──────────────────────────────────────────────
@@ -72,7 +77,7 @@ impl Scheduler<NatsConfigStore, jetstream::Context, NatsKvLease> {
7277
tick_publisher: js,
7378
leader_lock,
7479
node_id: Uuid::new_v4().to_string(),
75-
leader_renew_interval: leader_config.renew_interval(),
80+
leader_timing: leader_config.timing(),
7681
})
7782
}
7883
}
@@ -95,7 +100,7 @@ where
95100
tick_publisher,
96101
leader_lock,
97102
node_id: Uuid::new_v4().to_string(),
98-
leader_renew_interval: DEFAULT_LEADER_RENEW_INTERVAL,
103+
leader_timing: default_leader_timing(),
99104
}
100105
}
101106

@@ -120,11 +125,8 @@ where
120125
let mut jobs = rebuild_jobs_from_snapshot(&HashMap::new(), initial_jobs);
121126

122127
// 3. Leader election.
123-
let mut leader = LeaderElection::new(
124-
self.leader_lock,
125-
self.node_id.clone(),
126-
self.leader_renew_interval,
127-
);
128+
let mut leader =
129+
LeaderElection::new(self.leader_lock, self.node_id.clone(), self.leader_timing);
128130

129131
// 4. Main loop.
130132
let mut tick = tokio::time::interval(TICK_INTERVAL);

0 commit comments

Comments
 (0)