Skip to content

Commit ffab710

Browse files
authored
Raw tables: Support rest column (#158)
1 parent b61f87c commit ffab710

File tree

3 files changed

+203
-14
lines changed

3 files changed

+203
-14
lines changed

crates/core/src/schema/table_info.rs

Lines changed: 65 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
use alloc::{format, string::String, vec, vec::Vec};
1+
use alloc::string::ToString;
2+
use alloc::vec;
3+
use alloc::{collections::btree_set::BTreeSet, format, string::String, vec::Vec};
24
use serde::{Deserialize, de::Visitor};
35

46
#[derive(Deserialize)]
@@ -252,16 +254,77 @@ impl<'de> Deserialize<'de> for TableInfoFlags {
252254
}
253255
}
254256

255-
#[derive(Deserialize)]
256257
pub struct PendingStatement {
257258
pub sql: String,
258259
/// This vec should contain an entry for each parameter in [sql].
259260
pub params: Vec<PendingStatementValue>,
261+
262+
/// Present if this statement has a [PendingStatementValue::Rest] parameter.
263+
pub named_parameters_index: Option<RestColumnIndex>,
264+
}
265+
266+
pub struct RestColumnIndex {
267+
/// All column names referenced by this statement.
268+
pub named_parameters: BTreeSet<String>,
269+
/// Parameter indices that should be bound to a JSON object containing those values from the
270+
/// source row that haven't been referenced by [PendingStatementValue::Column].
271+
pub rest_parameter_positions: Vec<usize>,
272+
}
273+
274+
impl<'de> Deserialize<'de> for PendingStatement {
275+
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
276+
where
277+
D: serde::Deserializer<'de>,
278+
{
279+
#[derive(Deserialize)]
280+
struct PendingStatementSource {
281+
pub sql: String,
282+
/// This vec should contain an entry for each parameter in [sql].
283+
pub params: Vec<PendingStatementValue>,
284+
}
285+
286+
let source = PendingStatementSource::deserialize(deserializer)?;
287+
let mut named_parameters_index = None;
288+
if source
289+
.params
290+
.iter()
291+
.any(|s| matches!(s, PendingStatementValue::Rest))
292+
{
293+
let mut set = BTreeSet::new();
294+
let mut rest_parameter_positions = vec![];
295+
for (i, column) in source.params.iter().enumerate() {
296+
set.insert(match column {
297+
PendingStatementValue::Id => "id".to_string(),
298+
PendingStatementValue::Column(name) => name.clone(),
299+
PendingStatementValue::Rest => {
300+
rest_parameter_positions.push(i);
301+
continue;
302+
}
303+
});
304+
}
305+
306+
named_parameters_index = Some(RestColumnIndex {
307+
named_parameters: set,
308+
rest_parameter_positions,
309+
});
310+
}
311+
312+
return Ok(Self {
313+
sql: source.sql,
314+
params: source.params,
315+
named_parameters_index,
316+
});
317+
}
260318
}
261319

262320
#[derive(Deserialize)]
263321
pub enum PendingStatementValue {
322+
/// Bind to the PowerSync row id of the affected row.
264323
Id,
324+
/// Bind to the value of column in the synced row.
265325
Column(String),
326+
/// Bind to a JSON object containing all columns from the synced row that haven't been matched
327+
/// by other statement values.
328+
Rest,
266329
// TODO: Stuff like a raw object of put data?
267330
}

crates/core/src/sync_local.rs

Lines changed: 77 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ use alloc::collections::btree_map::BTreeMap;
22
use alloc::format;
33
use alloc::string::String;
44
use alloc::vec::Vec;
5-
use serde::Deserialize;
5+
use serde::ser::SerializeMap;
6+
use serde::{Deserialize, Serialize};
67

