Skip to content

Commit 49dc34d

Browse files
committed
Support larger read pools too
1 parent b56020d commit 49dc34d

3 files changed

Lines changed: 154 additions & 51 deletions

File tree

packages/web/src/db/adapters/AsyncWebAdapter.ts

Lines changed: 126 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@ import {
33
DBAdapterDefaultMixin,
44
DBAdapterListener,
55
DBLockOptions,
6-
LockContext
6+
LockContext,
7+
Mutex,
8+
Semaphore,
9+
UnlockFn
710
} from '@powersync/common';
811
import { SharedConnectionWorker, WebDBAdapter, WebDBAdapterConfiguration } from './WebDBAdapter.js';
912
import { DatabaseClient } from './wa-sqlite/DatabaseClient.js';
@@ -14,77 +17,52 @@ type PendingListener = { listener: Partial<DBAdapterListener>; closeAfterRegiste
1417
* A connection pool implementation delegating to another pool opened asynchronnously.
1518
*/
1619
class AsyncConnectionPool implements ConnectionPool {
17-
protected readonly inner: Promise<PoolConnection>;
18-
20+
protected readonly state: Promise<PoolState>;
1921
protected resolvedWriter?: DatabaseClient;
20-
private activeOnWriter = 0;
21-
private activeOnReader = 0;
2222

2323
private readonly pendingListeners = new Set<PendingListener>();
2424

2525
constructor(
2626
inner: Promise<PoolConnection>,
2727
readonly name: string
2828
) {
29-
this.inner = inner.then((client) => {
29+
this.state = inner.then((client) => {
3030
for (const pending of this.pendingListeners) {
3131
pending.closeAfterRegisteredOnResolvedPool = client.writer.registerListener(pending.listener);
3232
}
3333
this.pendingListeners.clear();
3434

3535
this.resolvedWriter = client.writer;
36-
return client;
36+
if (client.additionalReaders.length) {
37+
return readWritePoolState(client.writer, client.additionalReaders);
38+
}
39+
40+
return singleConnectionPoolState(client.writer);
3741
});
3842
}
3943

4044
async init() {
41-
await this.inner;
45+
await this.state;
4246
}
4347

4448
async close() {
45-
const inner = await this.inner;
46-
47-
await inner.writer.close();
48-
await inner.additionalReader?.close();
49+
const state = await this.state;
50+
await state.close();
4951
}
5052

5153
async readLock<T>(fn: (tx: LockContext) => Promise<T>, options?: DBLockOptions): Promise<T> {
52-
const inner = await this.inner;
53-
54-
// This is a crude load balancing scheme between the writer and an additional read connection (if available).
55-
// Ideally, we should support abortable requests (which would allow us to request a lock from both and just use
56-
// whatever completes first). For now, this at least gives us some concurrency. We can improve this in the future.
57-
if (inner.additionalReader && this.activeOnReader <= this.activeOnWriter) {
58-
try {
59-
this.activeOnReader++;
60-
return await inner.additionalReader.readLock(fn, options);
61-
} finally {
62-
this.activeOnReader--;
63-
}
64-
}
65-
66-
try {
67-
this.activeOnWriter++;
68-
return await inner.writer.readLock(fn, options);
69-
} finally {
70-
this.activeOnWriter--;
71-
}
54+
const state = await this.state;
55+
return state.withConnection(true, fn, options);
7256
}
7357

7458
async writeLock<T>(fn: (tx: LockContext) => Promise<T>, options?: DBLockOptions): Promise<T> {
75-
const inner = await this.inner;
76-
try {
77-
this.activeOnWriter++;
78-
return await inner.writer.writeLock(fn, options);
79-
} finally {
80-
this.activeOnWriter--;
81-
}
59+
const state = await this.state;
60+
return state.withConnection(false, fn, options);
8261
}
8362

8463
async refreshSchema(): Promise<void> {
85-
const inner = await this.inner;
86-
await inner.writer.refreshSchema();
87-
await inner.additionalReader?.refreshSchema();
64+
const state = await this.state;
65+
await state.refreshSchema();
8866
}
8967

9068
registerListener(listener: Partial<DBAdapterListener>): () => void {
@@ -107,13 +85,116 @@ class AsyncConnectionPool implements ConnectionPool {
10785

10886
export interface PoolConnection {
10987
writer: DatabaseClient;
110-
additionalReader?: DatabaseClient;
88+
additionalReaders: DatabaseClient[];
89+
}
90+
91+
interface PoolState {
92+
writer: DatabaseClient;
93+
withConnection<T>(allowReadOnly: boolean, fn: (tx: LockContext) => Promise<T>, options?: DBLockOptions): Promise<T>;
94+
close(): Promise<void>;
95+
refreshSchema(): Promise<void>;
96+
}
97+
98+
function singleConnectionPoolState(connection: DatabaseClient): PoolState {
99+
return {
100+
writer: connection,
101+
withConnection: (allowReadOnly, fn, options) => {
102+
if (allowReadOnly) {
103+
return connection.readLock(fn, options);
104+
} else {
105+
return connection.writeLock(fn, options);
106+
}
107+
},
108+
close: () => connection.close(),
109+
refreshSchema: () => connection.refreshSchema()
110+
};
111+
}
112+
113+
function readWritePoolState(writer: DatabaseClient, readers: DatabaseClient[]): PoolState {
114+
// DatabaseClients have locks internally, so these aren't necessary for correctness. However, our mutex and semaphore
115+
// implementations are very cheap to cancel, which we use to dispatch reads to the first available connection (by
116+
// simply requesting all of them and sticking with the first connection we get).
117+
const writerMutex = new Mutex();
118+
const readerSemaphore = new Semaphore(readers);
119+
120+
return {
121+
writer,
122+
async withConnection(allowReadOnly, fn, options) {
123+
const abortController = new AbortController();
124+
const abortSignal = abortController.signal;
125+
126+
let timeout: any = null;
127+
let release: UnlockFn | undefined;
128+
if (options?.timeoutMs) {
129+
timeout = setTimeout(() => abortController.abort, options.timeoutMs);
130+
}
131+
132+
try {
133+
if (allowReadOnly) {
134+
let connection: DatabaseClient;
135+
136+
// Even if we have a pool of read connections, it's typically very small and we assume that most queries are
137+
// reads. So, we want to request any connection from the read pool and the dedicated write connection (which
138+
// can also serve reads). We race for the first connection we can obtain this way, and then abort the other
139+
// request.
140+
[connection, release] = await new Promise<[DatabaseClient, UnlockFn]>((resolve, reject) => {
141+
let didComplete = false;
142+
function complete() {
143+
didComplete = true;
144+
abortController.abort();
145+
}
146+
147+
function completeSuccess(connection: DatabaseClient, returnFn: UnlockFn) {
148+
if (didComplete) {
149+
// We're not going to use this connection, so return it immediately.
150+
returnFn();
151+
} else {
152+
complete();
153+
resolve([connection, returnFn]);
154+
}
155+
}
156+
157+
function completeError(error: unknown) {
158+
// We either have a working connection already, or we've rejected the promise. Either way, we don't need
159+
// to do either thing again.
160+
if (didComplete) return;
161+
162+
complete();
163+
reject(error);
164+
}
165+
166+
writerMutex.acquire(abortSignal).then((unlock) => completeSuccess(writer, unlock), completeError);
167+
readerSemaphore
168+
.requestOne(abortSignal)
169+
.then(({ item, release }) => completeSuccess(item, release), completeError);
170+
});
171+
172+
return await connection.readLock(fn);
173+
} else {
174+
return await writerMutex.runExclusive(() => writer.writeLock(fn), abortSignal);
175+
}
176+
} finally {
177+
if (timeout != null) {
178+
clearTimeout(timeout);
179+
}
180+
release?.();
181+
}
182+
},
183+
async close() {
184+
await writer.close();
185+
await Promise.all(readers.map((r) => r.close()));
186+
},
187+
async refreshSchema() {
188+
await writer.refreshSchema();
189+
await Promise.all(readers.map((r) => r.refreshSchema()));
190+
}
191+
};
111192
}
112193

113194
export class AsyncDbAdapter extends DBAdapterDefaultMixin(AsyncConnectionPool) implements WebDBAdapter {
114195
async shareConnection(): Promise<SharedConnectionWorker> {
115-
const inner = await this.inner;
116-
return inner.writer.shareConnection();
196+
const state = await this.state;
197+
return state.writer.shareConnection();
117198
}
118199

119200
getConfiguration(): WebDBAdapterConfiguration {

packages/web/src/db/adapters/wa-sqlite/WASQLiteOpenFactory.ts

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,16 @@ import { AsyncDbAdapter, PoolConnection } from '../AsyncWebAdapter.js';
1919

2020
export interface WASQLiteOpenFactoryOptions extends WebSQLOpenFactoryOptions {
2121
vfs?: WASQLiteVFS;
22+
/**
23+
* If the {@link vfs} supports it, an additional amount of read-only connections to open. Using additional read
24+
* connections can speed up queries by dispatching them to multiple workers running them concurrently.
25+
*
26+
* {@link WASQLiteVFS.OPFSWriteAheadVFS} is the only VFS with support for multiple connections, so this option is
27+
* ignored for other VFS implementations.
28+
*
29+
* Defaults to 1.
30+
*/
31+
additionalReaders?: number;
2232
}
2333

2434
export interface ResolvedWASQLiteOpenFactoryOptions extends ResolvedWebSQLOpenOptions {
@@ -113,7 +123,7 @@ export class WASQLiteOpenFactory implements SQLOpenFactory {
113123
});
114124

115125
let client: DatabaseClient;
116-
let additionalReader: DatabaseClient | undefined;
126+
let additionalReaders: DatabaseClient[] = [];
117127
let requiresPersistentTriggers = vfsRequiresDedicatedWorkers(vfs);
118128

119129
if (useWebWorker) {
@@ -167,9 +177,12 @@ export class WASQLiteOpenFactory implements SQLOpenFactory {
167177

168178
if (vfs == WASQLiteVFS.OPFSWriteAheadVFS) {
169179
// This VFS supports concurrent reads, so we can open additional workers to host read-only connections for
170-
// concurrent reads / writes. To avoid excessive resource usage, we currently add one additional reader per
171-
// tab. In the future, we might revisit this to use a growable pool of readers.
172-
additionalReader = await openDatabaseWorker(resolveOptions(true));
180+
// concurrent reads / writes.
181+
const additionalReadersCount = this.options.additionalReaders ?? 1;
182+
for (let i = 0; i < additionalReadersCount; i++) {
183+
const reader = await openDatabaseWorker(resolveOptions(true));
184+
additionalReaders.push(reader);
185+
}
173186
}
174187
} else {
175188
// Don't use a web worker. Instead, open the MultiDatabaseServer a worker would use locally.
@@ -189,7 +202,7 @@ export class WASQLiteOpenFactory implements SQLOpenFactory {
189202

190203
return {
191204
writer: client,
192-
additionalReader
205+
additionalReaders
193206
};
194207
}
195208
}

packages/web/tests/src/db/write_ahead_log_opfs.test.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ test('supports concurrent reads', async () => {
77
const db = generateTestDb({
88
database: new WASQLiteOpenFactory({
99
dbFilename: 'basic-opfs.sqlite',
10-
vfs: WASQLiteVFS.OPFSWriteAheadVFS
10+
vfs: WASQLiteVFS.OPFSWriteAheadVFS,
11+
additionalReaders: 1
1112
}),
1213
schema: TEST_SCHEMA
1314
});
@@ -20,4 +21,12 @@ test('supports concurrent reads', async () => {
2021
});
2122

2223
expect(await db.getAll('SELECT * FROM customers')).toHaveLength(1);
24+
25+
// Despite only using one additional read connection, we should be able to support two concurrent readers by using
26+
// the write connection too.
27+
await db.readLock(async (ctx1) => {
28+
await db.readLock(async (ctx2) => {
29+
await Promise.all([ctx1.execute('SELECT 1'), ctx2.execute('SELECT 2')]);
30+
});
31+
});
2332
});

0 commit comments

Comments
 (0)