Skip to content

Commit 8f8ecec

Browse files
committed
fix(rivetkit): minor sleep-cleanup follow-ups
1 parent 1b59a25 commit 8f8ecec

4 files changed

Lines changed: 65 additions & 13 deletions

File tree

rivetkit-rust/packages/rivetkit-core/src/actor/context.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -439,10 +439,11 @@ impl ActorContext {
439439
return Err(ActorLifecycleError::Stopping.build())
440440
.context("destroy already requested for this generation");
441441
}
442-
self.cancel_sleep_timer();
443-
self.flush_on_shutdown();
444-
self.0.destroy_completed.store(false, Ordering::SeqCst);
445-
self.0.abort_signal.cancel();
442+
// Reuse the shared teardown sequence used by the registry shutdown
443+
// path so future changes to `mark_destroy_requested` cannot drift.
444+
// `destroy_requested` is already true from the swap above; the redundant
445+
// `store(true)` inside is harmless.
446+
self.mark_destroy_requested();
446447

447448
let ctx = self.clone();
448449
if Handle::try_current().is_ok() {

rivetkit-rust/packages/rivetkit-core/src/actor/sleep.rs

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -905,27 +905,65 @@ mod tests {
905905
ctx.set_sleep_started(true);
906906
ctx.set_ready(true);
907907

908+
// The stub must never flip the underlying flag.
908909
ctx.set_prevent_sleep(true);
909910
assert!(
910911
!ctx.prevent_sleep(),
911912
"prevent_sleep must stay false because the stub is a no-op"
912913
);
913-
// The sleep predicate ignores prevent_sleep entirely.
914-
assert_eq!(ctx.can_sleep().await, CanSleep::Yes);
914+
915+
// Exhaustive match guards against reintroducing a `PreventSleep` enum
916+
// variant. If a future change adds the variant back, this match stops
917+
// compiling — surfacing the regression at build time rather than via a
918+
// runtime assertion that could silently pass.
919+
match ctx.can_sleep().await {
920+
CanSleep::Yes
921+
| CanSleep::NotReady
922+
| CanSleep::NoSleep
923+
| CanSleep::ActiveHttpRequests
924+
| CanSleep::ActiveKeepAwake
925+
| CanSleep::ActiveInternalKeepAwake
926+
| CanSleep::ActiveRunHandler
927+
| CanSleep::ActiveDisconnectCallbacks
928+
| CanSleep::ActiveConnections
929+
| CanSleep::ActiveWebSocketCallbacks => {}
930+
}
915931

916932
ctx.set_prevent_sleep(false);
917-
assert_eq!(ctx.can_sleep().await, CanSleep::Yes);
933+
assert!(!ctx.prevent_sleep());
918934
}
919935

920936
#[tokio::test(start_paused = true)]
921-
async fn shutdown_deadline_token_cancels_on_request() {
937+
async fn shutdown_deadline_token_aborts_select_awaiting_task() {
938+
// Mirrors the NAPI `RunGracefulCleanup` pattern: a task awaits user
939+
// work and the shutdown_deadline cancellation in a `tokio::select!`.
940+
// If `cancel_shutdown_deadline()` does not propagate to clones of the
941+
// token (a regression we cannot catch with `is_cancelled()` alone),
942+
// the spawned task would hang and the test would time out.
922943
let ctx = ActorContext::new_for_sleep_tests("actor-shutdown-deadline");
923-
924944
let token = ctx.shutdown_deadline_token();
925945
assert!(!token.is_cancelled());
926946

947+
let aborted = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
948+
let aborted_clone = aborted.clone();
949+
let task = tokio::spawn(async move {
950+
tokio::select! {
951+
_ = token.cancelled() => {
952+
aborted_clone.store(true, Ordering::SeqCst);
953+
}
954+
_ = futures::future::pending::<()>() => {}
955+
}
956+
});
957+
958+
yield_now().await;
959+
assert!(!aborted.load(Ordering::SeqCst));
960+
927961
ctx.cancel_shutdown_deadline();
928-
assert!(token.is_cancelled());
962+
task.await.expect("select task should join after cancel");
963+
assert!(
964+
aborted.load(Ordering::SeqCst),
965+
"select-awaiting task must observe cancel via the cloned token"
966+
);
929967
}
930968

931969
#[tokio::test(start_paused = true)]

rivetkit-rust/packages/rivetkit-core/src/actor/task.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1671,7 +1671,13 @@ impl ActorTask {
16711671
return self.ctx.save_state(Vec::new()).await;
16721672
}
16731673

1674-
let deltas = match timeout(SERIALIZE_STATE_SHUTDOWN_SANITY_CAP, reply_rx).await {
1674+
// Cap at the larger of the default sanity bound or the user-configured
1675+
// sleep grace period. Without this, an actor with `sleepGracePeriod`
1676+
// raised above the default would silently truncate large state writes
1677+
// to empty deltas inside the user's own grace budget.
1678+
let cap = SERIALIZE_STATE_SHUTDOWN_SANITY_CAP
1679+
.max(self.factory.config().effective_sleep_grace_period());
1680+
let deltas = match timeout(cap, reply_rx).await {
16751681
Ok(Ok(Ok(deltas))) => deltas,
16761682
Ok(Ok(Err(error))) => {
16771683
tracing::error!(?error, "serializeState callback returned error");
@@ -1682,7 +1688,11 @@ impl ActorTask {
16821688
Vec::new()
16831689
}
16841690
Err(_) => {
1685-
tracing::error!("serializeState timed out");
1691+
tracing::error!(
1692+
actor_id = %self.ctx.actor_id(),
1693+
cap_ms = cap.as_millis() as u64,
1694+
"serializeState timed out; saving with empty deltas, prior persisted state retained"
1695+
);
16861696
Vec::new()
16871697
}
16881698
};

rivetkit-typescript/packages/rivetkit/src/registry/native.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2678,8 +2678,11 @@ export class NativeActorContextAdapter {
26782678
}
26792679

26802680
destroy(): void {
2681-
markNativeDestroyRequested(this.#ctx);
2681+
// Call the native destroy first so it can throw `actor/starting` or
2682+
// `actor/stopping` without leaving an unresolved destroyCompletion
2683+
// promise behind in the native runtime state.
26822684
callNativeSync(() => this.#ctx.destroy());
2685+
markNativeDestroyRequested(this.#ctx);
26832686
}
26842687

26852688
client<T = AnyClient>(): T extends Registry<any> ? Client<T> : T {

0 commit comments

Comments
 (0)