Skip to content

Commit 11daf92

Browse files
committed
fix(rivetkit): fire abort signal on shutdown finalize
1 parent 3c7167b commit 11daf92

12 files changed

Lines changed: 46 additions & 55 deletions

File tree

docs-internal/engine/rivetkit-core-internals.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ Two-phase:
8585
- `SleepGrace` fires `onSleep` immediately and keeps dispatch/save timers live.
8686
- `SleepFinalize` gates dispatch, suspends alarms, and runs teardown.
8787

88-
Sleep grace must fire the actor abort signal on entry and wait for the run handler to exit before finalize. Destroy abort firing remains unchanged.
88+
Sleep grace waits for the run handler to exit before finalize. The actor abort signal fires once when final shutdown completes, not when sleep grace or destroy starts.
8989

9090
Finalize:
9191

docs-internal/engine/sleep-sequence.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@ Removing `preventSleep` deleted both predicate branches. Any future sleep-affect
3737

3838
## Grace period and abort signals
3939

40-
- `start_grace(reason)` fires at the start of `SleepGrace` / `DestroyGrace`. It cancels the sleep idle timer, cancels the actor abort signal (`actor_abort_signal`), installs a `SleepGraceState` with the effective grace deadline, and resets the sleep timer to arm the grace tick.
41-
- The actor abort signal is a soft signal: "shutdown has started, please wrap up." User code observes it via `c.abortSignal`. It does not force-stop work.
42-
- For destroy, the abort signal may fire earlier than grace entry because `ctx.destroy()` cancels the abort token immediately via `mark_destroy_requested(...)`.
40+
- `start_grace(reason)` fires at the start of `SleepGrace` / `DestroyGrace`. It cancels the sleep idle timer, installs a `SleepGraceState` with the effective grace deadline, and resets the sleep timer to arm the grace tick.
41+
- The actor abort signal is a finalization signal. User code observes it via `c.abortSignal`, and it fires once after shutdown work has finished or the grace deadline has forced finalization.
42+
- Destroy does not fire the actor abort signal early. `ctx.destroy()` only records the destroy request and starts the same final shutdown path.
4343

4444
## Grace deadline enforcement
4545

rivetkit-rust/packages/rivetkit-core/src/actor/context.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -478,19 +478,17 @@ impl ActorContext {
478478
self.flush_on_shutdown();
479479
self.0.destroy_requested.store(true, Ordering::SeqCst);
480480
self.0.destroy_completed.store(false, Ordering::SeqCst);
481-
self.0.abort_signal.lock().cancel();
482481
}
483482

484483
#[cfg(feature = "wasm-runtime")]
485484
fn mark_destroy_requested_without_spawn(&self) {
486485
self.cancel_sleep_timer();
487486
self.0.destroy_requested.store(true, Ordering::SeqCst);
488487
self.0.destroy_completed.store(false, Ordering::SeqCst);
489-
self.0.abort_signal.lock().cancel();
490488
}
491489

