diff --git a/rivetkit-rust/engine/artifacts/errors/message.incoming_too_long.json b/rivetkit-rust/engine/artifacts/errors/message.incoming_too_long.json index e35ce9f122..729519603c 100644 --- a/rivetkit-rust/engine/artifacts/errors/message.incoming_too_long.json +++ b/rivetkit-rust/engine/artifacts/errors/message.incoming_too_long.json @@ -1,5 +1,5 @@ { "code": "incoming_too_long", "group": "message", - "message": "Incoming message too long" + "message": "Incoming message too long." } \ No newline at end of file diff --git a/rivetkit-typescript/CLAUDE.md b/rivetkit-typescript/CLAUDE.md index cdbf98ac7b..69570413b3 100644 --- a/rivetkit-typescript/CLAUDE.md +++ b/rivetkit-typescript/CLAUDE.md @@ -85,7 +85,6 @@ The log name matches the key in `ActorMetrics.startup`. Internal phases use `per - Public TS actor `onWake` maps to the native callback bag's `onWake`; `onBeforeActorStart` is an internal driver/NAPI startup hook, not public actor config. - Static actor `state` values in `packages/rivetkit/src/registry/native.ts` must be `structuredClone(...)`d per actor instance; reusing the literal leaks mutations across different keyed actors. - JS-only native actor caches in `packages/rivetkit/src/registry/native.ts` should live on `ActorContext.runtimeState()`, not on actorId-keyed module globals. Same-key recreates must get a fresh bag. -- Native database providers in `packages/rivetkit/src/registry/native.ts` must close through `closeDatabase(false)` after `onSleep`; otherwise provider `onDestroy` cleanup never runs for sleep/wake cycles and lifecycle counts drift from destroy behavior. - Every `NativeConnAdapter` construction path in `packages/rivetkit/src/registry/native.ts` must keep the `CONN_STATE_MANAGER_SYMBOL` hookup; hibernatable conn mutations rely on core `ConnHandle::set_state` dirty tracking to request persistence. - Durable native actor saves in `packages/rivetkit/src/registry/native.ts` must use `ctx.requestSaveAndWait({ immediate: true })`; state bytes are collected only through the `serializeState` callback. - Opaque user payloads that must preserve JS `undefined` across Rust JSON/CBOR bridges should go through `encodeCborCompat` / `decodeCborCompat`; do not use those helpers on structural JSON envelopes where omitted fields must stay omitted. diff --git a/rivetkit-typescript/packages/rivetkit-napi/index.d.ts b/rivetkit-typescript/packages/rivetkit-napi/index.d.ts index bd8bcdf00d..5c05e37574 100644 --- a/rivetkit-typescript/packages/rivetkit-napi/index.d.ts +++ b/rivetkit-typescript/packages/rivetkit-napi/index.d.ts @@ -225,7 +225,7 @@ export declare class ActorContext { disconnectConn(id: string): Promise disconnectConns(predicate: (...args: any[]) => any): Promise broadcast(name: string, args: Buffer): void - waitUntil(promise: Promise): Promise + waitUntil(promise: Promise): void keepAwake(promise: Promise): Promise registerTask(promise: Promise): void runtimeState(): object diff --git a/rivetkit-typescript/packages/rivetkit-napi/src/actor_context.rs b/rivetkit-typescript/packages/rivetkit-napi/src/actor_context.rs index 3bd2e71aac..48fcd54200 100644 --- a/rivetkit-typescript/packages/rivetkit-napi/src/actor_context.rs +++ b/rivetkit-typescript/packages/rivetkit-napi/src/actor_context.rs @@ -614,7 +614,7 @@ impl ActorContext { } #[napi] - pub async fn wait_until(&self, promise: Promise) -> napi::Result<()> { + pub fn wait_until(&self, promise: Promise) -> napi::Result<()> { self.inner.wait_until(async move { if let Err(error) = promise.await { tracing::warn!(?error, "actor wait_until promise rejected"); diff --git a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/db-lifecycle.ts b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/db-lifecycle.ts index 1fc423c9e9..e32ba0c34a 100644 --- a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/db-lifecycle.ts +++ b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/db-lifecycle.ts @@ -5,14 +5,12 @@ import { scheduleActorSleep } from "./schedule-sleep"; type LifecycleCounts = { create: number; migrate: number; - cleanup: number; }; const clientActorIds = new WeakMap(); const createCounts = new Map(); const migrateCounts = new Map(); -const cleanupCounts = new Map(); function increment(map: Map, actorId: string) { map.set(actorId, (map.get(actorId) ?? 0) + 1); @@ -22,18 +20,9 @@ function getCounts(actorId: string): LifecycleCounts { return { create: createCounts.get(actorId) ?? 0, migrate: migrateCounts.get(actorId) ?? 0, - cleanup: cleanupCounts.get(actorId) ?? 0, }; } -function getTotalCleanupCount(): number { - let total = 0; - for (const count of cleanupCounts.values()) { - total += count; - } - return total; -} - const baseProvider = db({ onMigrate: async (dbHandle) => { await dbHandle.execute(` @@ -62,15 +51,6 @@ const lifecycleProvider = { } await baseProvider.onMigrate(client); }, - onDestroy: async ( - client: Parameters>[0], - ) => { - const actorId = clientActorIds.get(client as object); - if (actorId) { - increment(cleanupCounts, actorId); - } - await baseProvider.onDestroy?.(client); - }, }; const failingLifecycleProvider = { @@ -89,15 +69,6 @@ const failingLifecycleProvider = { } throw new Error("forced migrate failure"); }, - onDestroy: async ( - client: Parameters>[0], - ) => { - const actorId = clientActorIds.get(client as object); - if (actorId) { - increment(cleanupCounts, actorId); - } - await baseProvider.onDestroy?.(client); - }, }; export const dbLifecycle = actor({ @@ -142,8 +113,5 @@ export const dbLifecycleObserver = actor({ getCounts: (_c, actorId: string) => { return getCounts(actorId); }, - getTotalCleanupCount: () => { - return getTotalCleanupCount(); - }, }, }); diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts b/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts index 03992173b2..e1eb9d7301 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts @@ -2193,7 +2193,7 @@ export class ActorInstance< } catch (error) { if (client) { try { - await this.#config.db.onDestroy?.(client); + await client.close(); } catch (cleanupError) { this.#rLog.error({ msg: "database setup cleanup failed", @@ -2227,7 +2227,7 @@ export class ActorInstance< if (client && dbConfig) { try { - await dbConfig.onDestroy?.(client); + await client.close(); } catch (error) { this.#rLog.error({ msg: "database cleanup failed", diff --git a/rivetkit-typescript/packages/rivetkit/src/common/database/config.ts b/rivetkit-typescript/packages/rivetkit/src/common/database/config.ts index 8f0c8c09a7..bdace52126 100644 --- a/rivetkit-typescript/packages/rivetkit/src/common/database/config.ts +++ b/rivetkit-typescript/packages/rivetkit/src/common/database/config.ts @@ -28,7 +28,9 @@ export interface SqliteDatabase { ): Promise; run(sql: string, params?: SqliteBindings): Promise; query(sql: string, params?: SqliteBindings): Promise; - getSqliteVfsMetrics?: () => import("./native-database").SqliteVfsMetrics | null; + getSqliteVfsMetrics?: () => + | import("./native-database").SqliteVfsMetrics + | null; close(): Promise; } @@ -94,12 +96,6 @@ export type DatabaseProvider = { * @experimental */ onMigrate: (client: DB) => void | Promise; - /** - * Runs when the actor is being destroyed. - * Use this to clean up database connections and release resources. - * @experimental - */ - onDestroy?: (client: DB) => void | Promise; }; /** diff --git a/rivetkit-typescript/packages/rivetkit/src/common/database/mod.ts b/rivetkit-typescript/packages/rivetkit/src/common/database/mod.ts index d8a7551c2b..a3b14b3017 100644 --- a/rivetkit-typescript/packages/rivetkit/src/common/database/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/common/database/mod.ts @@ -105,12 +105,16 @@ export function db({ const returnsRows = sqlReturnsRows(query); if (!hasMultipleStatements(query)) { if (returnsRows) { - const { rows, columns } = await db.query( - query, - ); + const { rows, columns } = + await db.query(query); result = rows.map((row: unknown[]) => { - const rowObj: Record = {}; - for (let i = 0; i < columns.length; i++) { + const rowObj: Record = + {}; + for ( + let i = 0; + i < columns.length; + i++ + ) { rowObj[columns[i]] = row[i]; } return rowObj; @@ -119,7 +123,8 @@ export function db({ await db.run(query); result = [] as TRow[]; } else { - const results: Record[] = []; + const results: Record[] = + []; let columnNames: string[] | null = null; await db.exec( query, @@ -127,8 +132,15 @@ export function db({ if (!columnNames) { columnNames = columns; } - const rowObj: Record = {}; - for (let i = 0; i < row.length; i++) { + const rowObj: Record< + string, + unknown + > = {}; + for ( + let i = 0; + i < row.length; + i++ + ) { rowObj[columnNames[i]] = row[i]; } results.push(rowObj); @@ -145,7 +157,8 @@ export function db({ if (!columnNames) { columnNames = columns; } - const rowObj: Record = {}; + const rowObj: Record = + {}; for (let i = 0; i < row.length; i++) { rowObj[columnNames[i]] = row[i]; } @@ -192,8 +205,5 @@ export function db({ await onMigrate(client); } }, - onDestroy: async (client) => { - await client.close(); - }, }; } diff --git a/rivetkit-typescript/packages/rivetkit/src/db/drizzle.ts b/rivetkit-typescript/packages/rivetkit/src/db/drizzle.ts index 357244f4fd..fc0144e219 100644 --- a/rivetkit-typescript/packages/rivetkit/src/db/drizzle.ts +++ b/rivetkit-typescript/packages/rivetkit/src/db/drizzle.ts @@ -136,7 +136,8 @@ export function db>({ msg: "sql query", query: query.slice(0, 120), durationMs, - kvReads: ctx.metrics.totalKvReads - kvReadsBefore, + kvReads: + ctx.metrics.totalKvReads - kvReadsBefore, kvWrites: ctx.metrics.totalKvWrites - kvWritesBefore, }); @@ -188,9 +189,6 @@ export function db>({ await onMigrate(client); } }, - onDestroy: async (client) => { - await client.close(); - }, }; } diff --git a/rivetkit-typescript/packages/rivetkit/src/registry/native.ts b/rivetkit-typescript/packages/rivetkit/src/registry/native.ts index 91d3e590ff..f4f2262303 100644 --- a/rivetkit-typescript/packages/rivetkit/src/registry/native.ts +++ b/rivetkit-typescript/packages/rivetkit/src/registry/native.ts @@ -137,7 +137,6 @@ type NativeDestroyGate = { }; type NativeDatabaseClientState = { client: unknown; - provider: Exclude; }; type NativeActorRuntimeState = { sql?: ReturnType; @@ -170,7 +169,9 @@ function getNativeRuntimeState( return runtimeState; } -function getNativePersistState(ctx: NativeActorContext): NativePersistActorState { +function getNativePersistState( + ctx: NativeActorContext, +): NativePersistActorState { return getNativeRuntimeState(ctx).persistState!; } @@ -282,9 +283,7 @@ function closeNativeSqlDatabase( async function closeNativeDatabaseClient( ctx: NativeActorContext, - destroy: boolean, ): Promise { - void destroy; const runtimeState = getNativeRuntimeState(ctx); const entry = runtimeState.databaseClient; if (!entry) { @@ -293,11 +292,6 @@ async function closeNativeDatabaseClient( runtimeState.databaseClient = undefined; - if (typeof entry.provider.onDestroy === "function") { - await entry.provider.onDestroy(entry.client as never); - return; - } - if ( entry.client && typeof entry.client === "object" && @@ -1129,12 +1123,9 @@ class NativeConnAdapter { get state(): unknown { const nextState = this.#readState(); - return createWriteThroughProxy( - nextState, - (nextValue) => { - this.#writeState(nextValue, { writeNative: true }); - }, - ); + return createWriteThroughProxy(nextState, (nextValue) => { + this.#writeState(nextValue, { writeNative: true }); + }); } set state(value: unknown) { @@ -2484,7 +2475,6 @@ export class NativeActorContextAdapter { }); runtimeState.databaseClient = { client, - provider: this.#databaseProvider, }; this.#db = client; return client; @@ -2508,10 +2498,10 @@ export class NativeActorContextAdapter { ); } - async closeDatabase(destroy: boolean): Promise { + async closeDatabase(): Promise { this.#db = undefined; this.#sql = undefined; - await closeNativeDatabaseClient(this.#ctx, destroy); + await closeNativeDatabaseClient(this.#ctx); await closeNativeSqlDatabase(this.#ctx); } @@ -2623,9 +2613,7 @@ export class NativeActorContextAdapter { return this.#runHandlerActiveProvider?.() ?? false; } - internalKeepAwake( - run: Promise | (() => Promise), - ): Promise { + internalKeepAwake(run: Promise | (() => Promise)): Promise { const promise = typeof run === "function" ? run() : run; const trackedPromise = promise.then(() => null); try { @@ -2639,20 +2627,13 @@ export class NativeActorContextAdapter { } waitUntil(promise: Promise): void { - // Same counter-arm race as `keepAwake`: increment of the - // shutdown_counter happens on first poll of the Rust future. Acceptable - // because the only consumer is the grace-finalize predicate, which - // debounces through `activity_notify` and re-checks the counter. - callNative(() => this.#ctx.waitUntil(Promise.resolve(promise))).catch( - (error) => { - if (!isClosedTaskRegistrationError(error)) { - logger().warn({ - msg: "waitUntil bridge to native runtime failed", - error: stringifyError(error), - }); - } - }, - ); + try { + callNativeSync(() => this.#ctx.waitUntil(Promise.resolve(promise))); + } catch (error) { + if (!isClosedTaskRegistrationError(error)) { + throw error; + } + } } beginWebSocketCallback(): number { @@ -2660,16 +2641,26 @@ export class NativeActorContextAdapter { } endWebSocketCallback(callbackRegionId: number): void { - callNativeSync(() => - this.#ctx.endWebsocketCallback(callbackRegionId), - ); + callNativeSync(() => this.#ctx.endWebsocketCallback(callbackRegionId)); + } + + // Intentionally a no-op. `setPreventSleep` / `preventSleep` are kept on the + // surface for legacy callers but must not gate sleep here. Callers that + // need to keep an actor awake should use `keepAwake(promise)` or + // `waitUntil(promise)` so the native counter machinery in rivetkit-core + // owns the lifecycle. + /** @deprecated Use `keepAwake(promise)` or `waitUntil(promise)` instead. */ + setPreventSleep(_preventSleep: boolean): void { + logger().warn({ + msg: "setPreventSleep is deprecated and is a no-op; use keepAwake(promise) or waitUntil(promise) instead", + }); } - /** @deprecated No-op. Use `keepAwake(promise)` or `waitUntil(promise)` instead. */ - setPreventSleep(_preventSleep: boolean): void {} - - /** @deprecated No-op. Always returns `false`. */ + /** @deprecated Use `keepAwake(promise)` or `waitUntil(promise)` instead. */ get preventSleep(): boolean { + logger().warn({ + msg: "preventSleep is deprecated and always returns false; use keepAwake(promise) or waitUntil(promise) instead", + }); return false; } @@ -2707,11 +2698,9 @@ export class NativeActorContextAdapter { if (nativeSignal.aborted) { controller.abort(); } else { - nativeSignal.addEventListener( - "abort", - () => controller.abort(), - { once: true }, - ); + nativeSignal.addEventListener("abort", () => controller.abort(), { + once: true, + }); } return controller.signal; } @@ -2719,7 +2708,9 @@ export class NativeActorContextAdapter { #readState(): unknown { const actorState = getNativePersistState(this.#ctx); if (actorState.state === undefined) { - actorState.state = decodeValue(callNativeSync(() => this.#ctx.state())); + actorState.state = decodeValue( + callNativeSync(() => this.#ctx.state()), + ); } return actorState.state; } @@ -2759,7 +2750,10 @@ export class NativeActorContextAdapter { callNativeSync(() => this.#ctx.beginOnStateChange()); let shouldFinish = true; try { - const result = this.#onStateChange(this, actorState.state) as unknown; + const result = this.#onStateChange( + this, + actorState.state, + ) as unknown; if (isPromiseLike(result)) { shouldFinish = false; void Promise.resolve(result) @@ -3012,14 +3006,8 @@ function withConnContext( dispatchCancelToken, ), { - conn: new NativeConnAdapter( - conn, - schemas, - ctx, - (connId) => - callNativeSync(() => - ctx.queueHibernationRemoval(connId), - ), + conn: new NativeConnAdapter(conn, schemas, ctx, (connId) => + callNativeSync(() => ctx.queueHibernationRemoval(connId)), ), }, ); @@ -3121,9 +3109,7 @@ function buildActorConfig( preloadMaxConnectionsBytes: options.preloadMaxConnectionsBytes as | number | undefined, - actions: Object.keys( - (config.actions ?? {}) as Record, - ) + actions: Object.keys((config.actions ?? {}) as Record) .sort() .map((name) => ({ name })), }; @@ -3260,8 +3246,9 @@ export function buildNativeFactory( }; try { await ctx.verifyInspectorAuth( - jsRequest.headers.get("authorization")?.replace(/^Bearer\s+/i, "") ?? - null, + jsRequest.headers + .get("authorization") + ?.replace(/^Bearer\s+/i, "") ?? null, ); } catch (error) { return errorResponse(error, 401); @@ -3335,13 +3322,11 @@ export function buildNativeFactory( const queue = ctx.queue(); const allMessages = await queue.inspectMessages(); const truncated = allMessages.length > limit; - const messages = allMessages - .slice(0, limit) - .map((m) => ({ - id: m.id, - name: m.name, - createdAtMs: m.createdAtMs, - })); + const messages = allMessages.slice(0, limit).map((m) => ({ + id: m.id, + name: m.name, + createdAtMs: m.createdAtMs, + })); return jsonResponse({ size: allMessages.length, maxSize: queue.maxSize(), @@ -3496,7 +3481,9 @@ export function buildNativeFactory( return jsonResponse({ rows: jsonSafe(rows) }); } const args = Array.isArray(body.args) ? body.args : []; - const rows = queryRows(await actorCtx.sql.query(body.sql, args)); + const rows = queryRows( + await actorCtx.sql.query(body.sql, args), + ); return jsonResponse({ rows: jsonSafe(rows) }); } if ( @@ -3581,7 +3568,7 @@ export function buildNativeFactory( } finally { await actorCtx.dispose(); } - }; + }; const callbacks = { createState: hasStaticState || typeof config.createState === "function" @@ -3593,14 +3580,20 @@ export function buildNativeFactory( input?: Buffer; }, ): Promise => { - const { ctx, input } = unwrapTsfnPayload(error, payload); + const { ctx, input } = unwrapTsfnPayload( + error, + payload, + ); const actorCtx = makeActorCtx(ctx); try { const decodedInput = decodeValue(input); const startedAt = performance.now(); const state = hasStaticState ? structuredClone(config.state) - : await config.createState(actorCtx, decodedInput); + : await config.createState( + actorCtx, + decodedInput, + ); logger().debug({ msg: "perf user: createStateMs", durationMs: performance.now() - startedAt, @@ -3623,10 +3616,16 @@ export function buildNativeFactory( input?: Buffer; }, ): Promise => { - const { ctx, input } = unwrapTsfnPayload(error, payload); + const { ctx, input } = unwrapTsfnPayload( + error, + payload, + ); const actorCtx = makeActorCtx(ctx); try { - await config.onCreate(actorCtx, decodeValue(input)); + await config.onCreate( + actorCtx, + decodeValue(input), + ); } finally { await actorCtx.dispose(); } @@ -3646,7 +3645,10 @@ export function buildNativeFactory( const startedAt = performance.now(); const vars = hasStaticVars ? structuredClone(config.vars) - : await config.createVars(actorCtx, undefined); + : await config.createVars( + actorCtx, + undefined, + ); logger().debug({ msg: "perf user: createVarsMs", durationMs: performance.now() - startedAt, @@ -3676,14 +3678,14 @@ export function buildNativeFactory( const actorCtx = makeActorCtx(ctx); try { if (!isNew) { - await actorCtx.closeDatabase(false); + await actorCtx.closeDatabase(); } await actorCtx.runDatabaseMigrations(); if (typeof config.onMigrate === "function") { await config.onMigrate(actorCtx, isNew); } } catch (error) { - await actorCtx.closeDatabase(true); + await actorCtx.closeDatabase(); throw error; } finally { await actorCtx.dispose(); @@ -3731,8 +3733,7 @@ export function buildNativeFactory( ) : undefined, onSleep: - typeof config.onSleep === "function" || - databaseProvider !== undefined + typeof config.onSleep === "function" ? wrapNativeCallback( async ( error: unknown, @@ -3741,12 +3742,9 @@ export function buildNativeFactory( const { ctx } = unwrapTsfnPayload(error, payload); const actorCtx = makeActorCtx(ctx); try { - if (typeof config.onSleep === "function") { - await config.onSleep(actorCtx); - } + await config.onSleep!(actorCtx); + await actorCtx.saveState({ immediate: true }); } finally { - await actorCtx.closeDatabase(false); - callNativeSync(() => ctx.clearRuntimeState()); await actorCtx.dispose(); } }, @@ -3756,7 +3754,10 @@ export function buildNativeFactory( typeof config.onDestroy === "function" || databaseProvider !== undefined ? wrapNativeCallback( - async (error: unknown, payload: { ctx: NativeActorContext }) => { + async ( + error: unknown, + payload: { ctx: NativeActorContext }, + ) => { const { ctx } = unwrapTsfnPayload(error, payload); const actorCtx = makeActorCtx(ctx); try { @@ -3765,8 +3766,7 @@ export function buildNativeFactory( } } finally { resolveNativeDestroy(ctx); - await actorCtx.closeDatabase(true); - callNativeSync(() => ctx.clearRuntimeState()); + await actorCtx.closeDatabase(); await actorCtx.dispose(); } }, @@ -3906,46 +3906,42 @@ export function buildNativeFactory( }, ) : undefined, - onDisconnectFinal: - needsDisconnectCallback - ? wrapNativeCallback( - async ( - error: unknown, - payload: { - ctx: NativeActorContext; - conn: NativeConnHandle; - }, - ) => { - const { ctx, conn } = unwrapTsfnPayload( - error, - payload, - ); - const actorCtx = makeConnCtx(ctx, conn); - try { - // Core already removed the connection; this hook is - // pure user dispatch. - if (typeof config.onDisconnect === "function") { - await config.onDisconnect( - actorCtx, - new NativeConnAdapter( - conn, - schemaConfig, - ctx, - (connId) => - callNativeSync(() => - ctx.queueHibernationRemoval( - connId, - ), + onDisconnectFinal: needsDisconnectCallback + ? wrapNativeCallback( + async ( + error: unknown, + payload: { + ctx: NativeActorContext; + conn: NativeConnHandle; + }, + ) => { + const { ctx, conn } = unwrapTsfnPayload(error, payload); + const actorCtx = makeConnCtx(ctx, conn); + try { + // Core already removed the connection; this hook is + // pure user dispatch. + if (typeof config.onDisconnect === "function") { + await config.onDisconnect( + actorCtx, + new NativeConnAdapter( + conn, + schemaConfig, + ctx, + (connId) => + callNativeSync(() => + ctx.queueHibernationRemoval( + connId, ), - ), - ); - } - } finally { - await actorCtx.dispose(); + ), + ), + ); } - }, - ) - : undefined, + } finally { + await actorCtx.dispose(); + } + }, + ) + : undefined, onBeforeSubscribe: schemaConfig.events && Object.values(schemaConfig.events).some( @@ -4135,8 +4131,10 @@ export function buildNativeFactory( }; }, ) => { - const { ctx, ws, request } = - unwrapTsfnPayload(error, payload); + const { ctx, ws, request } = unwrapTsfnPayload( + error, + payload, + ); const jsRequest = request ? buildRequest(request) : undefined; @@ -4239,10 +4237,8 @@ export function buildNativeFactory( cancelToken?: NativeCancellationToken; }, ) => { - const { ctx, conn, args, cancelToken } = unwrapTsfnPayload( - error, - payload, - ); + const { ctx, conn, args, cancelToken } = + unwrapTsfnPayload(error, payload); const actorCtx = conn != null ? makeConnCtx(ctx, conn, undefined, cancelToken) @@ -4326,17 +4322,18 @@ export function buildNativeFactory( const decodedBody = decodeValue(body); if (wait) { try { - const response = await actorCtx.queue.enqueueAndWait( - name, - decodedBody, - { - timeout: - timeoutMs === undefined || - timeoutMs === null - ? undefined - : Number(timeoutMs), - }, - ); + const response = + await actorCtx.queue.enqueueAndWait( + name, + decodedBody, + { + timeout: + timeoutMs === undefined || + timeoutMs === null + ? undefined + : Number(timeoutMs), + }, + ); return { status: "completed", response: diff --git a/rivetkit-typescript/packages/rivetkit/tests/driver/actor-db.test.ts b/rivetkit-typescript/packages/rivetkit/tests/driver/actor-db.test.ts index 1f9300ac7f..1832f64f2f 100644 --- a/rivetkit-typescript/packages/rivetkit/tests/driver/actor-db.test.ts +++ b/rivetkit-typescript/packages/rivetkit/tests/driver/actor-db.test.ts @@ -493,130 +493,17 @@ describeDriverMatrix("Actor Db", (driverTestConfig) => { }); } - describe("Actor Database Lifecycle Cleanup Tests", () => { - test( - "runs db provider cleanup on sleep", - async (c) => { - const { client } = await setupDriverTest(c, driverTestConfig); - const observer = client.dbLifecycleObserver.getOrCreate([ - "observer", - ]); - - const lifecycle = client.dbLifecycle.getOrCreate([ - `db-lifecycle-sleep-${crypto.randomUUID()}`, - ]); - const actorId = await lifecycle.getActorId(); - - const before = await observer.getCounts(actorId); - - await lifecycle.insertValue("before-sleep"); - await lifecycle.triggerSleep(); - await waitFor(driverTestConfig, SLEEP_WAIT_MS + 100); - await lifecycle.ping(); - - let after = before; - for (let i = 0; i < LIFECYCLE_POLL_ATTEMPTS; i++) { - after = await observer.getCounts(actorId); - if (after.cleanup >= before.cleanup + 1) { - break; - } - await waitFor(driverTestConfig, LIFECYCLE_POLL_INTERVAL_MS); - } - - expect(after.create).toBeGreaterThanOrEqual(before.create); - expect(after.migrate).toBeGreaterThanOrEqual(before.migrate); - expect(after.cleanup).toBeGreaterThanOrEqual( - before.cleanup + 1, - ); - }, - lifecycleTestTimeout, - ); - - test( - "runs db provider cleanup on destroy", - async (c) => { - const { client } = await setupDriverTest(c, driverTestConfig); - const observer = client.dbLifecycleObserver.getOrCreate([ - "observer", - ]); - - const lifecycle = client.dbLifecycle.getOrCreate([ - `db-lifecycle-destroy-${crypto.randomUUID()}`, - ]); - const actorId = await lifecycle.getActorId(); - const before = await observer.getCounts(actorId); - - await lifecycle.insertValue("before-destroy"); - await lifecycle.triggerDestroy(); - await waitFor(driverTestConfig, SLEEP_WAIT_MS + 100); - - let cleanupCount = before.cleanup; - for (let i = 0; i < LIFECYCLE_POLL_ATTEMPTS; i++) { - const counts = await observer.getCounts(actorId); - cleanupCount = counts.cleanup; - if (cleanupCount >= before.cleanup + 1) { - break; - } - await waitFor(driverTestConfig, LIFECYCLE_POLL_INTERVAL_MS); - } - - expect(cleanupCount).toBeGreaterThanOrEqual(before.cleanup + 1); - }, - lifecycleTestTimeout, - ); - - test( - "runs db provider cleanup when migration fails", - async (c) => { - const { client } = await setupDriverTest(c, driverTestConfig); - const observer = client.dbLifecycleObserver.getOrCreate([ - "observer", - ]); - const beforeTotalCleanup = - await observer.getTotalCleanupCount(); - const key = `db-lifecycle-migrate-failure-${crypto.randomUUID()}`; - const lifecycle = client.dbLifecycleFailing.getOrCreate([key]); - - let threw = false; - try { - await lifecycle.ping(); - } catch { - threw = true; - } - expect(threw).toBeTruthy(); - - let cleanupCount = beforeTotalCleanup; - for (let i = 0; i < LIFECYCLE_POLL_ATTEMPTS; i++) { - cleanupCount = await observer.getTotalCleanupCount(); - if (cleanupCount >= beforeTotalCleanup + 1) { - break; - } - await waitFor(driverTestConfig, LIFECYCLE_POLL_INTERVAL_MS); - } - - expect(cleanupCount).toBeGreaterThanOrEqual( - beforeTotalCleanup + 1, - ); - }, - lifecycleTestTimeout, - ); - + describe("Actor Database Lifecycle Tests", () => { test( "handles parallel actor lifecycle churn", async (c) => { const { client } = await setupDriverTest(c, driverTestConfig); - const observer = client.dbLifecycleObserver.getOrCreate([ - "observer", - ]); const actorHandles = Array.from({ length: 12 }, (_, i) => client.dbLifecycle.getOrCreate([ `db-lifecycle-stress-${i}-${crypto.randomUUID()}`, ]), ); - const actorIds = await Promise.all( - actorHandles.map((handle) => handle.getActorId()), - ); await Promise.all( actorHandles.map((handle, i) => @@ -655,29 +542,6 @@ describeDriverMatrix("Actor Db", (driverTestConfig) => { expect(count).toBe(2); } } - - const lifecycleCleanup = new Map(); - for (let i = 0; i < LIFECYCLE_POLL_ATTEMPTS; i++) { - let allCleaned = true; - for (const actorId of actorIds) { - const counts = await observer.getCounts(actorId); - lifecycleCleanup.set(actorId, counts.cleanup); - if (counts.cleanup < 1) { - allCleaned = false; - } - } - - if (allCleaned) { - break; - } - await waitFor(driverTestConfig, LIFECYCLE_POLL_INTERVAL_MS); - } - - for (const actorId of actorIds) { - expect( - lifecycleCleanup.get(actorId) ?? 0, - ).toBeGreaterThanOrEqual(1); - } }, lifecycleTestTimeout, );