-
Notifications
You must be signed in to change notification settings - Fork 11
Expand file tree
/
Copy pathsync_local_performance_test.dart
More file actions
399 lines (354 loc) · 13.4 KB
/
sync_local_performance_test.dart
File metadata and controls
399 lines (354 loc) · 13.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
@Tags(['slow'])
library;
import 'dart:convert';
import 'package:sqlite3/common.dart';
import 'package:sqlite3/sqlite3.dart';
import 'package:test/test.dart';
import 'utils/native_test_utils.dart';
import 'utils/tracking_vfs.dart';
import './schema_test.dart' show schema;
// These test how many filesystem reads and writes are performed during sync_local.
// The real world performane of filesystem operations depend a lot on the specific system.
// For example, on native desktop systems, the performance of temporary filesystem storage could
// be close to memory performance. However, on web and mobile, (temporary) filesystem operations
// could drastically slow down performance. So rather than only testing the real time for these
// queries, we count the number of filesystem operations.
void testFilesystemOperations(
{bool unique = true,
int count = 200000,
int alreadyApplied = 10000,
int buckets = 10,
bool rawQueries = false}) {
late TrackingFileSystem vfs;
late CommonDatabase db;
final skip = rawQueries == false ? 'For manual query testing only' : null;
setUpAll(() {
loadExtension();
});
setUp(() async {
// Needs an unique name per test file to avoid concurrency issues
vfs =
TrackingFileSystem(parent: InMemoryFileSystem(), name: 'perf-test-vfs');
sqlite3.registerVirtualFileSystem(vfs, makeDefault: false);
db = openTestDatabase(vfs: vfs, fileName: 'test.db');
});
tearDown(() {
sqlite3.unregisterVirtualFileSystem(vfs);
});
setUp(() {
// Optional: set a custom cache size - it affects the number of filesystem operations.
// db.execute('PRAGMA cache_size=-50000');
db.execute('SELECT powersync_replace_schema(?)', [json.encode(schema)]);
// Generate dummy data
// We can replace this with actual similated download operations later
db.execute('''
BEGIN TRANSACTION;
WITH RECURSIVE generate_rows(n) AS (
SELECT 1
UNION ALL
SELECT n + 1 FROM generate_rows WHERE n < $count
)
INSERT INTO ps_oplog (bucket, op_id, row_type, row_id, key, data, hash)
SELECT
(n % $buckets), -- Generate n different buckets
n,
'assets',
${unique ? 'uuid()' : "'duplicated_id'"},
uuid(),
'{"description": "' || n || '", "make": "test", "model": "this is just filler data. this is just filler data. this is just filler data. this is just filler data. this is just filler data. this is just filler data. this is just filler data. "}',
(n * 17) % 1000000000 -- Some pseudo-random hash
FROM generate_rows;
WITH RECURSIVE generate_bucket_rows(n) AS (
SELECT 1
UNION ALL
SELECT n + 1 FROM generate_bucket_rows WHERE n < $buckets
)
INSERT INTO ps_buckets (id, name, last_applied_op)
SELECT
(n % $buckets),
'bucket' || n,
$alreadyApplied -- simulate a percentage of operations previously applied
FROM generate_bucket_rows;
INSERT OR IGNORE INTO ps_updated_rows (bucket, row_type, row_id)
SELECT b.bucket, b.row_type, b.row_id FROM ps_buckets AS buckets
CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id
AND (b.op_id > buckets.last_applied_op);
COMMIT;
''');
// Enable this to see stats for initial data generation
// print('init stats: ${vfs.stats()}');
vfs.clearStats();
});
test('sync_local (full)', () {
var timer = Stopwatch()..start();
db.select('insert into powersync_operations(op, data) values(?, ?)',
['sync_local', '']);
print('${timer.elapsed.inMilliseconds}ms ${vfs.stats()}');
// These are fairly generous limits, to catch significant regressions only.
expect(vfs.tempWrites, lessThan(count / 50));
expect(timer.elapsed,
lessThan(Duration(milliseconds: 100 + (count / 50).round())));
});
test('sync_local (partial)', () {
var timer = Stopwatch()..start();
db.select('insert into powersync_operations(op, data) values(?, ?)', [
'sync_local',
jsonEncode({
'buckets': ['bucket0', 'bucket3', 'bucket4', 'bucket5', 'bucket6'],
'priority': 2
})
]);
print('${timer.elapsed.inMilliseconds}ms ${vfs.stats()}');
expect(vfs.tempWrites, lessThan(count / 50));
expect(timer.elapsed,
lessThan(Duration(milliseconds: 100 + (count / 50).round())));
});
// The tests below are for comparing different queries, not run as part of the
// standard test suite.
test('sync_local new new query', () {
var timer = Stopwatch()..start();
final q = '''
-- 1. Filter by the ops added but not applied yet
WITH updated_rows AS (
SELECT row_type, row_id FROM ps_updated_rows
)
-- 2. Find *all* current ops over different buckets for those objects (oplog r).
SELECT
b.row_type,
b.row_id,
(
-- 3. For each unique row, select the data from the latest oplog entry.
-- The max(r.op_id) clause is used to select the latest oplog entry.
-- The iif is to avoid the max(r.op_id) column ending up in the results.
SELECT iif(max(r.op_id), r.data, null)
FROM ps_oplog r
WHERE r.row_type = b.row_type
AND r.row_id = b.row_id
) as data
FROM updated_rows b
-- Group for (2)
GROUP BY b.row_type, b.row_id;
''';
db.select(q);
print('${timer.elapsed.inMilliseconds}ms ${vfs.stats()}');
}, skip: skip);
test('sync_local new old query', () {
// Same as "new query", but ignoring the data in ps_updated_rows since it's unfair to that test.
var timer = Stopwatch()..start();
final q = '''
-- 1. Filter oplog by the ops added but not applied yet (oplog b).
-- We do not do any DISTINCT operation here, since that introduces a temp b-tree.
-- We filter out duplicates using the GROUP BY below.
WITH updated_rows AS (
SELECT b.row_type, b.row_id FROM ps_buckets AS buckets
CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id
AND (b.op_id > buckets.last_applied_op)
)
-- 2. Find *all* current ops over different buckets for those objects (oplog r).
SELECT
b.row_type,
b.row_id,
(
-- 3. For each unique row, select the data from the latest oplog entry.
-- The max(r.op_id) clause is used to select the latest oplog entry.
-- The iif is to avoid the max(r.op_id) column ending up in the results.
SELECT iif(max(r.op_id), r.data, null)
FROM ps_oplog r
WHERE r.row_type = b.row_type
AND r.row_id = b.row_id
) as data
FROM updated_rows b
-- Group for (2)
GROUP BY b.row_type, b.row_id;
''';
db.select(q);
print('${timer.elapsed.inMilliseconds}ms ${vfs.stats()}');
}, skip: skip);
test('sync_local new query', () {
// This is the query we're using now.
// This query only uses a single TEMP B-TREE for the GROUP BY operation,
// leading to fairly efficient execution.
// QUERY PLAN
// |--CO-ROUTINE updated_rows
// | `--COMPOUND QUERY
// | |--LEFT-MOST SUBQUERY
// | | |--SCAN buckets
// | | `--SEARCH b USING INDEX ps_oplog_opid (bucket=? AND op_id>?)
// | `--UNION ALL
// | `--SCAN ps_updated_rows
// |--SCAN b
// |--USE TEMP B-TREE FOR GROUP BY
// `--CORRELATED SCALAR SUBQUERY 3
// `--SEARCH r USING INDEX ps_oplog_row (row_type=? AND row_id=?)
//
// For details on the max(r.op_id) clause, see:
// https://sqlite.org/lang_select.html#bare_columns_in_an_aggregate_query
// > If there is exactly one min() or max() aggregate in the query, then all bare columns in the result
// > set take values from an input row which also contains the minimum or maximum.
var timer = Stopwatch()..start();
final q = '''
-- 1. Filter oplog by the ops added but not applied yet (oplog b).
-- We do not do any DISTINCT operation here, since that introduces a temp b-tree.
-- We filter out duplicates using the GROUP BY below.
WITH updated_rows AS (
SELECT b.row_type, b.row_id FROM ps_buckets AS buckets
CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id
AND (b.op_id > buckets.last_applied_op)
UNION ALL SELECT row_type, row_id FROM ps_updated_rows
)
-- 2. Find *all* current ops over different buckets for those objects (oplog r).
SELECT
b.row_type,
b.row_id,
(
-- 3. For each unique row, select the data from the latest oplog entry.
-- The max(r.op_id) clause is used to select the latest oplog entry.
-- The iif is to avoid the max(r.op_id) column ending up in the results.
SELECT iif(max(r.op_id), r.data, null)
FROM ps_oplog r
WHERE r.row_type = b.row_type
AND r.row_id = b.row_id
) as data
FROM updated_rows b
-- Group for (2)
GROUP BY b.row_type, b.row_id;
''';
db.select(q);
print('${timer.elapsed.inMilliseconds}ms ${vfs.stats()}');
}, skip: skip);
test('old query', () {
// This query used a TEMP B-TREE for the first part of finding unique updated rows,
// then another TEMP B-TREE for the second GROUP BY. This redundant B-TREE causes
// a lot of temporary storage overhead.
// QUERY PLAN
// |--CO-ROUTINE updated_rows
// | `--COMPOUND QUERY
// | |--LEFT-MOST SUBQUERY
// | | |--SCAN buckets
// | | `--SEARCH b USING INDEX ps_oplog_opid (bucket=? AND op_id>?)
// | `--UNION USING TEMP B-TREE
// | `--SCAN ps_updated_rows
// |--SCAN b
// |--SEARCH r USING INDEX ps_oplog_row (row_type=? AND row_id=?) LEFT-JOIN
// `--USE TEMP B-TREE FOR GROUP BY
var timer = Stopwatch()..start();
final q = '''
WITH updated_rows AS (
SELECT DISTINCT b.row_type, b.row_id FROM ps_buckets AS buckets
CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id
AND (b.op_id > buckets.last_applied_op)
UNION SELECT row_type, row_id FROM ps_updated_rows
)
SELECT b.row_type as type,
b.row_id as id,
r.data as data,
count(r.bucket) as buckets,
max(r.op_id) as op_id
FROM updated_rows b
LEFT OUTER JOIN ps_oplog AS r
ON r.row_type = b.row_type
AND r.row_id = b.row_id
GROUP BY b.row_type, b.row_id;
''';
db.select(q);
print('${timer.elapsed.inMilliseconds}ms ${vfs.stats()}');
}, skip: skip);
test('group_by query', () {
// This is similar to the new query, but uses a GROUP BY .. LIMIT 1 clause instead of the max(op_id) hack.
// It is similar in the number of filesystem operations, but slightly slower in real time.
// QUERY PLAN
// |--CO-ROUTINE updated_rows
// | `--COMPOUND QUERY
// | |--LEFT-MOST SUBQUERY
// | | |--SCAN buckets
// | | `--SEARCH b USING INDEX ps_oplog_opid (bucket=? AND op_id>?)
// | `--UNION ALL
// | `--SCAN ps_updated_rows
// |--SCAN b
// |--USE TEMP B-TREE FOR GROUP BY
// `--CORRELATED SCALAR SUBQUERY 3
// |--SEARCH r USING INDEX ps_oplog_row (row_type=? AND row_id=?)
// `--USE TEMP B-TREE FOR ORDER BY
var timer = Stopwatch()..start();
final q = '''
WITH updated_rows AS (
SELECT b.row_type, b.row_id FROM ps_buckets AS buckets
CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id
AND (b.op_id > buckets.last_applied_op)
UNION ALL SELECT row_type, row_id FROM ps_updated_rows
)
SELECT
b.row_type,
b.row_id,
(
SELECT r.data FROM ps_oplog r
WHERE r.row_type = b.row_type
AND r.row_id = b.row_id
ORDER BY r.op_id DESC
LIMIT 1
) as data
FROM updated_rows b
GROUP BY b.row_type, b.row_id;
''';
db.select(q);
print('${timer.elapsed.inMilliseconds}ms ${vfs.stats()}');
}, skip: skip);
test('full scan query', () {
// This is a nice alternative for initial sync or resyncing large amounts of data.
// This is very efficient for reading all data, but not for incremental updates.
// QUERY PLAN
// |--SCAN r USING INDEX ps_oplog_row
// |--CORRELATED SCALAR SUBQUERY 1
// | `--SEARCH ps_buckets USING INTEGER PRIMARY KEY (rowid=?)
// `--CORRELATED SCALAR SUBQUERY 1
// `--SEARCH ps_buckets USING INTEGER PRIMARY KEY (rowid=?)
var timer = Stopwatch()..start();
final q = '''
SELECT r.row_type as type,
r.row_id as id,
r.data as data,
max(r.op_id) as op_id,
sum((select 1 from ps_buckets where ps_buckets.id = r.bucket and r.op_id > ps_buckets.last_applied_op)) as buckets
FROM ps_oplog r
GROUP BY r.row_type, r.row_id
HAVING buckets > 0;
''';
db.select(q);
print('${timer.elapsed.inMilliseconds}ms ${vfs.stats()}');
}, skip: skip);
}
main() {
group('test filesystem operations with unique ids', () {
testFilesystemOperations(
unique: true,
count: 500000,
alreadyApplied: 10000,
buckets: 10,
rawQueries: false);
});
group('test filesytem operations with duplicate ids', () {
// If this takes more than a couple of milliseconds to complete, there is a performance bug
testFilesystemOperations(
unique: false,
count: 500000,
alreadyApplied: 1000,
buckets: 10,
rawQueries: false);
});
group('test filesystem operations with a small number of changes', () {
testFilesystemOperations(
unique: true,
count: 100000,
alreadyApplied: 95000,
buckets: 10,
rawQueries: false);
});
group('test filesystem operations with a large number of buckets', () {
testFilesystemOperations(
unique: true,
count: 100000,
alreadyApplied: 10000,
buckets: 1000,
rawQueries: false);
});
}