Skip to content

Commit 947ca21

Browse files
authored
Fix readTransaction errors on node and op-sqlite drivers (#941)
1 parent a656afa commit 947ca21

13 files changed

Lines changed: 159 additions & 67 deletions

File tree

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
'@powersync/node': patch
3+
'@powersync/common': patch
4+
'@powersync/op-sqlite': patch
5+
---
6+
7+
Fix `attempt to write a readonly database` error in `readTransaction`.

packages/common/src/db/DBAdapter.ts

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,17 @@ export interface SqlExecutor {
6969
executeBatch: (query: string, params?: any[][]) => Promise<QueryResult>;
7070
}
7171

72-
export interface LockContext extends SqlExecutor, DBGetUtils {}
72+
export interface LockContext extends SqlExecutor, DBGetUtils {
73+
/**
74+
* How the connection has been opened.
75+
*
76+
* `writer` indicates that the lock context is capable of writing to the database.
77+
* `queryOnly` indicates that the lock context has been opened in a readwrite mode, but a `PRAGMA query_only = TRUE`
78+
* disabled writes.
79+
* `readOnly` indicates that the lock context has been opened by passing `SQLITE_OPEN_READONLY` to `sqlite3_open_v2`.
80+
*/
81+
connectionType?: 'writer' | 'queryOnly' | 'readOnly';
82+
}
7383

7484
/**
7585
* Implements {@link DBGetUtils} on a {@link SqlRunner}.
@@ -263,8 +273,16 @@ class TransactionImplementation extends DBGetUtilsDefaultMixin(BaseTransaction)
263273
static async runWith<T>(ctx: LockContext, fn: (tx: Transaction) => Promise<T>): Promise<T> {
264274
let tx = new TransactionImplementation(ctx);
265275

276+
// For write transactions, use BEGIN IMMEDIATE to immediately obtain a write lock on the database (instead of doing
277+
// that on the first statement). If we have a genuine read-only connection, we also use BEGIN IMMEDIATE there: In
278+
// WAL mode, that ensures we pin the current state of the database (instead of the state at the first statement in
279+
// the transaction). But if we have a "fake" read-only connection implemented through `pragma query_only = true`, we
280+
// can't use this trick because it would attempt to lock the connection. So there, we use a regular `BEGIN`
281+
// statement.
282+
const useBeginImmediate = ctx.connectionType != 'queryOnly';
283+
266284
try {
267-
await ctx.execute('BEGIN IMMEDIATE');
285+
await ctx.execute(useBeginImmediate ? 'BEGIN IMMEDIATE' : 'BEGIN');
268286

269287
const result = await fn(tx);
270288
await tx.commit();

packages/node/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
"@powersync/drizzle-driver": "workspace:*",
7575
"@types/node": "catalog:",
7676
"better-sqlite3": "^12.2.0",
77+
"@types/better-sqlite3": "^7.6.13",
7778
"bson": "catalog:",
7879
"drizzle-orm": "catalog:",
7980
"js-logger": "catalog:",

packages/node/src/db/BetterSqliteWorker.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ class BlockingAsyncDatabase implements AsyncDatabase {
6464

6565
export async function openDatabase(worker: PowerSyncWorkerOptions, options: AsyncDatabaseOpenOptions) {
6666
const BetterSQLite3Database = await worker.loadBetterSqlite3();
67-
const baseDB = new BetterSQLite3Database(options.path);
67+
const baseDB = new BetterSQLite3Database(options.path, { readonly: !options.isWriter });
6868
baseDB.loadExtension(worker.extensionPath(), 'sqlite3_powersync_init');
6969

7070
const asyncDb = new BlockingAsyncDatabase(baseDB);

packages/node/src/db/NodeSqliteWorker.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ export async function openDatabase(worker: PowerSyncWorkerOptions, options: Asyn
5656
// end, since that would make us incompatible with older Node.JS versions.
5757
const { DatabaseSync } = await dynamicImport('node:sqlite');
5858

59-
const baseDB = new DatabaseSync(options.path, { allowExtension: true });
59+
const baseDB = new DatabaseSync(options.path, { allowExtension: true, readOnly: !options.isWriter });
6060
baseDB.loadExtension(worker.extensionPath(), 'sqlite3_powersync_init');
6161

6262
return new BlockingNodeDatabase(baseDB, options.isWriter);

packages/node/src/db/RemoteConnection.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,12 @@ class BaseRemoteConnection implements SqlExecutor {
1919

2020
private readonly notifyWorkerClosed = new AbortController();
2121

22-
constructor(worker: Worker, comlink: Remote<AsyncDatabaseOpener>, database: Remote<AsyncDatabase>) {
22+
constructor(
23+
worker: Worker,
24+
comlink: Remote<AsyncDatabaseOpener>,
25+
database: Remote<AsyncDatabase>,
26+
private readonly readonly: boolean
27+
) {
2328
this.worker = worker;
2429
this.comlink = comlink;
2530
this.database = database;
@@ -29,6 +34,10 @@ class BaseRemoteConnection implements SqlExecutor {
2934
});
3035
}
3136

37+
public get connectionType() {
38+
return this.readonly ? 'readOnly' : 'writer';
39+
}
40+
3241
/**
3342
* Runs the inner function, but appends the stack trace where this function was called. This is useful for workers
3443
* because stack traces from worker errors are otherwise unrelated to the application issue that has caused them.

packages/node/src/db/WorkerConnectionPool.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -130,13 +130,11 @@ export class WorkerConnectionPool extends BaseObserver<DBAdapterListener> implem
130130
await database.execute("SELECT powersync_update_hooks('install');", []);
131131
}
132132

133-
const connection = new RemoteConnection(worker, comlink, database);
133+
const connection = new RemoteConnection(worker, comlink, database, !isWriter);
134134
if (this.options.initializeConnection) {
135135
await this.options.initializeConnection(connection, isWriter);
136136
}
137-
if (!isWriter) {
138-
await connection.execute('pragma query_only = true');
139-
} else {
137+
if (isWriter) {
140138
// We only need to enable this on the writer connection.
141139
// We can get `database is locked` errors if we enable this on concurrently opening read connections.
142140
await connection.execute('pragma journal_mode = WAL');

packages/node/tests/PowerSyncDatabase.test.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,3 +252,25 @@ databaseTest('execute batch', async ({ database }) => {
252252
await database.executeBatch('INSERT INTO users (id, name) VALUES (uuid(), ?)', [['a'], ['b'], ['c']]);
253253
expect(await database.getAll('SELECT * FROM users')).toHaveLength(3);
254254
});
255+
256+
databaseTest('read transaction', async ({ database }) => {
257+
await database.execute('INSERT INTO lists (id, name) VALUES (uuid(), ?)', ['before tx']);
258+
259+
let completeHasRead: () => void, completeDidWrite: () => void;
260+
const hasReadTx = new Promise<void>((resolve) => (completeHasRead = resolve));
261+
const didWrite = new Promise<void>((resolve) => (completeDidWrite = resolve));
262+
263+
const listsInReadTx = database.readTransaction(async (tx) => {
264+
completeHasRead();
265+
await didWrite;
266+
267+
// Because this transaction was started before the write, we shouldn't see it in here.
268+
return await tx.getAll<{ name: string }>('SELECT name FROM lists');
269+
});
270+
271+
await hasReadTx;
272+
await database.execute('INSERT INTO lists (id, name) VALUES (uuid(), ?)', ['after tx']);
273+
completeDidWrite!();
274+
275+
expect(await listsInReadTx).toEqual([{ name: 'before tx' }]);
276+
});

packages/powersync-op-sqlite/src/db/OPSQLiteConnection.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import {
1313

1414
export type OPSQLiteConnectionOptions = {
1515
baseDB: DB;
16+
readonly: boolean;
1617
};
1718

1819
export type OPSQLiteUpdateNotification = {
@@ -116,6 +117,10 @@ class OPSQLiteExecutor extends BaseObserver<DBAdapterListener> implements Omit<S
116117
}
117118

118119
export class OPSQLiteConnection extends DBGetUtilsDefaultMixin(OPSQLiteExecutor) implements LockContext {
120+
get connectionType() {
121+
return this.options.readonly ? 'queryOnly' : 'writer';
122+
}
123+
119124
async refreshSchema() {
120125
await this.get("PRAGMA table_info('sqlite_master')");
121126
}

packages/powersync-op-sqlite/src/db/OPSqliteAdapter.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ class OPSQLiteConnectionPool extends BaseObserver<DBAdapterListener> implements
5151
this.options.sqliteOptions!;
5252
const dbFilename = this.options.name;
5353

54-
const underlyingWriteConnection = await this.openConnection(dbFilename);
54+
const underlyingWriteConnection = await this.openConnection(false, dbFilename);
5555

5656
const baseStatements = [
5757
`PRAGMA busy_timeout = ${lockTimeoutMs}`,
@@ -90,7 +90,7 @@ class OPSQLiteConnectionPool extends BaseObserver<DBAdapterListener> implements
9090

9191
const underlyingReadConnections = [];
9292
for (let i = 0; i < READ_CONNECTIONS; i++) {
93-
const conn = await this.openConnection(dbFilename);
93+
const conn = await this.openConnection(true, dbFilename);
9494
for (let statement of readConnectionStatements) {
9595
await conn.execute(statement);
9696
}
@@ -101,7 +101,7 @@ class OPSQLiteConnectionPool extends BaseObserver<DBAdapterListener> implements
101101
this.readConnections = new Semaphore(underlyingReadConnections);
102102
}
103103

104-
protected async openConnection(filenameOverride?: string): Promise<OPSQLiteConnection> {
104+
protected async openConnection(readonly: boolean, filenameOverride?: string): Promise<OPSQLiteConnection> {
105105
const dbFilename = filenameOverride ?? this.options.name;
106106
const DB: DB = this.openDatabase(dbFilename, this.options.sqliteOptions?.encryptionKey ?? undefined);
107107

@@ -112,7 +112,8 @@ class OPSQLiteConnectionPool extends BaseObserver<DBAdapterListener> implements
112112
await DB.execute('SELECT powersync_init()');
113113

114114
return new OPSQLiteConnection({
115-
baseDB: DB
115+
baseDB: DB,
116+
readonly
116117
});
117118
}
118119

0 commit comments

Comments
 (0)