Skip to content

Commit 645f8a4

Browse files
committed
fix(rivetkit-core): abort graceful cleanup task when grace deadline elapses
1 parent 734fc88 commit 645f8a4

3 files changed

Lines changed: 50 additions & 19 deletions

File tree

rivetkit-rust/packages/rivetkit-core/src/actor/context.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ pub(crate) struct ActorContextInner {
133133
destroy_completed: AtomicBool,
134134
destroy_completion_notify: Notify,
135135
abort_signal: CancellationToken,
136+
shutdown_deadline: CancellationToken,
136137
// Forced-sync: runtime wiring slots are configured through synchronous
137138
// lifecycle setup and cloned before sending events.
138139
inspector: RwLock<Option<Inspector>>,
@@ -228,6 +229,7 @@ impl ActorContext {
228229
let lifecycle_event_inbox_capacity = config.lifecycle_event_inbox_capacity;
229230
let state_save_interval = config.state_save_interval;
230231
let abort_signal = CancellationToken::new();
232+
let shutdown_deadline = CancellationToken::new();
231233
let sleep = SleepState::new(config.clone());
232234
let ctx = Self(Arc::new(ActorContextInner {
233235
kv,
@@ -295,6 +297,7 @@ impl ActorContext {
295297
destroy_completed: AtomicBool::new(false),
296298
destroy_completion_notify: Notify::new(),
297299
abort_signal,
300+
shutdown_deadline,
298301
inspector: RwLock::new(None),
299302
inspector_attach_count: RwLock::new(None),
300303
inspector_overlay_tx: RwLock::new(None),
@@ -465,6 +468,20 @@ impl ActorContext {
465468
self.0.abort_signal.is_cancelled()
466469
}
467470

471+
/// Fires when the shutdown grace deadline has elapsed and core is forcing
472+
/// cleanup. Foreign-runtime adapters should abort any in-flight shutdown
473+
/// work (for example `onSleep` / `onDestroy`) when this token is cancelled
474+
/// so resources like SQLite are not torn down mid-operation.
475+
#[doc(hidden)]
476+
pub fn shutdown_deadline_token(&self) -> CancellationToken {
477+
self.0.shutdown_deadline.clone()
478+
}
479+
480+
#[doc(hidden)]
481+
pub fn cancel_shutdown_deadline(&self) {
482+
self.0.shutdown_deadline.cancel();
483+
}
484+
468485
/// Deprecated no-op. Use `keep_awake` to hold the actor awake for the
469486
/// duration of a future, or `wait_until` to keep work alive across the
470487
/// sleep grace period. Retained only for NAPI bridge compatibility.

rivetkit-rust/packages/rivetkit-core/src/actor/task.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1489,6 +1489,7 @@ impl ActorTask {
14891489
if let Some(run_handle) = self.run_handle.as_mut() {
14901490
run_handle.abort();
14911491
}
1492+
self.ctx.cancel_shutdown_deadline();
14921493
self.ctx.record_shutdown_timeout(grace.reason);
14931494
tracing::warn!(
14941495
reason = shutdown_reason_label(grace.reason),

rivetkit-typescript/packages/rivetkit-napi/src/napi_actor_events.rs

Lines changed: 32 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -584,27 +584,40 @@ pub(crate) async fn dispatch_event(
584584
rivetkit_core::actor::StopReason::Destroy => bindings.on_destroy.clone(),
585585
};
586586
let ctx = ctx.clone();
587+
let shutdown_deadline = ctx.inner().shutdown_deadline_token();
587588
tasks.spawn(async move {
588-
let result: Result<()> = async {
589-
if let Some(callback) = callback {
590-
match reason {
591-
rivetkit_core::actor::StopReason::Sleep => {
592-
call_on_sleep(&callback, &ctx).await
593-
}
594-
rivetkit_core::actor::StopReason::Destroy => {
595-
call_on_destroy(&callback, &ctx).await
596-
}
597-
}?;
589+
let work = async {
590+
let result: Result<()> = async {
591+
if let Some(callback) = callback {
592+
match reason {
593+
rivetkit_core::actor::StopReason::Sleep => {
594+
call_on_sleep(&callback, &ctx).await
595+
}
596+
rivetkit_core::actor::StopReason::Destroy => {
597+
call_on_destroy(&callback, &ctx).await
598+
}
599+
}?;
600+
}
601+
Ok(())
602+
}
603+
.await;
604+
if let Err(error) = result {
605+
tracing::error!(
606+
actor_id = %ctx.inner().actor_id(),
607+
?error,
608+
"graceful cleanup callback failed",
609+
);
610+
}
611+
};
612+
tokio::select! {
613+
_ = work => {}
614+
_ = shutdown_deadline.cancelled() => {
615+
tracing::warn!(
616+
actor_id = %ctx.inner().actor_id(),
617+
reason = ?reason,
618+
"graceful cleanup aborted by shutdown grace deadline",
619+
);
598620
}
599-
Ok(())
600-
}
601-
.await;
602-
if let Err(error) = result {
603-
tracing::error!(
604-
actor_id = %ctx.inner().actor_id(),
605-
?error,
606-
"graceful cleanup callback failed",
607-
);
608621
}
609622
reply.send(Ok(()));
610623
});

0 commit comments

Comments
 (0)