Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs-internal/engine/rivetkit-core-internals.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ Two-phase:
- `SleepGrace` fires `onSleep` immediately and keeps dispatch/save timers live.
- `SleepFinalize` gates dispatch, suspends alarms, and runs teardown.

Sleep grace must fire the actor abort signal on entry and wait for the run handler to exit before finalize. Destroy abort firing remains unchanged.
Sleep grace fires the actor abort signal on entry and waits for the run handler to exit before finalize.

Finalize:

Expand Down
2 changes: 1 addition & 1 deletion docs-internal/engine/sleep-sequence.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ Removing `preventSleep` deleted both predicate branches. Any future sleep-affect

- `start_grace(reason)` fires at the start of `SleepGrace` / `DestroyGrace`. It cancels the sleep idle timer, cancels the actor abort signal (`actor_abort_signal`), installs a `SleepGraceState` with the effective grace deadline, and resets the sleep timer to arm the grace tick.
- The actor abort signal is a soft signal: "shutdown has started, please wrap up." User code observes it via `c.abortSignal`. It does not force-stop work.
- For destroy, the abort signal may fire earlier than grace entry because `ctx.destroy()` cancels the abort token immediately via `mark_destroy_requested(...)`.
- Destroy requests also use the normal grace path. The actor abort signal fires when destroy grace starts.

## Grace deadline enforcement

Expand Down
10 changes: 4 additions & 6 deletions rivetkit-rust/packages/rivetkit-core/src/actor/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,19 +478,17 @@ impl ActorContext {
self.flush_on_shutdown();
self.0.destroy_requested.store(true, Ordering::SeqCst);
self.0.destroy_completed.store(false, Ordering::SeqCst);
self.0.abort_signal.lock().cancel();
}

#[cfg(feature = "wasm-runtime")]
fn mark_destroy_requested_without_spawn(&self) {
self.cancel_sleep_timer();
self.0.destroy_requested.store(true, Ordering::SeqCst);
self.0.destroy_completed.store(false, Ordering::SeqCst);
self.0.abort_signal.lock().cancel();
}

#[doc(hidden)]
pub fn cancel_abort_signal_for_sleep(&self) {
pub fn cancel_actor_abort_signal(&self) {
self.0.abort_signal.lock().cancel();
}

Expand All @@ -500,9 +498,9 @@ impl ActorContext {
return;
}

// Sleep cancels the generation abort signal to break queue waits and the
// run loop out of blocking calls. A restarted actor needs a fresh signal
// so the next generation can wait normally.
// Sleep or destroy cancels the generation abort signal to break actor
// scoped waits. A restarted actor needs a fresh signal so the next
// generation can wait normally.
let next_signal = CancellationToken::new();
*abort_signal = next_signal.clone();
*self.0.queue_abort_signal.lock() = next_signal;
Expand Down
19 changes: 10 additions & 9 deletions rivetkit-rust/packages/rivetkit-core/src/actor/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1528,9 +1528,7 @@ impl ActorTask {
let grace_period = self.factory.config().effective_sleep_grace_period();
self.sleep_deadline = None;
self.ctx.cancel_sleep_timer();
// Entering grace cancels the actor abort signal so user code blocked on
// queues or other actor scoped waits can unwind and let sleep finalize.
self.ctx.cancel_abort_signal_for_sleep();
self.ctx.cancel_actor_abort_signal();
self.sleep_grace = Some(SleepGraceState {
deadline: Instant::now() + grace_period,
reason,
Expand Down Expand Up @@ -1745,15 +1743,18 @@ impl ActorTask {
ShutdownKind::Sleep => LifecycleState::SleepFinalize,
ShutdownKind::Destroy => LifecycleState::Destroying,
});
self.save_final_state().await?;
self.close_actor_event_channel();
self.join_aborted_run_handle().await;
Self::finish_shutdown_cleanup_with_ctx(self.ctx.clone(), reason).await?;
if matches!(reason, ShutdownKind::Destroy) {
let result: Result<()> = async {
self.save_final_state().await?;
self.close_actor_event_channel();
self.join_aborted_run_handle().await;
Self::finish_shutdown_cleanup_with_ctx(self.ctx.clone(), reason).await
}
.await;
if result.is_ok() && matches!(reason, ShutdownKind::Destroy) {
self.ctx.mark_destroy_completed();
}
self.ctx.record_shutdown_wait(reason, started_at.elapsed());
Ok(())
result
}

async fn save_final_state(&mut self) -> Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion rivetkit-rust/packages/rivetkit-core/tests/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ mod moved_tests {
});

yield_now().await;
queue.mark_destroy_requested();
queue.cancel_actor_abort_signal();

let error = wait
.await
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,32 @@ export const destroyActor = actor({
},
},
});

