Skip to content

Commit 3f92424

Browse files
committed
refactor(rivetkit): switch sqlite runtime to native-only databases
1 parent ec1d965 commit 3f92424

31 files changed

Lines changed: 489 additions & 1680 deletions

File tree

rivetkit-typescript/packages/rivetkit-native/wrapper.d.ts

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -128,19 +128,4 @@ export declare function openDatabaseFromEnvoy(
128128
actorId: string,
129129
preloadedEntries?: readonly [Uint8Array, Uint8Array][] | null,
130130
): Promise<JsNativeDatabase>;
131-
132-
export interface NativeRawDatabase {
133-
execute: <TRow extends Record<string, unknown> = Record<string, unknown>>(
134-
query: string,
135-
...args: unknown[]
136-
) => Promise<TRow[]>;
137-
close: () => Promise<void>;
138-
}
139-
140-
export declare function openRawDatabaseFromEnvoy(
141-
handle: EnvoyHandle,
142-
actorId: string,
143-
preloadedEntries?: readonly [Uint8Array, Uint8Array][] | null,
144-
): Promise<NativeRawDatabase>;
145-
146131
export declare const utils: {};

rivetkit-typescript/packages/rivetkit-native/wrapper.js

Lines changed: 0 additions & 177 deletions
Original file line numberDiff line numberDiff line change
@@ -195,182 +195,6 @@ function decodePreloadedKv(preloadedKv) {
195195
};
196196
}
197197

