Skip to content

Commit 756a0cf

Browse files
authored
Support OPFSWriteAheadVFS, concurrent reads (#930)
1 parent 3504cfc commit 756a0cf

10 files changed

Lines changed: 313 additions & 122 deletions

File tree

.changeset/thick-items-train.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/web': minor
3+
---
4+
5+
Add `WASQLiteVFS.OPFSWriteAheadVFS`, which also supports concurrent reads.

packages/web/src/db/adapters/AsyncWebAdapter.ts

Lines changed: 138 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@ import {
33
DBAdapterDefaultMixin,
44
DBAdapterListener,
55
DBLockOptions,
6-
LockContext
6+
LockContext,
7+
Mutex,
8+
Semaphore,
9+
UnlockFn
710
} from '@powersync/common';
811
import { SharedConnectionWorker, WebDBAdapter, WebDBAdapterConfiguration } from './WebDBAdapter.js';
912
import { DatabaseClient } from './wa-sqlite/DatabaseClient.js';
@@ -14,52 +17,57 @@ type PendingListener = { listener: Partial<DBAdapterListener>; closeAfterRegiste
1417
* A connection pool implementation delegating to another pool opened asynchronnously.
1518
*/
1619
class AsyncConnectionPool implements ConnectionPool {
17-
protected readonly inner: Promise<DatabaseClient>;
20+
protected readonly state: Promise<PoolState>;
21+
protected resolvedWriter?: DatabaseClient;
1822

19-
protected resolvedClient?: DatabaseClient;
2023
private readonly pendingListeners = new Set<PendingListener>();
2124

2225
constructor(
23-
inner: Promise<DatabaseClient>,
26+
inner: Promise<PoolConnection>,
2427
readonly name: string
2528
) {
26-
this.inner = inner.then((client) => {
29+
this.state = inner.then((client) => {
2730
for (const pending of this.pendingListeners) {
28-
pending.closeAfterRegisteredOnResolvedPool = client.registerListener(pending.listener);
31+
pending.closeAfterRegisteredOnResolvedPool = client.writer.registerListener(pending.listener);
2932
}
3033
this.pendingListeners.clear();
3134

32-
this.resolvedClient = client;
33-
return client;
35+
this.resolvedWriter = client.writer;
36+
if (client.additionalReaders.length) {
37+
return readWritePoolState(client.writer, client.additionalReaders);
38+
}
39+
40+
return singleConnectionPoolState(client.writer);
3441
});
3542
}
3643

3744
async init() {
38-
await this.inner;
45+
await this.state;
3946
}
4047

4148
async close() {
42-
const inner = await this.inner;
43-
return await inner.close();
49+
const state = await this.state;
50+
await state.close();
4451
}
4552

4653
async readLock<T>(fn: (tx: LockContext) => Promise<T>, options?: DBLockOptions): Promise<T> {
47-
const inner = await this.inner;
48-
return await inner.readLock(fn, options);
54+
const state = await this.state;
55+
return state.withConnection(true, fn, options);
4956
}
5057

5158
async writeLock<T>(fn: (tx: LockContext) => Promise<T>, options?: DBLockOptions): Promise<T> {
52-
const inner = await this.inner;
53-
return await inner.writeLock(fn, options);
59+
const state = await this.state;
60+
return state.withConnection(false, fn, options);
5461
}
5562

5663
async refreshSchema(): Promise<void> {
57-
await (await this.inner).refreshSchema();
64+
const state = await this.state;
65+
await state.refreshSchema();
5866
}
5967

6068
registerListener(listener: Partial<DBAdapterListener>): () => void {
61-
if (this.resolvedClient) {
62-
return this.resolvedClient.registerListener(listener);
69+
if (this.resolvedWriter) {
70+
return this.resolvedWriter.registerListener(listener);
6371
} else {
6472
const pending: PendingListener = { listener };
6573
this.pendingListeners.add(pending);
@@ -75,15 +83,123 @@ class AsyncConnectionPool implements ConnectionPool {
7583
}
7684
}
7785

86+
export interface PoolConnection {
87+
writer: DatabaseClient;
88+
additionalReaders: DatabaseClient[];
89+
}
90+
91+
interface PoolState {
92+
writer: DatabaseClient;
93+
withConnection<T>(allowReadOnly: boolean, fn: (tx: LockContext) => Promise<T>, options?: DBLockOptions): Promise<T>;
94+
close(): Promise<void>;
95+
refreshSchema(): Promise<void>;
96+
}
97+
98+
function singleConnectionPoolState(connection: DatabaseClient): PoolState {
99+
return {
100+
writer: connection,
101+
withConnection: (allowReadOnly, fn, options) => {
102+
if (allowReadOnly) {
103+
return connection.readLock(fn, options);
104+
} else {
105+
return connection.writeLock(fn, options);
106+
}
107+
},
108+
close: () => connection.close(),
109+
refreshSchema: () => connection.refreshSchema()
110+
};
111+
}
112+
113+
function readWritePoolState(writer: DatabaseClient, readers: DatabaseClient[]): PoolState {
114+
// DatabaseClients have locks internally, so these aren't necessary for correctness. However, our mutex and semaphore
115+
// implementations are very cheap to cancel, which we use to dispatch reads to the first available connection (by
116+
// simply requesting all of them and sticking with the first connection we get).
117+
const writerMutex = new Mutex();
118+
const readerSemaphore = new Semaphore(readers);
119+
120+
return {
121+
writer,
122+
async withConnection(allowReadOnly, fn, options) {
123+
const abortController = new AbortController();
124+
const abortSignal = abortController.signal;
125+
126+
let timeout: any = null;
127+
let release: UnlockFn | undefined;
128+
if (options?.timeoutMs) {
129+
timeout = setTimeout(() => abortController.abort, options.timeoutMs);
130+
}
131+
132+
try {
133+
if (allowReadOnly) {
134+
let connection: DatabaseClient;
135+
136+
// Even if we have a pool of read connections, it's typically very small and we assume that most queries are
137+
// reads. So, we want to request any connection from the read pool and the dedicated write connection (which
138+
// can also serve reads). We race for the first connection we can obtain this way, and then abort the other
139+
// request.
140+
[connection, release] = await new Promise<[DatabaseClient, UnlockFn]>((resolve, reject) => {
141+
let didComplete = false;
142+
function complete() {
143+
didComplete = true;
144+
abortController.abort();
145+
}
146+
147+
function completeSuccess(connection: DatabaseClient, returnFn: UnlockFn) {
148+
if (didComplete) {
149+
// We're not going to use this connection, so return it immediately.
150+
returnFn();
151+
} else {
152+
complete();
153+
resolve([connection, returnFn]);
154+
}
155+
}
156+
157+
function completeError(error: unknown) {
158+
// We either have a working connection already, or we've rejected the promise. Either way, we don't need
159+
// to do either thing again.
160+
if (didComplete) return;
161+
162+
complete();
163+
reject(error);
164+
}
165+
166+
writerMutex.acquire(abortSignal).then((unlock) => completeSuccess(writer, unlock), completeError);
167+
readerSemaphore
168+
.requestOne(abortSignal)
169+
.then(({ item, release }) => completeSuccess(item, release), completeError);
170+
});
171+
172+
return await connection.readLock(fn);
173+
} else {
174+
return await writerMutex.runExclusive(() => writer.writeLock(fn), abortSignal);
175+
}
176+
} finally {
177+
if (timeout != null) {
178+
clearTimeout(timeout);
179+
}
180+
release?.();
181+
}
182+
},
183+
async close() {
184+
await writer.close();
185+
await Promise.all(readers.map((r) => r.close()));
186+
},
187+
async refreshSchema() {
188+
await writer.refreshSchema();
189+
await Promise.all(readers.map((r) => r.refreshSchema()));
190+
}
191+
};
192+
}
193+
78194
export class AsyncDbAdapter extends DBAdapterDefaultMixin(AsyncConnectionPool) implements WebDBAdapter {
79195
async shareConnection(): Promise<SharedConnectionWorker> {
80-
const inner = await this.inner;
81-
return inner.shareConnection();
196+
const state = await this.state;
197+
return state.writer.shareConnection();
82198
}
83199

84200
getConfiguration(): WebDBAdapterConfiguration {
85-
if (this.resolvedClient) {
86-
return this.resolvedClient.getConfiguration();
201+
if (this.resolvedWriter) {
202+
return this.resolvedWriter.getConfiguration();
87203
}
88204

89205
throw new Error('AsyncDbAdapter.getConfiguration() can only be called after initializing it.');

packages/web/src/db/adapters/wa-sqlite/DatabaseServer.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,10 @@ export class DatabaseServer {
112112
},
113113
requestAccess: async (write, timeoutMs) => {
114114
requireOpen();
115-
// TODO: Support timeouts, they don't seem to be supported by the async-mutex package.
116-
const lease = await this.#inner.acquireConnection();
115+
116+
const lease = await this.#inner.acquireConnection(
117+
timeoutMs != null ? AbortSignal.timeout(timeoutMs) : undefined
118+
);
117119
if (!isOpen) {
118120
// Race between requestAccess and close(), the connection was closed while we tried to acquire a lease.
119121
await lease.returnLease();

packages/web/src/db/adapters/wa-sqlite/RawSqliteConnection.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,10 @@ export class RawSqliteConnection {
3939

4040
async init() {
4141
const api = (this._sqliteAPI = await this.openSQLiteAPI());
42-
this.db = await api.open_v2(this.options.dbFilename);
42+
this.db = await api.open_v2(
43+
this.options.dbFilename,
44+
this.options.isReadOnly ? 1 /* SQLITE_OPEN_READONLY */ : 6 /* SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE */
45+
);
4346
await this.executeRaw(`PRAGMA temp_store = ${this.options.temporaryStorage};`);
4447
if (this.options.encryptionKey) {
4548
const escapedKey = this.options.encryptionKey.replace("'", "''");

0 commit comments

Comments
 (0)