diff --git a/.changeset/thick-items-train.md b/.changeset/thick-items-train.md new file mode 100644 index 000000000..0f396f8d1 --- /dev/null +++ b/.changeset/thick-items-train.md @@ -0,0 +1,5 @@ +--- +'@powersync/web': minor +--- + +Add `WASQLiteVFS.OPFSWriteAheadVFS`, which also supports concurrent reads. diff --git a/packages/web/src/db/adapters/AsyncWebAdapter.ts b/packages/web/src/db/adapters/AsyncWebAdapter.ts index 84b77f8da..b667a9f36 100644 --- a/packages/web/src/db/adapters/AsyncWebAdapter.ts +++ b/packages/web/src/db/adapters/AsyncWebAdapter.ts @@ -3,7 +3,10 @@ import { DBAdapterDefaultMixin, DBAdapterListener, DBLockOptions, - LockContext + LockContext, + Mutex, + Semaphore, + UnlockFn } from '@powersync/common'; import { SharedConnectionWorker, WebDBAdapter, WebDBAdapterConfiguration } from './WebDBAdapter.js'; import { DatabaseClient } from './wa-sqlite/DatabaseClient.js'; @@ -14,52 +17,57 @@ type PendingListener = { listener: Partial; closeAfterRegiste * A connection pool implementation delegating to another pool opened asynchronnously. */ class AsyncConnectionPool implements ConnectionPool { - protected readonly inner: Promise; + protected readonly state: Promise; + protected resolvedWriter?: DatabaseClient; - protected resolvedClient?: DatabaseClient; private readonly pendingListeners = new Set(); constructor( - inner: Promise, + inner: Promise, readonly name: string ) { - this.inner = inner.then((client) => { + this.state = inner.then((client) => { for (const pending of this.pendingListeners) { - pending.closeAfterRegisteredOnResolvedPool = client.registerListener(pending.listener); + pending.closeAfterRegisteredOnResolvedPool = client.writer.registerListener(pending.listener); } this.pendingListeners.clear(); - this.resolvedClient = client; - return client; + this.resolvedWriter = client.writer; + if (client.additionalReaders.length) { + return readWritePoolState(client.writer, client.additionalReaders); + } + + return singleConnectionPoolState(client.writer); }); } async init() { - await this.inner; + await this.state; } async close() { - const inner = await this.inner; - return await inner.close(); + const state = await this.state; + await state.close(); } async readLock(fn: (tx: LockContext) => Promise, options?: DBLockOptions): Promise { - const inner = await this.inner; - return await inner.readLock(fn, options); + const state = await this.state; + return state.withConnection(true, fn, options); } async writeLock(fn: (tx: LockContext) => Promise, options?: DBLockOptions): Promise { - const inner = await this.inner; - return await inner.writeLock(fn, options); + const state = await this.state; + return state.withConnection(false, fn, options); } async refreshSchema(): Promise { - await (await this.inner).refreshSchema(); + const state = await this.state; + await state.refreshSchema(); } registerListener(listener: Partial): () => void { - if (this.resolvedClient) { - return this.resolvedClient.registerListener(listener); + if (this.resolvedWriter) { + return this.resolvedWriter.registerListener(listener); } else { const pending: PendingListener = { listener }; this.pendingListeners.add(pending); @@ -75,15 +83,123 @@ class AsyncConnectionPool implements ConnectionPool { } } +export interface PoolConnection { + writer: DatabaseClient; + additionalReaders: DatabaseClient[]; +} + +interface PoolState { + writer: DatabaseClient; + withConnection(allowReadOnly: boolean, fn: (tx: LockContext) => Promise, options?: DBLockOptions): Promise; + close(): Promise; + refreshSchema(): Promise; +} + +function singleConnectionPoolState(connection: DatabaseClient): PoolState { + return { + writer: connection, + withConnection: (allowReadOnly, fn, options) => { + if (allowReadOnly) { + return connection.readLock(fn, options); + } else { + return connection.writeLock(fn, options); + } + }, + close: () => connection.close(), + refreshSchema: () => connection.refreshSchema() + }; +} + +function readWritePoolState(writer: DatabaseClient, readers: DatabaseClient[]): PoolState { + // DatabaseClients have locks internally, so these aren't necessary for correctness. However, our mutex and semaphore + // implementations are very cheap to cancel, which we use to dispatch reads to the first available connection (by + // simply requesting all of them and sticking with the first connection we get). + const writerMutex = new Mutex(); + const readerSemaphore = new Semaphore(readers); + + return { + writer, + async withConnection(allowReadOnly, fn, options) { + const abortController = new AbortController(); + const abortSignal = abortController.signal; + + let timeout: any = null; + let release: UnlockFn | undefined; + if (options?.timeoutMs) { + timeout = setTimeout(() => abortController.abort, options.timeoutMs); + } + + try { + if (allowReadOnly) { + let connection: DatabaseClient; + + // Even if we have a pool of read connections, it's typically very small and we assume that most queries are + // reads. So, we want to request any connection from the read pool and the dedicated write connection (which + // can also serve reads). We race for the first connection we can obtain this way, and then abort the other + // request. + [connection, release] = await new Promise<[DatabaseClient, UnlockFn]>((resolve, reject) => { + let didComplete = false; + function complete() { + didComplete = true; + abortController.abort(); + } + + function completeSuccess(connection: DatabaseClient, returnFn: UnlockFn) { + if (didComplete) { + // We're not going to use this connection, so return it immediately. + returnFn(); + } else { + complete(); + resolve([connection, returnFn]); + } + } + + function completeError(error: unknown) { + // We either have a working connection already, or we've rejected the promise. Either way, we don't need + // to do either thing again. + if (didComplete) return; + + complete(); + reject(error); + } + + writerMutex.acquire(abortSignal).then((unlock) => completeSuccess(writer, unlock), completeError); + readerSemaphore + .requestOne(abortSignal) + .then(({ item, release }) => completeSuccess(item, release), completeError); + }); + + return await connection.readLock(fn); + } else { + return await writerMutex.runExclusive(() => writer.writeLock(fn), abortSignal); + } + } finally { + if (timeout != null) { + clearTimeout(timeout); + } + release?.(); + } + }, + async close() { + await writer.close(); + await Promise.all(readers.map((r) => r.close())); + }, + async refreshSchema() { + await writer.refreshSchema(); + await Promise.all(readers.map((r) => r.refreshSchema())); + } + }; +} + export class AsyncDbAdapter extends DBAdapterDefaultMixin(AsyncConnectionPool) implements WebDBAdapter { async shareConnection(): Promise { - const inner = await this.inner; - return inner.shareConnection(); + const state = await this.state; + return state.writer.shareConnection(); } getConfiguration(): WebDBAdapterConfiguration { - if (this.resolvedClient) { - return this.resolvedClient.getConfiguration(); + if (this.resolvedWriter) { + return this.resolvedWriter.getConfiguration(); } throw new Error('AsyncDbAdapter.getConfiguration() can only be called after initializing it.'); diff --git a/packages/web/src/db/adapters/wa-sqlite/DatabaseServer.ts b/packages/web/src/db/adapters/wa-sqlite/DatabaseServer.ts index 42d9e92e8..f4cf863bc 100644 --- a/packages/web/src/db/adapters/wa-sqlite/DatabaseServer.ts +++ b/packages/web/src/db/adapters/wa-sqlite/DatabaseServer.ts @@ -112,8 +112,10 @@ export class DatabaseServer { }, requestAccess: async (write, timeoutMs) => { requireOpen(); - // TODO: Support timeouts, they don't seem to be supported by the async-mutex package. - const lease = await this.#inner.acquireConnection(); + + const lease = await this.#inner.acquireConnection( + timeoutMs != null ? AbortSignal.timeout(timeoutMs) : undefined + ); if (!isOpen) { // Race between requestAccess and close(), the connection was closed while we tried to acquire a lease. await lease.returnLease(); diff --git a/packages/web/src/db/adapters/wa-sqlite/RawSqliteConnection.ts b/packages/web/src/db/adapters/wa-sqlite/RawSqliteConnection.ts index a2724f6ef..3cf958eb1 100644 --- a/packages/web/src/db/adapters/wa-sqlite/RawSqliteConnection.ts +++ b/packages/web/src/db/adapters/wa-sqlite/RawSqliteConnection.ts @@ -39,7 +39,10 @@ export class RawSqliteConnection { async init() { const api = (this._sqliteAPI = await this.openSQLiteAPI()); - this.db = await api.open_v2(this.options.dbFilename); + this.db = await api.open_v2( + this.options.dbFilename, + this.options.isReadOnly ? 1 /* SQLITE_OPEN_READONLY */ : 6 /* SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE */ + ); await this.executeRaw(`PRAGMA temp_store = ${this.options.temporaryStorage};`); if (this.options.encryptionKey) { const escapedKey = this.options.encryptionKey.replace("'", "''"); diff --git a/packages/web/src/db/adapters/wa-sqlite/WASQLiteOpenFactory.ts b/packages/web/src/db/adapters/wa-sqlite/WASQLiteOpenFactory.ts index beed81fca..864592386 100644 --- a/packages/web/src/db/adapters/wa-sqlite/WASQLiteOpenFactory.ts +++ b/packages/web/src/db/adapters/wa-sqlite/WASQLiteOpenFactory.ts @@ -13,16 +13,31 @@ import { import { SSRDBAdapter } from '../SSRDBAdapter.js'; import { vfsRequiresDedicatedWorkers, WASQLiteVFS } from './vfs.js'; import { MultiDatabaseServer } from '../../../worker/db/MultiDatabaseServer.js'; -import { ClientOptions, DatabaseClient, OpenWorkerConnection } from './DatabaseClient.js'; +import { DatabaseClient, OpenWorkerConnection } from './DatabaseClient.js'; import { generateTabCloseSignal } from '../../../shared/tab_close_signal.js'; -import { AsyncDbAdapter } from '../AsyncWebAdapter.js'; +import { AsyncDbAdapter, PoolConnection } from '../AsyncWebAdapter.js'; export interface WASQLiteOpenFactoryOptions extends WebSQLOpenFactoryOptions { vfs?: WASQLiteVFS; + /** + * If the {@link vfs} supports it, an additional amount of read-only connections to open. Using additional read + * connections can speed up queries by dispatching them to multiple workers running them concurrently. + * + * {@link WASQLiteVFS.OPFSWriteAheadVFS} is the only VFS with support for multiple connections, so this option is + * ignored for other VFS implementations. + * + * Defaults to 1. + */ + additionalReaders?: number; } export interface ResolvedWASQLiteOpenFactoryOptions extends ResolvedWebSQLOpenOptions { vfs: WASQLiteVFS; + + /** + * Whether this is a read-only connection opened for the `OPFSWriteAheadVFS` file system. + */ + isReadOnly: boolean; } export interface WorkerDBOpenerOptions extends ResolvedWASQLiteOpenFactoryOptions { @@ -82,7 +97,7 @@ export class WASQLiteOpenFactory implements SQLOpenFactory { return this.openAdapter(); } - async openConnection(): Promise { + async openConnection(): Promise { const { enableMultiTabs, useWebWorker } = this.resolvedFlags; const { vfs = WASQLiteVFS.IDBBatchAtomicVFS, @@ -95,7 +110,7 @@ export class WASQLiteOpenFactory implements SQLOpenFactory { this.logger.warn('Multiple tabs are not enabled in this browser'); } - const resolvedOptions: ResolvedWASQLiteOpenFactoryOptions = { + const resolveOptions = (isReadOnly: boolean): ResolvedWASQLiteOpenFactoryOptions => ({ dbFilename: this.options.dbFilename, dbLocation: this.options.dbLocation, debugMode: this.options.debugMode, @@ -103,62 +118,92 @@ export class WASQLiteOpenFactory implements SQLOpenFactory { temporaryStorage, cacheSizeKb, flags: this.resolvedFlags, - encryptionKey: encryptionKey - }; + encryptionKey: encryptionKey, + isReadOnly + }); - let clientOptions: ClientOptions; + let client: DatabaseClient; + let additionalReaders: DatabaseClient[] = []; let requiresPersistentTriggers = vfsRequiresDedicatedWorkers(vfs); if (useWebWorker) { const optionsDbWorker = this.options.worker; - const workerPort = - typeof optionsDbWorker == 'function' - ? resolveWorkerDatabasePortFactory(() => - optionsDbWorker({ - ...this.options, - temporaryStorage, - cacheSizeKb, - flags: this.resolvedFlags, - encryptionKey - }) - ) - : openWorkerDatabasePort(this.options.dbFilename, enableMultiTabs, optionsDbWorker, this.waOptions.vfs); - - const source = Comlink.wrap(workerPort); - const closeSignal = new AbortController(); - const connection = await source.connect({ - ...resolvedOptions, - logLevel: this.logger.getLevel(), - lockName: await generateTabCloseSignal(closeSignal.signal) - }); - clientOptions = { - connection, - source, - // This tab owns the worker, so we're guaranteed to outlive it. - remoteCanCloseUnexpectedly: false, - onClose: () => { - closeSignal.abort(); - if (workerPort instanceof Worker) { - workerPort.terminate(); - } else { - workerPort.close(); + const openDatabaseWorker = async ( + resolvedOptions: ResolvedWASQLiteOpenFactoryOptions + ): Promise => { + const workerPort = + typeof optionsDbWorker == 'function' + ? resolveWorkerDatabasePortFactory(() => + optionsDbWorker({ + ...this.options, + temporaryStorage, + cacheSizeKb, + flags: this.resolvedFlags, + encryptionKey + }) + ) + : openWorkerDatabasePort(this.options.dbFilename, enableMultiTabs, optionsDbWorker, this.waOptions.vfs); + + const source = Comlink.wrap(workerPort); + const closeSignal = new AbortController(); + const connection = await source.connect({ + ...resolvedOptions, + logLevel: this.logger.getLevel(), + lockName: await generateTabCloseSignal(closeSignal.signal) + }); + const clientOptions = { + connection, + source, + // This tab owns the worker, so we're guaranteed to outlive it. + remoteCanCloseUnexpectedly: false, + onClose: () => { + closeSignal.abort(); + if (workerPort instanceof Worker) { + workerPort.terminate(); + } else { + workerPort.close(); + } } - } + }; + + return new DatabaseClient(clientOptions, { + ...resolvedOptions, + requiresPersistentTriggers + }); }; + + client = await openDatabaseWorker(resolveOptions(false)); + + if (vfs == WASQLiteVFS.OPFSWriteAheadVFS) { + // This VFS supports concurrent reads, so we can open additional workers to host read-only connections for + // concurrent reads / writes. + const additionalReadersCount = this.options.additionalReaders ?? 1; + for (let i = 0; i < additionalReadersCount; i++) { + const reader = await openDatabaseWorker(resolveOptions(true)); + additionalReaders.push(reader); + } + } } else { // Don't use a web worker. Instead, open the MultiDatabaseServer a worker would use locally. const localServer = new MultiDatabaseServer(this.logger); requiresPersistentTriggers = true; + const resolvedOptions = resolveOptions(false); const connection = await localServer.openConnectionLocally(resolvedOptions); - clientOptions = { connection, source: null, remoteCanCloseUnexpectedly: false }; + client = new DatabaseClient( + { connection, source: null, remoteCanCloseUnexpectedly: false }, + { + ...resolvedOptions, + requiresPersistentTriggers + } + ); } - return new DatabaseClient(clientOptions, { - ...resolvedOptions, - requiresPersistentTriggers - }); + return { + writer: client, + additionalReaders + }; } } diff --git a/packages/web/src/db/adapters/wa-sqlite/vfs.ts b/packages/web/src/db/adapters/wa-sqlite/vfs.ts index cf6b8ee80..6f3db97b7 100644 --- a/packages/web/src/db/adapters/wa-sqlite/vfs.ts +++ b/packages/web/src/db/adapters/wa-sqlite/vfs.ts @@ -6,7 +6,8 @@ import type * as SQLite from '@journeyapps/wa-sqlite'; export enum WASQLiteVFS { IDBBatchAtomicVFS = 'IDBBatchAtomicVFS', OPFSCoopSyncVFS = 'OPFSCoopSyncVFS', - AccessHandlePoolVFS = 'AccessHandlePoolVFS' + AccessHandlePoolVFS = 'AccessHandlePoolVFS', + OPFSWriteAheadVFS = 'OPFSWriteAheadVFS' } export function vfsRequiresDedicatedWorkers(vfs: WASQLiteVFS) { @@ -30,49 +31,32 @@ export type WASQLiteModuleFactory = ( options: WASQLiteModuleFactoryOptions ) => Promise<{ module: SQLiteModule; vfs: SQLiteVFS }>; -/** - * @internal - */ -export const AsyncWASQLiteModuleFactory = async () => { - const { default: factory } = await import('@journeyapps/wa-sqlite/dist/wa-sqlite-async.mjs'); - return factory(); -}; - -/** - * @internal - */ -export const MultiCipherAsyncWASQLiteModuleFactory = async () => { - const { default: factory } = await import('@journeyapps/wa-sqlite/dist/mc-wa-sqlite-async.mjs'); - return factory(); -}; - -/** - * @internal - */ -export const SyncWASQLiteModuleFactory = async () => { - const { default: factory } = await import('@journeyapps/wa-sqlite/dist/wa-sqlite.mjs'); - return factory(); -}; +async function asyncModuleFactory(encryptionKey: unknown) { + if (encryptionKey) { + const { default: factory } = await import('@journeyapps/wa-sqlite/dist/mc-wa-sqlite-async.mjs'); + return factory(); + } else { + const { default: factory } = await import('@journeyapps/wa-sqlite/dist/wa-sqlite-async.mjs'); + return factory(); + } +} -/** - * @internal - */ -export const MultiCipherSyncWASQLiteModuleFactory = async () => { - const { default: factory } = await import('@journeyapps/wa-sqlite/dist/mc-wa-sqlite.mjs'); - return factory(); -}; +async function syncModuleFactory(encryptionKey: unknown) { + if (encryptionKey) { + const { default: factory } = await import('@journeyapps/wa-sqlite/dist/mc-wa-sqlite.mjs'); + return factory(); + } else { + const { default: factory } = await import('@journeyapps/wa-sqlite/dist/wa-sqlite.mjs'); + return factory(); + } +} /** * @internal */ export const DEFAULT_MODULE_FACTORIES = { [WASQLiteVFS.IDBBatchAtomicVFS]: async (options: WASQLiteModuleFactoryOptions) => { - let module; - if (options.encryptionKey) { - module = await MultiCipherAsyncWASQLiteModuleFactory(); - } else { - module = await AsyncWASQLiteModuleFactory(); - } + const module = await asyncModuleFactory(options.encryptionKey); const { IDBBatchAtomicVFS } = await import('@journeyapps/wa-sqlite/src/examples/IDBBatchAtomicVFS.js'); return { module, @@ -81,12 +65,7 @@ export const DEFAULT_MODULE_FACTORIES = { }; }, [WASQLiteVFS.AccessHandlePoolVFS]: async (options: WASQLiteModuleFactoryOptions) => { - let module; - if (options.encryptionKey) { - module = await MultiCipherSyncWASQLiteModuleFactory(); - } else { - module = await SyncWASQLiteModuleFactory(); - } + const module = await syncModuleFactory(options.encryptionKey); // @ts-expect-error The types for this static method are missing upstream const { AccessHandlePoolVFS } = await import('@journeyapps/wa-sqlite/src/examples/AccessHandlePoolVFS.js'); return { @@ -95,12 +74,7 @@ export const DEFAULT_MODULE_FACTORIES = { }; }, [WASQLiteVFS.OPFSCoopSyncVFS]: async (options: WASQLiteModuleFactoryOptions) => { - let module; - if (options.encryptionKey) { - module = await MultiCipherSyncWASQLiteModuleFactory(); - } else { - module = await SyncWASQLiteModuleFactory(); - } + const module = await syncModuleFactory(options.encryptionKey); // @ts-expect-error The types for this static method are missing upstream const { OPFSCoopSyncVFS } = await import('@journeyapps/wa-sqlite/src/examples/OPFSCoopSyncVFS.js'); const vfs = await OPFSCoopSyncVFS.create(options.dbFileName, module); @@ -108,5 +82,15 @@ export const DEFAULT_MODULE_FACTORIES = { module, vfs }; + }, + [WASQLiteVFS.OPFSWriteAheadVFS]: async (options: WASQLiteModuleFactoryOptions) => { + const module = await syncModuleFactory(options.encryptionKey); + // @ts-expect-error The types for this static method are missing upstream + const { OPFSWriteAheadVFS } = await import('@journeyapps/wa-sqlite/src/examples/OPFSWriteAheadVFS.js'); + const vfs = await OPFSWriteAheadVFS.create(options.dbFileName, module, {}); + return { + module, + vfs + }; } }; diff --git a/packages/web/src/worker/db/MultiDatabaseServer.ts b/packages/web/src/worker/db/MultiDatabaseServer.ts index e883319c6..6345a6625 100644 --- a/packages/web/src/worker/db/MultiDatabaseServer.ts +++ b/packages/web/src/worker/db/MultiDatabaseServer.ts @@ -61,7 +61,10 @@ export class MultiDatabaseServer { let server: DatabaseServer | undefined = this.activeDatabases.get(dbFilename); if (server == null) { - const needsNavigatorLocks = !isSharedWorker; + // We don't need navigator locks for shared workers because all queries run in this shared worker exclusively. + // For read-only connections, we use a VFS that supports concurrent reads (so a single lock on the connection is + // fine). + const needsNavigatorLocks = !(isSharedWorker || options.isReadOnly); const connection = new RawSqliteConnection(options); const withSafeConcurrency = new ConcurrentSqliteConnection(connection, needsNavigatorLocks); diff --git a/packages/web/src/worker/db/open-worker-database.ts b/packages/web/src/worker/db/open-worker-database.ts index 460de1cac..c8b160578 100644 --- a/packages/web/src/worker/db/open-worker-database.ts +++ b/packages/web/src/worker/db/open-worker-database.ts @@ -1,5 +1,5 @@ import * as Comlink from 'comlink'; -import { WASQLiteVFS } from '../../db/adapters/wa-sqlite/vfs.js'; +import { vfsRequiresDedicatedWorkers, WASQLiteVFS } from '../../db/adapters/wa-sqlite/vfs.js'; import { OpenWorkerConnection } from '../../db/adapters/wa-sqlite/DatabaseClient.js'; /** @@ -11,7 +11,7 @@ export function openWorkerDatabasePort( worker: string | URL = '', vfs?: WASQLiteVFS ) { - const needsDedicated = vfs == WASQLiteVFS.AccessHandlePoolVFS || vfs == WASQLiteVFS.OPFSCoopSyncVFS; + const needsDedicated = vfs && vfsRequiresDedicatedWorkers(vfs); if (worker) { return !needsDedicated && multipleTabs diff --git a/packages/web/tests/open.test.ts b/packages/web/tests/open.test.ts index 4eca6ad87..f636a7b4d 100644 --- a/packages/web/tests/open.test.ts +++ b/packages/web/tests/open.test.ts @@ -90,7 +90,8 @@ describe('Open Methods', { sequential: true }, () => { }, temporaryStorage: TemporaryStorageOption.MEMORY, cacheSizeKb: 0, - dbFilename: '' + dbFilename: '', + isReadOnly: false }; const connection = await server.openConnectionLocally(options); const Adapter = DBAdapterDefaultMixin(DatabaseClient); diff --git a/packages/web/tests/src/db/write_ahead_log_opfs.test.ts b/packages/web/tests/src/db/write_ahead_log_opfs.test.ts new file mode 100644 index 000000000..ae77793d6 --- /dev/null +++ b/packages/web/tests/src/db/write_ahead_log_opfs.test.ts @@ -0,0 +1,32 @@ +import { expect, test } from 'vitest'; +import { generateTestDb } from '../../utils/testDb.js'; +import { WASQLiteOpenFactory, WASQLiteVFS } from '@powersync/web'; +import { TEST_SCHEMA } from '../../utils/test-schema.js'; + +test('supports concurrent reads', async () => { + const db = generateTestDb({ + database: new WASQLiteOpenFactory({ + dbFilename: 'basic-opfs.sqlite', + vfs: WASQLiteVFS.OPFSWriteAheadVFS, + additionalReaders: 1 + }), + schema: TEST_SCHEMA + }); + + await db.writeTransaction(async (tx) => { + expect(await db.getAll('SELECT * FROM customers')).toHaveLength(0); + await tx.execute('INSERT INTO customers (id, name) VALUES (uuid(), ?)', ['name']); + + expect(await db.getAll('SELECT * FROM customers')).toHaveLength(0); // No commit yet... + }); + + expect(await db.getAll('SELECT * FROM customers')).toHaveLength(1); + + // Despite only using one additional read connection, we should be able to support two concurrent readers by using + // the write connection too. + await db.readLock(async (ctx1) => { + await db.readLock(async (ctx2) => { + await Promise.all([ctx1.execute('SELECT 1'), ctx2.execute('SELECT 2')]); + }); + }); +});