Skip to content

Commit 30a29b6

Browse files
committed
Infer sync_local statements for raw tables
1 parent 31d90dd commit 30a29b6

File tree

6 files changed

+232
-54
lines changed

6 files changed

+232
-54
lines changed

crates/core/src/schema/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ mod table_info;
77
use alloc::{rc::Rc, vec::Vec};
88
pub use common::{ColumnFilter, SchemaTable};
99
use powersync_sqlite_nostd::{self as sqlite, Connection, Context, Value, args};
10+
pub use raw_table::InferredSchemaCache;
1011
use serde::Deserialize;
1112
use sqlite::ResultCode;
1213
pub use table_info::{

crates/core/src/schema/raw_table.rs

Lines changed: 166 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,27 @@
1-
use core::fmt::{self, Formatter, Write, from_fn};
1+
use core::{
2+
cell::RefCell,
3+
fmt::{self, Formatter, Write, from_fn},
4+
};
25

36
use alloc::{
7+
collections::btree_map::{BTreeMap, Entry},
48
format,
9+
rc::Rc,
510
string::{String, ToString},
611
vec,
712
vec::Vec,
813
};
9-
use powersync_sqlite_nostd::{Connection, Destructor, ResultCode};
14+
use powersync_sqlite_nostd::{self as sqlite, Connection, Destructor, ResultCode};
1015

1116
use crate::{
1217
error::PowerSyncError,
13-
schema::{ColumnFilter, RawTable, SchemaTable},
18+
schema::{ColumnFilter, PendingStatement, PendingStatementValue, RawTable, SchemaTable},
1419
utils::{InsertIntoCrud, SqlBuffer, WriteType},
1520
views::table_columns_to_json_object,
1621
};
1722

1823
pub struct InferredTableStructure {
24+
pub name: String,
1925
pub columns: Vec<String>,
2026
}
2127

@@ -24,7 +30,7 @@ impl InferredTableStructure {
2430
table_name: &str,
2531
db: impl Connection,
2632
ignored_local_columns: &ColumnFilter,
27-
) -> Result<Option<Self>, PowerSyncError> {
33+
) -> Result<Self, PowerSyncError> {
2834
let stmt = db.prepare_v2("select name from pragma_table_info(?)")?;
2935
stmt.bind_text(1, table_name, Destructor::STATIC)?;
3036

@@ -41,17 +47,168 @@ impl InferredTableStructure {
4147
}
4248

4349
if !has_id_column && columns.is_empty() {
44-
Ok(None)
50+
Err(PowerSyncError::argument_error(format!(
51+
"Could not find {table_name} in local schema."
52+
)))
4553
} else if !has_id_column {
4654
Err(PowerSyncError::argument_error(format!(
4755
"Table {table_name} has no id column."
4856
)))
4957
} else {
50-
Ok(Some(Self { columns }))
58+
Ok(Self {
59+
name: table_name.to_string(),
60+
columns,
61+
})
62+
}
63+
}
64+
65+
/// Generates a statement of the form `INSERT OR REPLACE INTO $tbl ($cols) VALUES (?, ...)` for
66+
/// the sync client.
67+
pub fn infer_put_stmt(&self) -> PendingStatement {
68+
let mut buffer = SqlBuffer::new();
69+
let mut params = vec![];
70+
71+
buffer.push_str("INSERT OR REPLACE INTO ");
72+
let _ = buffer.identifier().write_str(&self.name);
73+
buffer.push_str(" (id");
74+
for column in &self.columns {
75+
buffer.comma();
76+
buffer.push_str(column);
77+
}
78+
buffer.push_str(") VALUES (?");
79+
params.push(PendingStatementValue::Id);
80+
for column in &self.columns {
81+
buffer.comma();
82+
buffer.push_str("?");
83+
params.push(PendingStatementValue::Column(column.clone()));
84+
}
85+
buffer.push_str(")");
86+
87+
PendingStatement {
88+
sql: buffer.sql,
89+
params,
90+
named_parameters_index: None,
91+
}
92+
}
93+
94+
/// Generates a statement of the form `DELETE FROM $tbl WHERE id = ?` for the sync client.
95+
pub fn infer_delete_stmt(&self) -> PendingStatement {
96+
let mut buffer = SqlBuffer::new();
97+
buffer.push_str("DELETE FROM ");
98+
let _ = buffer.identifier().write_str(&self.name);
99+
buffer.push_str(" WHERE id = ?");
100+
101+
PendingStatement {
102+
sql: buffer.sql,
103+
params: vec![PendingStatementValue::Id],
104+
named_parameters_index: None,
51105
}
52106
}
53107
}
54108

