Skip to content

Commit 2f518e4

Browse files
committed
Support OPFSWriteAheadVFS, concurrent reads
1 parent 3f8f3f6 commit 2f518e4

9 files changed

Lines changed: 192 additions & 109 deletions

File tree

packages/web/src/db/adapters/AsyncWebAdapter.ts

Lines changed: 44 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,22 +14,25 @@ type PendingListener = { listener: Partial<DBAdapterListener>; closeAfterRegiste
1414
* A connection pool implementation delegating to another pool opened asynchronnously.
1515
*/
1616
class AsyncConnectionPool implements ConnectionPool {
17-
protected readonly inner: Promise<DatabaseClient>;
17+
protected readonly inner: Promise<PoolConnection>;
1818

1919
protected resolvedClient?: DatabaseClient;
20+
private activeOnWriter = 0;
21+
private activeOnReader = 0;
22+
2023
private readonly pendingListeners = new Set<PendingListener>();
2124

2225
constructor(
23-
inner: Promise<DatabaseClient>,
26+
inner: Promise<PoolConnection>,
2427
readonly name: string
2528
) {
2629
this.inner = inner.then((client) => {
2730
for (const pending of this.pendingListeners) {
28-
pending.closeAfterRegisteredOnResolvedPool = client.registerListener(pending.listener);
31+
pending.closeAfterRegisteredOnResolvedPool = client.writer.registerListener(pending.listener);
2932
}
3033
this.pendingListeners.clear();
3134

32-
this.resolvedClient = client;
35+
this.resolvedClient = client.writer;
3336
return client;
3437
});
3538
}
@@ -40,21 +43,48 @@ class AsyncConnectionPool implements ConnectionPool {
4043

4144
async close() {
4245
const inner = await this.inner;
43-
return await inner.close();
46+
47+
await inner.writer.close();
48+
await inner.additionalReader?.close();
4449
}
4550

4651
async readLock<T>(fn: (tx: LockContext) => Promise<T>, options?: DBLockOptions): Promise<T> {
4752
const inner = await this.inner;
48-
return await inner.readLock(fn, options);
53+
54+
// This is a crude load balancing scheme between the writer and an additional read connection (if available).
55+
// Ideally, we should support abortable requests (which would allow us to request a lock from both and just use
56+
// whatever completes first). For now, this at least gives us some concurrency. We can improve this in the future.
57+
if (inner.additionalReader && this.activeOnReader <= this.activeOnWriter) {
58+
try {
59+
this.activeOnReader++;
60+
return await inner.additionalReader.readLock(fn, options);
61+
} finally {
62+
this.activeOnReader--;
63+
}
64+
}
65+
66+
try {
67+
this.activeOnWriter++;
68+
return await inner.writer.readLock(fn, options);
69+
} finally {
70+
this.activeOnWriter--;
71+
}
4972
}
5073

5174
async writeLock<T>(fn: (tx: LockContext) => Promise<T>, options?: DBLockOptions): Promise<T> {
5275
const inner = await this.inner;
53-
return await inner.writeLock(fn, options);
76+
try {
77+
this.activeOnWriter++;
78+
return await inner.writer.writeLock(fn, options);
79+
} finally {
80+
this.activeOnWriter--;
81+
}
5482
}
5583

5684
async refreshSchema(): Promise<void> {
57-
await (await this.inner).refreshSchema();
85+
const inner = await this.inner;
86+
await inner.writer.refreshSchema();
87+
await inner.additionalReader?.refreshSchema();
5888
}
5989

6090
registerListener(listener: Partial<DBAdapterListener>): () => void {
@@ -75,10 +105,15 @@ class AsyncConnectionPool implements ConnectionPool {
75105
}
76106
}
77107

108+
export interface PoolConnection {
109+
writer: DatabaseClient;
110+
additionalReader?: DatabaseClient;
111+
}
112+
78113
export class AsyncDbAdapter extends DBAdapterDefaultMixin(AsyncConnectionPool) implements WebDBAdapter {
79114
async shareConnection(): Promise<SharedConnectionWorker> {
80115
const inner = await this.inner;
81-
return inner.shareConnection();
116+
return inner.writer.shareConnection();
82117
}
83118

84119
getConfiguration(): WebDBAdapterConfiguration {

packages/web/src/db/adapters/wa-sqlite/DatabaseServer.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,10 @@ export class DatabaseServer {
112112
},
113113
requestAccess: async (write, timeoutMs) => {
114114
requireOpen();
115-
// TODO: Support timeouts, they don't seem to be supported by the async-mutex package.
116-
const lease = await this.#inner.acquireConnection();
115+
116+
const lease = await this.#inner.acquireConnection(
117+
timeoutMs != null ? AbortSignal.timeout(timeoutMs) : undefined
118+
);
117119
if (!isOpen) {
118120
// Race between requestAccess and close(), the connection was closed while we tried to acquire a lease.
119121
await lease.returnLease();

packages/web/src/db/adapters/wa-sqlite/RawSqliteConnection.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,10 @@ export class RawSqliteConnection {
3939

4040
async init() {
4141
const api = (this._sqliteAPI = await this.openSQLiteAPI());
42-
this.db = await api.open_v2(this.options.dbFilename);
42+
this.db = await api.open_v2(
43+
this.options.dbFilename,
44+
this.options.isReadOnly ? 1 /* SQLITE_OPEN_READONLY */ : 6 /* SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE */
45+
);
4346
await this.executeRaw(`PRAGMA temp_store = ${this.options.temporaryStorage};`);
4447
if (this.options.encryptionKey) {
4548
const escapedKey = this.options.encryptionKey.replace("'", "''");

packages/web/src/db/adapters/wa-sqlite/WASQLiteOpenFactory.ts

Lines changed: 76 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,21 @@ import {
1313
import { SSRDBAdapter } from '../SSRDBAdapter.js';
1414
import { vfsRequiresDedicatedWorkers, WASQLiteVFS } from './vfs.js';
1515
import { MultiDatabaseServer } from '../../../worker/db/MultiDatabaseServer.js';
16-
import { ClientOptions, DatabaseClient, OpenWorkerConnection } from './DatabaseClient.js';
16+
import { DatabaseClient, OpenWorkerConnection } from './DatabaseClient.js';
1717
import { generateTabCloseSignal } from '../../../shared/tab_close_signal.js';
18-
import { AsyncDbAdapter } from '../AsyncWebAdapter.js';
18+
import { AsyncDbAdapter, PoolConnection } from '../AsyncWebAdapter.js';
1919

2020
export interface WASQLiteOpenFactoryOptions extends WebSQLOpenFactoryOptions {
2121
vfs?: WASQLiteVFS;
2222
}
2323

2424
export interface ResolvedWASQLiteOpenFactoryOptions extends ResolvedWebSQLOpenOptions {
2525
vfs: WASQLiteVFS;
26+
27+
/**
28+
* Whether this is a read-only connection opened for the `OPFSWriteAheadVFS` file system.
29+
*/
30+
isReadOnly: boolean;
2631
}
2732

2833
export interface WorkerDBOpenerOptions extends ResolvedWASQLiteOpenFactoryOptions {
@@ -82,7 +87,7 @@ export class WASQLiteOpenFactory implements SQLOpenFactory {
8287
return this.openAdapter();
8388
}
8489

85-
async openConnection(): Promise<DatabaseClient> {
90+
async openConnection(): Promise<PoolConnection> {
8691
const { enableMultiTabs, useWebWorker } = this.resolvedFlags;
8792
const {
8893
vfs = WASQLiteVFS.IDBBatchAtomicVFS,
@@ -95,70 +100,97 @@ export class WASQLiteOpenFactory implements SQLOpenFactory {
95100
this.logger.warn('Multiple tabs are not enabled in this browser');
96101
}
97102

98-
const resolvedOptions: ResolvedWASQLiteOpenFactoryOptions = {
103+
const resolveOptions = (isReadOnly: boolean): ResolvedWASQLiteOpenFactoryOptions => ({
99104
dbFilename: this.options.dbFilename,
100105
dbLocation: this.options.dbLocation,
101106
debugMode: this.options.debugMode,
102107
vfs,
103108
temporaryStorage,
104109
cacheSizeKb,
105110
flags: this.resolvedFlags,
106-
encryptionKey: encryptionKey
107-
};
111+
encryptionKey: encryptionKey,
112+
isReadOnly
113+
});
108114

109-
let clientOptions: ClientOptions;
115+
let client: DatabaseClient;
116+
let additionalReader: DatabaseClient | undefined;
110117
let requiresPersistentTriggers = vfsRequiresDedicatedWorkers(vfs);
111118

112119
if (useWebWorker) {
113120
const optionsDbWorker = this.options.worker;
114121

115-
const workerPort =
116-
typeof optionsDbWorker == 'function'
117-
? resolveWorkerDatabasePortFactory(() =>
118-
optionsDbWorker({
119-
...this.options,
120-
temporaryStorage,
121-
cacheSizeKb,
122-
flags: this.resolvedFlags,
123-
encryptionKey
124-
})
125-
)
126-
: openWorkerDatabasePort(this.options.dbFilename, enableMultiTabs, optionsDbWorker, this.waOptions.vfs);
127-
128-
const source = Comlink.wrap<OpenWorkerConnection>(workerPort);
129-
const closeSignal = new AbortController();
130-
const connection = await source.connect({
131-
...resolvedOptions,
132-
logLevel: this.logger.getLevel(),
133-
lockName: await generateTabCloseSignal(closeSignal.signal)
134-
});
135-
clientOptions = {
136-
connection,
137-
source,
138-
// This tab owns the worker, so we're guaranteed to outlive it.
139-
remoteCanCloseUnexpectedly: false,
140-
onClose: () => {
141-
closeSignal.abort();
142-
if (workerPort instanceof Worker) {
143-
workerPort.terminate();
144-
} else {
145-
workerPort.close();
122+
const openDatabaseWorker = async (
123+
resolvedOptions: ResolvedWASQLiteOpenFactoryOptions
124+
): Promise<DatabaseClient> => {
125+
const workerPort =
126+
typeof optionsDbWorker == 'function'
127+
? resolveWorkerDatabasePortFactory(() =>
128+
optionsDbWorker({
129+
...this.options,
130+
temporaryStorage,
131+
cacheSizeKb,
132+
flags: this.resolvedFlags,
133+
encryptionKey
134+
})
135+
)
136+
: openWorkerDatabasePort(this.options.dbFilename, enableMultiTabs, optionsDbWorker, this.waOptions.vfs);
137+
138+
const source = Comlink.wrap<OpenWorkerConnection>(workerPort);
139+
const closeSignal = new AbortController();
140+
const connection = await source.connect({
141+
...resolvedOptions,
142+
logLevel: this.logger.getLevel(),
143+
lockName: await generateTabCloseSignal(closeSignal.signal)
144+
});
145+
const clientOptions = {
146+
connection,
147+
source,
148+
// This tab owns the worker, so we're guaranteed to outlive it.
149+
remoteCanCloseUnexpectedly: false,
150+
onClose: () => {
151+
closeSignal.abort();
152+
if (workerPort instanceof Worker) {
153+
workerPort.terminate();
154+
} else {
155+
workerPort.close();
156+
}
146157
}
147-
}
158+
};
159+
160+
return new DatabaseClient(clientOptions, {
161+
...resolvedOptions,
162+
requiresPersistentTriggers
163+
});
148164
};
165+
166+
client = await openDatabaseWorker(resolveOptions(false));
167+
168+
if (vfs == WASQLiteVFS.OPFSWriteAheadVFS) {
169+
// This VFS supports concurrent reads, so we can open additional workers to host read-only connections for
170+
// concurrent reads / writes. To avoid excessive resource usage, we currently add one additional reader per
171+
// tab. In the future, we might revisit this to use a growable pool of readers.
172+
additionalReader = await openDatabaseWorker(resolveOptions(true));
173+
}
149174
} else {
150175
// Don't use a web worker. Instead, open the MultiDatabaseServer a worker would use locally.
151176
const localServer = new MultiDatabaseServer(this.logger);
152177
requiresPersistentTriggers = true;
153178

179+
const resolvedOptions = resolveOptions(false);
154180
const connection = await localServer.openConnectionLocally(resolvedOptions);
155-
clientOptions = { connection, source: null, remoteCanCloseUnexpectedly: false };
181+
client = new DatabaseClient(
182+
{ connection, source: null, remoteCanCloseUnexpectedly: false },
183+
{
184+
...resolvedOptions,
185+
requiresPersistentTriggers
186+
}
187+
);
156188
}
157189

158-
return new DatabaseClient(clientOptions, {
159-
...resolvedOptions,
160-
requiresPersistentTriggers
161-
});
190+
return {
191+
writer: client,
192+
additionalReader
193+
};
162194
}
163195
}
164196

0 commit comments

Comments
 (0)