Skip to content

Commit 428ef83

Browse files
authored
Add powersync_trigger_resync (#166)
1 parent c293b18 commit 428ef83

File tree

3 files changed

+131
-4
lines changed

3 files changed

+131
-4
lines changed

crates/core/src/sync/streaming_sync.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,11 @@ impl SyncClient {
107107
SyncControlRequest::StopSyncStream => self.state.tear_down(),
108108
}
109109
}
110+
111+
/// Whether a sync iteration is currently active on the connection.
112+
pub fn has_sync_iteration(&self) -> bool {
113+
matches!(self.state, ClientState::IterationActive(_))
114+
}
110115
}
111116

112117
enum ClientState {

crates/core/src/view_admin.rs

Lines changed: 61 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use powersync_sqlite_nostd as sqlite;
1010
use powersync_sqlite_nostd::{Connection, Context};
1111
use sqlite::{ResultCode, Value};
1212

13-
use crate::error::PowerSyncError;
13+
use crate::error::{PSResult, PowerSyncError};
1414
use crate::migrations::{LATEST_VERSION, powersync_migrate};
1515
use crate::schema::inspection::ExistingView;
1616
use crate::state::DatabaseState;
@@ -75,7 +75,7 @@ fn powersync_clear_impl(
7575
// speed up the next sync.
7676
local_db.exec_safe("DELETE FROM ps_oplog; DELETE FROM ps_buckets")?;
7777
} else {
78-
local_db.exec_safe("UPDATE ps_buckets SET last_applied_op = 0")?;
78+
trigger_resync(local_db, state)?;
7979
local_db.exec_safe("DELETE FROM ps_buckets WHERE name = '$local'")?;
8080
}
8181

@@ -86,10 +86,10 @@ DELETE FROM ps_crud;
8686
DELETE FROM ps_untyped;
8787
DELETE FROM ps_updated_rows;
8888
DELETE FROM ps_kv WHERE key != 'client_id';
89-
DELETE FROM ps_sync_state;
9089
DELETE FROM ps_stream_subscriptions;
9190
",
9291
)?;
92+
clear_has_synced(local_db)?;
9393

9494
let table_glob = if flags.clear_local() {
9595
"ps_data_*"
@@ -138,6 +138,52 @@ DELETE FROM {table};",
138138
Ok(String::from(""))
139139
}
140140

141+
fn trigger_resync(db: *mut sqlite::sqlite3, state: &DatabaseState) -> Result<(), PowerSyncError> {
142+
{
143+
let client = state.sync_client.borrow();
144+
if let Some(client) = client.as_ref()
145+
&& client.has_sync_iteration()
146+
{
147+
return Err(PowerSyncError::argument_error(
148+
"Cannot clear or trigger resync while a sync iteration is active.",
149+
));
150+
}
151+
}
152+
153+
db.exec_safe("UPDATE ps_buckets SET last_applied_op = 0 WHERE name != '$local'")
154+
.into_db_result(db)?;
155+
Ok(Default::default())
156+
}
157+
158+
fn clear_has_synced(db: *mut sqlite::sqlite3) -> Result<(), PowerSyncError> {
159+
db.exec_safe("DELETE FROM ps_sync_state;")?;
160+
db.exec_safe("UPDATE ps_stream_subscriptions SET last_synced_at = NULL")?;
161+
Ok(())
162+
}
163+
164+
fn powersync_trigger_resync_impl(
165+
ctx: *mut sqlite::context,
166+
args: &[*mut sqlite::value],
167+
) -> Result<String, PowerSyncError> {
168+
let local_db = ctx.db_handle();
169+
let state = unsafe { DatabaseState::from_context(&ctx) };
170+
trigger_resync(local_db, state)?;
171+
172+
let clear_progress = args[0].int() != 0;
173+
if clear_progress {
174+
clear_has_synced(local_db)?;
175+
}
176+
177+
Ok(Default::default())
178+
}
179+
180+
create_auto_tx_function!(powersync_trigger_resync_tx, powersync_trigger_resync_impl);
181+
create_sqlite_text_fn!(
182+
powersync_trigger_resync,
183+
powersync_trigger_resync_tx,
184+
"powersync_trigger_resync"
185+
);
186+
141187
#[derive(Clone, Copy)]
142188
struct PowerSyncClearFlags(i32);
143189

