Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 99 additions & 45 deletions crates/core/src/crud_vtab.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,20 @@ struct ActiveCrudTransaction {
}

enum CrudTransactionMode {
Manual {
stmt: ManagedStmt,
},
Simple {
stmt: ManagedStmt,
set_updated_rows: ManagedStmt,
update_local_bucket: ManagedStmt,
},
Manual(ManualCrudTransactionMode),
Simple(SimpleCrudTransactionMode),
}

#[derive(Default)]
struct ManualCrudTransactionMode {
stmt: Option<ManagedStmt>,
}

#[derive(Default)]
struct SimpleCrudTransactionMode {
stmt: Option<ManagedStmt>,
set_updated_rows: Option<ManagedStmt>,
had_writes: bool,
}

impl VirtualTable {
Expand All @@ -73,31 +79,29 @@ impl VirtualTable {
}
}

fn handle_insert(&self, args: &[*mut sqlite::value]) -> Result<(), SQLiteError> {
fn handle_insert(&mut self, args: &[*mut sqlite::value]) -> Result<(), SQLiteError> {
let current_tx = self
.current_tx
.as_ref()
.as_mut()
.ok_or_else(|| SQLiteError(ResultCode::MISUSE, Some(String::from("No tx_id"))))?;
let db = self.db;

match &current_tx.mode {
CrudTransactionMode::Manual { stmt } => {
match &mut current_tx.mode {
CrudTransactionMode::Manual(manual) => {
// Columns are (data TEXT, options INT HIDDEN)
let data = args[0].text();
let flags = match args[1].value_type() {
sqlite_nostd::ColumnType::Null => TableInfoFlags::default(),
_ => TableInfoFlags(args[1].int() as u32),
};

let stmt = manual.raw_crud_statement(db)?;
stmt.bind_int64(1, current_tx.tx_id)?;
stmt.bind_text(2, data, sqlite::Destructor::STATIC)?;
stmt.bind_int(3, flags.0 as i32)?;
stmt.exec()?;
}
CrudTransactionMode::Simple {
stmt,
set_updated_rows,
update_local_bucket,
} => {
CrudTransactionMode::Simple(simple) => {
// Columns are (op TEXT, id TEXT, type TEXT, data TEXT, old_values TEXT, metadata TEXT, options INT HIDDEN)
let flags = match args[6].value_type() {
sqlite_nostd::ColumnType::Null => TableInfoFlags::default(),
Expand Down Expand Up @@ -133,6 +137,7 @@ impl VirtualTable {

// First, we insert into ps_crud like the manual vtab would too. We have to create
// the JSON out of the individual components for that.
let stmt = simple.raw_crud_statement(db)?;
stmt.bind_int64(1, current_tx.tx_id)?;

let serialized = serde_json::to_string(&CrudEntry {
Expand All @@ -151,10 +156,11 @@ impl VirtualTable {
stmt.exec()?;

// However, we also set ps_updated_rows and update the $local bucket
let set_updated_rows = simple.set_updated_rows_statement(db)?;
set_updated_rows.bind_text(1, row_type, sqlite::Destructor::STATIC)?;
set_updated_rows.bind_text(2, id, sqlite::Destructor::STATIC)?;
set_updated_rows.exec()?;
update_local_bucket.exec()?;
simple.had_writes = true;
}
}

Expand All @@ -176,41 +182,90 @@ impl VirtualTable {
self.current_tx = Some(ActiveCrudTransaction {
tx_id,
mode: if self.is_simple {
CrudTransactionMode::Simple {
// language=SQLite
stmt: db.prepare_v3("INSERT INTO ps_crud(tx_id, data) VALUES (?, ?)", 0)?,
// language=SQLite
set_updated_rows: db.prepare_v3(
"INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?, ?)",
0,
)?,
update_local_bucket: db.prepare_v3(formatcp!("INSERT OR REPLACE INTO ps_buckets(name, last_op, target_op) VALUES('$local', 0, {MAX_OP_ID})"), 0)?,
}
CrudTransactionMode::Simple(Default::default())
} else {
const SQL: &str = formatcp!(
"\
CrudTransactionMode::Manual(Default::default())
},
});

Ok(())
}

fn end_transaction(&mut self) -> Result<(), SQLiteError> {
let tx = self.current_tx.take();
if let Some(tx) = tx {
let needs_local_bucket_update = match tx.mode {
CrudTransactionMode::Manual { .. } => {
// In manual mode, users need to update the $local bucket themselves.
false
}
CrudTransactionMode::Simple(simple) => simple.had_writes,
};

if needs_local_bucket_update {
self.db.exec_safe(formatcp!("INSERT OR REPLACE INTO ps_buckets(name, last_op, target_op) VALUES('$local', 0, {MAX_OP_ID})"))?;
Comment thread
simolus3 marked this conversation as resolved.
Outdated
}
}

Ok(())
}

fn clear_transaction_state(&mut self) {
self.current_tx = None;
}
}

impl ManualCrudTransactionMode {
fn raw_crud_statement(&mut self, db: *mut sqlite::sqlite3) -> Result<&ManagedStmt, ResultCode> {
prepare_lazy(&mut self.stmt, || {
const SQL: &str = formatcp!(
"\
WITH insertion (tx_id, data) AS (VALUES (?1, ?2))
INSERT INTO ps_crud(tx_id, data)
SELECT * FROM insertion WHERE (NOT (?3 & {})) OR data->>'op' != 'PATCH' OR data->'data' != '{{}}';
",
TableInfoFlags::IGNORE_EMPTY_UPDATE
);
TableInfoFlags::IGNORE_EMPTY_UPDATE
);

let insert_statement = db.prepare_v3(SQL, 0)?;
CrudTransactionMode::Manual {
stmt: insert_statement,
}
},
});
db.prepare_v3(SQL, 0)
})
}
}

Ok(())
impl SimpleCrudTransactionMode {
fn raw_crud_statement(&mut self, db: *mut sqlite::sqlite3) -> Result<&ManagedStmt, ResultCode> {
prepare_lazy(&mut self.stmt, || {
// language=SQLite
db.prepare_v3("INSERT INTO ps_crud(tx_id, data) VALUES (?, ?)", 0)
})
}

fn end_transaction(&mut self) {
self.current_tx = None;
fn set_updated_rows_statement(
&mut self,
db: *mut sqlite::sqlite3,
) -> Result<&ManagedStmt, ResultCode> {
prepare_lazy(&mut self.set_updated_rows, || {
// language=SQLite
db.prepare_v3(
"INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?, ?)",
0,
)
})
}
}

/// A variant of `Option.get_or_insert` that handles insertions returning errors.
fn prepare_lazy(
stmt: &mut Option<ManagedStmt>,
prepare: impl FnOnce() -> Result<ManagedStmt, ResultCode>,
) -> Result<&ManagedStmt, ResultCode> {
if let None = stmt {
*stmt = Some(prepare()?);
}

return Ok(unsafe { stmt.as_ref().unwrap_unchecked() });
}

extern "C" fn connect(
db: *mut sqlite::sqlite3,
_aux: *mut c_void,
Expand Down Expand Up @@ -269,13 +324,12 @@ extern "C" fn begin(vtab: *mut sqlite::vtab) -> c_int {

extern "C" fn commit(vtab: *mut sqlite::vtab) -> c_int {
let tab = unsafe { &mut *(vtab.cast::<VirtualTable>()) };
tab.end_transaction();
ResultCode::OK as c_int
vtab_result(vtab, tab.end_transaction())
}

extern "C" fn rollback(vtab: *mut sqlite::vtab) -> c_int {
let tab = unsafe { &mut *(vtab.cast::<VirtualTable>()) };
tab.end_transaction();
tab.clear_transaction_state();
// ps_tx will be rolled back automatically
ResultCode::OK as c_int
}
Expand All @@ -295,7 +349,7 @@ extern "C" fn update(
ResultCode::MISUSE as c_int
} else if rowid.value_type() == sqlite::ColumnType::Null {
// INSERT
let tab = unsafe { &*(vtab.cast::<VirtualTable>()) };
let tab = unsafe { &mut *(vtab.cast::<VirtualTable>()) };
let result = tab.handle_insert(&args[2..]);
vtab_result(vtab, result)
} else {
Expand Down
45 changes: 45 additions & 0 deletions dart/test/crud_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,51 @@ void main() {
'{"op":"PUT","id":"foo","type":"users","data":{"my":"value"},"old":{"previous":"value"}}',
});
});

test('resets state after commit', () {
db.execute('BEGIN');
db.execute(
'INSERT INTO powersync_crud (op, id, type) VALUES (?, ?, ?)', [
'DELETE',
'foo',
'users',
]);
db.execute('commit');

db.execute(
'INSERT INTO powersync_crud (op, id, type) VALUES (?, ?, ?)', [
'DELETE',
'foo',
'users',
]);
expect(db.select('SELECT * FROM ps_crud').map((r) => r['tx_id']),
[1, 2]);
});

test('resets state after rollback', () {
db.execute('BEGIN');
db.execute(
'INSERT INTO powersync_crud (op, id, type) VALUES (?, ?, ?)', [
'DELETE',
'foo',
'users',
]);
db.execute('rollback');

db.execute(
'INSERT INTO powersync_crud (op, id, type) VALUES (?, ?, ?)', [
'DELETE',
'foo2',
'users',
]);
expect(db.select('SELECT * FROM ps_crud'), [
{
'id': 1,
'data': '{"op":"DELETE","id":"foo2","type":"users"}',
'tx_id': 1,
}
]);
});
});
});

Expand Down