Skip to content

Commit c2198e9

Browse files
committed
fix(rivetkit): fire abort signal on shutdown grace
1 parent 314c8ff commit c2198e9

11 files changed

Lines changed: 38 additions & 47 deletions

File tree

docs-internal/engine/rivetkit-core-internals.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ Two-phase:
8585
- `SleepGrace` fires `onSleep` immediately and keeps dispatch/save timers live.
8686
- `SleepFinalize` gates dispatch, suspends alarms, and runs teardown.
8787

88-
Sleep grace must fire the actor abort signal on entry and wait for the run handler to exit before finalize. Destroy abort firing remains unchanged.
88+
Sleep grace fires the actor abort signal on entry and waits for the run handler to exit before finalize.
8989

9090
Finalize:
9191

docs-internal/engine/sleep-sequence.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ Removing `preventSleep` deleted both predicate branches. Any future sleep-affect
3939

4040
- `start_grace(reason)` fires at the start of `SleepGrace` / `DestroyGrace`. It cancels the sleep idle timer, cancels the actor abort signal (`actor_abort_signal`), installs a `SleepGraceState` with the effective grace deadline, and resets the sleep timer to arm the grace tick.
4141
- The actor abort signal is a soft signal: "shutdown has started, please wrap up." User code observes it via `c.abortSignal`. It does not force-stop work.
42-
- For destroy, the abort signal may fire earlier than grace entry because `ctx.destroy()` cancels the abort token immediately via `mark_destroy_requested(...)`.
42+
- Destroy requests also use the normal grace path. The actor abort signal fires when destroy grace starts.
4343

4444
## Grace deadline enforcement
4545

rivetkit-rust/packages/rivetkit-core/src/actor/context.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -478,19 +478,17 @@ impl ActorContext {
478478
self.flush_on_shutdown();
479479
self.0.destroy_requested.store(true, Ordering::SeqCst);
480480
self.0.destroy_completed.store(false, Ordering::SeqCst);
481-
self.0.abort_signal.lock().cancel();
482481
}
483482

484483
#[cfg(feature = "wasm-runtime")]
485484
fn mark_destroy_requested_without_spawn(&self) {
486485
self.cancel_sleep_timer();
487486
self.0.destroy_requested.store(true, Ordering::SeqCst);
488487
self.0.destroy_completed.store(false, Ordering::SeqCst);
489-
self.0.abort_signal.lock().cancel();
490488
}
491489

