Skip to content

Commit 7158470

Browse files
committed
refactor(rivetkit): switch sqlite runtime to native-only databases
1 parent ebe7c80 commit 7158470

30 files changed

Lines changed: 490 additions & 1669 deletions

File tree

rivetkit-typescript/packages/rivetkit-native/wrapper.d.ts

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -128,19 +128,4 @@ export declare function openDatabaseFromEnvoy(
128128
actorId: string,
129129
preloadedEntries?: readonly [Uint8Array, Uint8Array][] | null,
130130
): Promise<JsNativeDatabase>;
131-
132-
export interface NativeRawDatabase {
133-
execute: <TRow extends Record<string, unknown> = Record<string, unknown>>(
134-
query: string,
135-
...args: unknown[]
136-
) => Promise<TRow[]>;
137-
close: () => Promise<void>;
138-
}
139-
140-
export declare function openRawDatabaseFromEnvoy(
141-
handle: EnvoyHandle,
142-
actorId: string,
143-
preloadedEntries?: readonly [Uint8Array, Uint8Array][] | null,
144-
): Promise<NativeRawDatabase>;
145-
146131
export declare const utils: {};

rivetkit-typescript/packages/rivetkit-native/wrapper.js

Lines changed: 0 additions & 177 deletions
Original file line numberDiff line numberDiff line change
@@ -194,182 +194,6 @@ function decodePreloadedKv(preloadedKv) {
194194
};
195195
}
196196

