Skip to content

Commit 7764046

Browse files
committed
fix(rivetkit): align destroy abort signal timing
1 parent 314c8ff commit 7764046

14 files changed

Lines changed: 96 additions & 48 deletions

File tree

docs-internal/engine/rivetkit-core-internals.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ Two-phase:
8585
- `SleepGrace` fires `onSleep` immediately and keeps dispatch/save timers live.
8686
- `SleepFinalize` gates dispatch, suspends alarms, and runs teardown.
8787

88-
Sleep grace must fire the actor abort signal on entry and wait for the run handler to exit before finalize. Destroy abort firing remains unchanged.
88+
Sleep grace fires the actor abort signal on entry and waits for the run handler to exit before finalize.
8989

9090
Finalize:
9191

docs-internal/engine/sleep-sequence.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ Removing `preventSleep` deleted both predicate branches. Any future sleep-affect
3939

4040
- `start_grace(reason)` fires at the start of `SleepGrace` / `DestroyGrace`. It cancels the sleep idle timer, cancels the actor abort signal (`actor_abort_signal`), installs a `SleepGraceState` with the effective grace deadline, and resets the sleep timer to arm the grace tick.
4141
- The actor abort signal is a soft signal: "shutdown has started, please wrap up." User code observes it via `c.abortSignal`. It does not force-stop work.
42-
- For destroy, the abort signal may fire earlier than grace entry because `ctx.destroy()` cancels the abort token immediately via `mark_destroy_requested(...)`.
42+
- Destroy requests also use the normal grace path. The actor abort signal fires when destroy grace starts.
4343

4444
## Grace deadline enforcement
4545

rivetkit-rust/packages/rivetkit-core/src/actor/context.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -478,19 +478,17 @@ impl ActorContext {
478478
self.flush_on_shutdown();
479479
self.0.destroy_requested.store(true, Ordering::SeqCst);
480480
self.0.destroy_completed.store(false, Ordering::SeqCst);
481-
self.0.abort_signal.lock().cancel();
482481
}
483482

484483
#[cfg(feature = "wasm-runtime")]
485484
fn mark_destroy_requested_without_spawn(&self) {
486485
self.cancel_sleep_timer();
487486
self.0.destroy_requested.store(true, Ordering::SeqCst);
488487
self.0.destroy_completed.store(false, Ordering::SeqCst);
489-
self.0.abort_signal.lock().cancel();
490488
}
491489