492490
#[doc(hidden)]
493-
pub fn cancel_abort_signal_for_sleep(&self) {
491+
pub fn cancel_abort_signal_for_shutdown_finalize(&self) {
494492
self.0.abort_signal.lock().cancel();
495493
}
496494

@@ -500,9 +498,8 @@ impl ActorContext {
500498
return;
501499
}
502500

503-
// Sleep cancels the generation abort signal to break queue waits and the
504-
// run loop out of blocking calls. A restarted actor needs a fresh signal
505-
// so the next generation can wait normally.
501+
// Final shutdown cancels the generation abort signal. A restarted actor
502+
// needs a fresh signal so the next generation can wait normally.
506503
let next_signal = CancellationToken::new();
507504
*abort_signal = next_signal.clone();
508505
*self.0.queue_abort_signal.lock() = next_signal;

rivetkit-rust/packages/rivetkit-core/src/actor/task.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1528,9 +1528,6 @@ impl ActorTask {
15281528
let grace_period = self.factory.config().effective_sleep_grace_period();
15291529
self.sleep_deadline = None;
15301530
self.ctx.cancel_sleep_timer();
1531-
// Entering grace cancels the actor abort signal so user code blocked on
1532-
// queues or other actor scoped waits can unwind and let sleep finalize.
1533-
self.ctx.cancel_abort_signal_for_sleep();
15341531
self.sleep_grace = Some(SleepGraceState {
15351532
deadline: Instant::now() + grace_period,
15361533
reason,
@@ -1745,15 +1742,19 @@ impl ActorTask {
17451742
ShutdownKind::Sleep => LifecycleState::SleepFinalize,
17461743
ShutdownKind::Destroy => LifecycleState::Destroying,
17471744
});
1748-
self.save_final_state().await?;
1749-
self.close_actor_event_channel();
1750-
self.join_aborted_run_handle().await;
1751-
Self::finish_shutdown_cleanup_with_ctx(self.ctx.clone(), reason).await?;
1752-
if matches!(reason, ShutdownKind::Destroy) {
1745+
let result: Result<()> = async {
1746+
self.save_final_state().await?;
1747+
self.close_actor_event_channel();
1748+
self.join_aborted_run_handle().await;
1749+
Self::finish_shutdown_cleanup_with_ctx(self.ctx.clone(), reason).await
1750+
}
1751+
.await;
1752+
self.ctx.cancel_abort_signal_for_shutdown_finalize();
1753+
if result.is_ok() && matches!(reason, ShutdownKind::Destroy) {
17531754
self.ctx.mark_destroy_completed();
17541755
}
17551756
self.ctx.record_shutdown_wait(reason, started_at.elapsed());
1756-
Ok(())
1757+
result
17571758
}
17581759

17591760
async fn save_final_state(&mut self) -> Result<()> {

rivetkit-rust/packages/rivetkit-core/tests/queue.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ mod moved_tests {
162162
});
163163

164164
yield_now().await;
165-
queue.mark_destroy_requested();
165+
queue.cancel_abort_signal_for_shutdown_finalize();
166166

167167
let error = wait
168168
.await

rivetkit-rust/packages/rivetkit-core/tests/task.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2968,7 +2968,7 @@ mod moved_tests {
29682968
.await
29692969
.expect("sleep stop should send");
29702970
wait_for_count(&begin_sleep_count, 1).await;
2971-
assert!(ctx.actor_aborted());
2971+
assert!(!ctx.actor_aborted());
29722972

29732973
let (sleep_again_tx, sleep_again_rx) = oneshot::channel();
29742974
lifecycle_tx
@@ -3009,6 +3009,7 @@ mod moved_tests {
30093009
.await
30103010
.expect("sleep reply should send")
30113011
.expect("sleep should succeed");
3012+
assert!(ctx.actor_aborted());
30123013
keep_awake
30133014
.await
30143015
.expect("keep-awake task should finish after release");

rivetkit-typescript/packages/rivetkit/src/actor/config.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1227,7 +1227,7 @@ interface BaseActorConfig<
12271227
* shutdown window (`sleepGracePeriod`) cover deferred work.
12281228
*
12291229
* The handler receives an abort signal via `c.abortSignal` and a
1230-
* `c.aborted` alias for loop checks. Use these to gracefully exit.
1230+
* `c.aborted` alias. The signal fires once when actor shutdown finalizes.
12311231
*
12321232
* If this handler exits, the actor will follow the normal idle sleep timeout
12331233
* once it becomes idle.
@@ -1770,7 +1770,7 @@ export const DocActorOptionsSchema = z
17701770
.number()
17711771
.optional()
17721772
.describe(
1773-
`Max time in ms for the graceful shutdown window. Covers lifecycle hooks (onSleep, onDestroy), the run handler abort wait, async raw WebSocket handlers, disconnect callbacks, and final state serialization. Default: ${DEFAULT_SLEEP_GRACE_PERIOD}.`,
1773+
`Max time in ms for the graceful shutdown window. Covers lifecycle hooks (onSleep, onDestroy), the run handler wait, async raw WebSocket handlers, disconnect callbacks, and final state serialization. Default: ${DEFAULT_SLEEP_GRACE_PERIOD}.`,
17741774
),
17751775
onDestroyTimeout: z
17761776
.number()

rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts

Lines changed: 13 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -958,14 +958,6 @@ export class ActorInstance<
958958
// on wake via initializeAlarms().
959959
this.driver.cancelAlarm?.(this.#actorId);
960960

961-
// Abort listeners in the canonical stop path.
962-
// This must run for all stop modes, including sleep and remote stop.
963-
// Destroy may have already triggered an early abort, but repeating abort
964-
// is intentional and safe.
965-
try {
966-
this.#abortController.abort();
967-
} catch {}
968-
969961
// The run-handler join, lifecycle hooks, and remaining shutdown
970962
// tasks all share the single sleepGracePeriod budget.
971963
const shutdownTaskDeadlineTs =
@@ -1006,6 +998,7 @@ export class ActorInstance<
1006998
await this.stateManager.waitForPendingWrites();
1007999
await this.#scheduleManager.waitForPendingAlarmWrites();
10081000
} finally {
1001+
this.#abortActorSignal();
10091002
this.#shutdownComplete = true;
10101003
await this.#cleanupDatabase();
10111004
}
@@ -1031,11 +1024,8 @@ export class ActorInstance<
10311024

10321025
this.driver.cancelAlarm?.(this.#actorId);
10331026
this.stateManager.clearPendingSaveTimeout();
1034-
1035-
try {
1036-
this.#abortController.abort();
1037-
} catch {}
10381027
} finally {
1028+
this.#abortActorSignal();
10391029
this.#shutdownComplete = true;
10401030
await this.#cleanupDatabase();
10411031
}
@@ -1087,14 +1077,6 @@ export class ActorInstance<
10871077
}
10881078
this.#destroyCalled = true;
10891079

1090-
// Abort immediately so in flight waits can exit before the driver stop
1091-
// handshake completes.
1092-
// The onStop path will call abort again as a safety net for all stop
1093-
// modes.
1094-
try {
1095-
this.#abortController.abort();
1096-
} catch {}
1097-
10981080
const destroy = this.driver.startDestroy.bind(
10991081
this.driver,
11001082
this.#actorId,
@@ -1108,6 +1090,15 @@ export class ActorInstance<
11081090
});
11091091
}
11101092

1093+
#abortActorSignal() {
1094+
if (this.#abortController.signal.aborted) {
1095+
return;
1096+
}
1097+
try {
1098+
this.#abortController.abort();
1099+
} catch {}
1100+
}
1101+
11111102
// MARK: - HTTP Request Tracking
11121103
beginHonoHttpRequest() {
11131104
this.#activeHonoHttpRequests++;
@@ -2070,7 +2061,7 @@ export class ActorInstance<
20702061

20712062
if (timeoutMs <= 0) {
20722063
this.#rLog.warn({
2073-
msg: "run handler did not complete in time, it may have leaked - ensure you use c.aborted (or the abort signal c.abortSignal) to exit gracefully",
2064+
msg: "run handler did not complete in time, it may have leaked - ensure long-running work settles before shutdown finalizes",
20742065
timeoutMs,
20752066
});
20762067
return;
@@ -2085,7 +2076,7 @@ export class ActorInstance<
20852076

20862077
if (timedOut) {
20872078
this.#rLog.warn({
2088-
msg: "run handler did not complete in time, it may have leaked - ensure you use c.aborted (or the abort signal c.abortSignal) to exit gracefully",
2079+
msg: "run handler did not complete in time, it may have leaked - ensure long-running work settles before shutdown finalizes",
20892080
timeoutMs,
20902081
});
20912082
} else {

website/src/content/docs/actors/actions.mdx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ Actions have a single return value. To stream realtime data in response to an ac
325325

326326
## Canceling Long-Running Actions
327327

328-
For operations that should be cancelable on-demand, create your own `AbortController` and chain it with `c.abortSignal` for automatic cleanup on actor shutdown.
328+
For operations that should be cancelable on-demand, create your own `AbortController`. Chain it with `c.abortSignal` only for final actor shutdown cleanup.
329329

330330
```typescript
331331
import { actor } from "rivetkit";

website/src/content/docs/actors/index.mdx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -466,7 +466,7 @@ const userAccount = actor({
466466

467467
### Lifecycle Hooks
468468

469-
Actors support hooks for initialization, background processing, connections, networking, and state changes. Use `run` for long-lived background loops, and exit cleanly on shutdown with `c.aborted` or `c.abortSignal`.
469+
Actors support hooks for initialization, background processing, connections, networking, and state changes. Use `run` for long-lived background loops, and use `c.aborted` or `c.abortSignal` for final shutdown cleanup.
470470

471471
```ts
472472
import { actor, event, queue } from "rivetkit";

0 commit comments

Comments
 (0)