Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/core/src/crud_vtab.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ impl SimpleCrudTransactionMode {
prepare_lazy(&mut self.set_updated_rows, || {
// language=SQLite
db.prepare_v3(
"INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?, ?)",
"INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id, bucket) VALUES(?, ?, 0)",
0,
)
})
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/fix_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ pub fn apply_v035_fix(db: *mut sqlite::sqlite3) -> Result<i64, PowerSyncError> {
// language=SQLite
let statement = db.prepare_v2(&format!(
"
INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id)
SELECT ?1, id FROM {}
INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id, bucket)
SELECT ?1, id, 0 FROM {}
WHERE NOT EXISTS (
SELECT 1 FROM ps_oplog
WHERE row_type = ?1 AND row_id = {}.id
Expand Down
26 changes: 25 additions & 1 deletion crates/core/src/migrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::fix_data::apply_v035_fix;
use crate::schema::inspection::ExistingView;
use crate::sync::BucketPriority;

pub const LATEST_VERSION: i32 = 12;
pub const LATEST_VERSION: i32 = 13;

pub fn powersync_migrate(
ctx: *mut sqlite::context,
Expand Down Expand Up @@ -424,5 +424,29 @@ json_object('sql', 'DELETE FROM ps_migration WHERE id >= 12')
local_db.exec_safe(stmt).into_db_result(local_db)?;
}

if current_version < 13 && target_version >= 13 {
let stmt = "\
ALTER TABLE ps_updated_rows RENAME TO ps_updated_rows_old;
CREATE TABLE ps_updated_rows(
row_type TEXT,
row_id TEXT,
bucket INTEGER NOT NULL,
PRIMARY KEY(bucket, row_type, row_id)) STRICT, WITHOUT ROWID;
INSERT INTO ps_updated_rows(row_type, row_id, bucket)
SELECT row_type, row_id, 0 FROM ps_updated_rows_old;
INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id, bucket)
SELECT row_type, row_id, bucket FROM ps_oplog;
DROP TABLE ps_updated_rows_old;
INSERT INTO ps_migration(id, down_migrations) VALUES(13, json_array(
json_object('sql', 'ALTER TABLE ps_updated_rows RENAME TO ps_updated_rows_13'),
json_object('sql', 'CREATE TABLE ps_updated_rows(\n row_type TEXT,\n row_id TEXT,\n PRIMARY KEY(row_type, row_id)) STRICT, WITHOUT ROWID'),
json_object('sql', 'INSERT INTO ps_updated_rows(row_type, row_id)\n SELECT row_type, row_id FROM ps_updated_rows_13\n GROUP BY row_type, row_id'),
json_object('sql', 'DROP TABLE ps_updated_rows_13'),
json_object('sql', 'DELETE FROM ps_migration WHERE id >= 13')
));
";
local_db.exec_safe(stmt).into_db_result(local_db)?;
}

Ok(())
}
4 changes: 2 additions & 2 deletions crates/core/src/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ pub fn delete_bucket(db: *mut sqlite::sqlite3, name: &str) -> Result<(), ResultC
// language=SQLite
let updated_statement = db.prepare_v2(
"\
INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id)
SELECT row_type, row_id
INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id, bucket)
SELECT row_type, row_id, 0
FROM ps_oplog
WHERE bucket = ?1",
)?;
Expand Down
13 changes: 10 additions & 3 deletions crates/core/src/sync/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ INSERT INTO ps_oplog(bucket, op_id, key, row_type, row_id, data, hash) VALUES (?

let updated_row_statement = db.prepare_v2(
"\
INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)",
INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id, bucket) VALUES(?1, ?2, ?3)",
)?;
updated_row_statement.bind_int64(3, bucket_id)?;

let mut last_op: Option<i64> = None;
let mut add_checksum = Checksum::zero();
Expand Down Expand Up @@ -150,6 +151,12 @@ INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)",
insert_statement.bind_int(7, checksum.bitcast_i32())?;
insert_statement.exec()?;

if let (Some(object_type), Some(object_id)) = (object_type, object_id) {
updated_row_statement.bind_text(1, object_type, sqlite::Destructor::STATIC)?;
updated_row_statement.bind_text(2, object_id, sqlite::Destructor::STATIC)?;
updated_row_statement.exec()?;
}