492490
#[doc(hidden)]
493-
pub fn cancel_abort_signal_for_sleep(&self) {
491+
pub fn cancel_actor_abort_signal(&self) {
494492
self.0.abort_signal.lock().cancel();
495493
}
496494

@@ -500,9 +498,9 @@ impl ActorContext {
500498
return;
501499
}
502500

503-
// Sleep cancels the generation abort signal to break queue waits and the
504-
// run loop out of blocking calls. A restarted actor needs a fresh signal
505-
// so the next generation can wait normally.
501+
// Sleep or destroy cancels the generation abort signal to break actor
502+
// scoped waits. A restarted actor needs a fresh signal so the next
503+
// generation can wait normally.
506504
let next_signal = CancellationToken::new();
507505
*abort_signal = next_signal.clone();
508506
*self.0.queue_abort_signal.lock() = next_signal;

rivetkit-rust/packages/rivetkit-core/src/actor/task.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1528,9 +1528,7 @@ impl ActorTask {
15281528
let grace_period = self.factory.config().effective_sleep_grace_period();
15291529
self.sleep_deadline = None;
15301530
self.ctx.cancel_sleep_timer();
1531-
// Entering grace cancels the actor abort signal so user code blocked on
1532-
// queues or other actor scoped waits can unwind and let sleep finalize.
1533-
self.ctx.cancel_abort_signal_for_sleep();
1531+
self.ctx.cancel_actor_abort_signal();
15341532
self.sleep_grace = Some(SleepGraceState {
15351533
deadline: Instant::now() + grace_period,
15361534
reason,
@@ -1745,15 +1743,18 @@ impl ActorTask {
17451743
ShutdownKind::Sleep => LifecycleState::SleepFinalize,
17461744
ShutdownKind::Destroy => LifecycleState::Destroying,
17471745
});
1748-
self.save_final_state().await?;
1749-
self.close_actor_event_channel();
1750-
self.join_aborted_run_handle().await;
1751-
Self::finish_shutdown_cleanup_with_ctx(self.ctx.clone(), reason).await?;
1752-
if matches!(reason, ShutdownKind::Destroy) {
1746+
let result: Result<()> = async {
1747+
self.save_final_state().await?;
1748+
self.close_actor_event_channel();
1749+
self.join_aborted_run_handle().await;
1750+
Self::finish_shutdown_cleanup_with_ctx(self.ctx.clone(), reason).await
1751+
}
1752+
.await;
1753+
if result.is_ok() && matches!(reason, ShutdownKind::Destroy) {
17531754
self.ctx.mark_destroy_completed();
17541755
}
17551756
self.ctx.record_shutdown_wait(reason, started_at.elapsed());
1756-
Ok(())
1757+
result
17571758
}
17581759

17591760
async fn save_final_state(&mut self) -> Result<()> {

rivetkit-rust/packages/rivetkit-core/tests/queue.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ mod moved_tests {
162162
});
163163

164164
yield_now().await;
165-
queue.mark_destroy_requested();
165+
queue.cancel_actor_abort_signal();
166166

167167
let error = wait
168168
.await

rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/destroy.ts

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,3 +87,32 @@ export const destroyActor = actor({
8787
},
8888
},
8989
});
90+
91+
export const destroyAbortSignalActor = actor({
92+
state: {
93+
abortEventCount: 0,
94+
},
95+
onDestroy: async (c) => {
96+
const client = c.client<typeof registry>();
97+
const observer = client.destroyObserver.getOrCreate(["observer"]);
98+
await observer.notifyDestroyed(c.key.join("/"));
99+
},
100+
actions: {
101+
requestDestroy: (c) => {
102+
const beforeDestroyAborted = c.aborted;
103+
const beforeDestroySignalAborted = c.abortSignal.aborted;
104+
c.abortSignal.addEventListener("abort", () => {
105+
c.state.abortEventCount += 1;
106+
});
107+
c.destroy();
108+
109+
return {
110+
beforeDestroyAborted,
111+
beforeDestroySignalAborted,
112+
afterDestroyAborted: c.aborted,
113+
afterDestroySignalAborted: c.abortSignal.aborted,
114+
abortEventCountAfterDestroy: c.state.abortEventCount,
115+
};
116+
},
117+
},
118+
});

rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry-static.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,11 @@ import {
4242
dbLifecycleFailing,
4343
dbLifecycleObserver,
4444
} from "./db-lifecycle";
45-
import { destroyActor, destroyObserver } from "./destroy";
45+
import {
46+
destroyAbortSignalActor,
47+
destroyActor,
48+
destroyObserver,
49+
} from "./destroy";
4650
import { customTimeoutActor, errorHandlingActor } from "./error-handling";
4751
import { fileSystemHibernationCleanupActor } from "./file-system-hibernation-cleanup";
4852
import { hibernationActor, hibernationSleepWindowActor } from "./hibernation";
@@ -281,6 +285,7 @@ export const registry = setup({
281285
// From destroy.ts
282286
destroyActor,
283287
destroyObserver,
288+
destroyAbortSignalActor,
284289
// From hibernation.ts
285290
hibernationActor,
286291
hibernationSleepWindowActor,

rivetkit-typescript/packages/rivetkit/src/actor/config.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1227,7 +1227,7 @@ interface BaseActorConfig<
12271227
* shutdown window (`sleepGracePeriod`) cover deferred work.
12281228
*
12291229
* The handler receives an abort signal via `c.abortSignal` and a
1230-
* `c.aborted` alias for loop checks. Use these to gracefully exit.
1230+
* `c.aborted` alias. Use these to gracefully exit when shutdown starts.
12311231
*
12321232
* If this handler exits, the actor will follow the normal idle sleep timeout
12331233
* once it becomes idle.
@@ -1770,7 +1770,7 @@ export const DocActorOptionsSchema = z
17701770
.number()
17711771
.optional()
17721772
.describe(
1773-
`Max time in ms for the graceful shutdown window. Covers lifecycle hooks (onSleep, onDestroy), the run handler abort wait, async raw WebSocket handlers, disconnect callbacks, and final state serialization. Default: ${DEFAULT_SLEEP_GRACE_PERIOD}.`,
1773+
`Max time in ms for the graceful shutdown window. Covers lifecycle hooks (onSleep, onDestroy), the run handler wait, async raw WebSocket handlers, disconnect callbacks, and final state serialization. Default: ${DEFAULT_SLEEP_GRACE_PERIOD}.`,
17741774
),
17751775
onDestroyTimeout: z
17761776
.number()

rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts

Lines changed: 13 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -957,14 +957,7 @@ export class ActorInstance<
957957
// Scheduled events are persisted and will be re-initialized
958958
// on wake via initializeAlarms().
959959
this.driver.cancelAlarm?.(this.#actorId);
960-
961-
// Abort listeners in the canonical stop path.
962-
// This must run for all stop modes, including sleep and remote stop.
963-
// Destroy may have already triggered an early abort, but repeating abort
964-
// is intentional and safe.
965-
try {
966-
this.#abortController.abort();
967-
} catch {}
960+
this.#abortActorSignal();
968961

969962
// The run-handler join, lifecycle hooks, and remaining shutdown
970963
// tasks all share the single sleepGracePeriod budget.
@@ -1030,11 +1023,8 @@ export class ActorInstance<
10301023
}
10311024

10321025
this.driver.cancelAlarm?.(this.#actorId);
1026+
this.#abortActorSignal();
10331027
this.stateManager.clearPendingSaveTimeout();
1034-
1035-
try {
1036-
this.#abortController.abort();
1037-
} catch {}
10381028
} finally {
10391029
this.#shutdownComplete = true;
10401030
await this.#cleanupDatabase();
@@ -1087,14 +1077,6 @@ export class ActorInstance<
10871077
}
10881078
this.#destroyCalled = true;
10891079

1090-
// Abort immediately so in flight waits can exit before the driver stop
1091-
// handshake completes.
1092-
// The onStop path will call abort again as a safety net for all stop
1093-
// modes.
1094-
try {
1095-
this.#abortController.abort();
1096-
} catch {}
1097-
10981080
const destroy = this.driver.startDestroy.bind(
10991081
this.driver,
11001082
this.#actorId,
@@ -1108,6 +1090,15 @@ export class ActorInstance<
11081090
});
11091091
}
11101092

1093+
#abortActorSignal() {
1094+
if (this.#abortController.signal.aborted) {
1095+
return;
1096+
}
1097+
try {
1098+
this.#abortController.abort();
1099+
} catch {}
1100+
}
1101+
11111102
// MARK: - HTTP Request Tracking
11121103
beginHonoHttpRequest() {
11131104
this.#activeHonoHttpRequests++;
@@ -2070,7 +2061,7 @@ export class ActorInstance<
20702061

20712062
if (timeoutMs <= 0) {
20722063
this.#rLog.warn({
2073-
msg: "run handler did not complete in time, it may have leaked - ensure you use c.aborted (or the abort signal c.abortSignal) to exit gracefully",
2064+
msg: "run handler did not complete in time, it may have leaked - ensure long-running work settles before shutdown finalizes",
20742065
timeoutMs,
20752066
});
20762067
return;
@@ -2085,7 +2076,7 @@ export class ActorInstance<
20852076