@@ -199,12 +245,23 @@ pub fn register(db: *mut sqlite::sqlite3, state: Rc<DatabaseState>) -> Result<()
199245
"powersync_clear",
200246
1,
201247
sqlite::UTF8,
202-
Some(Rc::into_raw(state) as *mut c_void),
248+
Some(Rc::into_raw(state.clone()) as *mut c_void),
203249
Some(powersync_clear),
204250
None,
205251
None,
206252
Some(DatabaseState::destroy_rc),
207253
)?;
208254

255+
db.create_function_v2(
256+
"powersync_trigger_resync",
257+
1,
258+
sqlite::UTF8,
259+
Some(Rc::into_raw(state) as *mut c_void),
260+
Some(powersync_trigger_resync),
261+
None,
262+
None,
263+
Some(DatabaseState::destroy_rc),
264+
)?;
265+
209266
Ok(())
210267
}

dart/test/sync_test.dart

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,7 @@ void _syncTests<T>({
400400
db.execute(
401401
'insert into items (id, col) values (uuid(), ?)', ['local item']);
402402
expect(db.select('SELECT * FROM items'), hasLength(2));
403+
invokeControl('stop', null);
403404

404405
// Soft clear
405406
db.execute('SELECT powersync_clear(2)');
@@ -427,6 +428,70 @@ void _syncTests<T>({
427428
expect(db.select('SELECT * FROM items'), hasLength(1));
428429
});
429430

431+
group('trigger resync', () {
432+
test('forbidden during sync', () {
433+
invokeControl('start', null);
434+
435+
expect(
436+
() => db.select('SELECT powersync_trigger_resync(1)'),
437+
throwsA(
438+
isSqliteException(
439+
3091,
440+
contains(
441+
'Cannot clear or trigger resync while a sync iteration is active.'),
442+
),
443+
),
444+
);
445+
});
446+
447+
test('re-applies data', () {
448+
invokeControl('start', null);
449+
pushCheckpoint(buckets: [bucketDescription('a', count: 1)]);
450+
pushSyncData('a', '1', 'row-0', 'PUT', {'col': 'hi'});
451+
pushCheckpointComplete();
452+
invokeControl('stop', null);
453+
454+
db.execute('delete from ps_data__items');
455+
db.execute('select powersync_trigger_resync(0)');
456+
457+
final instructions = invokeControl('start', null);
458+
expect(
459+
instructions,
460+
contains(
461+
containsPair(
462+
'EstablishSyncStream',
463+
containsPair(
464+
'request',
465+
containsPair('buckets', [
466+
{'name': 'a', 'after': '1'}
467+
]),
468+
),
469+
),
470+
),
471+
);
472+
473+
pushCheckpoint(buckets: [bucketDescription('a', count: 1)]);
474+
pushCheckpointComplete();
475+
476+
expect(db.select('select * from items'), [
477+
{'id': 'row-0', 'col': 'hi'}
478+
]);
479+
});
480+
481+
test('can clear has synced', () {
482+
invokeControl('start', null);
483+
pushCheckpoint(buckets: [bucketDescription('a', count: 1)]);
484+
pushSyncData('a', '1', 'row-0', 'PUT', {'col': 'hi'});
485+
pushCheckpointComplete();
486+
invokeControl('stop', null);
487+
488+
db.execute('select powersync_trigger_resync(1)');
489+
final [row] = db.select('select powersync_offline_sync_status()');
490+
expect(json.decode(row.columnAt(0)),
491+
containsPair('priority_status', isEmpty));
492+
});
493+
});
494+
430495
test('persists download progress', () {
431496
const bucket = 'bkt';
432497
void expectProgress(int atLast, int sinceLast) {

0 commit comments

Comments
 (0)