-
Notifications
You must be signed in to change notification settings - Fork 78
Expand file tree
/
Copy pathCapacitorSQLiteAdapter.ts
More file actions
326 lines (288 loc) · 11.4 KB
/
Copy pathCapacitorSQLiteAdapter.ts
File metadata and controls
326 lines (288 loc) · 11.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
import { CapacitorSQLite, SQLiteConnection, SQLiteDBConnection } from '@capacitor-community/sqlite';
import { Capacitor } from '@capacitor/core';
import {
BaseObserver,
BatchedUpdateNotification,
ConnectionPool,
DBAdapter,
DBAdapterDefaultMixin,
DBAdapterListener,
DBLockOptions,
LockContext,
Mutex,
QueryResult,
timeoutSignal
} from '@powersync/web';
import { PowerSyncCore } from '../plugin/PowerSyncCore.js';
import { messageForErrorCode } from '../plugin/PowerSyncPlugin.js';
import { CapacitorSQLiteOpenFactoryOptions, DEFAULT_SQLITE_OPTIONS } from './CapacitorSQLiteOpenFactory.js';
/**
* Monitors the execution time of a query and logs it to the performance timeline.
*/
async function monitorQuery(sql: string, executor: () => Promise<QueryResult>): Promise<QueryResult> {
const start = performance.now();
try {
const r = await executor();
performance.measure(`[SQL] ${sql}`, { start });
return r;
} catch (e: any) {
performance.measure(`[SQL] [ERROR: ${e.message}] ${sql}`, { start });
throw e;
}
}
/**
* Maps SQLite query parameter values to Capacitor Community supported formats.
* This handles binary payloads for both iOS and Android.
*/
function mapSQLiteParameterValues({ platform, values }: { platform: string; values: any[] }) {
return values.map((value) => {
if (value instanceof Uint8Array) {
switch (platform) {
case 'ios': {
/**
* The Buffer polyfill, used in @powersync/common, is a Uint8Array subclass which defines additional fields like
* `_isBuffer` and `parent` on its `prototype`. The additional fields are serialized when passed through the native bridge.
* The Capacitor Community SQLite library expects a dictionary of indexes to numerical bytes.
* The additional fields (which are not an index to numerical byte mapping) cause the parsing logic in the SQLite library to throw an error:
* "Error in reading buffer".
*
* Re-wrapping the same backing buffer as a plain Uint8Array removes the Buffer subclass wrapper
* while keeping the same underlying bytes. This creates a new view, not a byte copy, so the
* overhead should be minimal.
*/
return new Uint8Array(value.buffer, value.byteOffset, value.byteLength);
}
case 'android': {
/**
* Android expects an object of the form:
* { type: 'Buffer', data: [...]}
*/
return {
type: 'Buffer',
data: Array.from(value)
};
}
}
}
// return value as-is
return value;
});
}
class CapacitorConnectionPool extends BaseObserver<DBAdapterListener> implements ConnectionPool {
protected _writeConnection: SQLiteDBConnection | null;
protected _readConnection: SQLiteDBConnection | null;
protected initializedPromise: Promise<void>;
protected writeMutex: Mutex;
protected readMutex: Mutex;
constructor(protected options: CapacitorSQLiteOpenFactoryOptions) {
super();
this._writeConnection = null;
this._readConnection = null;
this.writeMutex = new Mutex();
this.readMutex = new Mutex();
this.initializedPromise = this.init();
}
protected get writeConnection(): SQLiteDBConnection {
if (!this._writeConnection) {
throw new Error('Init not completed yet');
}
return this._writeConnection;
}
protected get readConnection(): SQLiteDBConnection {
if (!this._readConnection) {
throw new Error('Init not completed yet');
}
return this._readConnection;
}
get name() {
return this.options.dbFilename;
}
private async init() {
const { responseCode: registrationResponseCode } = await PowerSyncCore.registerCore();
if (registrationResponseCode != 0) {
throw new Error(`Could not register PowerSync core extension: ${messageForErrorCode(registrationResponseCode)}`);
}
const sqlite = new SQLiteConnection(CapacitorSQLite);
// It seems like the isConnection and retrieveConnection methods
// only check a JS side map of connections.
// On hot reload this JS cache can be cleared, while the connection
// still exists natively. and `createConnection` will fail if it already exists.
await sqlite.closeConnection(this.options.dbFilename, false).catch(() => {});
await sqlite.closeConnection(this.options.dbFilename, true).catch(() => {});
// TODO support encryption eventually
this._writeConnection = await sqlite.createConnection(this.options.dbFilename, false, 'no-encryption', 1, false);
this._readConnection = await sqlite.createConnection(this.options.dbFilename, false, 'no-encryption', 1, true);
await this._writeConnection.open();
const { cacheSizeKb, journalSizeLimit, synchronous } = { ...DEFAULT_SQLITE_OPTIONS, ...this.options.sqliteOptions };
await this.writeConnection.query('PRAGMA journal_mode = WAL');
await this.writeConnection.query(`PRAGMA journal_size_limit = ${journalSizeLimit}`);
await this.writeConnection.query(`PRAGMA temp_store = memory`);
await this.writeConnection.query(`PRAGMA synchronous = ${synchronous}`);
await this.writeConnection.query(`PRAGMA cache_size = -${cacheSizeKb}`);
await this._readConnection.open();
const platform = Capacitor.getPlatform();
if (platform == 'android') {
/**
* SQLCipher for Android enables dynamic loading of extensions.
* On iOS we use a static auto extension registration.
*/
const extensionQuery = "SELECT load_extension('libpowersync.so', 'sqlite3_powersync_init')";
await this.writeConnection.query(extensionQuery);
await this.readConnection.query(extensionQuery);
}
await this.writeConnection.query("SELECT powersync_update_hooks('install')");
}
async close(): Promise<void> {
await this.initializedPromise;
await this.writeConnection.close();
await this.readConnection.close();
}
protected generateLockContext(db: SQLiteDBConnection): LockContext {
const _query = async (query: string, params: any[] = []) => {
const mappedParams = mapSQLiteParameterValues({
platform: Capacitor.getPlatform(),
values: params
});
const result = await db.query(query, mappedParams);
const arrayResult = result.values ?? [];
return {
rowsAffected: 0,
rows: {
_array: arrayResult,
length: arrayResult.length,
item: (idx: number) => arrayResult[idx]
}
};
};
const _execute = async (query: string, params: any[] = []): Promise<QueryResult> => {
const platform = Capacitor.getPlatform();
if (
db.getConnectionReadOnly() ||
// Android: use query for SELECT and executeSet for mutations
// We cannot use `run` here for both cases.
(platform == 'android' && query.toLowerCase().trim().startsWith('select'))
) {
return _query(query, params);
}
const mappedParams = mapSQLiteParameterValues({
platform,
values: params
});
if (platform == 'android') {
const result = await db.executeSet([{ statement: query, values: mappedParams }], false);
return {
insertId: result.changes?.lastId,
rowsAffected: result.changes?.changes ?? 0,
rows: {
_array: [],
length: 0,
item: () => null
}
};
}
// iOS (and other platforms): use run("all")
const result = await db.run(query, mappedParams, false, 'all');
const resultSet = result.changes?.values ?? [];
return {
insertId: result.changes?.lastId,
rowsAffected: result.changes?.changes ?? 0,
rows: {
_array: resultSet,
length: resultSet.length,
item: (idx) => resultSet[idx]
}
};
};
const execute = this.options.debugMode
? (sql: string, params?: any[]) => monitorQuery(sql, () => _execute(sql, params))
: _execute;
const executeQuery = this.options.debugMode
? (sql: string, params?: any[]) => monitorQuery(sql, () => _query(sql, params))
: _query;
const getAll = async <T>(query: string, params?: any[]): Promise<T[]> => {
const result = await executeQuery(query, params);
return result.rows?._array ?? ([] as T[]);
};
const getOptional = async <T>(query: string, params?: any[]): Promise<T | null> => {
const results = await getAll<T>(query, params);
return results.length > 0 ? results[0] : null;
};
const get = async <T>(query: string, params?: any[]): Promise<T> => {
const result = await getOptional<T>(query, params);
if (!result) {
throw new Error(`No results for query: ${query}`);
}
return result;
};
const executeRaw = async (query: string, params?: any[]): Promise<any[][]> => {
// This is a workaround, we don't support multiple columns of the same name
const results = await execute(query, params);
return results.rows?._array.map((row) => Object.values(row)) ?? [];
};
const executeBatch = async (query: string, params: any[][] = []): Promise<QueryResult> => {
const platform = Capacitor.getPlatform();
let result = await db.executeSet(
params.map((param) => ({
statement: query,
values: mapSQLiteParameterValues({
platform,
values: param
})
}))
);
return {
rowsAffected: result.changes?.changes ?? 0,
insertId: result.changes?.lastId
};
};
return {
getAll,
getOptional,
get,
executeRaw,
execute,
executeBatch
};
}
readLock<T>(fn: (tx: LockContext) => Promise<T>, options?: DBLockOptions): Promise<T> {
return this.readMutex.runExclusive(async () => {
await this.initializedPromise;
return fn(this.generateLockContext(this.readConnection));
}, timeoutSignal(options?.timeoutMs));
}
writeLock<T>(fn: (tx: LockContext) => Promise<T>, options?: DBLockOptions): Promise<T> {
return this.writeMutex.runExclusive(async () => {
await this.initializedPromise;
const result = await fn(this.generateLockContext(this.writeConnection));
// Fetch table updates
const updates = await this.writeConnection.query("SELECT powersync_update_hooks('get') AS table_name");
const jsonUpdates = updates.values?.[0];
if (!jsonUpdates || !jsonUpdates.table_name) {
throw new Error('Could not fetch table updates');
}
const notification: BatchedUpdateNotification = {
rawUpdates: [],
tables: JSON.parse(jsonUpdates.table_name),
groupedUpdates: {}
};
this.iterateListeners((l) => l.tablesUpdated?.(notification));
return result;
}, timeoutSignal(options?.timeoutMs));
}
refreshSchema(): Promise<void> {
return this.writeLock(async (writeTx) => {
return this.readLock(async (readTx) => {
const updateQuery = `PRAGMA table_info('sqlite_master')`;
await writeTx.get(updateQuery);
await readTx.get(updateQuery);
});
});
}
}
/**
* An implementation of {@link DBAdapter} using the Capacitor Community SQLite [plugin](https://github.com/capacitor-community/sqlite).
*
* @experimental
* @alpha This is currently experimental and may change without a major version bump.
*/
export class CapacitorSQLiteAdapter extends DBAdapterDefaultMixin(CapacitorConnectionPool) {}