492490
#[doc(hidden)]
493-
pub fn cancel_abort_signal_for_sleep(&self) {
491+
pub fn cancel_actor_abort_signal(&self) {
494492
self.0.abort_signal.lock().cancel();
495493
}
496494

@@ -500,9 +498,9 @@ impl ActorContext {
500498
return;
501499
}
502500

503-
// Sleep cancels the generation abort signal to break queue waits and the
504-
// run loop out of blocking calls. A restarted actor needs a fresh signal
505-
// so the next generation can wait normally.
501+
// Sleep or destroy cancels the generation abort signal to break actor
502+
// scoped waits. A restarted actor needs a fresh signal so the next
503+
// generation can wait normally.
506504
let next_signal = CancellationToken::new();
507505
*abort_signal = next_signal.clone();
508506
*self.0.queue_abort_signal.lock() = next_signal;

rivetkit-rust/packages/rivetkit-core/src/actor/task.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1528,9 +1528,7 @@ impl ActorTask {
15281528
let grace_period = self.factory.config().effective_sleep_grace_period();
15291529
self.sleep_deadline = None;
15301530
self.ctx.cancel_sleep_timer();
1531-
// Entering grace cancels the actor abort signal so user code blocked on
1532-
// queues or other actor scoped waits can unwind and let sleep finalize.
1533-
self.ctx.cancel_abort_signal_for_sleep();
1531+
self.ctx.cancel_actor_abort_signal();
15341532
self.sleep_grace = Some(SleepGraceState {
15351533
deadline: Instant::now() + grace_period,
15361534
reason,
@@ -1745,15 +1743,18 @@ impl ActorTask {
17451743
ShutdownKind::Sleep => LifecycleState::SleepFinalize,
17461744
ShutdownKind::Destroy => LifecycleState::Destroying,
17471745
});
1748-
self.save_final_state().await?;
1749-
self.close_actor_event_channel();
1750-
self.join_aborted_run_handle().await;
1751-
Self::finish_shutdown_cleanup_with_ctx(self.ctx.clone(), reason).await?;
1752-
if matches!(reason, ShutdownKind::Destroy) {
1746+
let result: Result<()> = async {
1747+
self.save_final_state().await?;
1748+
self.close_actor_event_channel();
1749+
self.join_aborted_run_handle().await;
1750+
Self::finish_shutdown_cleanup_with_ctx(self.ctx.clone(), reason).await
1751+
}
1752+
.await;
1753+
if result.is_ok() && matches!(reason, ShutdownKind::Destroy) {
17531754
self.ctx.mark_destroy_completed();
17541755
}
17551756
self.ctx.record_shutdown_wait(reason, started_at.elapsed());
1756-
Ok(())
1757+
result
17571758
}
17581759

17591760
async fn save_final_state(&mut self) -> Result<()> {

rivetkit-rust/packages/rivetkit-core/tests/queue.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ mod moved_tests {
162162
});
163163

164164
yield_now().await;
165-
queue.mark_destroy_requested();
165+
queue.cancel_actor_abort_signal();
166166

167167
let error = wait
168168
.await

rivetkit-typescript/packages/rivetkit/src/actor/config.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1227,7 +1227,7 @@ interface BaseActorConfig<
12271227
* shutdown window (`sleepGracePeriod`) cover deferred work.
12281228
*
12291229
* The handler receives an abort signal via `c.abortSignal` and a
1230-
* `c.aborted` alias for loop checks. Use these to gracefully exit.
1230+
* `c.aborted` alias. Use these to gracefully exit when shutdown starts.
12311231
*
12321232
* If this handler exits, the actor will follow the normal idle sleep timeout
12331233
* once it becomes idle.
@@ -1770,7 +1770,7 @@ export const DocActorOptionsSchema = z
17701770
.number()
17711771
.optional()
17721772
.describe(
1773-
`Max time in ms for the graceful shutdown window. Covers lifecycle hooks (onSleep, onDestroy), the run handler abort wait, async raw WebSocket handlers, disconnect callbacks, and final state serialization. Default: ${DEFAULT_SLEEP_GRACE_PERIOD}.`,
1773+
`Max time in ms for the graceful shutdown window. Covers lifecycle hooks (onSleep, onDestroy), the run handler wait, async raw WebSocket handlers, disconnect callbacks, and final state serialization. Default: ${DEFAULT_SLEEP_GRACE_PERIOD}.`,
17741774
),
17751775
onDestroyTimeout: z
17761776
.number()

rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts

Lines changed: 14 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -957,14 +957,7 @@ export class ActorInstance<
957957
// Scheduled events are persisted and will be re-initialized
958958
// on wake via initializeAlarms().
959959
this.driver.cancelAlarm?.(this.#actorId);
960-
961-
// Abort listeners in the canonical stop path.
962-
// This must run for all stop modes, including sleep and remote stop.
963-
// Destroy may have already triggered an early abort, but repeating abort
964-
// is intentional and safe.
965-
try {
966-
this.#abortController.abort();
967-
} catch {}
960+
this.#abortActorSignal();
968961

969962
// The run-handler join, lifecycle hooks, and remaining shutdown
970963
// tasks all share the single sleepGracePeriod budget.
@@ -1030,11 +1023,8 @@ export class ActorInstance<
10301023
}
10311024

10321025
this.driver.cancelAlarm?.(this.#actorId);
1026+
this.#abortActorSignal();
10331027
this.stateManager.clearPendingSaveTimeout();
1034-
1035-
try {
1036-
this.#abortController.abort();
1037-
} catch {}
10381028
} finally {
10391029
this.#shutdownComplete = true;
10401030
await this.#cleanupDatabase();
@@ -1086,14 +1076,7 @@ export class ActorInstance<
10861076
return;
10871077
}
10881078
this.#destroyCalled = true;
1089-
1090-
// Abort immediately so in flight waits can exit before the driver stop
1091-
// handshake completes.
1092-
// The onStop path will call abort again as a safety net for all stop
1093-
// modes.
1094-
try {
1095-
this.#abortController.abort();
1096-
} catch {}
1079+
this.#abortActorSignal();
10971080

10981081
const destroy = this.driver.startDestroy.bind(
10991082
this.driver,
@@ -1108,6 +1091,15 @@ export class ActorInstance<
11081091
});
11091092
}
11101093

1094+
#abortActorSignal() {
1095+
if (this.#abortController.signal.aborted) {
1096+
return;
1097+
}
1098+
try {
1099+
this.#abortController.abort();
1100+
} catch {}
1101+
}
1102+
11111103
// MARK: - HTTP Request Tracking
11121104
beginHonoHttpRequest() {
11131105
this.#activeHonoHttpRequests++;
@@ -2070,7 +2062,7 @@ export class ActorInstance<
20702062

20712063
if (timeoutMs <= 0) {
20722064
this.#rLog.warn({
2073-
msg: "run handler did not complete in time, it may have leaked - ensure you use c.aborted (or the abort signal c.abortSignal) to exit gracefully",
2065+
msg: "run handler did not complete in time, it may have leaked - ensure long-running work settles before shutdown finalizes",
20742066
timeoutMs,
20752067
});
20762068
return;
@@ -2085,7 +2077,7 @@ export class ActorInstance<
20852077

