Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions rivetkit-rust/packages/rivetkit-core/src/actor/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ pub(crate) struct ActorContextInner {
destroy_completed: AtomicBool,
destroy_completion_notify: Notify,
abort_signal: CancellationToken,
shutdown_deadline: CancellationToken,
// Forced-sync: runtime wiring slots are configured through synchronous
// lifecycle setup and cloned before sending events.
inspector: RwLock<Option<Inspector>>,
Expand Down Expand Up @@ -228,6 +229,7 @@ impl ActorContext {
let lifecycle_event_inbox_capacity = config.lifecycle_event_inbox_capacity;
let state_save_interval = config.state_save_interval;
let abort_signal = CancellationToken::new();
let shutdown_deadline = CancellationToken::new();
let sleep = SleepState::new(config.clone());
let ctx = Self(Arc::new(ActorContextInner {
kv,
Expand Down Expand Up @@ -295,6 +297,7 @@ impl ActorContext {
destroy_completed: AtomicBool::new(false),
destroy_completion_notify: Notify::new(),
abort_signal,
shutdown_deadline,
inspector: RwLock::new(None),
inspector_attach_count: RwLock::new(None),
inspector_overlay_tx: RwLock::new(None),
Expand Down Expand Up @@ -465,6 +468,20 @@ impl ActorContext {
self.0.abort_signal.is_cancelled()
}

/// Fires when the shutdown grace deadline has elapsed and core is forcing
/// cleanup. Foreign-runtime adapters should abort any in-flight shutdown
/// work (for example `onSleep` / `onDestroy`) when this token is cancelled
/// so resources like SQLite are not torn down mid-operation.
#[doc(hidden)]
pub fn shutdown_deadline_token(&self) -> CancellationToken {
self.0.shutdown_deadline.clone()
}

#[doc(hidden)]
pub fn cancel_shutdown_deadline(&self) {
self.0.shutdown_deadline.cancel();
}

/// Deprecated no-op. Use `keep_awake` to hold the actor awake for the
/// duration of a future, or `wait_until` to keep work alive across the
/// sleep grace period. Retained only for NAPI bridge compatibility.
Expand Down
1 change: 1 addition & 0 deletions rivetkit-rust/packages/rivetkit-core/src/actor/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1489,6 +1489,7 @@ impl ActorTask {
if let Some(run_handle) = self.run_handle.as_mut() {
run_handle.abort();
}
self.ctx.cancel_shutdown_deadline();
self.ctx.record_shutdown_timeout(grace.reason);
tracing::warn!(
reason = shutdown_reason_label(grace.reason),
Expand Down
51 changes: 32 additions & 19 deletions rivetkit-typescript/packages/rivetkit-napi/src/napi_actor_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -584,27 +584,40 @@ pub(crate) async fn dispatch_event(
rivetkit_core::actor::StopReason::Destroy => bindings.on_destroy.clone(),
};
let ctx = ctx.clone();
let shutdown_deadline = ctx.inner().shutdown_deadline_token();
tasks.spawn(async move {
let result: Result<()> = async {
if let Some(callback) = callback {
match reason {
rivetkit_core::actor::StopReason::Sleep => {
call_on_sleep(&callback, &ctx).await
}
rivetkit_core::actor::StopReason::Destroy => {
call_on_destroy(&callback, &ctx).await
}
}?;
let work = async {
let result: Result<()> = async {
if let Some(callback) = callback {
match reason {
rivetkit_core::actor::StopReason::Sleep => {
call_on_sleep(&callback, &ctx).await
}
rivetkit_core::actor::StopReason::Destroy => {
call_on_destroy(&callback, &ctx).await
}
}?;
}
Ok(())
}
.await;
if let Err(error) = result {
tracing::error!(
actor_id = %ctx.inner().actor_id(),
?error,
"graceful cleanup callback failed",
);
}
};
tokio::select! {
_ = work => {}
_ = shutdown_deadline.cancelled() => {
tracing::warn!(
actor_id = %ctx.inner().actor_id(),
reason = ?reason,
"graceful cleanup aborted by shutdown grace deadline",
);
}
Ok(())
}
.await;
if let Err(error) = result {
tracing::error!(
actor_id = %ctx.inner().actor_id(),
?error,
"graceful cleanup callback failed",
);
}
reply.send(Ok(()));
});
Expand Down
Loading