Skip to content

Commit efe8a6e

Browse files
committed
fix(rivetkit): drain native sleep side tasks reliably
1 parent a4b1187 commit efe8a6e

3 files changed

Lines changed: 33 additions & 23 deletions

File tree

rivetkit-typescript/packages/rivetkit-napi/index.d.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ export declare class ActorContext {
225225
disconnectConn(id: string): Promise<void>
226226
disconnectConns(predicate: (...args: any[]) => any): Promise<void>
227227
broadcast(name: string, args: Buffer): void
228-
waitUntil(promise: Promise<any>): Promise<void>
228+
waitUntil(promise: Promise<any>): void
229229
keepAwake(promise: Promise<any>): Promise<any>
230230
registerTask(promise: Promise<any>): void
231231
runtimeState(): object

rivetkit-typescript/packages/rivetkit-napi/src/actor_context.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -614,7 +614,7 @@ impl ActorContext {
614614
}
615615

616616
#[napi]
617-
pub async fn wait_until(&self, promise: Promise<serde_json::Value>) -> napi::Result<()> {
617+
pub fn wait_until(&self, promise: Promise<serde_json::Value>) -> napi::Result<()> {
618618
self.inner.wait_until(async move {
619619
if let Err(error) = promise.await {
620620
tracing::warn!(?error, "actor wait_until promise rejected");

rivetkit-typescript/packages/rivetkit/src/registry/native.ts

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ type NativeActorRuntimeState = {
146146
vars?: unknown;
147147
destroyGate?: NativeDestroyGate;
148148
persistState?: NativePersistActorState;
149+
preventSleepRelease?: () => void;
149150
};
150151

151152
// Keep JS-only actor caches on the NAPI ActorContext runtime-state bag instead
@@ -2639,20 +2640,13 @@ export class NativeActorContextAdapter {
26392640
}
26402641

26412642
waitUntil(promise: Promise<unknown>): void {
2642-
// Same counter-arm race as `keepAwake`: increment of the
2643-
// shutdown_counter happens on first poll of the Rust future. Acceptable
2644-
// because the only consumer is the grace-finalize predicate, which
2645-
// debounces through `activity_notify` and re-checks the counter.
2646-
callNative(() => this.#ctx.waitUntil(Promise.resolve(promise))).catch(
2647-
(error) => {
2648-
if (!isClosedTaskRegistrationError(error)) {
2649-
logger().warn({
2650-
msg: "waitUntil bridge to native runtime failed",
2651-
error: stringifyError(error),
2652-
});
2653-
}
2654-
},
2655-
);
2643+
try {
2644+
callNativeSync(() => this.#ctx.waitUntil(Promise.resolve(promise)));
2645+
} catch (error) {
2646+
if (!isClosedTaskRegistrationError(error)) {
2647+
throw error;
2648+
}
2649+
}
26562650
}
26572651

26582652
beginWebSocketCallback(): number {
@@ -2665,12 +2659,30 @@ export class NativeActorContextAdapter {
26652659
);
26662660
}
26672661

2668-
/** @deprecated No-op. Use `keepAwake(promise)` or `waitUntil(promise)` instead. */
2669-
setPreventSleep(_preventSleep: boolean): void {}
2662+
/** @deprecated Use `keepAwake(promise)` or `waitUntil(promise)` instead. */
2663+
setPreventSleep(preventSleep: boolean): void {
2664+
const runtimeState = getNativeRuntimeState(this.#ctx);
2665+
if (preventSleep) {
2666+
if (runtimeState.preventSleepRelease) {
2667+
return;
2668+
}
2669+
2670+
let release: () => void = () => {};
2671+
const promise = new Promise<void>((resolve) => {
2672+
release = resolve;
2673+
});
2674+
runtimeState.preventSleepRelease = release;
2675+
this.keepAwake(promise);
2676+
return;
2677+
}
2678+
2679+
runtimeState.preventSleepRelease?.();
2680+
runtimeState.preventSleepRelease = undefined;
2681+
}
26702682

2671-
/** @deprecated No-op. Always returns `false`. */
2683+
/** @deprecated Use `keepAwake(promise)` or `waitUntil(promise)` instead. */
26722684
get preventSleep(): boolean {
2673-
return false;
2685+
return getNativeRuntimeState(this.#ctx).preventSleepRelease !== undefined;
26742686
}
26752687

26762688
sleep(): void {
@@ -3743,10 +3755,9 @@ export function buildNativeFactory(
37433755
try {
37443756
if (typeof config.onSleep === "function") {
37453757
await config.onSleep(actorCtx);
3758+
await actorCtx.saveState({ immediate: true });
37463759
}
37473760
} finally {
3748-
await actorCtx.closeDatabase(false);
3749-
callNativeSync(() => ctx.clearRuntimeState());
37503761
await actorCtx.dispose();
37513762
}
37523763
},
@@ -3766,7 +3777,6 @@ export function buildNativeFactory(
37663777
} finally {
37673778
resolveNativeDestroy(ctx);
37683779
await actorCtx.closeDatabase(true);
3769-
callNativeSync(() => ctx.clearRuntimeState());
37703780
await actorCtx.dispose();
37713781
}
37723782
},

0 commit comments

Comments
 (0)