20862078
if (timedOut) {
20872079
this.#rLog.warn({
2088-
msg: "run handler did not complete in time, it may have leaked - ensure you use c.aborted (or the abort signal c.abortSignal) to exit gracefully",
2080+
msg: "run handler did not complete in time, it may have leaked - ensure long-running work settles before shutdown finalizes",
20892081
timeoutMs,
20902082
});
20912083
} else {

website/src/content/docs/actors/actions.mdx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ Actions have a single return value. To stream realtime data in response to an ac
325325

326326
## Canceling Long-Running Actions
327327

328-
For operations that should be cancelable on-demand, create your own `AbortController` and chain it with `c.abortSignal` for automatic cleanup on actor shutdown.
328+
For operations that should be cancelable on-demand, create your own `AbortController`. Chain it with `c.abortSignal` so actor shutdown also cancels the operation.
329329

330330
```typescript
331331
import { actor } from "rivetkit";

website/src/content/docs/actors/index.mdx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -466,7 +466,7 @@ const userAccount = actor({
466466

467467
### Lifecycle Hooks
468468

469-
Actors support hooks for initialization, background processing, connections, networking, and state changes. Use `run` for long-lived background loops, and exit cleanly on shutdown with `c.aborted` or `c.abortSignal`.
469+
Actors support hooks for initialization, background processing, connections, networking, and state changes. Use `run` for long-lived background loops, and use `c.aborted` or `c.abortSignal` for graceful shutdown.
470470

471471
```ts
472472
import { actor, event, queue } from "rivetkit";

website/src/content/docs/actors/lifecycle.mdx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ const tickActor = actor({
304304
c.state.tickCount++;
305305
c.log.info({ msg: "tick", count: c.state.tickCount });
306306

307-
// Wait 1 second, but exit early if aborted
307+
// Wait 1 second. Final shutdown also resolves this wait.
308308
await new Promise<void>((resolve) => {
309309
const timeout = setTimeout(resolve, 1000);
310310
c.abortSignal.addEventListener("abort", () => {
@@ -876,7 +876,7 @@ When an actor sleeps or is destroyed, it enters the graceful shutdown window:
876876

877877
1. `c.abortSignal` fires and `c.aborted` becomes `true`. New connections and dispatch are rejected. Alarm timeouts are cancelled. On sleep, scheduled events are persisted and will be re-armed when the actor wakes.
878878
2. `onSleep` or `onDestroy` and `onDisconnect` for each closing connection run during the same window. User `waitUntil` promises and async raw WebSocket handlers are drained. Hibernatable WebSocket connections are preserved on sleep and closed on destroy.
879-
3. Once graceful work has completed, state is saved and the database is cleaned up.
879+
3. Once graceful work has completed, state is saved and final cleanup runs.
880880

881881
The entire window is bounded by `sleepGracePeriod` on both sleep and destroy. Defaults to 15 seconds. If the window is exceeded, the actor proceeds to state save anyway.
882882

0 commit comments

Comments
 (0)