Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/thick-items-train.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/web': minor
---

Add `WASQLiteVFS.OPFSWriteAheadVFS`, which also supports concurrent reads.
160 changes: 138 additions & 22 deletions packages/web/src/db/adapters/AsyncWebAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ import {
DBAdapterDefaultMixin,
DBAdapterListener,
DBLockOptions,
LockContext
LockContext,
Mutex,
Semaphore,
UnlockFn
} from '@powersync/common';
import { SharedConnectionWorker, WebDBAdapter, WebDBAdapterConfiguration } from './WebDBAdapter.js';
import { DatabaseClient } from './wa-sqlite/DatabaseClient.js';
Expand All @@ -14,52 +17,57 @@ type PendingListener = { listener: Partial<DBAdapterListener>; closeAfterRegiste
* A connection pool implementation delegating to another pool opened asynchronnously.
*/
class AsyncConnectionPool implements ConnectionPool {
protected readonly inner: Promise<DatabaseClient>;
protected readonly state: Promise<PoolState>;
protected resolvedWriter?: DatabaseClient;

protected resolvedClient?: DatabaseClient;
private readonly pendingListeners = new Set<PendingListener>();

constructor(
inner: Promise<DatabaseClient>,
inner: Promise<PoolConnection>,
readonly name: string
) {
this.inner = inner.then((client) => {
this.state = inner.then((client) => {
for (const pending of this.pendingListeners) {
pending.closeAfterRegisteredOnResolvedPool = client.registerListener(pending.listener);
pending.closeAfterRegisteredOnResolvedPool = client.writer.registerListener(pending.listener);
}
this.pendingListeners.clear();

this.resolvedClient = client;
return client;
this.resolvedWriter = client.writer;
if (client.additionalReaders.length) {
return readWritePoolState(client.writer, client.additionalReaders);
}

return singleConnectionPoolState(client.writer);
});
}

async init() {
await this.inner;
await this.state;
}

async close() {
const inner = await this.inner;
return await inner.close();
const state = await this.state;
await state.close();
}

async readLock<T>(fn: (tx: LockContext) => Promise<T>, options?: DBLockOptions): Promise<T> {
const inner = await this.inner;
return await inner.readLock(fn, options);
const state = await this.state;
return state.withConnection(true, fn, options);
}

async writeLock<T>(fn: (tx: LockContext) => Promise<T>, options?: DBLockOptions): Promise<T> {
const inner = await this.inner;
return await inner.writeLock(fn, options);
const state = await this.state;
return state.withConnection(false, fn, options);
}

async refreshSchema(): Promise<void> {
await (await this.inner).refreshSchema();
const state = await this.state;
await state.refreshSchema();
}

registerListener(listener: Partial<DBAdapterListener>): () => void {
if (this.resolvedClient) {
return this.resolvedClient.registerListener(listener);
if (this.resolvedWriter) {
return this.resolvedWriter.registerListener(listener);
} else {
const pending: PendingListener = { listener };
this.pendingListeners.add(pending);
Expand All @@ -75,15 +83,123 @@ class AsyncConnectionPool implements ConnectionPool {
}
}

export interface PoolConnection {
writer: DatabaseClient;
additionalReaders: DatabaseClient[];
}

interface PoolState {
writer: DatabaseClient;
withConnection<T>(allowReadOnly: boolean, fn: (tx: LockContext) => Promise<T>, options?: DBLockOptions): Promise<T>;
close(): Promise<void>;
refreshSchema(): Promise<void>;
}

function singleConnectionPoolState(connection: DatabaseClient): PoolState {
return {
writer: connection,
withConnection: (allowReadOnly, fn, options) => {
if (allowReadOnly) {
return connection.readLock(fn, options);
} else {
return connection.writeLock(fn, options);
}
},
close: () => connection.close(),
refreshSchema: () => connection.refreshSchema()
};
}

function readWritePoolState(writer: DatabaseClient, readers: DatabaseClient[]): PoolState {
// DatabaseClients have locks internally, so these aren't necessary for correctness. However, our mutex and semaphore
// implementations are very cheap to cancel, which we use to dispatch reads to the first available connection (by
// simply requesting all of them and sticking with the first connection we get).
const writerMutex = new Mutex();
const readerSemaphore = new Semaphore(readers);

return {
writer,
async withConnection(allowReadOnly, fn, options) {
const abortController = new AbortController();
const abortSignal = abortController.signal;

let timeout: any = null;
let release: UnlockFn | undefined;
if (options?.timeoutMs) {
timeout = setTimeout(() => abortController.abort, options.timeoutMs);
}

try {
if (allowReadOnly) {
let connection: DatabaseClient;

// Even if we have a pool of read connections, it's typically very small and we assume that most queries are
// reads. So, we want to request any connection from the read pool and the dedicated write connection (which
// can also serve reads). We race for the first connection we can obtain this way, and then abort the other
// request.
[connection, release] = await new Promise<[DatabaseClient, UnlockFn]>((resolve, reject) => {
let didComplete = false;
function complete() {
didComplete = true;
abortController.abort();
}

function completeSuccess(connection: DatabaseClient, returnFn: UnlockFn) {
if (didComplete) {
// We're not going to use this connection, so return it immediately.
returnFn();
} else {
complete();
resolve([connection, returnFn]);
}
}

function completeError(error: unknown) {
// We either have a working connection already, or we've rejected the promise. Either way, we don't need
// to do either thing again.
if (didComplete) return;

complete();
reject(error);
}

writerMutex.acquire(abortSignal).then((unlock) => completeSuccess(writer, unlock), completeError);
readerSemaphore
.requestOne(abortSignal)
.then(({ item, release }) => completeSuccess(item, release), completeError);
});

return await connection.readLock(fn);
} else {
return await writerMutex.runExclusive(() => writer.writeLock(fn), abortSignal);
}
} finally {
if (timeout != null) {
clearTimeout(timeout);
}
release?.();
}
},
async close() {
await writer.close();
await Promise.all(readers.map((r) => r.close()));
},
async refreshSchema() {
await writer.refreshSchema();
await Promise.all(readers.map((r) => r.refreshSchema()));
}
};
}

export class AsyncDbAdapter extends DBAdapterDefaultMixin(AsyncConnectionPool) implements WebDBAdapter {
async shareConnection(): Promise<SharedConnectionWorker> {
const inner = await this.inner;
return inner.shareConnection();
const state = await this.state;
return state.writer.shareConnection();
}

getConfiguration(): WebDBAdapterConfiguration {
if (this.resolvedClient) {
return this.resolvedClient.getConfiguration();
if (this.resolvedWriter) {
return this.resolvedWriter.getConfiguration();
}

throw new Error('AsyncDbAdapter.getConfiguration() can only be called after initializing it.');
Expand Down
6 changes: 4 additions & 2 deletions packages/web/src/db/adapters/wa-sqlite/DatabaseServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,10 @@ export class DatabaseServer {
},
requestAccess: async (write, timeoutMs) => {
requireOpen();
// TODO: Support timeouts, they don't seem to be supported by the async-mutex package.
const lease = await this.#inner.acquireConnection();

const lease = await this.#inner.acquireConnection(
timeoutMs != null ? AbortSignal.timeout(timeoutMs) : undefined
);
if (!isOpen) {
// Race between requestAccess and close(), the connection was closed while we tried to acquire a lease.
await lease.returnLease();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ export class RawSqliteConnection {

async init() {
const api = (this._sqliteAPI = await this.openSQLiteAPI());
this.db = await api.open_v2(this.options.dbFilename);
this.db = await api.open_v2(
this.options.dbFilename,
this.options.isReadOnly ? 1 /* SQLITE_OPEN_READONLY */ : 6 /* SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE */
);
await this.executeRaw(`PRAGMA temp_store = ${this.options.temporaryStorage};`);
if (this.options.encryptionKey) {
const escapedKey = this.options.encryptionKey.replace("'", "''");
Expand Down
Loading
Loading