Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions crates/core/src/sync/streaming_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ impl SyncClient {
SyncControlRequest::StopSyncStream => self.state.tear_down(),
}
}

/// Whether a sync iteration is currently active on the connection.
pub fn has_sync_iteration(&self) -> bool {
matches!(self.state, ClientState::IterationActive(_))
}
}

enum ClientState {
Expand Down
52 changes: 49 additions & 3 deletions crates/core/src/view_admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use powersync_sqlite_nostd as sqlite;
use powersync_sqlite_nostd::{Connection, Context};
use sqlite::{ResultCode, Value};

use crate::error::PowerSyncError;
use crate::error::{PSResult, PowerSyncError};
use crate::migrations::{LATEST_VERSION, powersync_migrate};
use crate::schema::inspection::ExistingView;
use crate::state::DatabaseState;
Expand Down Expand Up @@ -75,7 +75,7 @@ fn powersync_clear_impl(
// speed up the next sync.
local_db.exec_safe("DELETE FROM ps_oplog; DELETE FROM ps_buckets")?;
} else {
local_db.exec_safe("UPDATE ps_buckets SET last_applied_op = 0")?;
trigger_resync(local_db, state)?;
local_db.exec_safe("DELETE FROM ps_buckets WHERE name = '$local'")?;
}

Expand Down Expand Up @@ -138,6 +138,41 @@ DELETE FROM {table};",
Ok(String::from(""))
}

fn trigger_resync(db: *mut sqlite::sqlite3, state: &DatabaseState) -> Result<(), PowerSyncError> {
{
let client = state.sync_client.borrow();
if let Some(client) = client.as_ref()
&& client.has_sync_iteration()
{
return Err(PowerSyncError::argument_error(
"Cannot clear or trigger resync while a sync iteration is active.",
));
}
}

db.exec_safe("UPDATE ps_buckets SET last_applied_op = 0 WHERE name != '$local'")
.into_db_result(db)?;
Ok(Default::default())
}

fn powersync_trigger_resync_impl(
ctx: *mut sqlite::context,
_args: &[*mut sqlite::value],
) -> Result<String, PowerSyncError> {
let local_db = ctx.db_handle();
let state = unsafe { DatabaseState::from_context(&ctx) };
trigger_resync(local_db, state)?;

Ok(Default::default())
}

create_auto_tx_function!(powersync_trigger_resync_tx, powersync_trigger_resync_impl);
create_sqlite_text_fn!(
powersync_trigger_resync,
powersync_trigger_resync_tx,
"powersync_trigger_resync"
);

#[derive(Clone, Copy)]
struct PowerSyncClearFlags(i32);

Expand Down Expand Up @@ -199,12 +234,23 @@ pub fn register(db: *mut sqlite::sqlite3, state: Rc<DatabaseState>) -> Result<()
"powersync_clear",
1,
sqlite::UTF8,
Some(Rc::into_raw(state) as *mut c_void),
Some(Rc::into_raw(state.clone()) as *mut c_void),
Some(powersync_clear),
None,
None,
Some(DatabaseState::destroy_rc),
)?;

db.create_function_v2(
"powersync_trigger_resync",
0,
sqlite::UTF8,
Some(Rc::into_raw(state) as *mut c_void),
Some(powersync_trigger_resync),
None,
None,
Some(DatabaseState::destroy_rc),
)?;

Ok(())
}
52 changes: 52 additions & 0 deletions dart/test/sync_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ void _syncTests<T>({
db.execute(
'insert into items (id, col) values (uuid(), ?)', ['local item']);
expect(db.select('SELECT * FROM items'), hasLength(2));
invokeControl('stop', null);

// Soft clear
db.execute('SELECT powersync_clear(2)');
Expand Down Expand Up @@ -427,6 +428,57 @@ void _syncTests<T>({
expect(db.select('SELECT * FROM items'), hasLength(1));
});

group('trigger resync', () {
test('forbidden during sync', () {
invokeControl('start', null);

expect(
() => db.select('SELECT powersync_trigger_resync()'),
throwsA(
isSqliteException(
3091,
contains(
'Cannot clear or trigger resync while a sync iteration is active.'),
),
),
);
});

test('re-applies data', () {
invokeControl('start', null);
pushCheckpoint(buckets: [bucketDescription('a', count: 1)]);
pushSyncData('a', '1', 'row-0', 'PUT', {'col': 'hi'});
pushCheckpointComplete();
invokeControl('stop', null);

db.execute('delete from ps_data__items');
db.execute('select powersync_trigger_resync()');

final instructions = invokeControl('start', null);
expect(
instructions,
contains(
containsPair(
'EstablishSyncStream',
containsPair(
'request',
containsPair('buckets', [
{'name': 'a', 'after': '1'}
]),
),
),
),
);

pushCheckpoint(buckets: [bucketDescription('a', count: 1)]);
pushCheckpointComplete();

expect(db.select('select * from items'), [
{'id': 'row-0', 'col': 'hi'}
]);
});
});

test('persists download progress', () {
const bucket = 'bkt';
void expectProgress(int atLast, int sinceLast) {
Expand Down