Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"code": "incoming_too_long",
"group": "message",
"message": "Incoming message too long"
"message": "Incoming message too long."
}
1 change: 0 additions & 1 deletion rivetkit-typescript/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ The log name matches the key in `ActorMetrics.startup`. Internal phases use `per
- Public TS actor `onWake` maps to the native callback bag's `onWake`; `onBeforeActorStart` is an internal driver/NAPI startup hook, not public actor config.
- Static actor `state` values in `packages/rivetkit/src/registry/native.ts` must be `structuredClone(...)`d per actor instance; reusing the literal leaks mutations across different keyed actors.
- JS-only native actor caches in `packages/rivetkit/src/registry/native.ts` should live on `ActorContext.runtimeState()`, not on actorId-keyed module globals. Same-key recreates must get a fresh bag.
- Native database providers in `packages/rivetkit/src/registry/native.ts` must close through `closeDatabase(false)` after `onSleep`; otherwise provider `onDestroy` cleanup never runs for sleep/wake cycles and lifecycle counts drift from destroy behavior.
- Every `NativeConnAdapter` construction path in `packages/rivetkit/src/registry/native.ts` must keep the `CONN_STATE_MANAGER_SYMBOL` hookup; hibernatable conn mutations rely on core `ConnHandle::set_state` dirty tracking to request persistence.
- Durable native actor saves in `packages/rivetkit/src/registry/native.ts` must use `ctx.requestSaveAndWait({ immediate: true })`; state bytes are collected only through the `serializeState` callback.
- Opaque user payloads that must preserve JS `undefined` across Rust JSON/CBOR bridges should go through `encodeCborCompat` / `decodeCborCompat`; do not use those helpers on structural JSON envelopes where omitted fields must stay omitted.
Expand Down
2 changes: 1 addition & 1 deletion rivetkit-typescript/packages/rivetkit-napi/index.d.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* tslint:disable */

Check failure on line 1 in rivetkit-typescript/packages/rivetkit-napi/index.d.ts

View workflow job for this annotation

GitHub Actions / RivetKit / Quality Check

format

Formatter would have printed the following content:
/* eslint-disable */

/* auto-generated by NAPI-RS */
Expand Down Expand Up @@ -225,7 +225,7 @@
disconnectConn(id: string): Promise<void>
disconnectConns(predicate: (...args: any[]) => any): Promise<void>
broadcast(name: string, args: Buffer): void
waitUntil(promise: Promise<any>): Promise<void>
waitUntil(promise: Promise<any>): void
keepAwake(promise: Promise<any>): Promise<any>
registerTask(promise: Promise<any>): void
runtimeState(): object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ impl ActorContext {
}

#[napi]
pub async fn wait_until(&self, promise: Promise<serde_json::Value>) -> napi::Result<()> {
pub fn wait_until(&self, promise: Promise<serde_json::Value>) -> napi::Result<()> {
self.inner.wait_until(async move {
if let Err(error) = promise.await {
tracing::warn!(?error, "actor wait_until promise rejected");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,12 @@ import { scheduleActorSleep } from "./schedule-sleep";
type LifecycleCounts = {
create: number;
migrate: number;
cleanup: number;
};

const clientActorIds = new WeakMap<object, string>();

const createCounts = new Map<string, number>();
const migrateCounts = new Map<string, number>();
const cleanupCounts = new Map<string, number>();

function increment(map: Map<string, number>, actorId: string) {
map.set(actorId, (map.get(actorId) ?? 0) + 1);
Expand All @@ -22,18 +20,9 @@ function getCounts(actorId: string): LifecycleCounts {
return {
create: createCounts.get(actorId) ?? 0,
migrate: migrateCounts.get(actorId) ?? 0,
cleanup: cleanupCounts.get(actorId) ?? 0,
};
}

function getTotalCleanupCount(): number {
let total = 0;
for (const count of cleanupCounts.values()) {
total += count;
}
return total;
}

const baseProvider = db({
onMigrate: async (dbHandle) => {
await dbHandle.execute(`
Expand Down Expand Up @@ -62,15 +51,6 @@ const lifecycleProvider = {
}
await baseProvider.onMigrate(client);
},
onDestroy: async (
client: Parameters<NonNullable<typeof baseProvider.onDestroy>>[0],
) => {
const actorId = clientActorIds.get(client as object);
if (actorId) {
increment(cleanupCounts, actorId);
}
await baseProvider.onDestroy?.(client);
},
};

