Skip to content

Commit 48d47fa

Browse files
enforce schema integrity primitives
1 parent 08301e8 commit 48d47fa

9 files changed

Lines changed: 291 additions & 24 deletions

File tree

crates/contextdb-core/src/table_meta.rs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ pub struct TableMeta {
99
pub state_machine: Option<StateMachineConstraint>,
1010
#[serde(default)]
1111
pub dag_edge_types: Vec<String>,
12+
#[serde(default)]
13+
pub unique_constraints: Vec<Vec<String>>,
1214
pub natural_key_column: Option<String>,
1315
#[serde(default)]
1416
pub propagation_rules: Vec<PropagationRule>,
@@ -61,9 +63,17 @@ pub struct ColumnDef {
6163
#[serde(default)]
6264
pub default: Option<String>,
6365
#[serde(default)]
66+
pub references: Option<ForeignKeyReference>,
67+
#[serde(default)]
6468
pub expires: bool,
6569
}
6670

71+
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
72+
pub struct ForeignKeyReference {
73+
pub table: String,
74+
pub column: String,
75+
}
76+
6777
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
6878
pub enum ColumnType {
6979
Integer,
@@ -89,6 +99,15 @@ impl TableMeta {
8999
let dag_bytes = self.dag_edge_types.iter().fold(0usize, |acc, edge_type| {
90100
acc.saturating_add(32 + edge_type.len() * 16)
91101
});
102+
let unique_constraint_bytes =
103+
self.unique_constraints.iter().fold(0usize, |acc, columns| {
104+
acc.saturating_add(
105+
24 + columns
106+
.iter()
107+
.map(|column| 16 + column.len() * 16)
108+
.sum::<usize>(),
109+
)
110+
});
92111
let natural_key_bytes = self
93112
.natural_key_column
94113
.as_ref()
@@ -106,6 +125,7 @@ impl TableMeta {
106125
16 + columns_bytes
107126
+ state_machine_bytes
108127
+ dag_bytes
128+
+ unique_constraint_bytes
109129
+ natural_key_bytes
110130
+ propagation_bytes
111131
+ expires_bytes
@@ -160,7 +180,16 @@ impl ColumnDef {
160180
.as_ref()
161181
.map(|value| 32 + value.len() * 16)
162182
.unwrap_or(0);
163-
8 + self.name.len() * 16 + self.column_type.estimated_bytes() + default_bytes + 8
183+
let reference_bytes = self
184+
.references
185+
.as_ref()
186+
.map(|reference| 32 + reference.table.len() * 16 + reference.column.len() * 16)
187+
.unwrap_or(0);
188+
8 + self.name.len() * 16
189+
+ self.column_type.estimated_bytes()
190+
+ default_bytes
191+
+ reference_bytes
192+
+ 8
164193
}
165194
}
166195

crates/contextdb-engine/src/database.rs

Lines changed: 153 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -446,6 +446,13 @@ impl Database {
446446
fn commit_with_source(&self, tx: TxId, source: CommitSource) -> Result<()> {
447447
let mut ws = self.tx_mgr.cloned_write_set(tx)?;
448448

449+
if !ws.is_empty()
450+
&& let Err(err) = self.validate_foreign_keys_in_tx(tx)
451+
{
452+
let _ = self.rollback(tx);
453+
return Err(err);
454+
}
455+
449456
if !ws.is_empty()
450457
&& let Err(err) = self.plugin.pre_commit(&ws, source)
451458
{
@@ -662,6 +669,12 @@ impl Database {
662669
.default
663670
.as_ref()
664671
.map(crate::executor::stored_default_expr),
672+
references: col.references.as_ref().map(|reference| {
673+
contextdb_core::ForeignKeyReference {
674+
table: reference.table.clone(),
675+
column: reference.column.clone(),
676+
}
677+
}),
665678
expires: col.expires,
666679
});
667680
if col.expires {
@@ -751,6 +764,7 @@ impl Database {
751764
table: &str,
752765
values: HashMap<ColName, Value>,
753766
) -> Result<RowId> {
767+
self.validate_row_constraints(tx, table, &values, None)?;
754768
self.relational.insert(tx, table, values)
755769
}
756770

@@ -761,6 +775,17 @@ impl Database {
761775
conflict_col: &str,
762776
values: HashMap<ColName, Value>,
763777
) -> Result<UpsertResult> {
778+
let snapshot = self.snapshot();
779+
let existing_row_id = values
780+
.get(conflict_col)
781+
.map(|conflict_value| {
782+
self.point_lookup_in_tx(tx, table, conflict_col, conflict_value, snapshot)
783+
.map(|row| row.map(|row| row.row_id))
784+
})
785+
.transpose()?
786+
.flatten();
787+
self.validate_row_constraints(tx, table, &values, existing_row_id)?;
788+
764789
let row_uuid = values.get("id").and_then(Value::as_uuid).copied();
765790
let meta = self.table_meta(table);
766791
let new_state = meta
@@ -772,7 +797,7 @@ impl Database {
772797

773798
let result = self
774799
.relational
775-
.upsert(tx, table, conflict_col, values, self.snapshot())?;
800+
.upsert(tx, table, conflict_col, values, snapshot)?;
776801

777802
if let (Some(uuid), Some(state), Some(_meta)) =
778803
(row_uuid, new_state.as_deref(), meta.as_ref())
@@ -784,6 +809,116 @@ impl Database {
784809
Ok(result)
785810
}
786811

812+
fn validate_row_constraints(
813+
&self,
814+
tx: TxId,
815+
table: &str,
816+
values: &HashMap<ColName, Value>,
817+
skip_row_id: Option<RowId>,
818+
) -> Result<()> {
819+
let meta = self
820+
.table_meta(table)
821+
.ok_or_else(|| Error::TableNotFound(table.to_string()))?;
822+
let snapshot = self.snapshot();
823+
824+
let visible_rows =
825+
self.relational
826+
.scan_filter_with_tx(Some(tx), table, snapshot, &|row| {
827+
skip_row_id.is_none_or(|row_id| row.row_id != row_id)
828+
})?;
829+
830+
for column in meta
831+
.columns
832+
.iter()
833+
.filter(|column| column.unique && !column.primary_key)
834+
{
835+
let Some(value) = values.get(&column.name) else {
836+
continue;
837+
};
838+
if *value == Value::Null {
839+
continue;
840+
}
841+
if visible_rows
842+
.iter()
843+
.any(|existing| existing.values.get(&column.name) == Some(value))
844+
{
845+
return Err(Error::UniqueViolation {
846+
table: table.to_string(),
847+
column: column.name.clone(),
848+
});
849+
}
850+
}
851+
852+
for unique_constraint in &meta.unique_constraints {
853+
let mut candidate_values = Vec::with_capacity(unique_constraint.len());
854+
let mut has_null = false;
855+
856+
for column_name in unique_constraint {
857+
match values.get(column_name) {
858+
Some(Value::Null) | None => {
859+
has_null = true;
860+
break;
861+
}
862+
Some(value) => candidate_values.push(value),
863+
}
864+
}
865+
866+
if has_null {
867+
continue;
868+
}
869+
870+
if visible_rows.iter().any(|existing| {
871+
unique_constraint
872+
.iter()
873+
.zip(candidate_values.iter())
874+
.all(|(column_name, value)| existing.values.get(column_name) == Some(*value))
875+
}) {
876+
return Err(Error::UniqueViolation {
877+
table: table.to_string(),
878+
column: unique_constraint.join(","),
879+
});
880+
}
881+
}
882+
883+
Ok(())
884+
}
885+
886+
fn validate_foreign_keys_in_tx(&self, tx: TxId) -> Result<()> {
887+
let snapshot = self.snapshot();
888+
let relational_inserts = self
889+
.tx_mgr
890+
.with_write_set(tx, |ws| ws.relational_inserts.clone())?;
891+
892+
for (table, row) in relational_inserts {
893+
let meta = self
894+
.table_meta(&table)
895+
.ok_or_else(|| Error::TableNotFound(table.clone()))?;
896+
for column in &meta.columns {
897+
let Some(reference) = &column.references else {
898+
continue;
899+
};
900+
let Some(value) = row.values.get(&column.name) else {
901+
continue;
902+
};
903+
if *value == Value::Null {
904+
continue;
905+
}
906+
if self
907+
.point_lookup_in_tx(tx, &reference.table, &reference.column, value, snapshot)?
908+
.is_none()
909+
{
910+
return Err(Error::ForeignKeyViolation {
911+
table: table.clone(),
912+
column: column.name.clone(),
913+
ref_table: reference.table.clone(),
914+
});
915+
}
916+
}
917+
}
918+
919+
Ok(())
920+
}
921+
787922
pub(crate) fn propagate_state_change_if_needed(
788923
&self,
789924
tx: TxId,
@@ -3561,11 +3696,19 @@ fn sql_type_for_meta_column(col: &contextdb_core::ColumnDef, rules: &[Propagatio
35613696
})
35623697
.collect::<Vec<_>>();
35633698

3564-
if let Some((referenced_table, referenced_column, ..)) = fk_rules.first() {
3699+
if let Some(reference) = &col.references {
3700+
ty.push_str(&format!(
3701+
" REFERENCES {}({})",
3702+
reference.table, reference.column
3703+
));
3704+
} else if let Some((referenced_table, referenced_column, ..)) = fk_rules.first() {
35653705
ty.push_str(&format!(
35663706
" REFERENCES {}({})",
35673707
referenced_table, referenced_column
35683708
));
3709+
}
3710+
3711+
if col.references.is_some() || !fk_rules.is_empty() {
35693712
for (_, _, trigger_state, target_state, max_depth, abort_on_failure) in fk_rules {
35703713
ty.push_str(&format!(
35713714
" ON STATE {} PROPAGATE SET {}",
@@ -3630,6 +3773,10 @@ fn create_table_constraints_from_ast(ct: &CreateTable) -> Vec<String> {
36303773
constraints.push(clause);
36313774
}
36323775

3776+
for unique_constraint in &ct.unique_constraints {
3777+
constraints.push(format!("UNIQUE ({})", unique_constraint.join(", ")));
3778+
}
3779+
36333780
for rule in &ct.propagation_rules {
36343781
match rule {
36353782
contextdb_parser::ast::AstPropagationRule::EdgeState {
@@ -3700,6 +3847,10 @@ fn create_table_constraints_from_meta(meta: &TableMeta) -> Vec<String> {
37003847
constraints.push(clause);
37013848
}
37023849

3850+
for unique_constraint in &meta.unique_constraints {
3851+
constraints.push(format!("UNIQUE ({})", unique_constraint.join(", ")));
3852+
}
3853+
37033854
for rule in &meta.propagation_rules {
37043855
match rule {
37053856
PropagationRule::Edge {

crates/contextdb-engine/src/executor.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,12 @@ pub(crate) fn execute_plan(
3636
primary_key: c.primary_key,
3737
unique: c.unique,
3838
default: c.default.as_ref().map(stored_default_expr),
39+
references: c.references.as_ref().map(|reference| {
40+
contextdb_core::ForeignKeyReference {
41+
table: reference.table.clone(),
42+
column: reference.column.clone(),
43+
}
44+
}),
3945
expires: c.expires,
4046
})
4147
.collect(),
@@ -49,6 +55,7 @@ pub(crate) fn execute_plan(
4955
.collect(),
5056
}),
5157
dag_edge_types: p.dag_edge_types.clone(),
58+
unique_constraints: p.unique_constraints.clone(),
5259
natural_key_column: None,
5360
propagation_rules: p.propagation_rules.clone(),
5461
default_ttl_seconds: p.retain.as_ref().map(|r| r.duration_seconds),
@@ -97,6 +104,12 @@ pub(crate) fn execute_plan(
97104
primary_key: col.primary_key,
98105
unique: col.unique,
99106
default: col.default.as_ref().map(stored_default_expr),
107+
references: col.references.as_ref().map(|reference| {
108+
contextdb_core::ForeignKeyReference {
109+
table: reference.table.clone(),
110+
column: reference.column.clone(),
111+
}
112+
}),
100113
expires: col.expires,
101114
};
102115
store

crates/contextdb-parser/src/ast.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,7 @@ pub struct OnConflict {
251251
pub struct CreateTable {
252252
pub name: String,
253253
pub columns: Vec<ColumnDef>,
254+
pub unique_constraints: Vec<Vec<String>>,
254255
pub if_not_exists: bool,
255256
pub immutable: bool,
256257
pub state_machine: Option<StateMachineDef>,

crates/contextdb-parser/src/grammar.pest

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,12 +119,14 @@ create_table_stmt = {
119119
~ if_not_exists?
120120
~ identifier
121121
~ "("
122-
~ column_def
123-
~ ("," ~ column_def)*
122+
~ table_element
123+
~ ("," ~ table_element)*
124124
~ ")"
125125
~ table_option*
126126
}
127127
if_not_exists = { ^"IF" ~ ^"NOT" ~ ^"EXISTS" }
128+
table_element = { column_def | unique_table_constraint }
129+
unique_table_constraint = { ^"UNIQUE" ~ "(" ~ identifier ~ ("," ~ identifier)* ~ ")" }
128130

129131
alter_table_stmt = { ^"ALTER" ~ ^"TABLE" ~ identifier ~ alter_action }
130132
alter_action = { add_column_action | drop_column_action | rename_column_action | set_retain_action | drop_retain_action | set_table_conflict_policy | drop_table_conflict_policy }

0 commit comments

Comments
 (0)