198-
function isPlainObject(value) {
199-
return (
200-
!!value &&
201-
typeof value === "object" &&
202-
!Array.isArray(value) &&
203-
Object.getPrototypeOf(value) === Object.prototype
204-
);
205-
}
206-
207-
function toNativeBinding(value) {
208-
if (value === null || value === undefined) {
209-
return { kind: "null" };
210-
}
211-
if (typeof value === "bigint") {
212-
return { kind: "int", intValue: Number(value) };
213-
}
214-
if (typeof value === "number") {
215-
return Number.isInteger(value)
216-
? { kind: "int", intValue: value }
217-
: { kind: "float", floatValue: value };
218-
}
219-
if (typeof value === "string") {
220-
return { kind: "text", textValue: value };
221-
}
222-
if (value instanceof ArrayBuffer) {
223-
return { kind: "blob", blobValue: Buffer.from(value) };
224-
}
225-
if (ArrayBuffer.isView(value)) {
226-
return {
227-
kind: "blob",
228-
blobValue: Buffer.from(value.buffer, value.byteOffset, value.byteLength),
229-
};
230-
}
231-
232-
throw new Error(`unsupported sqlite binding type: ${typeof value}`);
233-
}
234-
235-
function extractNamedSqliteParameters(sql) {
236-
return [...sql.matchAll(/([:@$][A-Za-z_][A-Za-z0-9_]*)/g)].map(
237-
(match) => match[1],
238-
);
239-
}
240-
241-
function getNamedSqliteBinding(bindings, name) {
242-
if (name in bindings) {
243-
return bindings[name];
244-
}
245-
246-
const bareName = name.slice(1);
247-
if (bareName in bindings) {
248-
return bindings[bareName];
249-
}
250-
251-
for (const prefix of [":", "@", "$"]) {
252-
const candidate = `${prefix}${bareName}`;
253-
if (candidate in bindings) {
254-
return bindings[candidate];
255-
}
256-
}
257-
258-
return undefined;
259-
}
260-
261-
function normalizeBindings(sql, args) {
262-
if (!args || args.length === 0) {
263-
return [];
264-
}
265-
266-
if (
267-
args.length === 1 &&
268-
isPlainObject(args[0]) &&
269-
!(args[0] instanceof Uint8Array)
270-
) {
271-
const names = extractNamedSqliteParameters(sql);
272-
if (names.length === 0) {
273-
throw new Error(
274-
"native sqlite object bindings require named placeholders in the SQL statement",
275-
);
276-
}
277-
return names.map((name) => {
278-
const value = getNamedSqliteBinding(args[0], name);
279-
if (value === undefined) {
280-
throw new Error(`missing bind parameter: ${name}`);
281-
}
282-
return toNativeBinding(value);
283-
});
284-
}
285-
286-
return args.map(toNativeBinding);
287-
}
288-
289-
function mapRows(rows, columns) {
290-
return rows.map((row) => {
291-
const rowObject = {};
292-
for (let i = 0; i < columns.length; i++) {
293-
rowObject[columns[i]] = row[i];
294-
}
295-
return rowObject;
296-
});
297-
}
298-
299-
function wrapNativeStorageError(nativeDb, error) {
300-
const lastKvError =
301-
typeof nativeDb.takeLastKvError === "function"
302-
? nativeDb.takeLastKvError()
303-
: null;
304-
if (!lastKvError) {
305-
throw error;
306-
}
307-
throw new Error(
308-
`Database query failed because the underlying storage is no longer available (${lastKvError}). This usually means the actor is stopping. Use c.abortSignal to cancel long-running work before the actor shuts down.`,
309-
{ cause: error },
310-
);
311-
}
312-
313-
async function openRawDatabaseFromEnvoy(handle, actorId, preloadedEntries) {
314-
const nativeDb = await openDatabaseFromEnvoy(
315-
handle,
316-
actorId,
317-
preloadedEntries,
318-
);
319-
let closed = false;
320-
321-
const ensureOpen = () => {
322-
if (closed) {
323-
throw new Error("database is closed");
324-
}
325-
};
326-
327-
return {
328-
execute: async (query, ...args) => {
329-
ensureOpen();
330-
331-
if (args.length > 0) {
332-
const bindings = normalizeBindings(query, args);
333-
const token = query.trimStart().slice(0, 16).toUpperCase();
334-
const returnsRows =
335-
token.startsWith("SELECT") ||
336-
token.startsWith("PRAGMA") ||
337-
token.startsWith("WITH") ||
338-
/\bRETURNING\b/i.test(query);
339-
340-
if (returnsRows) {
341-
try {
342-
const result = await nativeDb.query(query, bindings);
343-
return mapRows(result.rows, result.columns);
344-
} catch (error) {
345-
wrapNativeStorageError(nativeDb, error);
346-
}
347-
}
348-
349-
try {
350-
await nativeDb.run(query, bindings);
351-
} catch (error) {
352-
wrapNativeStorageError(nativeDb, error);
353-
}
354-
return [];
355-
}
356-
357-
try {
358-
const result = await nativeDb.exec(query);
359-
return mapRows(result.rows, result.columns);
360-
} catch (error) {
361-
wrapNativeStorageError(nativeDb, error);
362-
}
363-
},
364-
close: async () => {
365-
if (closed) {
366-
return;
367-
}
368-
closed = true;
369-
await nativeDb.close();
370-
},
371-
};
372-
}
373-
374198
/**
375199
* Route callback envelopes from the native addon to EnvoyConfig callbacks.
376200
*/
@@ -670,4 +494,3 @@ function handleEvent(event, config, wrappedHandle) {
670494
module.exports.startEnvoy = startEnvoy;
671495
module.exports.startEnvoySync = startEnvoySync;
672496
module.exports.openDatabaseFromEnvoy = openDatabaseFromEnvoy;
673-
module.exports.openRawDatabaseFromEnvoy = openRawDatabaseFromEnvoy;

rivetkit-typescript/packages/rivetkit/dynamic-isolate-runtime/src/index.cts

Lines changed: 69 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import { createActorRouter } from "../../src/actor/router";
1717
import { routeWebSocket } from "../../src/actor/router-websocket-endpoints";
1818
import { HEADER_CONN_PARAMS } from "../../src/common/actor-router-consts";
1919
import { InlineWebSocketAdapter } from "../../src/common/inline-websocket-adapter";
20-
import type { ISqliteVfs } from "@rivetkit/sqlite-wasm";
20+
import type { NativeDatabaseProvider, SqliteDatabase } from "../../src/db/config";
2121
import {
2222
DYNAMIC_BOOTSTRAP_CONFIG_GLOBAL_KEY,
2323
DYNAMIC_HOST_BRIDGE_GLOBAL_KEYS,
@@ -34,8 +34,6 @@ import {
3434
} from "../../src/dynamic/runtime-bridge";
3535
import { RegistryConfigSchema } from "../../src/registry/config";
3636

37-
const { SqliteVfsPool } = require("@rivetkit/sqlite-wasm") as typeof import("@rivetkit/sqlite-wasm");
38-
3937
interface IsolateReferenceLike {
4038
applySyncPromise(
4139
receiver: unknown,
@@ -60,6 +58,10 @@ interface DynamicHostBridge {
6058
kvDeleteRange: IsolateReferenceLike;
6159
kvListPrefix: IsolateReferenceLike;
6260
kvListRange: IsolateReferenceLike;
61+
dbExec: IsolateReferenceLike;
62+
dbQuery: IsolateReferenceLike;
63+
dbRun: IsolateReferenceLike;
64+
dbClose: IsolateReferenceLike;
6365
setAlarm: IsolateReferenceLike;
6466
clientCall: IsolateReferenceLike;
6567
ackHibernatableWebSocketMessage: IsolateReferenceLike;
@@ -108,7 +110,7 @@ interface DynamicActorDriver {
108110
},
109111
): Promise<Array<[Uint8Array, Uint8Array]>>;
110112
setAlarm(actor: { id: string }, timestamp: number): Promise<void>;
111-
createSqliteVfs(actorId: string): Promise<ISqliteVfs>;
113+
getNativeDatabaseProvider(): NativeDatabaseProvider;
112114
startSleep(actorId: string): void;
113115
ackHibernatableWebSocketMessage(
114116
gatewayId: ArrayBuffer,
@@ -282,12 +284,7 @@ const webSocketSessions = new Map<
282284
}
283285
>();
284286
const CLIENT_ACCESSOR_METHODS = new Set(["get", "getOrCreate", "getForId", "create"]);
285-
let sqliteVfsPoolPromise:
286-
| Promise<{
287-
acquire(actorId: string): Promise<ISqliteVfs>;
288-
shutdown(): Promise<void>;
289-
}>
290-
| undefined;
287+
const nativeDatabaseCache = new Map<string, SqliteDatabase>();
291288

292289
type DynamicActorRouter = ReturnType<typeof createActorRouter>;
293290

@@ -371,6 +368,10 @@ function readHostBridge(): DynamicHostBridge {
371368
kvListRange: getRequiredHostRef(
372369
DYNAMIC_HOST_BRIDGE_GLOBAL_KEYS.kvListRange,
373370
),
371+
dbExec: getRequiredHostRef(DYNAMIC_HOST_BRIDGE_GLOBAL_KEYS.dbExec),
372+
dbQuery: getRequiredHostRef(DYNAMIC_HOST_BRIDGE_GLOBAL_KEYS.dbQuery),
373+
dbRun: getRequiredHostRef(DYNAMIC_HOST_BRIDGE_GLOBAL_KEYS.dbRun),
374+
dbClose: getRequiredHostRef(DYNAMIC_HOST_BRIDGE_GLOBAL_KEYS.dbClose),
374375
setAlarm: getRequiredHostRef(DYNAMIC_HOST_BRIDGE_GLOBAL_KEYS.setAlarm),
375376
clientCall: getRequiredHostRef(DYNAMIC_HOST_BRIDGE_GLOBAL_KEYS.clientCall),
376377
ackHibernatableWebSocketMessage: getRequiredHostRef(
@@ -452,23 +453,6 @@ async function getRuntimeState(): Promise<DynamicRuntimeState> {
452453
return await runtimeStatePromise;
453454
}
454455

455-
async function loadSqliteVfsPool(): Promise<{
456-
acquire(actorId: string): Promise<ISqliteVfs>;
457-
shutdown(): Promise<void>;
458-
}> {
459-
if (!sqliteVfsPoolPromise) {
460-
sqliteVfsPoolPromise = Promise.resolve().then(
461-
() =>
462-
new SqliteVfsPool({
463-
actorsPerInstance: 50,
464-
idleDestroyMs: 30_000,
465-
}),
466-
);
467-
}
468-
469-
return await sqliteVfsPoolPromise;
470-
}
471-
472456
function dynamicHostLog(level: "debug" | "warn", message: string): void {
473457
if (!hostBridge.log) {
474458
return;
@@ -512,6 +496,45 @@ function bridgeCallSync<T>(ref: IsolateReferenceLike, args: unknown[]): T {
512496
}) as T;
513497
}
514498

499+
function createNativeDatabaseBridge(actorIdValue: string): SqliteDatabase {
500+
return {
501+
async exec(
502+
sql: string,
503+
callback?: (row: unknown[], columns: string[]) => void,
504+
): Promise<void> {
505+
const result = await bridgeCall<{
506+
columns: string[];
507+
rows: unknown[][];
508+
}>(hostBridge.dbExec, [actorIdValue, sql]);
509+
if (!callback) {
510+
return;
511+
}
512+
for (const row of result.rows) {
513+
callback(row, result.columns);
514+
}
515+
},
516+
async run(
517+
sql: string,
518+
params?: unknown[] | Record<string, unknown>,
519+
): Promise<void> {
520+
await bridgeCall(hostBridge.dbRun, [actorIdValue, sql, params]);
521+
},
522+
async query(
523+
sql: string,
524+
params?: unknown[] | Record<string, unknown>,
525+
): Promise<{ rows: unknown[][]; columns: string[] }> {
526+
return await bridgeCall(hostBridge.dbQuery, [actorIdValue, sql, params]);
527+
},
528+
async close(): Promise<void> {
529+
try {
530+
await bridgeCall(hostBridge.dbClose, [actorIdValue]);
531+
} finally {
532+
nativeDatabaseCache.delete(actorIdValue);
533+
}
534+
},
535+
};
536+
}
537+
515538
function toArrayBuffer(input: Uint8Array | ArrayBuffer): ArrayBuffer {
516539
if (input instanceof ArrayBuffer) {
517540
return input;
@@ -851,9 +874,18 @@ const actorDriver: DynamicActorDriver = {
851874
async setAlarm(actor, timestamp: number): Promise<void> {
852875
await bridgeCall(hostBridge.setAlarm, [actor.id, timestamp]);
853876
},
854-
async createSqliteVfs(actorIdValue: string): Promise<ISqliteVfs> {
855-
const pool = await loadSqliteVfsPool();
856-
return await pool.acquire(actorIdValue);
877+
getNativeDatabaseProvider(): NativeDatabaseProvider {
878+
return {
879+
open: async (actorIdValue: string): Promise<SqliteDatabase> => {
880+
const existing = nativeDatabaseCache.get(actorIdValue);
881+
if (existing) {
882+
return existing;
883+
}
884+
const database = createNativeDatabaseBridge(actorIdValue);
885+
nativeDatabaseCache.set(actorIdValue, database);
886+
return database;
887+
},
888+
};
857889
},
858890
startSleep(requestActorId: string): void {
859891
bridgeCallSync(hostBridge.startSleep, [requestActorId]);
@@ -1306,10 +1338,13 @@ async function dynamicDisposeEnvelope(): Promise<boolean> {
13061338
}
13071339
webSocketSessions.clear();
13081340
runtimeStopMode = undefined;
1309-
if (sqliteVfsPoolPromise) {
1310-
const sqliteVfsPool = await sqliteVfsPoolPromise;
1311-
await sqliteVfsPool.shutdown();
1312-
sqliteVfsPoolPromise = undefined;
1341+
for (const [actorId, database] of nativeDatabaseCache.entries()) {
1342+
try {
1343+
await database.close();
1344+
} catch {
1345+
// noop
1346+
}
1347+
nativeDatabaseCache.delete(actorId);
13131348
}
13141349
return true;
13151350
}

rivetkit-typescript/packages/rivetkit/package.json

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -332,8 +332,6 @@
332332
"@rivetkit/engine-runner": "workspace:*",
333333
"@rivetkit/fast-json-patch": "^3.1.2",
334334
"@rivetkit/on-change": "^6.0.2-rc.1",
335-
"@rivetkit/sqlite": "^0.1.1",
336-
"@rivetkit/sqlite-wasm": "workspace:*",
337335
"@rivetkit/traces": "workspace:*",
338336
"@rivetkit/virtual-websocket": "workspace:*",
339337
"@rivetkit/workflow-engine": "workspace:*",

0 commit comments

Comments
 (0)