Skip to content

Commit 1a68ed6

Browse files
committed
Generate triggers for raw tables
1 parent 4fee3cb commit 1a68ed6

7 files changed

Lines changed: 461 additions & 172 deletions

File tree

crates/core/src/schema/common.rs

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
use core::slice;
2+
3+
use alloc::{string::String, vec::Vec};
4+
5+
use crate::schema::{
6+
Column, CommonTableOptions, RawTable, Table, raw_table::InferredTableStructure,
7+
};
8+
9+
pub enum SchemaTable<'a> {
10+
Json(&'a Table),
11+
Raw {
12+
definition: &'a RawTable,
13+
schema: &'a InferredTableStructure,
14+
},
15+
}
16+
17+
impl<'a> SchemaTable<'a> {
18+
pub fn common_options(&self) -> &CommonTableOptions {
19+
match self {
20+
Self::Json(table) => &table.options,
21+
Self::Raw {
22+
definition,
23+
schema: _,
24+
} => &definition.schema.options,
25+
}
26+
}
27+
28+
pub fn column_names(&self) -> impl Iterator<Item = &'a str> {
29+
match self {
30+
Self::Json(table) => SchemaTableColumnIterator::Json(table.columns.iter()),
31+
Self::Raw {
32+
definition: _,
33+
schema,
34+
} => SchemaTableColumnIterator::Raw(schema.columns.iter()),
35+
}
36+
}
37+
}
38+
39+
impl<'a> From<&'a Table> for SchemaTable<'a> {
40+
fn from(value: &'a Table) -> Self {
41+
Self::Json(value)
42+
}
43+
}
44+
45+
enum SchemaTableColumnIterator<'a> {
46+
Json(slice::Iter<'a, Column>),
47+
Raw(slice::Iter<'a, String>),
48+
}
49+
50+
impl<'a> Iterator for SchemaTableColumnIterator<'a> {
51+
type Item = &'a str;
52+
53+
fn next(&mut self) -> Option<Self::Item> {
54+
Some(match self {
55+
Self::Json(iter) => &iter.next()?.name,
56+
Self::Raw(iter) => iter.next()?.as_ref(),
57+
})
58+
}
59+
}
60+
61+
pub struct ColumnFilter {
62+
sorted_names: Vec<String>,
63+
}
64+
65+
impl From<Vec<String>> for ColumnFilter {
66+
fn from(mut value: Vec<String>) -> Self {
67+
value.sort();
68+
Self {
69+
sorted_names: value,
70+
}
71+
}
72+
}
73+
74+
impl ColumnFilter {
75+
/// Whether this filter matches the given column name.
76+
pub fn matches(&self, column: &str) -> bool {
77+
self.sorted_names
78+
.binary_search_by(|item| item.as_str().cmp(column))
79+
.is_ok()
80+
}
81+
}

crates/core/src/schema/mod.rs

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,23 @@
1+
mod common;
12
pub mod inspection;
23
mod management;
34
mod raw_table;
45
mod table_info;
56

67
use alloc::{rc::Rc, vec::Vec};
7-
use powersync_sqlite_nostd as sqlite;
8+
pub use common::{ColumnFilter, SchemaTable};
9+
use powersync_sqlite_nostd::{self as sqlite, Connection, Context, Value, args};
810
use serde::Deserialize;
911
use sqlite::ResultCode;
1012
pub use table_info::{
11-
Column, DiffIncludeOld, PendingStatement, PendingStatementValue, RawTable, Table,
13+
Column, CommonTableOptions, PendingStatement, PendingStatementValue, RawTable, Table,
1214
TableInfoFlags,
1315
};
1416

15-
use crate::state::DatabaseState;
17+
use crate::{
18+
error::PowerSyncError, schema::raw_table::generate_raw_table_trigger, state::DatabaseState,
19+
utils::WriteType,
20+
};
1621

1722
#[derive(Deserialize, Default)]
1823
pub struct Schema {
@@ -22,5 +27,47 @@ pub struct Schema {
2227
}
2328

2429
pub fn register(db: *mut sqlite::sqlite3, state: Rc<DatabaseState>) -> Result<(), ResultCode> {
25-
management::register(db, state)
30+
management::register(db, state)?;
31+
32+
{
33+
fn create_trigger(
34+
context: *mut sqlite::context,
35+
args: &[*mut sqlite::value],
36+
) -> Result<(), PowerSyncError> {
37+
// Args: Table (JSON), trigger_name, write_type
38+
let table: RawTable =
39+
serde_json::from_str(args[0].text()).map_err(PowerSyncError::as_argument_error)?;
40+
let trigger_name = args[1].text();
41+
let write_type: WriteType = args[2].text().parse()?;
42+
43+
let db = context.db_handle();
44+
let create_trigger_stmt =
45+
generate_raw_table_trigger(db, &table, trigger_name, write_type)?;
46+
db.exec_safe(&create_trigger_stmt)?;
47+
Ok(())
48+
}
49+
50+
extern "C" fn create_raw_trigger_sqlite(
51+
context: *mut sqlite::context,
52+
argc: i32,
53+
args: *mut *mut sqlite::value,
54+
) {
55+
let args = args!(argc, args);
56+
if let Err(e) = create_trigger(context, args) {
57+
e.apply_to_ctx("powersync_create_raw_table_crud_trigger", context);
58+
}
59+
}
60+
61+
db.create_function_v2(
62+
"powersync_create_raw_table_crud_trigger",
63+
3,
64+
sqlite::UTF8,
65+
None,
66+
Some(create_raw_trigger_sqlite),
67+
None,
68+
None,
69+
Some(DatabaseState::destroy_rc),
70+
)?;
71+
}
72+
Ok(())
2673
}

crates/core/src/schema/raw_table.rs

Lines changed: 114 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,21 @@
1+
use core::fmt::from_fn;
2+
13
use alloc::{
4+
format,
25
string::{String, ToString},
36
vec,
47
vec::Vec,
58
};
69
use powersync_sqlite_nostd::{Connection, Destructor, ResultCode};
710

8-
use crate::error::PowerSyncError;
11+
use crate::{
12+
error::PowerSyncError,
13+
schema::{RawTable, SchemaTable},
14+
utils::{InsertIntoCrud, SqlBuffer, WriteType},
15+
views::table_columns_to_json_object,
16+
};
917

1018
pub struct InferredTableStructure {
11-
pub table_name: String,
12-
pub has_id_column: bool,
1319
pub columns: Vec<String>,
1420
}
1521

@@ -35,12 +41,112 @@ impl InferredTableStructure {
3541

3642
if !has_id_column && columns.is_empty() {
3743
Ok(None)
44+
} else if !has_id_column {
45+
Err(PowerSyncError::argument_error(format!(
46+
"Table {table_name} has no id column."
47+
)))
48+
} else {
49+
Ok(Some(Self { columns }))
50+
}
51+
}
52+
}
53+
54+
/// Generates a `CREATE TRIGGER` statement to capture writes on raw tables and to forward them to
55+
/// ps-crud.
56+
pub fn generate_raw_table_trigger(
57+
db: impl Connection,
58+
table: &RawTable,
59+
trigger_name: &str,
60+
write: WriteType,
61+
) -> Result<String, PowerSyncError> {
62+
let Some(local_table_name) = table.schema.table_name.as_ref() else {
63+
return Err(PowerSyncError::argument_error("Table has no local name"));
64+
};
65+
66+
let Some(resolved_table) = InferredTableStructure::read_from_database(local_table_name, db)?
67+
else {
68+
return Err(PowerSyncError::argument_error(format!(
69+
"Could not find {} in local schema",
70+
local_table_name
71+
)));
72+
};
73+
74+
let as_schema_table = SchemaTable::Raw {
75+
definition: table,
76+
schema: &resolved_table,
77+
};
78+
79+
let mut buffer = SqlBuffer::new();
80+
buffer.create_trigger("", trigger_name);
81+
buffer.trigger_after(write, local_table_name);
82+
// Skip the trigger for writes during sync_local, these aren't crud writes.
83+
buffer.push_str("WHEN NOT powersync_in_sync_operation() BEGIN\n");
84+
85+
if table.schema.options.flags.insert_only() {
86+
if write != WriteType::Insert {
87+
// Prevent illegal writes to a table marked as insert-only by raising errors here.
88+
buffer.push_str("SELECT RAISE(FAIL, 'Unexpected update on insert-only table');\n");
3889
} else {
39-
Ok(Some(Self {
40-
table_name: table_name.to_string(),
41-
has_id_column,
42-
columns,
43-
}))
90+
// Write directly to powersync_crud_ to skip writing the $local bucket for insert-only
91+
// tables.
92+
let fragment = table_columns_to_json_object("NEW", &as_schema_table)?;
93+
buffer.powersync_crud_manual_put(&table.name, &fragment);
94+
}
95+
} else {
96+
if write == WriteType::Update {
97+
// Updates must not change the id.
98+
buffer.check_id_not_changed();
4499
}
100+
101+
let json_fragment_new = table_columns_to_json_object("NEW", &as_schema_table)?;
102+
let json_fragment_old = if write == WriteType::Update {
103+
Some(table_columns_to_json_object("OLD", &as_schema_table)?)
104+
} else {
105+
None
106+
};
107+
108+
buffer.insert_into_powersync_crud(InsertIntoCrud {
109+
op: write,
110+
table: &as_schema_table,
111+
id_expr: from_fn(|f| {
112+
if write == WriteType::Delete {
113+
f.write_str("OLD.")
114+
} else {
115+
f.write_str("NEW.")
116+
}?;
117+
f.write_str(".id")
118+
}),
119+
type_name: &table.name,
120+
data: Some(from_fn(|f| {
121+
match write {
122+
WriteType::Insert => {}
123+
WriteType::Update => todo!(),
124+
WriteType::Delete => {
125+
// There is no data for deleted rows, don't emit anything.
126+
}
127+
}
128+
129+
if write == WriteType::Delete {
130+
// There is no data for deleted rows.
131+
return Ok(());
132+
}
133+
134+
write!(f, "json(powersync_diff(")?;
135+
136+
if let Some(ref old) = json_fragment_old {
137+
f.write_str(old)?;
138+
} else {
139+
// We don't have OLD values for inserts, we diff from an empty JSON object
140+
// instead.
141+
f.write_str("'{}'")?;
142+
};
143+
144+
write!(f, ", {json_fragment_new}))")
145+
})),
146+
metadata: None::<&'static str>,
147+
})?;
45148
}
149+
150+
buffer.trigger_end();
151+
Ok(buffer.sql)
46152
}

0 commit comments

Comments
 (0)