export const destroyAbortSignalActor = actor({
state: {
abortEventCount: 0,
},
onDestroy: async (c) => {
const client = c.client<typeof registry>();
const observer = client.destroyObserver.getOrCreate(["observer"]);
await observer.notifyDestroyed(c.key.join("/"));
},
actions: {
requestDestroy: (c) => {
const beforeDestroyAborted = c.aborted;
const beforeDestroySignalAborted = c.abortSignal.aborted;
c.abortSignal.addEventListener("abort", () => {
c.state.abortEventCount += 1;
});
c.destroy();

return {
beforeDestroyAborted,
beforeDestroySignalAborted,
afterDestroyAborted: c.aborted,
afterDestroySignalAborted: c.abortSignal.aborted,
abortEventCountAfterDestroy: c.state.abortEventCount,
};
},
},
});
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ import {
dbLifecycleFailing,
dbLifecycleObserver,
} from "./db-lifecycle";
import { destroyActor, destroyObserver } from "./destroy";
import {
destroyAbortSignalActor,
destroyActor,
destroyObserver,
} from "./destroy";
import { customTimeoutActor, errorHandlingActor } from "./error-handling";
import { fileSystemHibernationCleanupActor } from "./file-system-hibernation-cleanup";
import { hibernationActor, hibernationSleepWindowActor } from "./hibernation";
Expand Down Expand Up @@ -281,6 +285,7 @@ export const registry = setup({
// From destroy.ts
destroyActor,
destroyObserver,
destroyAbortSignalActor,
// From hibernation.ts
hibernationActor,
hibernationSleepWindowActor,
Expand Down
4 changes: 2 additions & 2 deletions rivetkit-typescript/packages/rivetkit/src/actor/config.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { z } from "zod/v4";

Check failure on line 1 in rivetkit-typescript/packages/rivetkit/src/actor/config.ts

View workflow job for this annotation

GitHub Actions / RivetKit / Quality Check

format

Formatter would have printed the following content:
import type { UniversalWebSocket } from "@/common/websocket-interface";
import type {
AnyDatabaseProvider,
Expand Down Expand Up @@ -1227,7 +1227,7 @@
* shutdown window (`sleepGracePeriod`) cover deferred work.
*
* The handler receives an abort signal via `c.abortSignal` and a
* `c.aborted` alias for loop checks. Use these to gracefully exit.
* `c.aborted` alias. Use these to gracefully exit when shutdown starts.
*
* If this handler exits, the actor will follow the normal idle sleep timeout
* once it becomes idle.
Expand Down Expand Up @@ -1770,7 +1770,7 @@
.number()
.optional()
.describe(
`Max time in ms for the graceful shutdown window. Covers lifecycle hooks (onSleep, onDestroy), the run handler abort wait, async raw WebSocket handlers, disconnect callbacks, and final state serialization. Default: ${DEFAULT_SLEEP_GRACE_PERIOD}.`,
`Max time in ms for the graceful shutdown window. Covers lifecycle hooks (onSleep, onDestroy), the run handler wait, async raw WebSocket handlers, disconnect callbacks, and final state serialization. Default: ${DEFAULT_SLEEP_GRACE_PERIOD}.`,
),
onDestroyTimeout: z
.number()
Expand Down
35 changes: 13 additions & 22 deletions rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -957,14 +957,7 @@ export class ActorInstance<
// Scheduled events are persisted and will be re-initialized
// on wake via initializeAlarms().
this.driver.cancelAlarm?.(this.#actorId);

// Abort listeners in the canonical stop path.
// This must run for all stop modes, including sleep and remote stop.
// Destroy may have already triggered an early abort, but repeating abort
// is intentional and safe.
try {
this.#abortController.abort();
} catch {}
this.#abortActorSignal();

// The run-handler join, lifecycle hooks, and remaining shutdown
// tasks all share the single sleepGracePeriod budget.
Expand Down Expand Up @@ -1030,11 +1023,8 @@ export class ActorInstance<
}

this.driver.cancelAlarm?.(this.#actorId);
this.#abortActorSignal();
this.stateManager.clearPendingSaveTimeout();

try {
this.#abortController.abort();
} catch {}
} finally {
this.#shutdownComplete = true;
await this.#cleanupDatabase();
Expand Down Expand Up @@ -1087,14 +1077,6 @@ export class ActorInstance<
}
this.#destroyCalled = true;

// Abort immediately so in flight waits can exit before the driver stop
// handshake completes.
// The onStop path will call abort again as a safety net for all stop
// modes.
try {
this.#abortController.abort();
} catch {}

const destroy = this.driver.startDestroy.bind(
this.driver,
this.#actorId,
Expand All @@ -1108,6 +1090,15 @@ export class ActorInstance<
});
}

#abortActorSignal() {
if (this.#abortController.signal.aborted) {
return;
}
try {
this.#abortController.abort();
} catch {}
}

