Skip to content

Commit 28cc21e

Browse files
committed
Add schedule_timer_until absolute-deadline timer
Add OrchestrationContext::schedule_timer_until(deadline), an absolute wall-clock deadline timer variant that complements the existing relative schedule_timer. A deadline already in the past fires immediately. The change is purely additive: it reuses the existing timer future and replay path via a shared schedule_timer_at_ms helper, with no changes to the action/event schema, the provider's visible_at handling, or replay semantics. Closes #34
1 parent 08c4e12 commit 28cc21e

3 files changed

Lines changed: 132 additions & 2 deletions

File tree

CHANGELOG.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
### Added
11+
12+
- **`ctx.schedule_timer_until(deadline)`** — A new absolute-deadline timer
13+
variant that fires at a wall-clock `SystemTime` instead of a relative
14+
`Duration`. A deadline already in the past fires immediately. It is a thin
15+
constructor over the existing timer machinery (the durable record is already
16+
an absolute `fire_at_ms`), so there are no changes to the action/event schema
17+
or replay semantics. `schedule_timer` now shares the same internal code path.
18+
1019
### Changed
1120

1221
- **`ctx.new_guid()` now returns a standard UUID v4.** The previous

src/lib.rs

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3535,10 +3535,44 @@ impl OrchestrationContext {
35353535
pub fn schedule_timer(&self, delay: std::time::Duration) -> DurableFuture<()> {
35363536
let delay_ms = delay.as_millis() as u64;
35373537

3538+
let now = {
3539+
let inner = self.inner.lock().expect("Mutex should not be poisoned");
3540+
inner.now_ms()
3541+
};
3542+
let fire_at_ms = now.saturating_add(delay_ms);
3543+
self.schedule_timer_at_ms(fire_at_ms)
3544+
}
3545+
3546+
/// Schedule a timer that fires at an absolute wall-clock deadline.
3547+
///
3548+
/// Equivalent to [`schedule_timer`](Self::schedule_timer) but anchored to an
3549+
/// absolute point in time rather than a delay from "now". A deadline already in
3550+
/// the past fires immediately (next turn). The deadline is recorded durably and
3551+
/// replayed verbatim, exactly like the existing relative timer.
3552+
///
3553+
/// This is useful when the caller already has an absolute target time (a cron
3554+
/// tick, an SLA, a scheduled-at timestamp) and wants to avoid the manual
3555+
/// `deadline - now` clamp and the extra recorded `utc_now` reading that the
3556+
/// relative form would require.
3557+
///
3558+
/// # Example
3559+
/// ```ignore
3560+
/// use std::time::{Duration, SystemTime};
3561+
/// let deadline = SystemTime::now() + Duration::from_secs(30);
3562+
/// ctx.schedule_timer_until(deadline).await;
3563+
/// ```
3564+
pub fn schedule_timer_until(&self, deadline: std::time::SystemTime) -> DurableFuture<()> {
3565+
let fire_at_ms = deadline
3566+
.duration_since(std::time::UNIX_EPOCH)
3567+
.map(|d| d.as_millis() as u64)
3568+
.unwrap_or(0);
3569+
self.schedule_timer_at_ms(fire_at_ms)
3570+
}
3571+
3572+
/// Shared timer constructor over an absolute ms-since-epoch fire time.
3573+
fn schedule_timer_at_ms(&self, fire_at_ms: u64) -> DurableFuture<()> {
35383574
let mut inner = self.inner.lock().expect("Mutex should not be poisoned");
35393575

3540-
let now = inner.now_ms();
3541-
let fire_at_ms = now.saturating_add(delay_ms);
35423576
let token = inner.emit_action(Action::CreateTimer {
35433577
scheduling_event_id: 0,
35443578
fire_at_ms,

tests/timer_tests.rs

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -603,3 +603,90 @@ async fn timer_fires_at_correct_time_after_previous_timer() {
603603
_ => panic!("Unexpected status: {status:?}"),
604604
}
605605
}
606+
607+
// ============================================================================
608+
// ABSOLUTE-DEADLINE TIMER TESTS (schedule_timer_until)
609+
// ============================================================================
610+
611+
#[tokio::test]
612+
async fn schedule_timer_until_fires_at_deadline() {
613+
let (store, _td) = create_sqlite_store().await;
614+
615+
const DELAY_MS: u64 = 50;
616+
let orch = |ctx: OrchestrationContext, _input: String| async move {
617+
let deadline = std::time::SystemTime::now() + Duration::from_millis(DELAY_MS);
618+
ctx.schedule_timer_until(deadline).await;
619+
Ok("done".to_string())
620+
};
621+
622+
let reg = OrchestrationRegistry::builder()
623+
.register("TimerUntil", orch)
624+
.build();
625+
let acts = ActivityRegistry::builder().build();
626+
let rt = runtime::Runtime::start_with_store(store.clone(), acts, reg).await;
627+
let client = duroxide::Client::new(store.clone());
628+
629+
let start = std::time::Instant::now();
630+
client
631+
.start_orchestration("inst-until", "TimerUntil", "")
632+
.await
633+
.unwrap();
634+
635+
let status = client
636+
.wait_for_orchestration("inst-until", Duration::from_secs(5))
637+
.await
638+
.unwrap();
639+
let elapsed = start.elapsed().as_millis() as u64;
640+
641+
assert!(
642+
elapsed >= DELAY_MS,
643+
"Timer fired too early: expected >={DELAY_MS}ms, got {elapsed}ms"
644+
);
645+
646+
match status {
647+
duroxide::runtime::OrchestrationStatus::Completed { output, .. } => {
648+
assert_eq!(output, "done");
649+
}
650+
other => panic!("Unexpected status: {other:?}"),
651+
}
652+
653+
drop(rt);
654+
}
655+
656+
#[tokio::test]
657+
async fn schedule_timer_until_past_deadline_fires_immediately() {
658+
let (store, _td) = create_sqlite_store().await;
659+
660+
// A deadline well in the past should fire immediately (next turn).
661+
let orch = |ctx: OrchestrationContext, _input: String| async move {
662+
let deadline = std::time::UNIX_EPOCH; // far in the past
663+
ctx.schedule_timer_until(deadline).await;
664+
Ok("done".to_string())
665+
};
666+
667+
let reg = OrchestrationRegistry::builder()
668+
.register("TimerUntilPast", orch)
669+
.build();
670+
let acts = ActivityRegistry::builder().build();
671+
let rt = runtime::Runtime::start_with_store(store.clone(), acts, reg).await;
672+
let client = duroxide::Client::new(store.clone());
673+
674+
client
675+
.start_orchestration("inst-until-past", "TimerUntilPast", "")
676+
.await
677+
.unwrap();
678+
679+
let status = client
680+
.wait_for_orchestration("inst-until-past", Duration::from_secs(5))
681+
.await
682+
.unwrap();
683+
684+
match status {
685+
duroxide::runtime::OrchestrationStatus::Completed { output, .. } => {
686+
assert_eq!(output, "done");
687+
}
688+
other => panic!("Unexpected status: {other:?}"),
689+
}
690+
691+
drop(rt);
692+
}

0 commit comments

Comments
 (0)