Skip to content

Commit c293b18

Browse files
authored
Infer sync statements for raw tables (#164)
1 parent c6ffee3 commit c293b18

File tree

7 files changed

+348
-54
lines changed

7 files changed

+348
-54
lines changed

crates/core/src/schema/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ mod table_info;
77
use alloc::{rc::Rc, vec::Vec};
88
pub use common::{ColumnFilter, SchemaTable};
99
use powersync_sqlite_nostd::{self as sqlite, Connection, Context, Value, args};
10+
pub use raw_table::InferredSchemaCache;
1011
use serde::Deserialize;
1112
use sqlite::ResultCode;
1213
pub use table_info::{

crates/core/src/schema/raw_table.rs

Lines changed: 207 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,27 @@
1-
use core::fmt::{self, Formatter, Write, from_fn};
1+
use core::{
2+
cell::RefCell,
3+
fmt::{self, Formatter, Write, from_fn},
4+
};
25

36
use alloc::{
7+
collections::btree_map::BTreeMap,
48
format,
9+
rc::Rc,
510
string::{String, ToString},
611
vec,
712
vec::Vec,
813
};
9-
use powersync_sqlite_nostd::{Connection, Destructor, ResultCode};
14+
use powersync_sqlite_nostd::{self as sqlite, Connection, Destructor, ResultCode};
1015

1116
use crate::{
1217
error::PowerSyncError,
13-
schema::{ColumnFilter, RawTable, SchemaTable},
18+
schema::{ColumnFilter, PendingStatement, PendingStatementValue, RawTable, SchemaTable},
1419
utils::{InsertIntoCrud, SqlBuffer, WriteType},
1520
views::table_columns_to_json_object,
1621
};
1722

1823
pub struct InferredTableStructure {
24+
pub name: String,
1925
pub columns: Vec<String>,
2026
}
2127

@@ -24,7 +30,7 @@ impl InferredTableStructure {
2430
table_name: &str,
2531
db: impl Connection,
2632
synced_columns: &Option<ColumnFilter>,
27-
) -> Result<Option<Self>, PowerSyncError> {
33+
) -> Result<Self, PowerSyncError> {
2834
let stmt = db.prepare_v2("select name from pragma_table_info(?)")?;
2935
stmt.bind_text(1, table_name, Destructor::STATIC)?;
3036

@@ -45,17 +51,173 @@ impl InferredTableStructure {
4551
}
4652

4753
if !has_id_column && columns.is_empty() {
48-
Ok(None)
54+
Err(PowerSyncError::argument_error(format!(
55+
"Could not find {table_name} in local schema."
56+
)))
4957
} else if !has_id_column {
5058
Err(PowerSyncError::argument_error(format!(
5159
"Table {table_name} has no id column."
5260
)))
5361
} else {
54-
Ok(Some(Self { columns }))
62+
Ok(Self {
63+
name: table_name.to_string(),
64+
columns,
65+
})
66+
}
67+
}
68+
69+
/// Generates a statement of the form `INSERT INTO $tbl ($cols) VALUES (?, ...) ON CONFLICT (id)
70+
/// DO UPDATE SET ...` for the sync client.
71+
pub fn infer_put_stmt(&self) -> PendingStatement {
72+
let mut buffer = SqlBuffer::new();
73+
let mut params = vec![];
74+
75+
buffer.push_str("INSERT INTO ");
76+
let _ = buffer.identifier().write_str(&self.name);
77+
buffer.push_str(" (id");
78+
for column in &self.columns {
79+
buffer.comma();
80+
let _ = buffer.identifier().write_str(column);
81+
}
82+
buffer.push_str(") VALUES (?1");
83+
params.push(PendingStatementValue::Id);
84+
for (i, column) in self.columns.iter().enumerate() {
85+
buffer.comma();
86+
let _ = write!(&mut buffer, "?{}", i + 2);
87+
params.push(PendingStatementValue::Column(column.clone()));
88+
}
89+
buffer.push_str(") ON CONFLICT (id) DO UPDATE SET ");
90+
let mut do_update = buffer.comma_separated();
91+
// Generated an "x" = ? for all synced columns to update them without affecting local-only
92+
// columns.
93+
for (i, column) in self.columns.iter().enumerate() {
94+
let entry = do_update.element();
95+
let _ = entry.identifier().write_str(column);
96+
let _ = write!(entry, " = ?{}", i + 2);
97+
}
98+
99+
PendingStatement {
100+
sql: buffer.sql,
101+
params,
102+
named_parameters_index: None,
103+
}
104+
}
105+
106+
/// Generates a statement of the form `DELETE FROM $tbl WHERE id = ?` for the sync client.
107+
pub fn infer_delete_stmt(&self) -> PendingStatement {
108+
let mut buffer = SqlBuffer::new();
109+
buffer.push_str("DELETE FROM ");
110+
let _ = buffer.identifier().write_str(&self.name);
111+
buffer.push_str(" WHERE id = ?");
112+
113+
PendingStatement {
114+
sql: buffer.sql,
115+
params: vec![PendingStatementValue::Id],
116+
named_parameters_index: None,
55117
}
56118
}
57119
}
58120

121+
/// A cache of inferred raw table schema and associated put and delete statements for `sync_local`.
122+
///
123+
/// This cache avoids having to re-generate statements on every (partial) checkpoint in the sync
124+
/// client.
125+
#[derive(Default)]
126+
pub struct InferredSchemaCache {
127+
entries: RefCell<BTreeMap<String, SchemaCacheEntry>>,
128+
}
129+
130+
impl InferredSchemaCache {
131+
pub fn current_schema_version(db: *mut sqlite::sqlite3) -> Result<usize, PowerSyncError> {
132+
let version = db.prepare_v2("PRAGMA schema_version")?;
133+
version.step()?;
134+
let version = version.column_int64(0) as usize;
135+
Ok(version)
136+
}
137+
138+
pub fn infer_put_statement(
139+
&self,
140+
db: *mut sqlite::sqlite3,
141+
schema_version: usize,
142+
tbl: &RawTable,
143+
) -> Result<Rc<PendingStatement>, PowerSyncError> {
144+
self.with_entry(db, schema_version, tbl, SchemaCacheEntry::put)
145+
}
146+
147+
pub fn infer_delete_statement(
148+
&self,
149+
db: *mut sqlite::sqlite3,
150+
schema_version: usize,
151+
tbl: &RawTable,
152+
) -> Result<Rc<PendingStatement>, PowerSyncError> {
153+
self.with_entry(db, schema_version, tbl, SchemaCacheEntry::delete)
154+
}
155+
156+
fn with_entry(
157+
&self,
158+
db: *mut sqlite::sqlite3,
159+
schema_version: usize,
160+
tbl: &RawTable,
161+
f: impl FnOnce(&mut SchemaCacheEntry) -> Rc<PendingStatement>,
162+
) -> Result<Rc<PendingStatement>, PowerSyncError> {
163+
let mut entries = self.entries.borrow_mut();
164+
if let Some(value) = entries.get_mut(&tbl.name) {
165+
if value.schema_version != schema_version {
166+
// Values are outdated, refresh.
167+
*value = SchemaCacheEntry::infer(db, schema_version, tbl)?;
168+
}
169+
170+
Ok(f(value))
171+
} else {
172+
let mut entry = SchemaCacheEntry::infer(db, schema_version, tbl)?;
173+
let stmt = f(&mut entry);
174+
entries.insert(tbl.name.clone(), entry);
175+
Ok(stmt)
176+
}
177+
}
178+
}
179+
180+
pub struct SchemaCacheEntry {
181+
schema_version: usize,
182+
structure: InferredTableStructure,
183+
put_stmt: Option<Rc<PendingStatement>>,
184+
delete_stmt: Option<Rc<PendingStatement>>,
185+
}
186+
187+
impl SchemaCacheEntry {
188+
fn infer(
189+
db: *mut sqlite::sqlite3,
190+
schema_version: usize,
191+
table: &RawTable,
192+
) -> Result<Self, PowerSyncError> {
193+
let local_table_name = table.require_table_name()?;
194+
let structure = InferredTableStructure::read_from_database(
195+
local_table_name,
196+
db,
197+
&table.schema.synced_columns,
198+
)?;
199+
200+
Ok(Self {
201+
schema_version,
202+
structure,
203+
put_stmt: None,
204+
delete_stmt: None,
205+
})
206+
}
207+
208+
fn put(&mut self) -> Rc<PendingStatement> {
209+
self.put_stmt
210+
.get_or_insert_with(|| Rc::new(self.structure.infer_put_stmt()))
211+
.clone()
212+
}
213+
214+
fn delete(&mut self) -> Rc<PendingStatement> {
215+
self.delete_stmt
216+
.get_or_insert_with(|| Rc::new(self.structure.infer_delete_stmt()))
217+
.clone()
218+
}
219+
}
220+
59221
/// Generates a `CREATE TRIGGER` statement to capture writes on raw tables and to forward them to
60222
/// ps-crud.
61223
pub fn generate_raw_table_trigger(
@@ -64,19 +226,10 @@ pub fn generate_raw_table_trigger(
64226
trigger_name: &str,
65227
write: WriteType,
66228
) -> Result<String, PowerSyncError> {
67-
let Some(local_table_name) = table.schema.table_name.as_ref() else {
68-
return Err(PowerSyncError::argument_error("Table has no local name"));
69-
};
70-
229+
let local_table_name = table.require_table_name()?;
71230
let synced_columns = &table.schema.synced_columns;
72-
let Some(resolved_table) =
73-
InferredTableStructure::read_from_database(local_table_name, db, synced_columns)?
74-
else {
75-
return Err(PowerSyncError::argument_error(format!(
76-
"Could not find {} in local schema",
77-
local_table_name
78-
)));
79-
};
231+
let resolved_table =
232+
InferredTableStructure::read_from_database(local_table_name, db, synced_columns)?;
80233

81234
let as_schema_table = SchemaTable::Raw {
82235
definition: table,
@@ -167,3 +320,39 @@ pub fn generate_raw_table_trigger(
167320
buffer.trigger_end();
168321
Ok(buffer.sql)
169322
}
323+
324+
#[cfg(test)]
325+
mod test {
326+
use alloc::{string::ToString, vec};
327+
328+
use crate::schema::{PendingStatementValue, raw_table::InferredTableStructure};
329+
330+
#[test]
331+
fn infer_sync_statements() {
332+
let structure = InferredTableStructure {
333+
name: "tbl".to_string(),
334+
columns: vec!["foo".to_string(), "bar".to_string()],
335+
};
336+
337+
let put = structure.infer_put_stmt();
338+
assert_eq!(
339+
put.sql,
340+
r#"INSERT INTO "tbl" (id, "foo", "bar") VALUES (?1, ?2, ?3) ON CONFLICT (id) DO UPDATE SET "foo" = ?2, "bar" = ?3"#
341+
);
342+
assert_eq!(put.params.len(), 3);
343+
assert!(matches!(put.params[0], PendingStatementValue::Id));
344+
assert!(matches!(
345+
put.params[1],
346+
PendingStatementValue::Column(ref name) if name == "foo"
347+
));
348+
assert!(matches!(
349+
put.params[2],
350+
PendingStatementValue::Column(ref name) if name == "bar"
351+
));
352+
353+
let delete = structure.infer_delete_stmt();
354+
assert_eq!(delete.sql, r#"DELETE FROM "tbl" WHERE id = ?"#);
355+
assert_eq!(delete.params.len(), 1);
356+
assert!(matches!(delete.params[0], PendingStatementValue::Id));
357+
}
358+
}

crates/core/src/schema/table_info.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1+
use alloc::rc::Rc;
12
use alloc::string::ToString;
23
use alloc::vec;
34
use alloc::{collections::btree_set::BTreeSet, format, string::String, vec::Vec};
45
use serde::{Deserialize, de::Visitor};
56

7+
use crate::error::PowerSyncError;
68
use crate::schema::ColumnFilter;
79

810
#[derive(Deserialize)]
@@ -52,8 +54,8 @@ pub struct RawTable {
5254
pub name: String,
5355
#[serde(flatten, default)]
5456
pub schema: RawTableSchema,
55-
pub put: PendingStatement,
56-
pub delete: PendingStatement,
57+
pub put: Option<Rc<PendingStatement>>,
58+
pub delete: Option<Rc<PendingStatement>>,
5759
#[serde(default)]
5860
pub clear: Option<String>,
5961
}
@@ -78,6 +80,18 @@ impl Table {
7880
}
7981
}
8082

83+
impl RawTable {
84+
pub fn require_table_name(&self) -> Result<&str, PowerSyncError> {
85+
let Some(local_table_name) = self.schema.table_name.as_ref() else {
86+
return Err(PowerSyncError::argument_error(format!(
87+
"Raw table {} has no local name",
88+
self.name,
89+
)));
90+
};
91+
Ok(local_table_name)
92+
}
93+
}
94+
8195
#[derive(Deserialize)]
8296
pub struct Column {
8397
pub name: String,

crates/core/src/state.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@ use alloc::{
1111
use powersync_sqlite_nostd::{self as sqlite, Context};
1212
use sqlite::{Connection, ResultCode};
1313

14-
use crate::{schema::Schema, sync::SyncClient};
14+
use crate::{
15+
schema::{InferredSchemaCache, Schema},
16+
sync::SyncClient,
17+
};
1518

1619
/// State that is shared for a SQLite database connection after the core extension has been
1720
/// registered on it.
@@ -25,6 +28,9 @@ pub struct DatabaseState {
2528
pending_updates: RefCell<BTreeSet<String>>,
2629
commited_updates: RefCell<BTreeSet<String>>,
2730
pub sync_client: RefCell<Option<SyncClient>>,
31+
/// Cached put and delete statements for raw tables, used by the `sync_local` step of the sync
32+
/// client.
33+
pub inferred_schema_cache: InferredSchemaCache,
2834
}
2935

3036
impl DatabaseState {

0 commit comments

Comments
 (0)