op_checksum += checksum;
} else if op == OpType::MOVE {
add_checksum += checksum;
Expand All @@ -158,8 +165,8 @@ INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)",
// language=SQLite
let clear_statement1 = db
.prepare_v2(
"INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id)
SELECT row_type, row_id
"INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id, bucket)
SELECT row_type, row_id, ?1
FROM ps_oplog
WHERE bucket = ?1",
)
Expand Down
30 changes: 20 additions & 10 deletions crates/core/src/sync_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,10 +299,7 @@ impl<'a> SyncOperation<'a> {
.prepare_v2(
"\
WITH updated_rows AS (
SELECT b.row_type, b.row_id FROM ps_buckets AS buckets
CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id
AND (b.op_id > buckets.last_applied_op)
UNION ALL SELECT row_type, row_id FROM ps_updated_rows
SELECT row_type, row_id FROM ps_updated_rows
)

SELECT
Expand All @@ -325,7 +322,7 @@ SELECT
.db
.prepare_v2(
"\
-- 1. Filter oplog by the ops added but not applied yet (oplog b).
-- 1. Filter updated rows by the buckets included in this partial checkpoint.
-- We do not do any DISTINCT operation here, since that introduces a temp b-tree.
-- We filter out duplicates using the GROUP BY below.
WITH
Expand All @@ -334,10 +331,8 @@ WITH
OR name IN (SELECT value FROM json_each(json_extract(?1, '$.buckets')))
),
updated_rows AS (
SELECT b.row_type, b.row_id FROM ps_buckets AS buckets
CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id
AND (b.op_id > buckets.last_applied_op)
WHERE buckets.id IN (SELECT id FROM involved_buckets)
SELECT row_type, row_id FROM ps_updated_rows
WHERE bucket IN (SELECT id FROM involved_buckets)
)

-- 2. Find *all* current ops over different buckets for those objects (oplog r).
Expand Down Expand Up @@ -406,7 +401,22 @@ SELECT
.into_db_result(self.db)?;
BucketPriority::SENTINEL
}
Some(partial) => partial.priority,
Some(partial) => {
let stmt = self
.db
.prepare_v2(
"DELETE FROM ps_updated_rows
WHERE bucket IN (
SELECT id FROM ps_buckets
WHERE ?1 IS NULL
OR name IN (SELECT value FROM json_each(json_extract(?1, '$.buckets')))
)",
)
.into_db_result(self.db)?;
stmt.bind_text(1, partial.args, Destructor::STATIC)?;
stmt.exec()?;
partial.priority
}
}
.into();

Expand Down
70 changes: 70 additions & 0 deletions dart/test/sync_local_performance_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ SELECT

FROM generate_bucket_rows;

INSERT OR IGNORE INTO ps_updated_rows (bucket, row_type, row_id)
SELECT b.bucket, b.row_type, b.row_id FROM ps_buckets AS buckets
CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id
AND (b.op_id > buckets.last_applied_op);

COMMIT;
''');
// Enable this to see stats for initial data generation
Expand Down Expand Up @@ -120,6 +125,71 @@ COMMIT;
// The tests below are for comparing different queries, not run as part of the
// standard test suite.

test('sync_local new new query', () {
var timer = Stopwatch()..start();
final q = '''
-- 1. Filter by the ops added but not applied yet
WITH updated_rows AS (
SELECT row_type, row_id FROM ps_updated_rows
)

-- 2. Find *all* current ops over different buckets for those objects (oplog r).
SELECT
b.row_type,
b.row_id,
(
-- 3. For each unique row, select the data from the latest oplog entry.
-- The max(r.op_id) clause is used to select the latest oplog entry.
-- The iif is to avoid the max(r.op_id) column ending up in the results.
SELECT iif(max(r.op_id), r.data, null)
FROM ps_oplog r
WHERE r.row_type = b.row_type
AND r.row_id = b.row_id

) as data
FROM updated_rows b
-- Group for (2)
GROUP BY b.row_type, b.row_id;
''';
db.select(q);
print('${timer.elapsed.inMilliseconds}ms ${vfs.stats()}');
}, skip: skip);

test('sync_local new old query', () {
// Same as "new query", but ignoring the data in ps_updated_rows since it's unfair to that test.
var timer = Stopwatch()..start();
final q = '''
-- 1. Filter oplog by the ops added but not applied yet (oplog b).
-- We do not do any DISTINCT operation here, since that introduces a temp b-tree.
-- We filter out duplicates using the GROUP BY below.
WITH updated_rows AS (
SELECT b.row_type, b.row_id FROM ps_buckets AS buckets
CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id
AND (b.op_id > buckets.last_applied_op)
)

-- 2. Find *all* current ops over different buckets for those objects (oplog r).
SELECT
b.row_type,
b.row_id,
(
-- 3. For each unique row, select the data from the latest oplog entry.
-- The max(r.op_id) clause is used to select the latest oplog entry.
-- The iif is to avoid the max(r.op_id) column ending up in the results.
SELECT iif(max(r.op_id), r.data, null)
FROM ps_oplog r
WHERE r.row_type = b.row_type
AND r.row_id = b.row_id

) as data
FROM updated_rows b
-- Group for (2)
GROUP BY b.row_type, b.row_id;
''';
db.select(q);
print('${timer.elapsed.inMilliseconds}ms ${vfs.stats()}');
}, skip: skip);

test('sync_local new query', () {
// This is the query we're using now.
// This query only uses a single TEMP B-TREE for the GROUP BY operation,
Expand Down
6 changes: 3 additions & 3 deletions dart/test/utils/fix_035_fixtures.dart
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ const dataMigrated = '''
(1, 1, 'todos', 't1', '', '{}', 100),
(1, 2, 'todos', 't2', '', '{}', 20),
(2, 3, 'lists', 'l1', '', '{}', 3)
;INSERT INTO ps_updated_rows(row_type, row_id) VALUES
('lists', 'l3'),
('todos', 't3')
;INSERT INTO ps_updated_rows(row_type, row_id, bucket) VALUES
('lists', 'l3', 0),
('todos', 't3', 0)
;INSERT INTO ps_data__lists(id, data) VALUES
('l1', '{}'),
('l3', '{}')
Expand Down
Loading
Loading