109+
#[derive(Default)]
110+
pub struct InferredSchemaCache {
111+
entries: RefCell<BTreeMap<String, SchemaCacheEntry>>,
112+
}
113+
114+
impl InferredSchemaCache {
115+
pub fn current_schema_version(db: *mut sqlite::sqlite3) -> Result<usize, PowerSyncError> {
116+
let version = db.prepare_v2("PRAGMA schema_version")?;
117+
version.step()?;
118+
let version = version.column_int64(0) as usize;
119+
Ok(version)
120+
}
121+
122+
pub fn infer_put_statement(
123+
&self,
124+
db: *mut sqlite::sqlite3,
125+
schema_version: usize,
126+
tbl: &RawTable,
127+
) -> Result<Rc<PendingStatement>, PowerSyncError> {
128+
let mut entries = self.entries.borrow_mut();
129+
let mut entry = entries.entry(tbl.name.clone());
130+
let entry = match entry {
131+
Entry::Vacant(entry) => entry.insert(SchemaCacheEntry::infer(db, schema_version, tbl)?),
132+
Entry::Occupied(ref mut entry) => {
133+
let value = entry.get_mut();
134+
if value.schema_version != schema_version {
135+
// Values are outdated, refresh.
136+
*value = SchemaCacheEntry::infer(db, schema_version, tbl)?;
137+
}
138+
139+
value
140+
}
141+
};
142+
143+
Ok(entry.put())
144+
}
145+
146+
pub fn infer_delete_statement(
147+
&self,
148+
db: *mut sqlite::sqlite3,
149+
schema_version: usize,
150+
tbl: &RawTable,
151+
) -> Result<Rc<PendingStatement>, PowerSyncError> {
152+
let mut entries = self.entries.borrow_mut();
153+
let mut entry = entries.entry(tbl.name.clone());
154+
let entry = match entry {
155+
Entry::Vacant(entry) => entry.insert(SchemaCacheEntry::infer(db, schema_version, tbl)?),
156+
Entry::Occupied(ref mut entry) => {
157+
let value = entry.get_mut();
158+
if value.schema_version != schema_version {
159+
// Values are outdated, refresh.
160+
*value = SchemaCacheEntry::infer(db, schema_version, tbl)?;
161+
}
162+
163+
value
164+
}
165+
};
166+
167+
Ok(entry.delete())
168+
}
169+
}
170+
171+
pub struct SchemaCacheEntry {
172+
schema_version: usize,
173+
structure: InferredTableStructure,
174+
put_stmt: Option<Rc<PendingStatement>>,
175+
delete_stmt: Option<Rc<PendingStatement>>,
176+
}
177+
178+
impl SchemaCacheEntry {
179+
fn infer(
180+
db: *mut sqlite::sqlite3,
181+
schema_version: usize,
182+
table: &RawTable,
183+
) -> Result<Self, PowerSyncError> {
184+
let local_table_name = table.require_table_name()?;
185+
let structure = InferredTableStructure::read_from_database(
186+
local_table_name,
187+
db,
188+
&table.schema.local_only_columns,
189+
)?;
190+
191+
Ok(Self {
192+
schema_version,
193+
structure,
194+
put_stmt: None,
195+
delete_stmt: None,
196+
})
197+
}
198+
199+
fn put(&mut self) -> Rc<PendingStatement> {
200+
self.put_stmt
201+
.get_or_insert_with(|| Rc::new(self.structure.infer_put_stmt()))
202+
.clone()
203+
}
204+
205+
fn delete(&mut self) -> Rc<PendingStatement> {
206+
self.delete_stmt
207+
.get_or_insert_with(|| Rc::new(self.structure.infer_delete_stmt()))
208+
.clone()
209+
}
210+
}
211+
55212
/// Generates a `CREATE TRIGGER` statement to capture writes on raw tables and to forward them to
56213
/// ps-crud.
57214
pub fn generate_raw_table_trigger(
@@ -60,19 +217,10 @@ pub fn generate_raw_table_trigger(
60217
trigger_name: &str,
61218
write: WriteType,
62219
) -> Result<String, PowerSyncError> {
63-
let Some(local_table_name) = table.schema.table_name.as_ref() else {
64-
return Err(PowerSyncError::argument_error("Table has no local name"));
65-
};
66-
220+
let local_table_name = table.require_table_name()?;
67221
let local_only_columns = &table.schema.local_only_columns;
68-
let Some(resolved_table) =
69-
InferredTableStructure::read_from_database(local_table_name, db, local_only_columns)?
70-
else {
71-
return Err(PowerSyncError::argument_error(format!(
72-
"Could not find {} in local schema",
73-
local_table_name
74-
)));
75-
};
222+
let resolved_table =
223+
InferredTableStructure::read_from_database(local_table_name, db, local_only_columns)?;
76224

77225
let as_schema_table = SchemaTable::Raw {
78226
definition: table,

crates/core/src/schema/table_info.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1+
use alloc::rc::Rc;
12
use alloc::string::ToString;
23
use alloc::vec;
34
use alloc::{collections::btree_set::BTreeSet, format, string::String, vec::Vec};
45
use serde::{Deserialize, de::Visitor};
56

7+
use crate::error::PowerSyncError;
68
use crate::schema::ColumnFilter;
79

810
#[derive(Deserialize)]
@@ -52,8 +54,8 @@ pub struct RawTable {
5254
pub name: String,
5355
#[serde(flatten, default)]
5456
pub schema: RawTableSchema,
55-
pub put: PendingStatement,
56-
pub delete: PendingStatement,
57+
pub put: Option<Rc<PendingStatement>>,
58+
pub delete: Option<Rc<PendingStatement>>,
5759
#[serde(default)]
5860
pub clear: Option<String>,
5961
}
@@ -78,6 +80,18 @@ impl Table {
7880
}
7981
}
8082

83+
impl RawTable {
84+
pub fn require_table_name(&self) -> Result<&str, PowerSyncError> {
85+
let Some(local_table_name) = self.schema.table_name.as_ref() else {
86+
return Err(PowerSyncError::argument_error(format!(
87+
"Raw table {} has no local name",
88+
self.name,
89+
)));
90+
};
91+
Ok(local_table_name)
92+
}
93+
}
94+
8195
#[derive(Deserialize)]
8296
pub struct Column {
8397
pub name: String,

crates/core/src/state.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@ use alloc::{
1111
use powersync_sqlite_nostd::{self as sqlite, Context};
1212
use sqlite::{Connection, ResultCode};
1313

14-
use crate::{schema::Schema, sync::SyncClient};
14+
use crate::{
15+
schema::{InferredSchemaCache, Schema},
16+
sync::SyncClient,
17+
};
1518

1619
/// State that is shared for a SQLite database connection after the core extension has been
1720
/// registered on it.
@@ -25,6 +28,9 @@ pub struct DatabaseState {
2528
pending_updates: RefCell<BTreeSet<String>>,
2629
commited_updates: RefCell<BTreeSet<String>>,
2730
pub sync_client: RefCell<Option<SyncClient>>,
31+
/// Cached put and delete statements for raw tables, used by the `sync_local` step of the sync
32+
/// client.
33+
pub inferred_schema_cache: InferredSchemaCache,
2834
}
2935

3036
impl DatabaseState {

0 commit comments

Comments
 (0)