197-
function isPlainObject(value) {
198-
return (
199-
!!value &&
200-
typeof value === "object" &&
201-
!Array.isArray(value) &&
202-
Object.getPrototypeOf(value) === Object.prototype
203-
);
204-
}
205-
206-
function toNativeBinding(value) {
207-
if (value === null || value === undefined) {
208-
return { kind: "null" };
209-
}
210-
if (typeof value === "bigint") {
211-
return { kind: "int", intValue: Number(value) };
212-
}
213-
if (typeof value === "number") {
214-
return Number.isInteger(value)
215-
? { kind: "int", intValue: value }
216-
: { kind: "float", floatValue: value };
217-
}
218-
if (typeof value === "string") {
219-
return { kind: "text", textValue: value };
220-
}
221-
if (value instanceof ArrayBuffer) {
222-
return { kind: "blob", blobValue: Buffer.from(value) };
223-
}
224-
if (ArrayBuffer.isView(value)) {
225-
return {
226-
kind: "blob",
227-
blobValue: Buffer.from(value.buffer, value.byteOffset, value.byteLength),
228-
};
229-
}
230-
231-
throw new Error(`unsupported sqlite binding type: ${typeof value}`);
232-
}
233-
234-
function extractNamedSqliteParameters(sql) {
235-
return [...sql.matchAll(/([:@$][A-Za-z_][A-Za-z0-9_]*)/g)].map(
236-
(match) => match[1],
237-
);
238-
}
239-
240-
function getNamedSqliteBinding(bindings, name) {
241-
if (name in bindings) {
242-
return bindings[name];
243-
}
244-
245-
const bareName = name.slice(1);
246-
if (bareName in bindings) {
247-
return bindings[bareName];
248-
}
249-
250-
for (const prefix of [":", "@", "$"]) {
251-
const candidate = `${prefix}${bareName}`;
252-
if (candidate in bindings) {
253-
return bindings[candidate];
254-
}
255-
}
256-
257-
return undefined;
258-
}
259-
260-
function normalizeBindings(sql, args) {
261-
if (!args || args.length === 0) {
262-
return [];
263-
}
264-
265-
if (
266-
args.length === 1 &&
267-
isPlainObject(args[0]) &&
268-
!(args[0] instanceof Uint8Array)
269-
) {
270-
const names = extractNamedSqliteParameters(sql);
271-
if (names.length === 0) {
272-
throw new Error(
273-
"native sqlite object bindings require named placeholders in the SQL statement",
274-
);
275-
}
276-
return names.map((name) => {
277-
const value = getNamedSqliteBinding(args[0], name);
278-
if (value === undefined) {
279-
throw new Error(`missing bind parameter: ${name}`);
280-
}
281-
return toNativeBinding(value);
282-
});
283-
}
284-
285-
return args.map(toNativeBinding);
286-
}
287-
288-
function mapRows(rows, columns) {
289-
return rows.map((row) => {
290-
const rowObject = {};
291-
for (let i = 0; i < columns.length; i++) {
292-
rowObject[columns[i]] = row[i];
293-
}
294-
return rowObject;
295-
});
296-
}
297-
298-
function wrapNativeStorageError(nativeDb, error) {
299-
const lastKvError =
300-
typeof nativeDb.takeLastKvError === "function"
301-
? nativeDb.takeLastKvError()
302-
: null;
303-
if (!lastKvError) {
304-
throw error;
305-
}
306-
throw new Error(
307-
`Database query failed because the underlying storage is no longer available (${lastKvError}). This usually means the actor is stopping. Use c.abortSignal to cancel long-running work before the actor shuts down.`,
308-
{ cause: error },
309-
);
310-
}
311-
312-
async function openRawDatabaseFromEnvoy(handle, actorId, preloadedEntries) {
313-
const nativeDb = await openDatabaseFromEnvoy(
314-
handle,
315-
actorId,
316-
preloadedEntries,
317-
);
318-
let closed = false;
319-
320-
const ensureOpen = () => {
321-
if (closed) {
322-
throw new Error("database is closed");
323-
}
324-
};
325-
326-
return {
327-
execute: async (query, ...args) => {
328-
ensureOpen();
329-
330-
if (args.length > 0) {
331-
const bindings = normalizeBindings(query, args);
332-
const token = query.trimStart().slice(0, 16).toUpperCase();
333-
const returnsRows =
334-
token.startsWith("SELECT") ||
335-
token.startsWith("PRAGMA") ||
336-
token.startsWith("WITH") ||
337-
/\bRETURNING\b/i.test(query);
338-
339-
if (returnsRows) {
340-
try {
341-
const result = await nativeDb.query(query, bindings);
342-
return mapRows(result.rows, result.columns);
343-
} catch (error) {
344-
wrapNativeStorageError(nativeDb, error);
345-
}
346-
}
347-
348-
try {
349-
await nativeDb.run(query, bindings);
350-
} catch (error) {
351-
wrapNativeStorageError(nativeDb, error);
352-
}
353-
return [];
354-
}
355-
356-
try {
357-
const result = await nativeDb.exec(query);
358-
return mapRows(result.rows, result.columns);
359-
} catch (error) {
360-
wrapNativeStorageError(nativeDb, error);
361-
}
362-
},
363-
close: async () => {
364-
if (closed) {
365-
return;
366-
}
367-
closed = true;
368-
await nativeDb.close();
369-
},
370-
};
371-
}
372-
373197
/**
374198
* Route callback envelopes from the native addon to EnvoyConfig callbacks.
375199
*/
@@ -635,4 +459,3 @@ function handleEvent(event, config, wrappedHandle) {
635459
module.exports.startEnvoy = startEnvoy;
636460
module.exports.startEnvoySync = startEnvoySync;
637461
module.exports.openDatabaseFromEnvoy = openDatabaseFromEnvoy;
638-
module.exports.openRawDatabaseFromEnvoy = openRawDatabaseFromEnvoy;

rivetkit-typescript/packages/rivetkit/dynamic-isolate-runtime/src/index.cts

Lines changed: 69 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import { createActorRouter } from "../../src/actor/router";
1717
import { routeWebSocket } from "../../src/actor/router-websocket-endpoints";
1818
import { HEADER_CONN_PARAMS } from "../../src/common/actor-router-consts";
1919
import { InlineWebSocketAdapter } from "../../src/common/inline-websocket-adapter";
20-
import type { ISqliteVfs } from "@rivetkit/sqlite-wasm";
20+
import type { NativeDatabaseProvider, SqliteDatabase } from "../../src/db/config";
2121
import {
2222
DYNAMIC_BOOTSTRAP_CONFIG_GLOBAL_KEY,
2323
DYNAMIC_HOST_BRIDGE_GLOBAL_KEYS,
@@ -34,8 +34,6 @@ import {
3434
} from "../../src/dynamic/runtime-bridge";
3535
import { RegistryConfigSchema } from "../../src/registry/config";
3636

37-
const { SqliteVfsPool } = require("@rivetkit/sqlite-wasm") as typeof import("@rivetkit/sqlite-wasm");
38-
3937
interface IsolateReferenceLike {
4038
applySyncPromise(
4139
receiver: unknown,
@@ -60,6 +58,10 @@ interface DynamicHostBridge {
6058
kvDeleteRange: IsolateReferenceLike;
6159
kvListPrefix: IsolateReferenceLike;
6260
kvListRange: IsolateReferenceLike;
61+
dbExec: IsolateReferenceLike;
62+
dbQuery: IsolateReferenceLike;
63+
dbRun: IsolateReferenceLike;
64+
dbClose: IsolateReferenceLike;
6365
setAlarm: IsolateReferenceLike;
6466
clientCall: IsolateReferenceLike;
6567
ackHibernatableWebSocketMessage: IsolateReferenceLike;
@@ -108,7 +110,7 @@ interface DynamicActorDriver {
108110
},
109111
): Promise<Array<[Uint8Array, Uint8Array]>>;
110112
setAlarm(actor: { id: string }, timestamp: number): Promise<void>;
111-
createSqliteVfs(actorId: string): Promise<ISqliteVfs>;
113+
getNativeDatabaseProvider(): NativeDatabaseProvider;
112114
startSleep(actorId: string): void;
113115
ackHibernatableWebSocketMessage(
114116
gatewayId: ArrayBuffer,
@@ -282,12 +284,7 @@ const webSocketSessions = new Map<
282284
}
283285
>();
284286
const CLIENT_ACCESSOR_METHODS = new Set(["get", "getOrCreate", "getForId", "create"]);
285-
let sqliteVfsPoolPromise:
286-
| Promise<{
287-
acquire(actorId: string): Promise<ISqliteVfs>;
288-
shutdown(): Promise<void>;
289-
}>
290-
| undefined;
287+
const nativeDatabaseCache = new Map<string, SqliteDatabase>();
291288

292289
type DynamicActorRouter = ReturnType<typeof createActorRouter>;
293290

@@ -371,6 +368,10 @@ function readHostBridge(): DynamicHostBridge {
371368
kvListRange: getRequiredHostRef(
372369
DYNAMIC_HOST_BRIDGE_GLOBAL_KEYS.kvListRange,
373370
),
371+
dbExec: getRequiredHostRef(DYNAMIC_HOST_BRIDGE_GLOBAL_KEYS.dbExec),
372+
dbQuery: getRequiredHostRef(DYNAMIC_HOST_BRIDGE_GLOBAL_KEYS.dbQuery),
373+
dbRun: getRequiredHostRef(DYNAMIC_HOST_BRIDGE_GLOBAL_KEYS.dbRun),
374+
dbClose: getRequiredHostRef(DYNAMIC_HOST_BRIDGE_GLOBAL_KEYS.dbClose),
374375
setAlarm: getRequiredHostRef(DYNAMIC_HOST_BRIDGE_GLOBAL_KEYS.setAlarm),
375376
clientCall: getRequiredHostRef(DYNAMIC_HOST_BRIDGE_GLOBAL_KEYS.clientCall),
376377
ackHibernatableWebSocketMessage: getRequiredHostRef(
@@ -452,23 +453,6 @@ async function getRuntimeState(): Promise<DynamicRuntimeState> {
452453
return await runtimeStatePromise;
453454
}
454455

455-
async function loadSqliteVfsPool(): Promise<{
456-
acquire(actorId: string): Promise<ISqliteVfs>;
457-
shutdown(): Promise<void>;
458-
}> {
459-
if (!sqliteVfsPoolPromise) {
460-
sqliteVfsPoolPromise = Promise.resolve().then(
461-
() =>
462-
new SqliteVfsPool({
463-
actorsPerInstance: 50,
464-
idleDestroyMs: 30_000,
465-
}),
466-
);
467-
}
468-
469-
return await sqliteVfsPoolPromise;
470-
}
471-
472456
function dynamicHostLog(level: "debug" | "warn", message: string): void {
473457
if (!hostBridge.log) {
474458
return;
@@ -512,6 +496,45 @@ function bridgeCallSync<T>(ref: IsolateReferenceLike, args: unknown[]): T {
512496
}) as T;
513497
}
514498

499+
function createNativeDatabaseBridge(actorIdValue: string): SqliteDatabase {
500+
return {
501+
async exec(
502+
sql: string,
503+
callback?: (row: unknown[], columns: string[]) => void,
504+
): Promise<void> {
505+
const result = await bridgeCall<{
506+
columns: string[];
507+
rows: unknown[][];
508+
}>(hostBridge.dbExec, [actorIdValue, sql]);
509+
if (!callback) {
510+
return;
511+
}
512+
for (const row of result.rows) {
513+
callback(row, result.columns);
514+
}
515+
},
516+
async run(
517+
sql: string,
518+
params?: unknown[] | Record<string, unknown>,
519+
): Promise<void> {
520+
await bridgeCall(hostBridge.dbRun, [actorIdValue, sql, params]);
521+
},
522+
async query(
523+
sql: string,
524+
params?: unknown[] | Record<string, unknown>,
525+
): Promise<{ rows: unknown[][]; columns: string[] }> {
526+
return await bridgeCall(hostBridge.dbQuery, [actorIdValue, sql, params]);
527+
},
528+
async close(): Promise<void> {
529+
try {
530+
await bridgeCall(hostBridge.dbClose, [actorIdValue]);
531+
} finally {
532+
nativeDatabaseCache.delete(actorIdValue);
533+
}
534+
},
535+
};
536+
}
537+
515538
function toArrayBuffer(input: Uint8Array | ArrayBuffer): ArrayBuffer {
516539
if (input instanceof ArrayBuffer) {
517540
return input;
@@ -851,9 +874,18 @@ const actorDriver: DynamicActorDriver = {
851874
async setAlarm(actor, timestamp: number): Promise<void> {
852875
await bridgeCall(hostBridge.setAlarm, [actor.id, timestamp]);
853876
},
854-
async createSqliteVfs(actorIdValue: string): Promise<ISqliteVfs> {
855-
const pool = await loadSqliteVfsPool();
856-
return await pool.acquire(actorIdValue);
877+
getNativeDatabaseProvider(): NativeDatabaseProvider {
878+
return {
879+
open: async (actorIdValue: string): Promise<SqliteDatabase> => {
880+
const existing = nativeDatabaseCache.get(actorIdValue);
881+
if (existing) {
882+
return existing;
883+
}
884+
const database = createNativeDatabaseBridge(actorIdValue);
885+
nativeDatabaseCache.set(actorIdValue, database);
886+
return database;
887+
},
888+
};
857889
},
858890
startSleep(requestActorId: string): void {
859891
bridgeCallSync(hostBridge.startSleep, [requestActorId]);
@@ -1306,10 +1338,13 @@ async function dynamicDisposeEnvelope(): Promise<boolean> {
13061338
}
13071339
webSocketSessions.clear();
13081340
runtimeStopMode = undefined;
1309-
if (sqliteVfsPoolPromise) {
1310-
const sqliteVfsPool = await sqliteVfsPoolPromise;
1311-
await sqliteVfsPool.shutdown();
1312-
sqliteVfsPoolPromise = undefined;
1341+
for (const [actorId, database] of nativeDatabaseCache.entries()) {
1342+
try {
1343+
await database.close();
1344+
} catch {
1345+
// noop
1346+
}
1347+
nativeDatabaseCache.delete(actorId);
13131348
}
13141349
return true;
13151350
}

rivetkit-typescript/packages/rivetkit/package.json

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -332,8 +332,6 @@
332332
"@rivetkit/engine-runner": "workspace:*",
333333
"@rivetkit/fast-json-patch": "^3.1.2",
334334
"@rivetkit/on-change": "^6.0.2-rc.1",
335-
"@rivetkit/sqlite": "^0.1.1",
336-
"@rivetkit/sqlite-wasm": "workspace:*",
337335
"@rivetkit/traces": "workspace:*",
338336
"@rivetkit/virtual-websocket": "workspace:*",
339337
"@rivetkit/workflow-engine": "workspace:*",

0 commit comments

Comments
 (0)