Skip to content

Commit 00897e1

Browse files
committed
Add powersync_trigger_resync
1 parent c293b18 commit 00897e1

File tree

3 files changed

+106
-3
lines changed

3 files changed

+106
-3
lines changed

crates/core/src/sync/streaming_sync.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,11 @@ impl SyncClient {
107107
SyncControlRequest::StopSyncStream => self.state.tear_down(),
108108
}
109109
}
110+
111+
/// Whether a sync iteration is currently active on the connection.
112+
pub fn has_sync_iteration(&self) -> bool {
113+
matches!(self.state, ClientState::IterationActive(_))
114+
}
110115
}
111116

112117
enum ClientState {

crates/core/src/view_admin.rs

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use powersync_sqlite_nostd as sqlite;
1010
use powersync_sqlite_nostd::{Connection, Context};
1111
use sqlite::{ResultCode, Value};
1212

13-
use crate::error::PowerSyncError;
13+
use crate::error::{PSResult, PowerSyncError};
1414
use crate::migrations::{LATEST_VERSION, powersync_migrate};
1515
use crate::schema::inspection::ExistingView;
1616
use crate::state::DatabaseState;
@@ -75,7 +75,7 @@ fn powersync_clear_impl(
7575
// speed up the next sync.
7676
local_db.exec_safe("DELETE FROM ps_oplog; DELETE FROM ps_buckets")?;
7777
} else {
78-
local_db.exec_safe("UPDATE ps_buckets SET last_applied_op = 0")?;
78+
trigger_resync(local_db, state)?;
7979
local_db.exec_safe("DELETE FROM ps_buckets WHERE name = '$local'")?;
8080
}
8181

@@ -138,6 +138,41 @@ DELETE FROM {table};",
138138
Ok(String::from(""))
139139
}
140140

141+
fn trigger_resync(db: *mut sqlite::sqlite3, state: &DatabaseState) -> Result<(), PowerSyncError> {
142+
{
143+
let client = state.sync_client.borrow();
144+
if let Some(client) = client.as_ref()
145+
&& client.has_sync_iteration()
146+
{
147+
return Err(PowerSyncError::argument_error(
148+
"Cannot clear or trigger resync while a sync iteration is active.",
149+
));
150+
}
151+
}
152+
153+
db.exec_safe("UPDATE ps_buckets SET last_applied_op = 0 WHERE name != '$local'")
154+
.into_db_result(db)?;
155+
Ok(Default::default())
156+
}
157+
158+
fn powersync_trigger_resync_impl(
159+
ctx: *mut sqlite::context,
160+
_args: &[*mut sqlite::value],
161+
) -> Result<String, PowerSyncError> {
162+
let local_db = ctx.db_handle();
163+
let state = unsafe { DatabaseState::from_context(&ctx) };
164+
trigger_resync(local_db, state)?;
165+
166+
Ok(Default::default())
167+
}
168+
169+
create_auto_tx_function!(powersync_trigger_resync_tx, powersync_trigger_resync_impl);
170+
create_sqlite_text_fn!(
171+
powersync_trigger_resync,
172+
powersync_trigger_resync_tx,
173+
"powersync_trigger_resync"
174+
);
175+
141176
#[derive(Clone, Copy)]
142177
struct PowerSyncClearFlags(i32);
143178

@@ -199,12 +234,23 @@ pub fn register(db: *mut sqlite::sqlite3, state: Rc<DatabaseState>) -> Result<()
199234
"powersync_clear",
200235
1,
201236
sqlite::UTF8,
202-
Some(Rc::into_raw(state) as *mut c_void),
237+
Some(Rc::into_raw(state.clone()) as *mut c_void),
203238
Some(powersync_clear),
204239
None,
205240
None,
206241
Some(DatabaseState::destroy_rc),
207242
)?;
208243

244+
db.create_function_v2(
245+
"powersync_trigger_resync",
246+
0,
247+
sqlite::UTF8,
248+
Some(Rc::into_raw(state) as *mut c_void),
249+
Some(powersync_trigger_resync),
250+
None,
251+
None,
252+
Some(DatabaseState::destroy_rc),
253+
)?;
254+
209255
Ok(())
210256
}

dart/test/sync_test.dart

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,7 @@ void _syncTests<T>({
400400
db.execute(
401401
'insert into items (id, col) values (uuid(), ?)', ['local item']);
402402
expect(db.select('SELECT * FROM items'), hasLength(2));
403+
invokeControl('stop', null);
403404

404405
// Soft clear
405406
db.execute('SELECT powersync_clear(2)');
@@ -427,6 +428,57 @@ void _syncTests<T>({
427428
expect(db.select('SELECT * FROM items'), hasLength(1));
428429
});
429430

431+
group('trigger resync', () {
432+
test('forbidden during sync', () {
433+
invokeControl('start', null);
434+
435+
expect(
436+
() => db.select('SELECT powersync_trigger_resync()'),
437+
throwsA(
438+
isSqliteException(
439+
3091,
440+
contains(
441+
'Cannot clear or trigger resync while a sync iteration is active.'),
442+
),
443+
),
444+
);
445+
});
446+
447+
test('re-applies data', () {
448+
invokeControl('start', null);
449+
pushCheckpoint(buckets: [bucketDescription('a', count: 1)]);
450+
pushSyncData('a', '1', 'row-0', 'PUT', {'col': 'hi'});
451+
pushCheckpointComplete();
452+
invokeControl('stop', null);
453+
454+
db.execute('delete from ps_data__items');
455+
db.execute('select powersync_trigger_resync()');
456+
457+
final instructions = invokeControl('start', null);
458+
expect(
459+
instructions,
460+
contains(
461+
containsPair(
462+
'EstablishSyncStream',
463+
containsPair(
464+
'request',
465+
containsPair('buckets', [
466+
{'name': 'a', 'after': '1'}
467+
]),
468+
),
469+
),
470+
),
471+
);
472+
473+
pushCheckpoint(buckets: [bucketDescription('a', count: 1)]);
474+
pushCheckpointComplete();
475+
476+
expect(db.select('select * from items'), [
477+
{'id': 'row-0', 'col': 'hi'}
478+
]);
479+
});
480+
});
481+
430482
test('persists download progress', () {
431483
const bucket = 'bkt';
432484
void expectProgress(int atLast, int sinceLast) {

0 commit comments

Comments
 (0)