78
use crate::error::{PSResult, PowerSyncError};
89
use crate::schema::inspection::ExistingTable;
@@ -153,7 +154,14 @@ impl<'a> SyncOperation<'a> {
153154
let stmt = raw.put_statement(self.db)?;
154155
let parsed: serde_json::Value = serde_json::from_str(data)
155156
.map_err(PowerSyncError::json_local_error)?;
156-
stmt.bind_for_put(id, &parsed)?;
157+
let json_object = parsed.as_object().ok_or_else(|| {
158+
PowerSyncError::argument_error(
159+
"expected oplog data to be an object",
160+
)
161+
})?;
162+
163+
let rest = stmt.render_rest_object(json_object)?;
164+
stmt.bind_for_put(id, &json_object, &rest)?;
157165
stmt.exec(self.db, type_name, id, Some(&parsed))?;
158166
}
159167
Err(_) => {
@@ -510,7 +518,7 @@ impl<'a> ParsedSchemaTable<'a> {
510518

511519
struct PreparedPendingStatement<'a> {
512520
stmt: ManagedStmt,
513-
params: &'a [PendingStatementValue],
521+
definition: &'a PendingStatement,
514522
}
515523

516524
impl<'a> PreparedPendingStatement<'a> {
@@ -532,29 +540,72 @@ impl<'a> PreparedPendingStatement<'a> {
532540

533541
Ok(Self {
534542
stmt,
535-
params: &pending.params,
543+
definition: pending,
544+
})
545+
}
546+
547+
pub fn render_rest_object(
548+
&self,
549+
json_data: &serde_json::Map<String, serde_json::Value>,
550+
) -> Result<Option<String>, PowerSyncError> {
551+
use serde_json::Value;
552+
553+
let Some(ref index) = self.definition.named_parameters_index else {
554+
return Ok(None);
555+
};
556+
557+
struct UnmatchedValues<'a>(BTreeMap<&'a String, &'a Value>);
558+
559+
impl<'a> Serialize for UnmatchedValues<'a> {
560+
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
561+
where
562+
S: serde::Serializer,
563+
{
564+
let mut map = serializer.serialize_map(Some(self.0.len()))?;
565+
566+
for (k, v) in &self.0 {
567+
map.serialize_entry(k, v)?;
568+
}
569+
570+
map.end()
571+
}
572+
}
573+
574+
let mut unmatched_values: Option<UnmatchedValues> = None;
575+
for (key, value) in json_data {
576+
if !index.named_parameters.contains(key) {
577+
unmatched_values
578+
.get_or_insert_with(|| UnmatchedValues(BTreeMap::new()))
579+
.0
580+
.insert(key, value);
581+
}
582+
}
583+
584+
Ok(match unmatched_values {
585+
None => None,
586+
Some(unmatched) => {
587+
Some(serde_json::to_string(&unmatched).map_err(|e| PowerSyncError::internal(e))?)
588+
}
536589
})
537590
}
538591

539592
pub fn bind_for_put(
540593
&self,
541594
id: &str,
542-
json_data: &serde_json::Value,
595+
json_data: &serde_json::Map<String, serde_json::Value>,
596+
rest: &Option<String>,
543597
) -> Result<(), PowerSyncError> {
544598
use serde_json::Value;
545-
for (i, source) in self.params.iter().enumerate() {
599+
600+
for (i, source) in self.definition.params.iter().enumerate() {
546601
let i = (i + 1) as i32;
547602

548603
match source {
549604
PendingStatementValue::Id => {
550605
self.stmt.bind_text(i, id, Destructor::STATIC)?;
551606
}
552607
PendingStatementValue::Column(column) => {
553-
let parsed = json_data.as_object().ok_or_else(|| {
554-
PowerSyncError::argument_error("expected oplog data to be an object")
555-
})?;
556-
557-
match parsed.get(column) {
608+
match json_data.get(column) {
558609
Some(Value::Bool(value)) => {
559610
self.stmt.bind_int(i, if *value { 1 } else { 0 })
560611
}
@@ -573,14 +624,28 @@ impl<'a> PreparedPendingStatement<'a> {
573624
_ => self.stmt.bind_null(i),
574625
}?;
575626
}
627+
PendingStatementValue::Rest => {
628+
// These are bound later.
629+
debug_assert!(self.definition.named_parameters_index.is_some());
630+
}
631+
}
632+
}
633+
634+
if let Some(index) = &self.definition.named_parameters_index {
635+
for target in &index.rest_parameter_positions {
636+
let index = (*target + 1) as i32;
637+
match rest {
638+
None => self.stmt.bind_null(index),
639+
Some(value) => self.stmt.bind_text(index, &*value, Destructor::STATIC),
640+
}?;
576641
}
577642
}
578643

579644
Ok(())
580645
}
581646

582647
pub fn bind_for_delete(&self, id: &str) -> Result<(), PowerSyncError> {
583-
for (i, source) in self.params.iter().enumerate() {
648+
for (i, source) in self.definition.params.iter().enumerate() {
584649
if let PendingStatementValue::Id = source {
585650
self.stmt
586651
.bind_text((i + 1) as i32, id, Destructor::STATIC)?;

dart/test/sync_test.dart

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1197,6 +1197,67 @@ CREATE TRIGGER users_delete
11971197
);
11981198
});
11991199

1200+
test('rest column', () {
1201+
db.execute(
1202+
'CREATE TABLE users (id TEXT NOT NULL, name TEXT, _rest TEXT)');
1203+
invokeControl(
1204+
'start',
1205+
json.encode({
1206+
'schema': {
1207+
'tables': [],
1208+
'raw_tables': [
1209+
{
1210+
'name': 'users',
1211+
'put': {
1212+
'sql':
1213+
'INSERT OR REPLACE INTO users (id, name, _rest) VALUES (?, ?, ?);',
1214+
'params': [
1215+
'Id',
1216+
{'Column': 'name'},
1217+
'Rest'
1218+
],
1219+
},
1220+
'delete': {
1221+
'sql': 'DELETE FROM users WHERE id = ?',
1222+
'params': ['Id'],
1223+
},
1224+
'clear': 'DELETE FROM users;',
1225+
}
1226+
]
1227+
}
1228+
}),
1229+
);
1230+
1231+
pushCheckpoint(buckets: [bucketDescription('a')]);
1232+
pushSyncData(
1233+
'a',
1234+
'1',
1235+
'user1',
1236+
'PUT',
1237+
{'name': 'First user'},
1238+
objectType: 'users',
1239+
);
1240+
pushSyncData(
1241+
'a',
1242+
'2',
1243+
'user2',
1244+
'PUT',
1245+
{'name': 'Second user', 'foo': 'bar', 'another': 3},
1246+
objectType: 'users',
1247+
);
1248+
pushCheckpointComplete();
1249+
1250+
final users = db.select('SELECT * FROM users;');
1251+
expect(users, [
1252+
{'id': 'user1', 'name': 'First user', '_rest': null},
1253+
{
1254+
'id': 'user2',
1255+
'name': 'Second user',
1256+
'_rest': json.encode({'another': 3, 'foo': 'bar'})
1257+
},
1258+
]);
1259+
});
1260+
12001261
test('crud vtab', () {
12011262
// This is mostly a test for the triggers, validating the suggestions we
12021263
// give on https://docs.powersync.com/usage/use-case-examples/raw-tables#capture-local-writes-with-triggers

0 commit comments

Comments
 (0)