diff --git a/rivetkit-rust/packages/rivetkit-core/src/actor/context.rs b/rivetkit-rust/packages/rivetkit-core/src/actor/context.rs index de689d402c..514ff9646f 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/actor/context.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/actor/context.rs @@ -133,6 +133,7 @@ pub(crate) struct ActorContextInner { destroy_completed: AtomicBool, destroy_completion_notify: Notify, abort_signal: CancellationToken, + shutdown_deadline: CancellationToken, // Forced-sync: runtime wiring slots are configured through synchronous // lifecycle setup and cloned before sending events. inspector: RwLock>, @@ -228,6 +229,7 @@ impl ActorContext { let lifecycle_event_inbox_capacity = config.lifecycle_event_inbox_capacity; let state_save_interval = config.state_save_interval; let abort_signal = CancellationToken::new(); + let shutdown_deadline = CancellationToken::new(); let sleep = SleepState::new(config.clone()); let ctx = Self(Arc::new(ActorContextInner { kv, @@ -295,6 +297,7 @@ impl ActorContext { destroy_completed: AtomicBool::new(false), destroy_completion_notify: Notify::new(), abort_signal, + shutdown_deadline, inspector: RwLock::new(None), inspector_attach_count: RwLock::new(None), inspector_overlay_tx: RwLock::new(None), @@ -465,6 +468,20 @@ impl ActorContext { self.0.abort_signal.is_cancelled() } + /// Fires when the shutdown grace deadline has elapsed and core is forcing + /// cleanup. Foreign-runtime adapters should abort any in-flight shutdown + /// work (for example `onSleep` / `onDestroy`) when this token is cancelled + /// so resources like SQLite are not torn down mid-operation. + #[doc(hidden)] + pub fn shutdown_deadline_token(&self) -> CancellationToken { + self.0.shutdown_deadline.clone() + } + + #[doc(hidden)] + pub fn cancel_shutdown_deadline(&self) { + self.0.shutdown_deadline.cancel(); + } + /// Deprecated no-op. Use `keep_awake` to hold the actor awake for the /// duration of a future, or `wait_until` to keep work alive across the /// sleep grace period. Retained only for NAPI bridge compatibility. diff --git a/rivetkit-rust/packages/rivetkit-core/src/actor/task.rs b/rivetkit-rust/packages/rivetkit-core/src/actor/task.rs index 54150bafa8..d618ede5c2 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/actor/task.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/actor/task.rs @@ -1489,6 +1489,7 @@ impl ActorTask { if let Some(run_handle) = self.run_handle.as_mut() { run_handle.abort(); } + self.ctx.cancel_shutdown_deadline(); self.ctx.record_shutdown_timeout(grace.reason); tracing::warn!( reason = shutdown_reason_label(grace.reason), diff --git a/rivetkit-typescript/packages/rivetkit-napi/src/napi_actor_events.rs b/rivetkit-typescript/packages/rivetkit-napi/src/napi_actor_events.rs index f4f4222fb8..66e28bdef0 100644 --- a/rivetkit-typescript/packages/rivetkit-napi/src/napi_actor_events.rs +++ b/rivetkit-typescript/packages/rivetkit-napi/src/napi_actor_events.rs @@ -584,27 +584,40 @@ pub(crate) async fn dispatch_event( rivetkit_core::actor::StopReason::Destroy => bindings.on_destroy.clone(), }; let ctx = ctx.clone(); + let shutdown_deadline = ctx.inner().shutdown_deadline_token(); tasks.spawn(async move { - let result: Result<()> = async { - if let Some(callback) = callback { - match reason { - rivetkit_core::actor::StopReason::Sleep => { - call_on_sleep(&callback, &ctx).await - } - rivetkit_core::actor::StopReason::Destroy => { - call_on_destroy(&callback, &ctx).await - } - }?; + let work = async { + let result: Result<()> = async { + if let Some(callback) = callback { + match reason { + rivetkit_core::actor::StopReason::Sleep => { + call_on_sleep(&callback, &ctx).await + } + rivetkit_core::actor::StopReason::Destroy => { + call_on_destroy(&callback, &ctx).await + } + }?; + } + Ok(()) + } + .await; + if let Err(error) = result { + tracing::error!( + actor_id = %ctx.inner().actor_id(), + ?error, + "graceful cleanup callback failed", + ); + } + }; + tokio::select! { + _ = work => {} + _ = shutdown_deadline.cancelled() => { + tracing::warn!( + actor_id = %ctx.inner().actor_id(), + reason = ?reason, + "graceful cleanup aborted by shutdown grace deadline", + ); } - Ok(()) - } - .await; - if let Err(error) = result { - tracing::error!( - actor_id = %ctx.inner().actor_id(), - ?error, - "graceful cleanup callback failed", - ); } reply.send(Ok(())); });