// MARK: - HTTP Request Tracking
beginHonoHttpRequest() {
this.#activeHonoHttpRequests++;
Expand Down Expand Up @@ -2070,7 +2061,7 @@ export class ActorInstance<

if (timeoutMs <= 0) {
this.#rLog.warn({
msg: "run handler did not complete in time, it may have leaked - ensure you use c.aborted (or the abort signal c.abortSignal) to exit gracefully",
msg: "run handler did not complete in time, it may have leaked - ensure long-running work settles before shutdown finalizes",
timeoutMs,
});
return;
Expand All @@ -2085,7 +2076,7 @@ export class ActorInstance<

if (timedOut) {
this.#rLog.warn({
msg: "run handler did not complete in time, it may have leaked - ensure you use c.aborted (or the abort signal c.abortSignal) to exit gracefully",
msg: "run handler did not complete in time, it may have leaked - ensure long-running work settles before shutdown finalizes",
timeoutMs,
});
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,30 @@ describeDriverMatrix("Actor Destroy", (driverTestConfig) => {
expect(newValue).toBe(0);
});

test("ctx.destroy does not synchronously abort actor signal", async (c) => {
const { client } = await setupDriverTest(c, driverTestConfig);
const actorKey = `test-destroy-abort-signal-${crypto.randomUUID()}`;
const observer = client.destroyObserver.getOrCreate(["observer"]);
await observer.reset();

const result = await client.destroyAbortSignalActor
.getOrCreate([actorKey])
.requestDestroy();

expect(result).toEqual({
beforeDestroyAborted: false,
beforeDestroySignalAborted: false,
afterDestroyAborted: false,
afterDestroySignalAborted: false,
abortEventCountAfterDestroy: 0,
});

// Poll until onDestroy records so this action covered the real destroy path.
await vi.waitFor(async () => {
expect(await observer.wasDestroyed(actorKey)).toBe(true);
});
});

test("actor destroy clears ephemeral vars on same-key recreation", async (c) => {
const { client } = await setupDriverTest(c, driverTestConfig);

Expand Down
2 changes: 1 addition & 1 deletion website/src/content/docs/actors/actions.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ Actions have a single return value. To stream realtime data in response to an ac

## Canceling Long-Running Actions

For operations that should be cancelable on-demand, create your own `AbortController` and chain it with `c.abortSignal` for automatic cleanup on actor shutdown.
For operations that should be cancelable on-demand, create your own `AbortController`. Chain it with `c.abortSignal` so actor shutdown also cancels the operation.

```typescript
import { actor } from "rivetkit";
Expand Down
2 changes: 1 addition & 1 deletion website/src/content/docs/actors/index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ const userAccount = actor({

### Lifecycle Hooks

Actors support hooks for initialization, background processing, connections, networking, and state changes. Use `run` for long-lived background loops, and exit cleanly on shutdown with `c.aborted` or `c.abortSignal`.
Actors support hooks for initialization, background processing, connections, networking, and state changes. Use `run` for long-lived background loops, and use `c.aborted` or `c.abortSignal` for graceful shutdown.

```ts
import { actor, event, queue } from "rivetkit";
Expand Down
4 changes: 2 additions & 2 deletions website/src/content/docs/actors/lifecycle.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ const tickActor = actor({
c.state.tickCount++;
c.log.info({ msg: "tick", count: c.state.tickCount });

// Wait 1 second, but exit early if aborted
// Wait 1 second. Final shutdown also resolves this wait.
await new Promise<void>((resolve) => {
const timeout = setTimeout(resolve, 1000);
c.abortSignal.addEventListener("abort", () => {
Expand Down Expand Up @@ -876,7 +876,7 @@ When an actor sleeps or is destroyed, it enters the graceful shutdown window:

1. `c.abortSignal` fires and `c.aborted` becomes `true`. New connections and dispatch are rejected. Alarm timeouts are cancelled. On sleep, scheduled events are persisted and will be re-armed when the actor wakes.
2. `onSleep` or `onDestroy` and `onDisconnect` for each closing connection run during the same window. User `waitUntil` promises and async raw WebSocket handlers are drained. Hibernatable WebSocket connections are preserved on sleep and closed on destroy.
3. Once graceful work has completed, state is saved and the database is cleaned up.
3. Once graceful work has completed, state is saved and final cleanup runs.

The entire window is bounded by `sleepGracePeriod` on both sleep and destroy. Defaults to 15 seconds. If the window is exceeded, the actor proceeds to state save anyway.

Expand Down
2 changes: 1 addition & 1 deletion website/src/content/docs/actors/queues.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ This means you can run normal code in `run` without worrying about sleep interru
- Implement connection auth in `onBeforeConnect`. See [Authentication](/docs/actors/authentication).
- Route most state changes through one queue loop so ordering stays predictable.
- If you need more complex multi-step run loops, consider using workflows.
- Use `c.aborted` and `c.abortSignal` for graceful shutdown and cancellation.
- Use `c.aborted` and `c.abortSignal` for actor shutdown. Use your own `AbortController` for earlier loop cancellation.
- Add `timeout` when callers need bounded wait behavior.
- Use `wait: true` only when the caller actually needs a response.

Expand Down
Loading