Skip to content

Commit 1987a40

Browse files
committed
New: add sqlite-workers submodule for async SQLite reads
1 parent c21deca commit 1987a40

11 files changed

Lines changed: 880 additions & 0 deletions

File tree

package.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,11 @@
4848
"require": "./dist/cjs/sqlite/index.js",
4949
"import": "./dist/esm/sqlite/index.js"
5050
},
51+
"./sqlite-workers": {
52+
"types": "./dist/types/sqlite-workers/index.d.ts",
53+
"require": "./dist/cjs/sqlite-workers/index.js",
54+
"import": "./dist/esm/sqlite-workers/index.js"
55+
},
5156
"./redis": {
5257
"types": "./dist/types/redis/index.d.ts",
5358
"require": "./dist/cjs/redis/index.js",
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import * as Comlink from 'comlink';
2+
import type {
3+
SqliteRunResult,
4+
ISqliteWorkerApi,
5+
SqliteWorkerQueryParams,
6+
SqliteWorkerStatementHandle
7+
} from './protocol.ts';
8+
9+
export class AsyncSqliteStatement {
10+
11+
readonly #workerApi: Comlink.Remote<ISqliteWorkerApi>;
12+
readonly #handle: SqliteWorkerStatementHandle;
13+
14+
constructor(
15+
workerApi: Comlink.Remote<ISqliteWorkerApi>,
16+
handle: SqliteWorkerStatementHandle
17+
) {
18+
this.#workerApi = workerApi;
19+
this.#handle = handle;
20+
}
21+
22+
async all<TRow>(params?: SqliteWorkerQueryParams): Promise<TRow[]> {
23+
return this.#workerApi.allPrepared(this.#handle, params) as Promise<TRow[]>;
24+
}
25+
26+
async get<TRow>(params?: SqliteWorkerQueryParams): Promise<TRow | undefined> {
27+
return this.#workerApi.getPrepared(this.#handle, params) as Promise<TRow | undefined>;
28+
}
29+
30+
async run(params?: SqliteWorkerQueryParams): Promise<SqliteRunResult> {
31+
return this.#workerApi.runPrepared(this.#handle, params);
32+
}
33+
}
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
import { Worker } from 'node:worker_threads';
2+
import * as Comlink from 'comlink';
3+
import type {
4+
SqliteWorkerProxyParams,
5+
SqliteWorkerQueryParams,
6+
ISqliteWorkerApi,
7+
SqliteRunResult
8+
} from './protocol.ts';
9+
import { createSqliteWorker, nodeEndpoint } from './utils/index.ts';
10+
import { SqliteWorkerRunner } from './SqliteWorkerRunner.ts';
11+
import { AsyncSqliteStatement } from './AsyncSqliteStatement.ts';
12+
13+
export class SqliteWorkerProxy {
14+
15+
readonly #config: SqliteWorkerProxyParams;
16+
#worker: Worker | undefined;
17+
#workerPromise: Promise<Worker> | undefined;
18+
#workerApi: Comlink.Remote<ISqliteWorkerApi> | undefined;
19+
#workerApiPromise: Promise<Comlink.Remote<ISqliteWorkerApi>> | undefined;
20+
#disposePromise: Promise<void> | undefined;
21+
22+
constructor(config: SqliteWorkerProxyParams) {
23+
this.#config = config;
24+
}
25+
26+
async all<TRow>(sql: string, params?: SqliteWorkerQueryParams): Promise<TRow[]> {
27+
const workerApi = await this.#assertWorkerApi();
28+
return workerApi.all(sql, params) as Promise<TRow[]>;
29+
}
30+
31+
async get<TRow>(sql: string, params?: SqliteWorkerQueryParams): Promise<TRow | undefined> {
32+
const workerApi = await this.#assertWorkerApi();
33+
return workerApi.get(sql, params) as Promise<TRow | undefined>;
34+
}
35+
36+
async run(sql: string, params?: SqliteWorkerQueryParams): Promise<SqliteRunResult> {
37+
const workerApi = await this.#assertWorkerApi();
38+
return workerApi.run(sql, params);
39+
}
40+
41+
async prepare(sql: string): Promise<AsyncSqliteStatement> {
42+
const workerApi = await this.#assertWorkerApi();
43+
const handle = await workerApi.prepare(sql);
44+
45+
return new AsyncSqliteStatement(workerApi, handle);
46+
}
47+
48+
async dispose(): Promise<void> {
49+
this.#disposePromise ??= this.#disposeWorker()
50+
.finally(() => {
51+
this.#disposePromise = undefined;
52+
});
53+
54+
return this.#disposePromise;
55+
}
56+
57+
/** @internal */
58+
async assertWorker(): Promise<Worker> {
59+
if (this.#worker)
60+
return this.#worker;
61+
62+
const {
63+
sqliteWorkerRunnerLocation = SqliteWorkerRunner.location,
64+
...dbParams
65+
} = this.#config;
66+
67+
this.#workerPromise ??= createSqliteWorker({
68+
...dbParams,
69+
sqliteWorkerRunnerLocation
70+
})
71+
.then(worker => {
72+
this.#worker = worker;
73+
worker.once('error', this.handleWorkerError);
74+
worker.once('exit', this.handleWorkerExit);
75+
76+
return worker;
77+
});
78+
79+
return this.#workerPromise;
80+
}
81+
82+
async #assertWorkerApi(): Promise<Comlink.Remote<ISqliteWorkerApi>> {
83+
if (this.#workerApi)
84+
return this.#workerApi;
85+
86+
this.#workerApiPromise ??= this.assertWorker()
87+
.then(worker => {
88+
this.#workerApi = Comlink.wrap<ISqliteWorkerApi>(nodeEndpoint(worker));
89+
return this.#workerApi;
90+
});
91+
92+
return this.#workerApiPromise;
93+
}
94+
95+
async #disposeWorker(): Promise<void> {
96+
if (!this.#worker && !this.#workerPromise)
97+
return;
98+
99+
const workerApi = this.#workerApi ?? await this.#workerApiPromise?.catch(() => undefined);
100+
const worker = this.#worker ?? await this.#workerPromise?.catch(() => undefined);
101+
102+
this.#worker = undefined;
103+
this.#workerPromise = undefined;
104+
this.#workerApi = undefined;
105+
this.#workerApiPromise = undefined;
106+
107+
workerApi?.[Comlink.releaseProxy]();
108+
109+
if (!worker)
110+
return;
111+
112+
worker.off('error', this.handleWorkerError);
113+
worker.off('exit', this.handleWorkerExit);
114+
await worker.terminate();
115+
}
116+
117+
handleWorkerError = (_err: Error) => {
118+
void this.dispose().catch(() => undefined);
119+
};
120+
121+
handleWorkerExit = (_exitCode: number) => {
122+
void this.dispose().catch(() => undefined);
123+
};
124+
}
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
import { parentPort, workerData } from 'node:worker_threads';
2+
import * as path from 'node:path';
3+
import { fileURLToPath } from 'node:url';
4+
import * as Comlink from 'comlink';
5+
import Database, { type Statement } from 'better-sqlite3';
6+
import {
7+
isSqliteWorkerData,
8+
type SqliteRunResult,
9+
type ISqliteWorkerApi,
10+
type SqliteWorkerRunnerDbParams,
11+
type SqliteWorkerQueryParams,
12+
type SqliteWorkerStatementHandle
13+
} from './protocol.ts';
14+
import { nodeEndpoint } from './utils/index.ts';
15+
import { assertString, assertStringArray } from '../utils/index.ts';
16+
17+
declare const __filename: string | undefined;
18+
19+
function all<TRow>(statement: Statement<unknown[], TRow>, params?: SqliteWorkerQueryParams): TRow[] {
20+
return params === undefined ? statement.all() : statement.all(params);
21+
}
22+
23+
function get<TRow>(statement: Statement<unknown[], TRow>, params?: SqliteWorkerQueryParams): TRow | undefined {
24+
return params === undefined ? statement.get() : statement.get(params);
25+
}
26+
27+
function run(statement: Statement, params?: SqliteWorkerQueryParams): SqliteRunResult {
28+
return params === undefined ? statement.run() : statement.run(params);
29+
}
30+
31+
/** @internal */
32+
export function resolveCurrentFileLocationFromStack(stack = new Error().stack) {
33+
const stackFilename = stack?.match(/\((file:\/\/[^)]+SqliteWorkerRunner\.js):\d+:\d+\)/)?.[1];
34+
if (!stackFilename)
35+
throw new Error('Worker location could not be resolved from Error stack, pass sqliteWorkerRunnerLocation');
36+
37+
return fileURLToPath(stackFilename);
38+
}
39+
40+
export class SqliteWorkerRunner implements ISqliteWorkerApi {
41+
42+
static get location() {
43+
if (typeof __filename !== 'undefined' && path.isAbsolute(__filename))
44+
return __filename;
45+
46+
/* istanbul ignore next -- exercised by ESM consumers, not ts-jest's CJS transform */
47+
return resolveCurrentFileLocationFromStack();
48+
}
49+
50+
readonly #db;
51+
#nextStatementHandle = 1;
52+
readonly #statements = new Map<SqliteWorkerStatementHandle, Statement<unknown[], unknown>>();
53+
54+
constructor(dbParams: SqliteWorkerRunnerDbParams) {
55+
assertString(dbParams.location, 'dbParams.location');
56+
57+
this.#db = new Database(dbParams.location, {
58+
readonly: true,
59+
fileMustExist: true
60+
});
61+
62+
if (dbParams.pragmas?.length) {
63+
assertStringArray(dbParams.pragmas, 'dbParams.pragmas');
64+
65+
for (const pragma of dbParams.pragmas)
66+
this.#db.pragma(pragma);
67+
}
68+
}
69+
70+
all<TRow>(sql: string, params?: SqliteWorkerQueryParams): TRow[] {
71+
const statement = this.#db.prepare<unknown[], TRow>(sql);
72+
return all(statement, params);
73+
}
74+
75+
get<TRow>(sql: string, params?: SqliteWorkerQueryParams): TRow | undefined {
76+
const statement = this.#db.prepare<unknown[], TRow>(sql);
77+
return get(statement, params);
78+
}
79+
80+
run(sql: string, params?: SqliteWorkerQueryParams): SqliteRunResult {
81+
const statement = this.#db.prepare(sql);
82+
return run(statement, params);
83+
}
84+
85+
prepare(sql: string): SqliteWorkerStatementHandle {
86+
const handle = this.#nextStatementHandle++;
87+
this.#statements.set(handle, this.#db.prepare(sql));
88+
89+
return handle;
90+
}
91+
92+
/** @internal */
93+
allPrepared<TRow>(handle: SqliteWorkerStatementHandle, params?: SqliteWorkerQueryParams): TRow[] {
94+
return all(this.#getStatement<TRow>(handle), params);
95+
}
96+
97+
/** @internal */
98+
getPrepared<TRow>(handle: SqliteWorkerStatementHandle, params?: SqliteWorkerQueryParams): TRow | undefined {
99+
return get(this.#getStatement<TRow>(handle), params);
100+
}
101+
102+
/** @internal */
103+
runPrepared(handle: SqliteWorkerStatementHandle, params?: SqliteWorkerQueryParams): SqliteRunResult {
104+
return run(this.#getStatement(handle), params);
105+
}
106+
107+
#getStatement<TRow>(handle: SqliteWorkerStatementHandle): Statement<unknown[], TRow> {
108+
const statement = this.#statements.get(handle);
109+
if (!statement)
110+
throw new Error(`SQLite worker statement '${handle}' does not exist`);
111+
112+
return statement as Statement<unknown[], TRow>;
113+
}
114+
}
115+
116+
/* istanbul ignore next -- this branch runs inside the spawned worker process */
117+
if (parentPort) {
118+
if (!isSqliteWorkerData(workerData))
119+
throw new Error('workerData does not contain SQLite worker db parameters');
120+
121+
const runner = new SqliteWorkerRunner(workerData.db);
122+
123+
parentPort.postMessage({ type: 'ready' });
124+
Comlink.expose(runner, nodeEndpoint(parentPort));
125+
}

src/sqlite-workers/index.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
export * from './AsyncSqliteStatement.ts';
2+
export * from './SqliteWorkerProxy.ts';
3+
export type {
4+
SqliteRunResult,
5+
SqliteWorkerProxyParams,
6+
SqliteWorkerQueryParams
7+
} from './protocol.ts';

src/sqlite-workers/protocol.ts

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import type { Statement } from 'better-sqlite3';
2+
3+
export type SqliteWorkerProxyParams = {
4+
5+
/** SQLite database file opened readonly by the worker. */
6+
dbLocation: string;
7+
8+
/** PRAGMA statements applied to the worker connection. */
9+
pragmas?: readonly string[];
10+
11+
/** Worker runner script location; defaults to the bundled runner. */
12+
sqliteWorkerRunnerLocation?: string | URL;
13+
};
14+
15+
export type SqliteWorkerRunnerDbParams = {
16+
location: string;
17+
pragmas?: readonly string[];
18+
};
19+
20+
export type SqliteWorkerData = {
21+
db: SqliteWorkerRunnerDbParams;
22+
};
23+
24+
export type SqliteWorkerQueryParams = readonly unknown[] | Record<string, unknown>;
25+
26+
export type SqliteWorkerStatementHandle = number;
27+
28+
export type SqliteRunResult = ReturnType<Statement['run']>;
29+
30+
export interface ISqliteWorkerApi {
31+
all<TRow>(sql: string, params?: SqliteWorkerQueryParams): TRow[];
32+
get<TRow>(sql: string, params?: SqliteWorkerQueryParams): TRow | undefined;
33+
run(sql: string, params?: SqliteWorkerQueryParams): SqliteRunResult;
34+
35+
prepare(sql: string): SqliteWorkerStatementHandle;
36+
37+
/** @internal */
38+
allPrepared<TRow>(handle: SqliteWorkerStatementHandle, params?: SqliteWorkerQueryParams): TRow[];
39+
40+
/** @internal */
41+
getPrepared<TRow>(handle: SqliteWorkerStatementHandle, params?: SqliteWorkerQueryParams): TRow | undefined;
42+
43+
/** @internal */
44+
runPrepared(handle: SqliteWorkerStatementHandle, params?: SqliteWorkerQueryParams): SqliteRunResult;
45+
}
46+
47+
export type SqliteWorkerReadyMessage = {
48+
type: 'ready';
49+
};
50+
51+
export function isSqliteWorkerData(value: unknown): value is SqliteWorkerData {
52+
return typeof value === 'object' &&
53+
value !== null &&
54+
'db' in value &&
55+
typeof value.db === 'object' &&
56+
value.db !== null &&
57+
'location' in value.db &&
58+
typeof value.db.location === 'string';
59+
}
60+
61+
export function isSqliteWorkerReadyMessage(value: unknown): value is SqliteWorkerReadyMessage {
62+
return typeof value === 'object' &&
63+
value !== null &&
64+
'type' in value &&
65+
value.type === 'ready';
66+
}

0 commit comments

Comments
 (0)