Skip to content

Commit 42afb0e

Browse files
authored
Database adapters: Share common logic (#886)
1 parent 0096e96 commit 42afb0e

12 files changed

Lines changed: 360 additions & 581 deletions

File tree

.changeset/slimy-mails-move.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
---
2+
'@powersync/react-native': minor
3+
'@powersync/common': minor
4+
'@powersync/web': minor
5+
'@powersync/op-sqlite': patch
6+
'@powersync/capacitor': patch
7+
'@powersync/node': patch
8+
---
9+
10+
Share common db adapter implementation logic.

packages/adapter-sql-js/src/SQLJSAdapter.ts

Lines changed: 75 additions & 159 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,18 @@ import {
22
BaseListener,
33
BaseObserver,
44
BatchedUpdateNotification,
5+
ConnectionPool,
56
ControlledExecutor,
67
createLogger,
78
DBAdapter,
9+
DBAdapterDefaultMixin,
810
DBAdapterListener,
11+
DBGetUtilsDefaultMixin,
912
DBLockOptions,
1013
ILogger,
1114
LockContext,
1215
QueryResult,
16+
SqlExecutor,
1317
SQLOpenFactory,
1418
SQLOpenOptions,
1519
Transaction
@@ -56,7 +60,7 @@ interface TableObserverListener extends BaseListener {
5660
}
5761
class TableObserver extends BaseObserver<TableObserverListener> {}
5862

59-
export class SQLJSDBAdapter extends BaseObserver<DBAdapterListener> implements DBAdapter {
63+
class SqlJsConnectionPool extends BaseObserver<DBAdapterListener> implements ConnectionPool {
6064
protected initPromise: Promise<SQLJs.Database>;
6165
protected _db: SQLJs.Database | null;
6266
protected tableUpdateCache: Set<string>;
@@ -136,129 +140,17 @@ export class SQLJSDBAdapter extends BaseObserver<DBAdapterListener> implements D
136140
db.close();
137141
}
138142

139-
protected generateLockContext(): LockContext {
140-
const execute = async (query: string, params?: any[]): Promise<QueryResult> => {
141-
const db = await this.getDB();
142-
const statement = db.prepare(query);
143-
const rawResults: any[][] = [];
144-
let columnNames: string[] | null = null;
145-
try {
146-
if (params) {
147-
statement.bind(params);
148-
}
149-
while (statement.step()) {
150-
if (!columnNames) {
151-
columnNames = statement.getColumnNames();
152-
}
153-
rawResults.push(statement.get());
154-
}
155-
156-
const rows = rawResults.map((row) => {
157-
return Object.fromEntries(row.map((value, index) => [columnNames![index], value]));
158-
});
159-
return {
160-
// `lastInsertId` is not available in the original version of SQL.js or its types, but it's available in the fork we use.
161-
insertId: (db as any).lastInsertId(),
162-
rowsAffected: db.getRowsModified(),
163-
rows: {
164-
_array: rows,
165-
length: rows.length,
166-
item: (idx: number) => rows[idx]
167-
}
168-
};
169-
} finally {
170-
statement.free();
171-
}
172-
};
173-
174-
const getAll = async <T>(query: string, params?: any[]): Promise<T[]> => {
175-
const result = await execute(query, params);
176-
return result.rows?._array ?? ([] as T[]);
177-
};
178-
179-
const getOptional = async <T>(query: string, params?: any[]): Promise<T | null> => {
180-
const results = await getAll<T>(query, params);
181-
return results.length > 0 ? results[0] : null;
182-
};
183-
184-
const get = async <T>(query: string, params?: any[]): Promise<T> => {
185-
const result = await getOptional<T>(query, params);
186-
if (!result) {
187-
throw new Error(`No results for query: ${query}`);
188-
}
189-
return result;
190-
};
191-
192-
const executeRaw = async (query: string, params?: any[]): Promise<any[][]> => {
193-
const db = await this.getDB();
194-
const statement = db.prepare(query);
195-
const rawResults: any[][] = [];
196-
try {
197-
if (params) {
198-
statement.bind(params);
199-
}
200-
while (statement.step()) {
201-
rawResults.push(statement.get());
202-
}
203-
return rawResults;
204-
} finally {
205-
statement.free();
206-
}
207-
};
208-
209-
return {
210-
getAll,
211-
getOptional,
212-
get,
213-
executeRaw,
214-
execute
215-
};
216-
}
217-
218-
execute(query: string, params?: any[]): Promise<QueryResult> {
219-
return this.writeLock((tx) => tx.execute(query, params));
220-
}
221-
222-
executeRaw(query: string, params?: any[]): Promise<any[][]> {
223-
return this.writeLock((tx) => tx.executeRaw(query, params));
224-
}
225-
226-
async executeBatch(query: string, params: any[][] = []): Promise<QueryResult> {
227-
let totalRowsAffected = 0;
228-
const db = await this.getDB();
229-
230-
const stmt = db.prepare(query);
231-
try {
232-
for (const paramSet of params) {
233-
stmt.run(paramSet);
234-
totalRowsAffected += db.getRowsModified();
235-
}
236-
237-
return {
238-
rowsAffected: totalRowsAffected
239-
};
240-
} finally {
241-
stmt.free();
242-
}
243-
}
244-
245143
/**
246144
* We're not using separate read/write locks here because we can't implement connection pools on top of SQL.js.
247145
*/
248146
readLock<T>(fn: (tx: LockContext) => Promise<T>, options?: DBLockOptions): Promise<T> {
249147
return this.writeLock(fn, options);
250148
}
251149

252-
readTransaction<T>(fn: (tx: Transaction) => Promise<T>, options?: DBLockOptions): Promise<T> {
253-
return this.readLock(async (ctx) => {
254-
return this.internalTransaction(ctx, fn);
255-
});
256-
}
257-
258150
writeLock<T>(fn: (tx: LockContext) => Promise<T>, options?: DBLockOptions): Promise<T> {
259151
return this.mutex.runExclusive(async () => {
260152
const db = await this.getDB();
261-
const result = await fn(this.generateLockContext());
153+
const result = await fn(new SqlJsLockContext(db));
262154

263155
// No point to schedule a write if there's no persister.
264156
if (this.options.persister) {
@@ -276,61 +168,85 @@ export class SQLJSDBAdapter extends BaseObserver<DBAdapterListener> implements D
276168
});
277169
}
278170

279-
writeTransaction<T>(fn: (tx: Transaction) => Promise<T>, options?: DBLockOptions): Promise<T> {
280-
return this.writeLock(async (ctx) => {
281-
return this.internalTransaction(ctx, fn);
282-
});
283-
}
284-
285-
refreshSchema(): Promise<void> {
286-
return this.get("PRAGMA table_info('sqlite_master')");
171+
async refreshSchema(): Promise<void> {
172+
await this.writeLock((ctx) => ctx.get("PRAGMA table_info('sqlite_master')"));
287173
}
174+
}
288175

289-
getAll<T>(sql: string, parameters?: any[]): Promise<T[]> {
290-
return this.readLock((tx) => tx.getAll<T>(sql, parameters));
291-
}
176+
class SqlJsExecutor implements SqlExecutor {
177+
constructor(readonly db: SQLJs.Database) {}
292178

293-
getOptional<T>(sql: string, parameters?: any[]): Promise<T | null> {
294-
return this.readLock((tx) => tx.getOptional<T>(sql, parameters));
295-
}
179+
async execute(query: string, params?: any[]): Promise<QueryResult> {
180+
const db = this.db;
181+
const statement = db.prepare(query);
182+
const rawResults: any[][] = [];
183+
let columnNames: string[] | null = null;
184+
try {
185+
if (params) {
186+
statement.bind(params);
187+
}
188+
while (statement.step()) {
189+
if (!columnNames) {
190+
columnNames = statement.getColumnNames();
191+
}
192+
rawResults.push(statement.get());
193+
}
296194

297-
get<T>(sql: string, parameters?: any[]): Promise<T> {
298-
return this.readLock((tx) => tx.get<T>(sql, parameters));
195+
const rows = rawResults.map((row) => {
196+
return Object.fromEntries(row.map((value, index) => [columnNames![index], value]));
197+
});
198+
return {
199+
// `lastInsertId` is not available in the original version of SQL.js or its types, but it's available in the fork we use.
200+
insertId: (db as any).lastInsertId(),
201+
rowsAffected: db.getRowsModified(),
202+
rows: {
203+
_array: rows,
204+
length: rows.length,
205+
item: (idx: number) => rows[idx]
206+
}
207+
};
208+
} finally {
209+
statement.free();
210+
}
299211
}
300212

301-
protected async internalTransaction<T>(ctx: LockContext, fn: (tx: Transaction) => Promise<T>): Promise<T> {
302-
let finalized = false;
303-
const commit = async (): Promise<QueryResult> => {
304-
if (finalized) {
305-
return { rowsAffected: 0 };
213+
async executeRaw(query: string, params?: any[]): Promise<any[][]> {
214+
const db = this.db;
215+
const statement = db.prepare(query);
216+
const rawResults: any[][] = [];
217+
try {
218+
if (params) {
219+
statement.bind(params);
306220
}
307-
finalized = true;
308-
return ctx.execute('COMMIT');
309-
};
310-
const rollback = async (): Promise<QueryResult> => {
311-
if (finalized) {
312-
return { rowsAffected: 0 };
221+
while (statement.step()) {
222+
rawResults.push(statement.get());
313223
}
314-
finalized = true;
315-
return ctx.execute('ROLLBACK');
316-
};
224+
return rawResults;
225+
} finally {
226+
statement.free();
227+
}
228+
}
229+
230+
async executeBatch(query: string, params: any[][] = []): Promise<QueryResult> {
231+
let totalRowsAffected = 0;
232+
const db = this.db;
233+
234+
const stmt = db.prepare(query);
317235
try {
318-
await ctx.execute('BEGIN');
319-
const result = await fn({
320-
...ctx,
321-
commit,
322-
rollback
323-
});
324-
await commit();
325-
return result;
326-
} catch (ex) {
327-
try {
328-
await rollback();
329-
} catch (ex2) {
330-
// In rare cases, a rollback may fail.
331-
// Safe to ignore.
236+
for (const paramSet of params) {
237+
stmt.run(paramSet);
238+
totalRowsAffected += db.getRowsModified();
332239
}
333-
throw ex;
240+
241+
return {
242+
rowsAffected: totalRowsAffected
243+
};
244+
} finally {
245+
stmt.free();
334246
}
335247
}
336248
}
249+
250+
class SqlJsLockContext extends DBGetUtilsDefaultMixin(SqlJsExecutor) implements LockContext {}
251+
252+
export class SQLJSDBAdapter extends DBAdapterDefaultMixin(SqlJsConnectionPool) implements DBAdapter {}

0 commit comments

Comments
 (0)