20862077
if (timedOut) {
20872078
this.#rLog.warn({
2088-
msg: "run handler did not complete in time, it may have leaked - ensure you use c.aborted (or the abort signal c.abortSignal) to exit gracefully",
2079+
msg: "run handler did not complete in time, it may have leaked - ensure long-running work settles before shutdown finalizes",
20892080
timeoutMs,
20902081
});
20912082
} else {

rivetkit-typescript/packages/rivetkit/tests/driver/actor-destroy.test.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,30 @@ describeDriverMatrix("Actor Destroy", (driverTestConfig) => {
104104
expect(newValue).toBe(0);
105105
});
106106

107+
test("ctx.destroy does not synchronously abort actor signal", async (c) => {
108+
const { client } = await setupDriverTest(c, driverTestConfig);
109+
const actorKey = `test-destroy-abort-signal-${crypto.randomUUID()}`;
110+
const observer = client.destroyObserver.getOrCreate(["observer"]);
111+
await observer.reset();
112+
113+
const result = await client.destroyAbortSignalActor
114+
.getOrCreate([actorKey])
115+
.requestDestroy();
116+
117+
expect(result).toEqual({
118+
beforeDestroyAborted: false,
119+
beforeDestroySignalAborted: false,
120+
afterDestroyAborted: false,
121+
afterDestroySignalAborted: false,
122+
abortEventCountAfterDestroy: 0,
123+
});
124+
125+
// Poll until onDestroy records so this action covered the real destroy path.
126+
await vi.waitFor(async () => {
127+
expect(await observer.wasDestroyed(actorKey)).toBe(true);
128+
});
129+
});
130+
107131
test("actor destroy clears ephemeral vars on same-key recreation", async (c) => {
108132
const { client } = await setupDriverTest(c, driverTestConfig);
109133

0 commit comments

Comments
 (0)