From 2f518e4c2eec74d823c4be8e2a6aed32dd49b654 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 14 Apr 2026 15:53:18 +0200 Subject: [PATCH 1/5] Support `OPFSWriteAheadVFS`, concurrent reads --- .../web/src/db/adapters/AsyncWebAdapter.ts | 53 ++++++-- .../db/adapters/wa-sqlite/DatabaseServer.ts | 6 +- .../adapters/wa-sqlite/RawSqliteConnection.ts | 5 +- .../adapters/wa-sqlite/WASQLiteOpenFactory.ts | 120 +++++++++++------- packages/web/src/db/adapters/wa-sqlite/vfs.ts | 82 +++++------- .../web/src/worker/db/MultiDatabaseServer.ts | 5 +- .../web/src/worker/db/open-worker-database.ts | 4 +- packages/web/tests/open.test.ts | 3 +- .../tests/src/db/write_ahead_log_opfs.test.ts | 23 ++++ 9 files changed, 192 insertions(+), 109 deletions(-) create mode 100644 packages/web/tests/src/db/write_ahead_log_opfs.test.ts diff --git a/packages/web/src/db/adapters/AsyncWebAdapter.ts b/packages/web/src/db/adapters/AsyncWebAdapter.ts index 84b77f8da..3dc0756cf 100644 --- a/packages/web/src/db/adapters/AsyncWebAdapter.ts +++ b/packages/web/src/db/adapters/AsyncWebAdapter.ts @@ -14,22 +14,25 @@ type PendingListener = { listener: Partial; closeAfterRegiste * A connection pool implementation delegating to another pool opened asynchronnously. */ class AsyncConnectionPool implements ConnectionPool { - protected readonly inner: Promise; + protected readonly inner: Promise; protected resolvedClient?: DatabaseClient; + private activeOnWriter = 0; + private activeOnReader = 0; + private readonly pendingListeners = new Set(); constructor( - inner: Promise, + inner: Promise, readonly name: string ) { this.inner = inner.then((client) => { for (const pending of this.pendingListeners) { - pending.closeAfterRegisteredOnResolvedPool = client.registerListener(pending.listener); + pending.closeAfterRegisteredOnResolvedPool = client.writer.registerListener(pending.listener); } this.pendingListeners.clear(); - this.resolvedClient = client; + this.resolvedClient = client.writer; return client; }); } @@ -40,21 +43,48 @@ class AsyncConnectionPool implements ConnectionPool { async close() { const inner = await this.inner; - return await inner.close(); + + await inner.writer.close(); + await inner.additionalReader?.close(); } async readLock(fn: (tx: LockContext) => Promise, options?: DBLockOptions): Promise { const inner = await this.inner; - return await inner.readLock(fn, options); + + // This is a crude load balancing scheme between the writer and an additional read connection (if available). + // Ideally, we should support abortable requests (which would allow us to request a lock from both and just use + // whatever completes first). For now, this at least gives us some concurrency. We can improve this in the future. + if (inner.additionalReader && this.activeOnReader <= this.activeOnWriter) { + try { + this.activeOnReader++; + return await inner.additionalReader.readLock(fn, options); + } finally { + this.activeOnReader--; + } + } + + try { + this.activeOnWriter++; + return await inner.writer.readLock(fn, options); + } finally { + this.activeOnWriter--; + } } async writeLock(fn: (tx: LockContext) => Promise, options?: DBLockOptions): Promise { const inner = await this.inner; - return await inner.writeLock(fn, options); + try { + this.activeOnWriter++; + return await inner.writer.writeLock(fn, options); + } finally { + this.activeOnWriter--; + } } async refreshSchema(): Promise { - await (await this.inner).refreshSchema(); + const inner = await this.inner; + await inner.writer.refreshSchema(); + await inner.additionalReader?.refreshSchema(); } registerListener(listener: Partial): () => void { @@ -75,10 +105,15 @@ class AsyncConnectionPool implements ConnectionPool { } } +export interface PoolConnection { + writer: DatabaseClient; + additionalReader?: DatabaseClient; +} + export class AsyncDbAdapter extends DBAdapterDefaultMixin(AsyncConnectionPool) implements WebDBAdapter { async shareConnection(): Promise { const inner = await this.inner; - return inner.shareConnection(); + return inner.writer.shareConnection(); } getConfiguration(): WebDBAdapterConfiguration { diff --git a/packages/web/src/db/adapters/wa-sqlite/DatabaseServer.ts b/packages/web/src/db/adapters/wa-sqlite/DatabaseServer.ts index 42d9e92e8..f4cf863bc 100644 --- a/packages/web/src/db/adapters/wa-sqlite/DatabaseServer.ts +++ b/packages/web/src/db/adapters/wa-sqlite/DatabaseServer.ts @@ -112,8 +112,10 @@ export class DatabaseServer { }, requestAccess: async (write, timeoutMs) => { requireOpen(); - // TODO: Support timeouts, they don't seem to be supported by the async-mutex package. - const lease = await this.#inner.acquireConnection(); + + const lease = await this.#inner.acquireConnection( + timeoutMs != null ? AbortSignal.timeout(timeoutMs) : undefined + ); if (!isOpen) { // Race between requestAccess and close(), the connection was closed while we tried to acquire a lease. await lease.returnLease(); diff --git a/packages/web/src/db/adapters/wa-sqlite/RawSqliteConnection.ts b/packages/web/src/db/adapters/wa-sqlite/RawSqliteConnection.ts index a2724f6ef..3cf958eb1 100644 --- a/packages/web/src/db/adapters/wa-sqlite/RawSqliteConnection.ts +++ b/packages/web/src/db/adapters/wa-sqlite/RawSqliteConnection.ts @@ -39,7 +39,10 @@ export class RawSqliteConnection { async init() { const api = (this._sqliteAPI = await this.openSQLiteAPI()); - this.db = await api.open_v2(this.options.dbFilename); + this.db = await api.open_v2( + this.options.dbFilename, + this.options.isReadOnly ? 1 /* SQLITE_OPEN_READONLY */ : 6 /* SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE */ + ); await this.executeRaw(`PRAGMA temp_store = ${this.options.temporaryStorage};`); if (this.options.encryptionKey) { const escapedKey = this.options.encryptionKey.replace("'", "''"); diff --git a/packages/web/src/db/adapters/wa-sqlite/WASQLiteOpenFactory.ts b/packages/web/src/db/adapters/wa-sqlite/WASQLiteOpenFactory.ts index beed81fca..33176ba32 100644 --- a/packages/web/src/db/adapters/wa-sqlite/WASQLiteOpenFactory.ts +++ b/packages/web/src/db/adapters/wa-sqlite/WASQLiteOpenFactory.ts @@ -13,9 +13,9 @@ import { import { SSRDBAdapter } from '../SSRDBAdapter.js'; import { vfsRequiresDedicatedWorkers, WASQLiteVFS } from './vfs.js'; import { MultiDatabaseServer } from '../../../worker/db/MultiDatabaseServer.js'; -import { ClientOptions, DatabaseClient, OpenWorkerConnection } from './DatabaseClient.js'; +import { DatabaseClient, OpenWorkerConnection } from './DatabaseClient.js'; import { generateTabCloseSignal } from '../../../shared/tab_close_signal.js'; -import { AsyncDbAdapter } from '../AsyncWebAdapter.js'; +import { AsyncDbAdapter, PoolConnection } from '../AsyncWebAdapter.js'; export interface WASQLiteOpenFactoryOptions extends WebSQLOpenFactoryOptions { vfs?: WASQLiteVFS; @@ -23,6 +23,11 @@ export interface WASQLiteOpenFactoryOptions extends WebSQLOpenFactoryOptions { export interface ResolvedWASQLiteOpenFactoryOptions extends ResolvedWebSQLOpenOptions { vfs: WASQLiteVFS; + + /** + * Whether this is a read-only connection opened for the `OPFSWriteAheadVFS` file system. + */ + isReadOnly: boolean; } export interface WorkerDBOpenerOptions extends ResolvedWASQLiteOpenFactoryOptions { @@ -82,7 +87,7 @@ export class WASQLiteOpenFactory implements SQLOpenFactory { return this.openAdapter(); } - async openConnection(): Promise { + async openConnection(): Promise { const { enableMultiTabs, useWebWorker } = this.resolvedFlags; const { vfs = WASQLiteVFS.IDBBatchAtomicVFS, @@ -95,7 +100,7 @@ export class WASQLiteOpenFactory implements SQLOpenFactory { this.logger.warn('Multiple tabs are not enabled in this browser'); } - const resolvedOptions: ResolvedWASQLiteOpenFactoryOptions = { + const resolveOptions = (isReadOnly: boolean): ResolvedWASQLiteOpenFactoryOptions => ({ dbFilename: this.options.dbFilename, dbLocation: this.options.dbLocation, debugMode: this.options.debugMode, @@ -103,62 +108,89 @@ export class WASQLiteOpenFactory implements SQLOpenFactory { temporaryStorage, cacheSizeKb, flags: this.resolvedFlags, - encryptionKey: encryptionKey - }; + encryptionKey: encryptionKey, + isReadOnly + }); - let clientOptions: ClientOptions; + let client: DatabaseClient; + let additionalReader: DatabaseClient | undefined; let requiresPersistentTriggers = vfsRequiresDedicatedWorkers(vfs); if (useWebWorker) { const optionsDbWorker = this.options.worker; - const workerPort = - typeof optionsDbWorker == 'function' - ? resolveWorkerDatabasePortFactory(() => - optionsDbWorker({ - ...this.options, - temporaryStorage, - cacheSizeKb, - flags: this.resolvedFlags, - encryptionKey - }) - ) - : openWorkerDatabasePort(this.options.dbFilename, enableMultiTabs, optionsDbWorker, this.waOptions.vfs); - - const source = Comlink.wrap(workerPort); - const closeSignal = new AbortController(); - const connection = await source.connect({ - ...resolvedOptions, - logLevel: this.logger.getLevel(), - lockName: await generateTabCloseSignal(closeSignal.signal) - }); - clientOptions = { - connection, - source, - // This tab owns the worker, so we're guaranteed to outlive it. - remoteCanCloseUnexpectedly: false, - onClose: () => { - closeSignal.abort(); - if (workerPort instanceof Worker) { - workerPort.terminate(); - } else { - workerPort.close(); + const openDatabaseWorker = async ( + resolvedOptions: ResolvedWASQLiteOpenFactoryOptions + ): Promise => { + const workerPort = + typeof optionsDbWorker == 'function' + ? resolveWorkerDatabasePortFactory(() => + optionsDbWorker({ + ...this.options, + temporaryStorage, + cacheSizeKb, + flags: this.resolvedFlags, + encryptionKey + }) + ) + : openWorkerDatabasePort(this.options.dbFilename, enableMultiTabs, optionsDbWorker, this.waOptions.vfs); + + const source = Comlink.wrap(workerPort); + const closeSignal = new AbortController(); + const connection = await source.connect({ + ...resolvedOptions, + logLevel: this.logger.getLevel(), + lockName: await generateTabCloseSignal(closeSignal.signal) + }); + const clientOptions = { + connection, + source, + // This tab owns the worker, so we're guaranteed to outlive it. + remoteCanCloseUnexpectedly: false, + onClose: () => { + closeSignal.abort(); + if (workerPort instanceof Worker) { + workerPort.terminate(); + } else { + workerPort.close(); + } } - } + }; + + return new DatabaseClient(clientOptions, { + ...resolvedOptions, + requiresPersistentTriggers + }); }; + + client = await openDatabaseWorker(resolveOptions(false)); + + if (vfs == WASQLiteVFS.OPFSWriteAheadVFS) { + // This VFS supports concurrent reads, so we can open additional workers to host read-only connections for + // concurrent reads / writes. To avoid excessive resource usage, we currently add one additional reader per + // tab. In the future, we might revisit this to use a growable pool of readers. + additionalReader = await openDatabaseWorker(resolveOptions(true)); + } } else { // Don't use a web worker. Instead, open the MultiDatabaseServer a worker would use locally. const localServer = new MultiDatabaseServer(this.logger); requiresPersistentTriggers = true; + const resolvedOptions = resolveOptions(false); const connection = await localServer.openConnectionLocally(resolvedOptions); - clientOptions = { connection, source: null, remoteCanCloseUnexpectedly: false }; + client = new DatabaseClient( + { connection, source: null, remoteCanCloseUnexpectedly: false }, + { + ...resolvedOptions, + requiresPersistentTriggers + } + ); } - return new DatabaseClient(clientOptions, { - ...resolvedOptions, - requiresPersistentTriggers - }); + return { + writer: client, + additionalReader + }; } } diff --git a/packages/web/src/db/adapters/wa-sqlite/vfs.ts b/packages/web/src/db/adapters/wa-sqlite/vfs.ts index cf6b8ee80..6f3db97b7 100644 --- a/packages/web/src/db/adapters/wa-sqlite/vfs.ts +++ b/packages/web/src/db/adapters/wa-sqlite/vfs.ts @@ -6,7 +6,8 @@ import type * as SQLite from '@journeyapps/wa-sqlite'; export enum WASQLiteVFS { IDBBatchAtomicVFS = 'IDBBatchAtomicVFS', OPFSCoopSyncVFS = 'OPFSCoopSyncVFS', - AccessHandlePoolVFS = 'AccessHandlePoolVFS' + AccessHandlePoolVFS = 'AccessHandlePoolVFS', + OPFSWriteAheadVFS = 'OPFSWriteAheadVFS' } export function vfsRequiresDedicatedWorkers(vfs: WASQLiteVFS) { @@ -30,49 +31,32 @@ export type WASQLiteModuleFactory = ( options: WASQLiteModuleFactoryOptions ) => Promise<{ module: SQLiteModule; vfs: SQLiteVFS }>; -/** - * @internal - */ -export const AsyncWASQLiteModuleFactory = async () => { - const { default: factory } = await import('@journeyapps/wa-sqlite/dist/wa-sqlite-async.mjs'); - return factory(); -}; - -/** - * @internal - */ -export const MultiCipherAsyncWASQLiteModuleFactory = async () => { - const { default: factory } = await import('@journeyapps/wa-sqlite/dist/mc-wa-sqlite-async.mjs'); - return factory(); -}; - -/** - * @internal - */ -export const SyncWASQLiteModuleFactory = async () => { - const { default: factory } = await import('@journeyapps/wa-sqlite/dist/wa-sqlite.mjs'); - return factory(); -}; +async function asyncModuleFactory(encryptionKey: unknown) { + if (encryptionKey) { + const { default: factory } = await import('@journeyapps/wa-sqlite/dist/mc-wa-sqlite-async.mjs'); + return factory(); + } else { + const { default: factory } = await import('@journeyapps/wa-sqlite/dist/wa-sqlite-async.mjs'); + return factory(); + } +} -/** - * @internal - */ -export const MultiCipherSyncWASQLiteModuleFactory = async () => { - const { default: factory } = await import('@journeyapps/wa-sqlite/dist/mc-wa-sqlite.mjs'); - return factory(); -}; +async function syncModuleFactory(encryptionKey: unknown) { + if (encryptionKey) { + const { default: factory } = await import('@journeyapps/wa-sqlite/dist/mc-wa-sqlite.mjs'); + return factory(); + } else { + const { default: factory } = await import('@journeyapps/wa-sqlite/dist/wa-sqlite.mjs'); + return factory(); + } +} /** * @internal */ export const DEFAULT_MODULE_FACTORIES = { [WASQLiteVFS.IDBBatchAtomicVFS]: async (options: WASQLiteModuleFactoryOptions) => { - let module; - if (options.encryptionKey) { - module = await MultiCipherAsyncWASQLiteModuleFactory(); - } else { - module = await AsyncWASQLiteModuleFactory(); - } + const module = await asyncModuleFactory(options.encryptionKey); const { IDBBatchAtomicVFS } = await import('@journeyapps/wa-sqlite/src/examples/IDBBatchAtomicVFS.js'); return { module, @@ -81,12 +65,7 @@ export const DEFAULT_MODULE_FACTORIES = { }; }, [WASQLiteVFS.AccessHandlePoolVFS]: async (options: WASQLiteModuleFactoryOptions) => { - let module; - if (options.encryptionKey) { - module = await MultiCipherSyncWASQLiteModuleFactory(); - } else { - module = await SyncWASQLiteModuleFactory(); - } + const module = await syncModuleFactory(options.encryptionKey); // @ts-expect-error The types for this static method are missing upstream const { AccessHandlePoolVFS } = await import('@journeyapps/wa-sqlite/src/examples/AccessHandlePoolVFS.js'); return { @@ -95,12 +74,7 @@ export const DEFAULT_MODULE_FACTORIES = { }; }, [WASQLiteVFS.OPFSCoopSyncVFS]: async (options: WASQLiteModuleFactoryOptions) => { - let module; - if (options.encryptionKey) { - module = await MultiCipherSyncWASQLiteModuleFactory(); - } else { - module = await SyncWASQLiteModuleFactory(); - } + const module = await syncModuleFactory(options.encryptionKey); // @ts-expect-error The types for this static method are missing upstream const { OPFSCoopSyncVFS } = await import('@journeyapps/wa-sqlite/src/examples/OPFSCoopSyncVFS.js'); const vfs = await OPFSCoopSyncVFS.create(options.dbFileName, module); @@ -108,5 +82,15 @@ export const DEFAULT_MODULE_FACTORIES = { module, vfs }; + }, + [WASQLiteVFS.OPFSWriteAheadVFS]: async (options: WASQLiteModuleFactoryOptions) => { + const module = await syncModuleFactory(options.encryptionKey); + // @ts-expect-error The types for this static method are missing upstream + const { OPFSWriteAheadVFS } = await import('@journeyapps/wa-sqlite/src/examples/OPFSWriteAheadVFS.js'); + const vfs = await OPFSWriteAheadVFS.create(options.dbFileName, module, {}); + return { + module, + vfs + }; } }; diff --git a/packages/web/src/worker/db/MultiDatabaseServer.ts b/packages/web/src/worker/db/MultiDatabaseServer.ts index e883319c6..6345a6625 100644 --- a/packages/web/src/worker/db/MultiDatabaseServer.ts +++ b/packages/web/src/worker/db/MultiDatabaseServer.ts @@ -61,7 +61,10 @@ export class MultiDatabaseServer { let server: DatabaseServer | undefined = this.activeDatabases.get(dbFilename); if (server == null) { - const needsNavigatorLocks = !isSharedWorker; + // We don't need navigator locks for shared workers because all queries run in this shared worker exclusively. + // For read-only connections, we use a VFS that supports concurrent reads (so a single lock on the connection is + // fine). + const needsNavigatorLocks = !(isSharedWorker || options.isReadOnly); const connection = new RawSqliteConnection(options); const withSafeConcurrency = new ConcurrentSqliteConnection(connection, needsNavigatorLocks); diff --git a/packages/web/src/worker/db/open-worker-database.ts b/packages/web/src/worker/db/open-worker-database.ts index 460de1cac..c8b160578 100644 --- a/packages/web/src/worker/db/open-worker-database.ts +++ b/packages/web/src/worker/db/open-worker-database.ts @@ -1,5 +1,5 @@ import * as Comlink from 'comlink'; -import { WASQLiteVFS } from '../../db/adapters/wa-sqlite/vfs.js'; +import { vfsRequiresDedicatedWorkers, WASQLiteVFS } from '../../db/adapters/wa-sqlite/vfs.js'; import { OpenWorkerConnection } from '../../db/adapters/wa-sqlite/DatabaseClient.js'; /** @@ -11,7 +11,7 @@ export function openWorkerDatabasePort( worker: string | URL = '', vfs?: WASQLiteVFS ) { - const needsDedicated = vfs == WASQLiteVFS.AccessHandlePoolVFS || vfs == WASQLiteVFS.OPFSCoopSyncVFS; + const needsDedicated = vfs && vfsRequiresDedicatedWorkers(vfs); if (worker) { return !needsDedicated && multipleTabs diff --git a/packages/web/tests/open.test.ts b/packages/web/tests/open.test.ts index 4eca6ad87..f636a7b4d 100644 --- a/packages/web/tests/open.test.ts +++ b/packages/web/tests/open.test.ts @@ -90,7 +90,8 @@ describe('Open Methods', { sequential: true }, () => { }, temporaryStorage: TemporaryStorageOption.MEMORY, cacheSizeKb: 0, - dbFilename: '' + dbFilename: '', + isReadOnly: false }; const connection = await server.openConnectionLocally(options); const Adapter = DBAdapterDefaultMixin(DatabaseClient); diff --git a/packages/web/tests/src/db/write_ahead_log_opfs.test.ts b/packages/web/tests/src/db/write_ahead_log_opfs.test.ts new file mode 100644 index 000000000..7c007407b --- /dev/null +++ b/packages/web/tests/src/db/write_ahead_log_opfs.test.ts @@ -0,0 +1,23 @@ +import { expect, test } from 'vitest'; +import { generateTestDb } from '../../utils/testDb.js'; +import { WASQLiteOpenFactory, WASQLiteVFS } from '@powersync/web'; +import { TEST_SCHEMA } from '../../utils/test-schema.js'; + +test('supports concurrent reads', async () => { + const db = generateTestDb({ + database: new WASQLiteOpenFactory({ + dbFilename: 'basic-opfs.sqlite', + vfs: WASQLiteVFS.OPFSWriteAheadVFS + }), + schema: TEST_SCHEMA + }); + + await db.writeTransaction(async (tx) => { + expect(await db.getAll('SELECT * FROM customers')).toHaveLength(0); + await tx.execute('INSERT INTO customers (id, name) VALUES (uuid(), ?)', ['name']); + + expect(await db.getAll('SELECT * FROM customers')).toHaveLength(0); // No commit yet... + }); + + expect(await db.getAll('SELECT * FROM customers')).toHaveLength(1); +}); From 2c3f243d3954e9ece2ca0d0434b7602fb9ddba22 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 14 Apr 2026 16:35:59 +0200 Subject: [PATCH 2/5] Add changeset entry --- .changeset/thick-items-train.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/thick-items-train.md diff --git a/.changeset/thick-items-train.md b/.changeset/thick-items-train.md new file mode 100644 index 000000000..0f396f8d1 --- /dev/null +++ b/.changeset/thick-items-train.md @@ -0,0 +1,5 @@ +--- +'@powersync/web': minor +--- + +Add `WASQLiteVFS.OPFSWriteAheadVFS`, which also supports concurrent reads. From 307a08ff9cdaa4052ec731d60fc9f7c7f255c8ea Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 14 Apr 2026 16:53:23 +0200 Subject: [PATCH 3/5] Rename for clarity --- .../src/components/providers/SystemProvider.tsx | 2 +- packages/web/src/db/adapters/AsyncWebAdapter.ts | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/demos/react-supabase-todolist/src/components/providers/SystemProvider.tsx b/demos/react-supabase-todolist/src/components/providers/SystemProvider.tsx index f4e800b5f..edf25a413 100644 --- a/demos/react-supabase-todolist/src/components/providers/SystemProvider.tsx +++ b/demos/react-supabase-todolist/src/components/providers/SystemProvider.tsx @@ -23,7 +23,7 @@ export const db = new PowerSyncDatabase({ schema: AppSchema, database: new WASQLiteOpenFactory({ dbFilename: 'example.db', - vfs: WASQLiteVFS.OPFSCoopSyncVFS, + vfs: WASQLiteVFS.OPFSWriteAheadVFS, flags: { enableMultiTabs: typeof SharedWorker !== 'undefined' } diff --git a/packages/web/src/db/adapters/AsyncWebAdapter.ts b/packages/web/src/db/adapters/AsyncWebAdapter.ts index 3dc0756cf..13ba879e2 100644 --- a/packages/web/src/db/adapters/AsyncWebAdapter.ts +++ b/packages/web/src/db/adapters/AsyncWebAdapter.ts @@ -16,7 +16,7 @@ type PendingListener = { listener: Partial; closeAfterRegiste class AsyncConnectionPool implements ConnectionPool { protected readonly inner: Promise; - protected resolvedClient?: DatabaseClient; + protected resolvedWriter?: DatabaseClient; private activeOnWriter = 0; private activeOnReader = 0; @@ -32,7 +32,7 @@ class AsyncConnectionPool implements ConnectionPool { } this.pendingListeners.clear(); - this.resolvedClient = client.writer; + this.resolvedWriter = client.writer; return client; }); } @@ -88,8 +88,8 @@ class AsyncConnectionPool implements ConnectionPool { } registerListener(listener: Partial): () => void { - if (this.resolvedClient) { - return this.resolvedClient.registerListener(listener); + if (this.resolvedWriter) { + return this.resolvedWriter.registerListener(listener); } else { const pending: PendingListener = { listener }; this.pendingListeners.add(pending); @@ -117,8 +117,8 @@ export class AsyncDbAdapter extends DBAdapterDefaultMixin(AsyncConnectionPool) i } getConfiguration(): WebDBAdapterConfiguration { - if (this.resolvedClient) { - return this.resolvedClient.getConfiguration(); + if (this.resolvedWriter) { + return this.resolvedWriter.getConfiguration(); } throw new Error('AsyncDbAdapter.getConfiguration() can only be called after initializing it.'); From b56020d7e7ce1b3a0440a40062582b8735b87e89 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 14 Apr 2026 16:57:26 +0200 Subject: [PATCH 4/5] Revert changes to demo --- .../src/components/providers/SystemProvider.tsx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/demos/react-supabase-todolist/src/components/providers/SystemProvider.tsx b/demos/react-supabase-todolist/src/components/providers/SystemProvider.tsx index edf25a413..f4e800b5f 100644 --- a/demos/react-supabase-todolist/src/components/providers/SystemProvider.tsx +++ b/demos/react-supabase-todolist/src/components/providers/SystemProvider.tsx @@ -23,7 +23,7 @@ export const db = new PowerSyncDatabase({ schema: AppSchema, database: new WASQLiteOpenFactory({ dbFilename: 'example.db', - vfs: WASQLiteVFS.OPFSWriteAheadVFS, + vfs: WASQLiteVFS.OPFSCoopSyncVFS, flags: { enableMultiTabs: typeof SharedWorker !== 'undefined' } From 49dc34d00acf0c499eadce82d57452e8a29ddc52 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 20 Apr 2026 09:57:18 +0200 Subject: [PATCH 5/5] Support larger read pools too --- .../web/src/db/adapters/AsyncWebAdapter.ts | 171 +++++++++++++----- .../adapters/wa-sqlite/WASQLiteOpenFactory.ts | 23 ++- .../tests/src/db/write_ahead_log_opfs.test.ts | 11 +- 3 files changed, 154 insertions(+), 51 deletions(-) diff --git a/packages/web/src/db/adapters/AsyncWebAdapter.ts b/packages/web/src/db/adapters/AsyncWebAdapter.ts index 13ba879e2..b667a9f36 100644 --- a/packages/web/src/db/adapters/AsyncWebAdapter.ts +++ b/packages/web/src/db/adapters/AsyncWebAdapter.ts @@ -3,7 +3,10 @@ import { DBAdapterDefaultMixin, DBAdapterListener, DBLockOptions, - LockContext + LockContext, + Mutex, + Semaphore, + UnlockFn } from '@powersync/common'; import { SharedConnectionWorker, WebDBAdapter, WebDBAdapterConfiguration } from './WebDBAdapter.js'; import { DatabaseClient } from './wa-sqlite/DatabaseClient.js'; @@ -14,11 +17,8 @@ type PendingListener = { listener: Partial; closeAfterRegiste * A connection pool implementation delegating to another pool opened asynchronnously. */ class AsyncConnectionPool implements ConnectionPool { - protected readonly inner: Promise; - + protected readonly state: Promise; protected resolvedWriter?: DatabaseClient; - private activeOnWriter = 0; - private activeOnReader = 0; private readonly pendingListeners = new Set(); @@ -26,65 +26,43 @@ class AsyncConnectionPool implements ConnectionPool { inner: Promise, readonly name: string ) { - this.inner = inner.then((client) => { + this.state = inner.then((client) => { for (const pending of this.pendingListeners) { pending.closeAfterRegisteredOnResolvedPool = client.writer.registerListener(pending.listener); } this.pendingListeners.clear(); this.resolvedWriter = client.writer; - return client; + if (client.additionalReaders.length) { + return readWritePoolState(client.writer, client.additionalReaders); + } + + return singleConnectionPoolState(client.writer); }); } async init() { - await this.inner; + await this.state; } async close() { - const inner = await this.inner; - - await inner.writer.close(); - await inner.additionalReader?.close(); + const state = await this.state; + await state.close(); } async readLock(fn: (tx: LockContext) => Promise, options?: DBLockOptions): Promise { - const inner = await this.inner; - - // This is a crude load balancing scheme between the writer and an additional read connection (if available). - // Ideally, we should support abortable requests (which would allow us to request a lock from both and just use - // whatever completes first). For now, this at least gives us some concurrency. We can improve this in the future. - if (inner.additionalReader && this.activeOnReader <= this.activeOnWriter) { - try { - this.activeOnReader++; - return await inner.additionalReader.readLock(fn, options); - } finally { - this.activeOnReader--; - } - } - - try { - this.activeOnWriter++; - return await inner.writer.readLock(fn, options); - } finally { - this.activeOnWriter--; - } + const state = await this.state; + return state.withConnection(true, fn, options); } async writeLock(fn: (tx: LockContext) => Promise, options?: DBLockOptions): Promise { - const inner = await this.inner; - try { - this.activeOnWriter++; - return await inner.writer.writeLock(fn, options); - } finally { - this.activeOnWriter--; - } + const state = await this.state; + return state.withConnection(false, fn, options); } async refreshSchema(): Promise { - const inner = await this.inner; - await inner.writer.refreshSchema(); - await inner.additionalReader?.refreshSchema(); + const state = await this.state; + await state.refreshSchema(); } registerListener(listener: Partial): () => void { @@ -107,13 +85,116 @@ class AsyncConnectionPool implements ConnectionPool { export interface PoolConnection { writer: DatabaseClient; - additionalReader?: DatabaseClient; + additionalReaders: DatabaseClient[]; +} + +interface PoolState { + writer: DatabaseClient; + withConnection(allowReadOnly: boolean, fn: (tx: LockContext) => Promise, options?: DBLockOptions): Promise; + close(): Promise; + refreshSchema(): Promise; +} + +function singleConnectionPoolState(connection: DatabaseClient): PoolState { + return { + writer: connection, + withConnection: (allowReadOnly, fn, options) => { + if (allowReadOnly) { + return connection.readLock(fn, options); + } else { + return connection.writeLock(fn, options); + } + }, + close: () => connection.close(), + refreshSchema: () => connection.refreshSchema() + }; +} + +function readWritePoolState(writer: DatabaseClient, readers: DatabaseClient[]): PoolState { + // DatabaseClients have locks internally, so these aren't necessary for correctness. However, our mutex and semaphore + // implementations are very cheap to cancel, which we use to dispatch reads to the first available connection (by + // simply requesting all of them and sticking with the first connection we get). + const writerMutex = new Mutex(); + const readerSemaphore = new Semaphore(readers); + + return { + writer, + async withConnection(allowReadOnly, fn, options) { + const abortController = new AbortController(); + const abortSignal = abortController.signal; + + let timeout: any = null; + let release: UnlockFn | undefined; + if (options?.timeoutMs) { + timeout = setTimeout(() => abortController.abort, options.timeoutMs); + } + + try { + if (allowReadOnly) { + let connection: DatabaseClient; + + // Even if we have a pool of read connections, it's typically very small and we assume that most queries are + // reads. So, we want to request any connection from the read pool and the dedicated write connection (which + // can also serve reads). We race for the first connection we can obtain this way, and then abort the other + // request. + [connection, release] = await new Promise<[DatabaseClient, UnlockFn]>((resolve, reject) => { + let didComplete = false; + function complete() { + didComplete = true; + abortController.abort(); + } + + function completeSuccess(connection: DatabaseClient, returnFn: UnlockFn) { + if (didComplete) { + // We're not going to use this connection, so return it immediately. + returnFn(); + } else { + complete(); + resolve([connection, returnFn]); + } + } + + function completeError(error: unknown) { + // We either have a working connection already, or we've rejected the promise. Either way, we don't need + // to do either thing again. + if (didComplete) return; + + complete(); + reject(error); + } + + writerMutex.acquire(abortSignal).then((unlock) => completeSuccess(writer, unlock), completeError); + readerSemaphore + .requestOne(abortSignal) + .then(({ item, release }) => completeSuccess(item, release), completeError); + }); + + return await connection.readLock(fn); + } else { + return await writerMutex.runExclusive(() => writer.writeLock(fn), abortSignal); + } + } finally { + if (timeout != null) { + clearTimeout(timeout); + } + release?.(); + } + }, + async close() { + await writer.close(); + await Promise.all(readers.map((r) => r.close())); + }, + async refreshSchema() { + await writer.refreshSchema(); + await Promise.all(readers.map((r) => r.refreshSchema())); + } + }; } export class AsyncDbAdapter extends DBAdapterDefaultMixin(AsyncConnectionPool) implements WebDBAdapter { async shareConnection(): Promise { - const inner = await this.inner; - return inner.writer.shareConnection(); + const state = await this.state; + return state.writer.shareConnection(); } getConfiguration(): WebDBAdapterConfiguration { diff --git a/packages/web/src/db/adapters/wa-sqlite/WASQLiteOpenFactory.ts b/packages/web/src/db/adapters/wa-sqlite/WASQLiteOpenFactory.ts index 33176ba32..864592386 100644 --- a/packages/web/src/db/adapters/wa-sqlite/WASQLiteOpenFactory.ts +++ b/packages/web/src/db/adapters/wa-sqlite/WASQLiteOpenFactory.ts @@ -19,6 +19,16 @@ import { AsyncDbAdapter, PoolConnection } from '../AsyncWebAdapter.js'; export interface WASQLiteOpenFactoryOptions extends WebSQLOpenFactoryOptions { vfs?: WASQLiteVFS; + /** + * If the {@link vfs} supports it, an additional amount of read-only connections to open. Using additional read + * connections can speed up queries by dispatching them to multiple workers running them concurrently. + * + * {@link WASQLiteVFS.OPFSWriteAheadVFS} is the only VFS with support for multiple connections, so this option is + * ignored for other VFS implementations. + * + * Defaults to 1. + */ + additionalReaders?: number; } export interface ResolvedWASQLiteOpenFactoryOptions extends ResolvedWebSQLOpenOptions { @@ -113,7 +123,7 @@ export class WASQLiteOpenFactory implements SQLOpenFactory { }); let client: DatabaseClient; - let additionalReader: DatabaseClient | undefined; + let additionalReaders: DatabaseClient[] = []; let requiresPersistentTriggers = vfsRequiresDedicatedWorkers(vfs); if (useWebWorker) { @@ -167,9 +177,12 @@ export class WASQLiteOpenFactory implements SQLOpenFactory { if (vfs == WASQLiteVFS.OPFSWriteAheadVFS) { // This VFS supports concurrent reads, so we can open additional workers to host read-only connections for - // concurrent reads / writes. To avoid excessive resource usage, we currently add one additional reader per - // tab. In the future, we might revisit this to use a growable pool of readers. - additionalReader = await openDatabaseWorker(resolveOptions(true)); + // concurrent reads / writes. + const additionalReadersCount = this.options.additionalReaders ?? 1; + for (let i = 0; i < additionalReadersCount; i++) { + const reader = await openDatabaseWorker(resolveOptions(true)); + additionalReaders.push(reader); + } } } else { // Don't use a web worker. Instead, open the MultiDatabaseServer a worker would use locally. @@ -189,7 +202,7 @@ export class WASQLiteOpenFactory implements SQLOpenFactory { return { writer: client, - additionalReader + additionalReaders }; } } diff --git a/packages/web/tests/src/db/write_ahead_log_opfs.test.ts b/packages/web/tests/src/db/write_ahead_log_opfs.test.ts index 7c007407b..ae77793d6 100644 --- a/packages/web/tests/src/db/write_ahead_log_opfs.test.ts +++ b/packages/web/tests/src/db/write_ahead_log_opfs.test.ts @@ -7,7 +7,8 @@ test('supports concurrent reads', async () => { const db = generateTestDb({ database: new WASQLiteOpenFactory({ dbFilename: 'basic-opfs.sqlite', - vfs: WASQLiteVFS.OPFSWriteAheadVFS + vfs: WASQLiteVFS.OPFSWriteAheadVFS, + additionalReaders: 1 }), schema: TEST_SCHEMA }); @@ -20,4 +21,12 @@ test('supports concurrent reads', async () => { }); expect(await db.getAll('SELECT * FROM customers')).toHaveLength(1); + + // Despite only using one additional read connection, we should be able to support two concurrent readers by using + // the write connection too. + await db.readLock(async (ctx1) => { + await db.readLock(async (ctx2) => { + await Promise.all([ctx1.execute('SELECT 1'), ctx2.execute('SELECT 2')]); + }); + }); });