const failingLifecycleProvider = {
Expand All @@ -89,15 +69,6 @@ const failingLifecycleProvider = {
}
throw new Error("forced migrate failure");
},
onDestroy: async (
client: Parameters<NonNullable<typeof baseProvider.onDestroy>>[0],
) => {
const actorId = clientActorIds.get(client as object);
if (actorId) {
increment(cleanupCounts, actorId);
}
await baseProvider.onDestroy?.(client);
},
};

export const dbLifecycle = actor({
Expand Down Expand Up @@ -142,8 +113,5 @@ export const dbLifecycleObserver = actor({
getCounts: (_c, actorId: string) => {
return getCounts(actorId);
},
getTotalCleanupCount: () => {
return getTotalCleanupCount();
},
},
});
Original file line number Diff line number Diff line change
Expand Up @@ -2193,7 +2193,7 @@ export class ActorInstance<
} catch (error) {
if (client) {
try {
await this.#config.db.onDestroy?.(client);
await client.close();
} catch (cleanupError) {
this.#rLog.error({
msg: "database setup cleanup failed",
Expand Down Expand Up @@ -2227,7 +2227,7 @@ export class ActorInstance<

if (client && dbConfig) {
try {
await dbConfig.onDestroy?.(client);
await client.close();
} catch (error) {
this.#rLog.error({
msg: "database cleanup failed",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ export interface SqliteDatabase {
): Promise<void>;
run(sql: string, params?: SqliteBindings): Promise<void>;
query(sql: string, params?: SqliteBindings): Promise<SqliteQueryResult>;
getSqliteVfsMetrics?: () => import("./native-database").SqliteVfsMetrics | null;
getSqliteVfsMetrics?: () =>
| import("./native-database").SqliteVfsMetrics
| null;
close(): Promise<void>;
}

Expand Down Expand Up @@ -94,12 +96,6 @@ export type DatabaseProvider<DB extends RawAccess> = {
* @experimental
*/
onMigrate: (client: DB) => void | Promise<void>;
/**
* Runs when the actor is being destroyed.
* Use this to clean up database connections and release resources.
* @experimental
*/
onDestroy?: (client: DB) => void | Promise<void>;
};

/**
Expand Down
34 changes: 22 additions & 12 deletions rivetkit-typescript/packages/rivetkit/src/common/database/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,16 @@ export function db({
const returnsRows = sqlReturnsRows(query);
if (!hasMultipleStatements(query)) {
if (returnsRows) {
const { rows, columns } = await db.query(
query,
);
const { rows, columns } =
await db.query(query);
result = rows.map((row: unknown[]) => {
const rowObj: Record<string, unknown> = {};
for (let i = 0; i < columns.length; i++) {
const rowObj: Record<string, unknown> =
{};
for (
let i = 0;
i < columns.length;
i++
) {
rowObj[columns[i]] = row[i];
}
return rowObj;
Expand All @@ -119,16 +123,24 @@ export function db({
await db.run(query);
result = [] as TRow[];
} else {
const results: Record<string, unknown>[] = [];
const results: Record<string, unknown>[] =
[];
let columnNames: string[] | null = null;
await db.exec(
query,
(row: unknown[], columns: string[]) => {
if (!columnNames) {
columnNames = columns;
}
const rowObj: Record<string, unknown> = {};
for (let i = 0; i < row.length; i++) {
const rowObj: Record<
string,
unknown
> = {};
for (
let i = 0;
i < row.length;
i++
) {
rowObj[columnNames[i]] = row[i];
}
results.push(rowObj);
Expand All @@ -145,7 +157,8 @@ export function db({
if (!columnNames) {
columnNames = columns;
}
const rowObj: Record<string, unknown> = {};
const rowObj: Record<string, unknown> =
{};
for (let i = 0; i < row.length; i++) {
rowObj[columnNames[i]] = row[i];
}
Expand Down Expand Up @@ -192,8 +205,5 @@ export function db({
await onMigrate(client);
}
},
onDestroy: async (client) => {
await client.close();
},
};
}
6 changes: 2 additions & 4 deletions rivetkit-typescript/packages/rivetkit/src/db/drizzle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ export function db<TSchema extends DrizzleSchema = Record<string, never>>({
msg: "sql query",
query: query.slice(0, 120),
durationMs,
kvReads: ctx.metrics.totalKvReads - kvReadsBefore,
kvReads:
ctx.metrics.totalKvReads - kvReadsBefore,
kvWrites:
ctx.metrics.totalKvWrites - kvWritesBefore,
});
Expand Down Expand Up @@ -188,9 +189,6 @@ export function db<TSchema extends DrizzleSchema = Record<string, never>>({
await onMigrate(client);
}
},
onDestroy: async (client) => {
await client.close();
},
};
}

Expand Down
Loading
Loading