From 4f00a737b875aebfea8ef1c026b11aee3770c0e9 Mon Sep 17 00:00:00 2001 From: Egor Vorontsov Date: Tue, 24 Jun 2025 00:26:40 +0300 Subject: [PATCH 1/5] Implemented `LEFT JOIN` support. --- crates/execution/src/lib.rs | 11 ++++- crates/execution/src/pipelined.rs | 18 ++++++++ crates/expr/src/check.rs | 59 ++++++++++++++++++-------- crates/expr/src/expr.rs | 28 ++++++++---- crates/expr/src/rls.rs | 36 +++++++++++++--- crates/physical-plan/src/compile.rs | 42 +++++++++++++++++- crates/physical-plan/src/plan.rs | 20 ++++++--- crates/physical-plan/src/rules.rs | 2 + crates/sql-parser/src/ast/mod.rs | 40 ++++++++++++++--- crates/sql-parser/src/parser/errors.rs | 4 +- crates/sql-parser/src/parser/mod.rs | 33 +++++++++++--- crates/vm/src/relation.rs | 1 + 12 files changed, 239 insertions(+), 55 deletions(-) diff --git a/crates/execution/src/lib.rs b/crates/execution/src/lib.rs index d9583cbcef7..8e8ba385d33 100644 --- a/crates/execution/src/lib.rs +++ b/crates/execution/src/lib.rs @@ -102,6 +102,7 @@ pub trait DeltaStore { #[derive(Clone)] pub enum Row<'a> { + Null, Ptr(RowRef<'a>), Ref(&'a ProductValue), } @@ -113,6 +114,7 @@ impl PartialEq for Row<'_> { (Self::Ref(x), Self::Ref(y)) => x == y, (Self::Ptr(x), Self::Ref(y)) => x == *y, (Self::Ref(x), Self::Ptr(y)) => y == *x, + (Self::Null, _) | (_, Self::Null) => false, } } } @@ -122,6 +124,7 @@ impl Eq for Row<'_> {} impl Hash for Row<'_> { fn hash(&self, state: &mut H) { match self { + Self::Null => AlgebraicValue::unit().hash(state), Self::Ptr(x) => x.hash(state), Self::Ref(x) => x.hash(state), } @@ -131,6 +134,7 @@ impl Hash for Row<'_> { impl Row<'_> { pub fn to_product_value(&self) -> ProductValue { match self { + Self::Null => ProductValue { elements: Box::new([]) }, Self::Ptr(ptr) => ptr.to_product_value(), Self::Ref(val) => (*val).clone(), } @@ -145,6 +149,7 @@ impl Row<'_> { } impl_serialize!(['a] Row<'a>, (self, ser) => match self { + Self::Null => AlgebraicValue::unit().serialize(ser), Self::Ptr(row) => row.serialize(ser), Self::Ref(row) => row.serialize(ser), }); @@ -152,6 +157,7 @@ impl_serialize!(['a] Row<'a>, (self, ser) => match self { impl ToBsatn for Row<'_> { fn static_bsatn_size(&self) -> Option { match self { + Self::Null => self.to_product_value().static_bsatn_size(), Self::Ptr(ptr) => ptr.static_bsatn_size(), Self::Ref(val) => val.static_bsatn_size(), } @@ -159,6 +165,7 @@ impl ToBsatn for Row<'_> { fn to_bsatn_extend(&self, buf: &mut Vec) -> std::result::Result<(), EncodeError> { match self { + Self::Null => self.to_product_value().to_bsatn_extend(buf), Self::Ptr(ptr) => ptr.to_bsatn_extend(buf), Self::Ref(val) => val.to_bsatn_extend(buf), } @@ -166,6 +173,7 @@ impl ToBsatn for Row<'_> { fn to_bsatn_vec(&self) -> std::result::Result, EncodeError> { match self { + Self::Null => self.to_product_value().to_bsatn_vec(), Self::Ptr(ptr) => ptr.to_bsatn_vec(), Self::Ref(val) => val.to_bsatn_vec(), } @@ -175,6 +183,7 @@ impl ToBsatn for Row<'_> { impl ProjectField for Row<'_> { fn project(&self, field: &TupleField) -> AlgebraicValue { match self { + Self::Null => AlgebraicValue::unit(), Self::Ptr(ptr) => ptr.project(field), Self::Ref(val) => val.project(field), } @@ -200,7 +209,7 @@ impl ProjectField for Tuple<'_> { .label_pos .and_then(|i| ptrs.get(i)) .map(|ptr| ptr.project(field)) - .unwrap(), + .unwrap_or(AlgebraicValue::unit()), } } } diff --git a/crates/execution/src/pipelined.rs b/crates/execution/src/pipelined.rs index 1c60bc066b0..26fe9261a31 100644 --- a/crates/execution/src/pipelined.rs +++ b/crates/execution/src/pipelined.rs @@ -354,6 +354,7 @@ impl From for PipelinedExecutor { lhs_field, rhs_field, unique, + outer, }, semijoin, ) => Self::HashJoin(BlockingHashJoin { @@ -362,6 +363,7 @@ impl From for PipelinedExecutor { lhs_field, rhs_field, unique, + outer, semijoin, }), PhysicalPlan::NLJoin(lhs, rhs) => Self::NLJoin(BlockingNLJoin { @@ -1172,6 +1174,7 @@ pub struct BlockingHashJoin { pub lhs_field: TupleField, pub rhs_field: TupleField, pub unique: bool, + pub outer: bool, pub semijoin: Semi, } @@ -1190,12 +1193,18 @@ impl BlockingHashJoin { let mut n = 0; let mut bytes_scanned = 0; match self { + Self { + outer: true, + semijoin: Semi::Lhs | Semi::Rhs, + .. + } => unreachable!("Outer semijoin is not possible"), Self { lhs, rhs, lhs_field, rhs_field, unique: true, + outer: false, semijoin: Semi::Lhs, } => { let mut rhs_table = HashSet::new(); @@ -1221,6 +1230,7 @@ impl BlockingHashJoin { lhs_field, rhs_field, unique: true, + outer: false, semijoin: Semi::Rhs, } => { let mut rhs_table = HashMap::new(); @@ -1246,6 +1256,7 @@ impl BlockingHashJoin { lhs_field, rhs_field, unique: true, + outer, semijoin: Semi::All, } => { let mut rhs_table = HashMap::new(); @@ -1261,6 +1272,8 @@ impl BlockingHashJoin { n += 1; if let Some(v) = rhs_table.get(&project(&u, lhs_field, &mut bytes_scanned)) { f(u.clone().join(v.clone()))?; + } else if *outer { + f(u.clone().append(Row::Null))?; } Ok(()) })?; @@ -1271,6 +1284,7 @@ impl BlockingHashJoin { lhs_field, rhs_field, unique: false, + outer: false, semijoin: Semi::Lhs, } => { let mut rhs_table = HashMap::new(); @@ -1298,6 +1312,7 @@ impl BlockingHashJoin { lhs_field, rhs_field, unique: false, + outer: false, semijoin: Semi::Rhs, } => { let mut rhs_table: HashMap> = HashMap::new(); @@ -1327,6 +1342,7 @@ impl BlockingHashJoin { lhs_field, rhs_field, unique: false, + outer, semijoin: Semi::All, } => { let mut rhs_table: HashMap> = HashMap::new(); @@ -1346,6 +1362,8 @@ impl BlockingHashJoin { for v in rhs_tuples { f(u.clone().join(v.clone()))?; } + } else if *outer { + f(u.clone().append(Row::Null))?; } Ok(()) })?; diff --git a/crates/expr/src/check.rs b/crates/expr/src/check.rs index 9c4717f9a9f..81a48c5b487 100644 --- a/crates/expr/src/check.rs +++ b/crates/expr/src/check.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use std::ops::{Deref, DerefMut}; use std::sync::Arc; +use crate::ast::{CrossJoin, InnerJoin, OuterJoin}; use crate::expr::LeftDeepJoin; use crate::expr::{Expr, ProjectList, ProjectName, Relvar}; use spacetimedb_lib::identity::AuthCtx; @@ -78,34 +79,56 @@ pub trait TypeChecker { delta: None, }); - for SqlJoin { - var: SqlIdent(name), - alias: SqlIdent(alias), - on, - } in joins - { + for jn in joins { // Check for duplicate aliases - if vars.contains_key(&alias) { - return Err(DuplicateName(alias.into_string()).into()); + match jn { + SqlJoin::Cross(CrossJoin { alias: SqlIdent(alias), .. }) + | SqlJoin::Inner(InnerJoin { alias: SqlIdent(alias), .. }) + | SqlJoin::Left(OuterJoin { alias: SqlIdent(alias), .. }) + if vars.contains_key(&alias) => { + return Err(DuplicateName(alias.into_string()).into()); + } + SqlJoin::Cross(_) => (), + SqlJoin::Inner(_) => (), + SqlJoin::Left(_) => (), } let lhs = Box::new(join); - let rhs = Relvar { - schema: Self::type_relvar(tx, &name)?, - alias, - delta: None, + let rhs = match &jn { + SqlJoin::Cross(CrossJoin { var: SqlIdent(name), alias: SqlIdent(alias), .. }) + | SqlJoin::Inner(InnerJoin { var: SqlIdent(name), alias: SqlIdent(alias), .. }) + | SqlJoin::Left(OuterJoin { var: SqlIdent(name), alias: SqlIdent(alias), .. }) => { + Relvar { + schema: Self::type_relvar(tx, &name)?, + alias: alias.clone(), + delta: None, + } + } }; vars.insert(rhs.alias.clone(), rhs.schema.clone()); - if let Some(on) = on { - if let Expr::BinOp(BinOp::Eq, a, b) = type_expr(vars, on, Some(&AlgebraicType::Bool))? { - if let (Expr::Field(a), Expr::Field(b)) = (*a, *b) { - join = RelExpr::EqJoin(LeftDeepJoin { lhs, rhs }, a, b); - continue; + match jn { + SqlJoin::Cross(_) => (), + SqlJoin::Inner(InnerJoin { on: Some(on), .. }) => { + if let Expr::BinOp(BinOp::Eq, a, b) = type_expr(vars, on, Some(&AlgebraicType::Bool))? { + if let (Expr::Field(a), Expr::Field(b)) = (*a, *b) { + join = RelExpr::InnerEqJoin(LeftDeepJoin { lhs, rhs }, a, b); + continue; + } + } + unreachable!("Unreachability guaranteed by parser") + } + SqlJoin::Inner(_) => (), + SqlJoin::Left(OuterJoin { on, .. }) => { + if let Expr::BinOp(BinOp::Eq, a, b) = type_expr(vars, on, Some(&AlgebraicType::Bool))? { + if let (Expr::Field(a), Expr::Field(b)) = (*a, *b) { + join = RelExpr::LeftOuterEqJoin(LeftDeepJoin { lhs, rhs }, a, b); + continue; + } } + unreachable!("Unreachability guaranteed by parser") } - unreachable!("Unreachability guaranteed by parser") } join = RelExpr::LeftDeepJoin(LeftDeepJoin { lhs, rhs }); diff --git a/crates/expr/src/expr.rs b/crates/expr/src/expr.rs index 451b9f3498c..06d307a700a 100644 --- a/crates/expr/src/expr.rs +++ b/crates/expr/src/expr.rs @@ -243,8 +243,10 @@ pub enum RelExpr { Select(Box, Expr), /// A left deep binary cross product LeftDeepJoin(LeftDeepJoin), - /// A left deep binary equi-join - EqJoin(LeftDeepJoin, FieldProject, FieldProject), + /// A left deep binary inner equi-join + InnerEqJoin(LeftDeepJoin, FieldProject, FieldProject), + /// A left deep binary left outer equi-join + LeftOuterEqJoin(LeftDeepJoin, FieldProject, FieldProject), } /// A table reference @@ -277,7 +279,8 @@ impl RelExpr { match self { Self::Select(lhs, _) | Self::LeftDeepJoin(LeftDeepJoin { lhs, .. }) - | Self::EqJoin(LeftDeepJoin { lhs, .. }, ..) => { + | Self::InnerEqJoin(LeftDeepJoin { lhs, .. }, ..) + | Self::LeftOuterEqJoin(LeftDeepJoin { lhs, .. }, ..) => { lhs.visit(f); } Self::RelVar(..) => {} @@ -290,7 +293,8 @@ impl RelExpr { match self { Self::Select(lhs, _) | Self::LeftDeepJoin(LeftDeepJoin { lhs, .. }) - | Self::EqJoin(LeftDeepJoin { lhs, .. }, ..) => { + | Self::InnerEqJoin(LeftDeepJoin { lhs, .. }, ..) + | Self::LeftOuterEqJoin(LeftDeepJoin { lhs, .. }, ..) => { lhs.visit_mut(f); } Self::RelVar(..) => {} @@ -301,7 +305,11 @@ impl RelExpr { pub fn nfields(&self) -> usize { match self { Self::RelVar(..) => 1, - Self::LeftDeepJoin(join) | Self::EqJoin(join, ..) => join.lhs.nfields() + 1, + Self::LeftDeepJoin(join) + | Self::InnerEqJoin(join, ..) + | Self::LeftOuterEqJoin(join, ..) => { + join.lhs.nfields() + 1 + } Self::Select(input, _) => input.nfields(), } } @@ -310,7 +318,9 @@ impl RelExpr { pub fn has_field(&self, field: &str) -> bool { match self { Self::RelVar(Relvar { alias, .. }) => alias.as_ref() == field, - Self::LeftDeepJoin(join) | Self::EqJoin(join, ..) => { + Self::LeftDeepJoin(join) + | Self::InnerEqJoin(join, ..) + | Self::LeftOuterEqJoin(join, ..) => { join.rhs.alias.as_ref() == field || join.lhs.has_field(field) } Self::Select(input, _) => input.has_field(field), @@ -322,10 +332,12 @@ impl RelExpr { match self { Self::RelVar(relvar) if relvar.alias.as_ref() == alias => Some(&relvar.schema), Self::Select(input, _) => input.find_table_schema(alias), - Self::EqJoin(LeftDeepJoin { rhs, .. }, ..) if rhs.alias.as_ref() == alias => Some(&rhs.schema), - Self::EqJoin(LeftDeepJoin { lhs, .. }, ..) => lhs.find_table_schema(alias), Self::LeftDeepJoin(LeftDeepJoin { rhs, .. }) if rhs.alias.as_ref() == alias => Some(&rhs.schema), Self::LeftDeepJoin(LeftDeepJoin { lhs, .. }) => lhs.find_table_schema(alias), + Self::InnerEqJoin(LeftDeepJoin { rhs, .. }, ..) if rhs.alias.as_ref() == alias => Some(&rhs.schema), + Self::InnerEqJoin(LeftDeepJoin { lhs, .. }, ..) => lhs.find_table_schema(alias), + Self::LeftOuterEqJoin(LeftDeepJoin { rhs, .. }, ..) if rhs.alias.as_ref() == alias => Some(&rhs.schema), + Self::LeftOuterEqJoin(LeftDeepJoin { lhs, .. }, ..) => lhs.find_table_schema(alias), _ => None, } } diff --git a/crates/expr/src/rls.rs b/crates/expr/src/rls.rs index 75f096ae50c..33fadae1f9f 100644 --- a/crates/expr/src/rls.rs +++ b/crates/expr/src/rls.rs @@ -219,7 +219,8 @@ fn resolve_views_for_expr( view.visit(&mut |expr| match expr { RelExpr::RelVar(rhs) | RelExpr::LeftDeepJoin(LeftDeepJoin { rhs, .. }) - | RelExpr::EqJoin(LeftDeepJoin { rhs, .. }, ..) + | RelExpr::InnerEqJoin(LeftDeepJoin { rhs, .. }, ..) + | RelExpr::LeftOuterEqJoin(LeftDeepJoin { rhs, .. }, ..) if !is_return_table(rhs) => { names.push((rhs.schema.table_id, rhs.alias.clone())); @@ -365,7 +366,8 @@ fn alpha_rename(expr: &mut RelExpr, f: &mut impl FnMut(&str) -> Box) { RelExpr::RelVar(rhs) | RelExpr::LeftDeepJoin(LeftDeepJoin { rhs, .. }) => { rename(rhs, f); } - RelExpr::EqJoin(LeftDeepJoin { rhs, .. }, a, b) => { + RelExpr::InnerEqJoin(LeftDeepJoin { rhs, .. }, a, b) + | RelExpr::LeftOuterEqJoin(LeftDeepJoin { rhs, .. }, a, b) => { rename(rhs, f); rename_field(a, f); rename_field(b, f); @@ -427,7 +429,15 @@ fn extend_lhs(expr: RelExpr, with: RelExpr) -> RelExpr { lhs: Box::new(extend_lhs(*join.lhs, with)), ..join }), - RelExpr::EqJoin(join, a, b) => RelExpr::EqJoin( + RelExpr::InnerEqJoin(join, a, b) => RelExpr::InnerEqJoin( + LeftDeepJoin { + lhs: Box::new(extend_lhs(*join.lhs, with)), + ..join + }, + a, + b, + ), + RelExpr::LeftOuterEqJoin(join, a, b) => RelExpr::LeftOuterEqJoin( LeftDeepJoin { lhs: Box::new(extend_lhs(*join.lhs, with)), ..join @@ -451,11 +461,23 @@ fn expand_leaf(expr: RelExpr, table_id: TableId, alias: &str, with: &RelExpr) -> lhs: Box::new(expand_leaf(*lhs, table_id, alias, with)), rhs, }), - RelExpr::EqJoin(join, a, b) if ok(&join.rhs) => RelExpr::Select( - Box::new(extend_lhs(with.clone(), *join.lhs)), - Expr::BinOp(BinOp::Eq, Box::new(Expr::Field(a)), Box::new(Expr::Field(b))), + RelExpr::InnerEqJoin(join, a, b) + | RelExpr::LeftOuterEqJoin(join, a, b) + if ok(&join.rhs) => { + RelExpr::Select( + Box::new(extend_lhs(with.clone(), *join.lhs)), + Expr::BinOp(BinOp::Eq, Box::new(Expr::Field(a)), Box::new(Expr::Field(b))), + ) + } + RelExpr::InnerEqJoin(LeftDeepJoin { lhs, rhs }, a, b) => RelExpr::InnerEqJoin( + LeftDeepJoin { + lhs: Box::new(expand_leaf(*lhs, table_id, alias, with)), + rhs, + }, + a, + b, ), - RelExpr::EqJoin(LeftDeepJoin { lhs, rhs }, a, b) => RelExpr::EqJoin( + RelExpr::LeftOuterEqJoin(LeftDeepJoin { lhs, rhs }, a, b) => RelExpr::LeftOuterEqJoin( LeftDeepJoin { lhs: Box::new(expand_leaf(*lhs, table_id, alias, with)), rhs, diff --git a/crates/physical-plan/src/compile.rs b/crates/physical-plan/src/compile.rs index 0997fa0b968..5b08c2295da 100644 --- a/crates/physical-plan/src/compile.rs +++ b/crates/physical-plan/src/compile.rs @@ -80,7 +80,7 @@ fn compile_rel_expr(var: &mut impl VarLabel, ast: RelExpr) -> PhysicalPlan { let input = Box::new(input); PhysicalPlan::Filter(input, compile_expr(expr, var)) } - RelExpr::EqJoin( + RelExpr::InnerEqJoin( LeftDeepJoin { lhs, rhs: @@ -115,6 +115,46 @@ fn compile_rel_expr(var: &mut impl VarLabel, ast: RelExpr) -> PhysicalPlan { field_pos: b, }, unique: false, + outer: false, + }, + Semi::All, + ), + RelExpr::LeftOuterEqJoin( + LeftDeepJoin { + lhs, + rhs: + Relvar { + schema: rhs_schema, + alias: rhs_alias, + delta, + .. + }, + }, + FieldProject { table: u, field: a, .. }, + FieldProject { table: v, field: b, .. }, + ) => PhysicalPlan::HashJoin( + HashJoin { + lhs: Box::new(compile_rel_expr(var, *lhs)), + rhs: Box::new(PhysicalPlan::TableScan( + TableScan { + schema: rhs_schema, + limit: None, + delta, + }, + var.label(&rhs_alias), + )), + lhs_field: TupleField { + label: var.label(u.as_ref()), + label_pos: None, + field_pos: a, + }, + rhs_field: TupleField { + label: var.label(v.as_ref()), + label_pos: None, + field_pos: b, + }, + unique: false, + outer: true, }, Semi::All, ), diff --git a/crates/physical-plan/src/plan.rs b/crates/physical-plan/src/plan.rs index 81933829ef9..e5125bf4bda 100644 --- a/crates/physical-plan/src/plan.rs +++ b/crates/physical-plan/src/plan.rs @@ -654,6 +654,7 @@ impl PhysicalPlan { lhs_field, rhs_field, unique, + outer, }, semi, ) if rhs.has_label(&lhs_field.label) || lhs.has_label(&rhs_field.label) => Self::HashJoin( @@ -663,6 +664,7 @@ impl PhysicalPlan { lhs_field: rhs_field, rhs_field: lhs_field, unique, + outer, }, semi, ), @@ -783,15 +785,18 @@ impl PhysicalPlan { lhs_field: lhs_field @ TupleField { label: u, .. }, rhs_field: rhs_field @ TupleField { label: v, .. }, unique, + outer, }, Semi::All, ) => { - let semi = reqs - .iter() - .all(|label| lhs.has_label(label)) - .then_some(Semi::Lhs) - .or_else(|| reqs.iter().all(|label| rhs.has_label(label)).then_some(Semi::Rhs)) - .unwrap_or(Semi::All); + let semi = if !outer { + reqs + .iter() + .all(|label| lhs.has_label(label)) + .then_some(Semi::Lhs) + .or_else(|| reqs.iter().all(|label| rhs.has_label(label)).then_some(Semi::Rhs)) + .unwrap_or(Semi::All) + } else { Semi::All }; let mut lhs_reqs = vec![u]; let mut rhs_reqs = vec![v]; for var in reqs { @@ -809,6 +814,7 @@ impl PhysicalPlan { lhs_field, rhs_field, unique, + outer, }, semi, ) @@ -1197,6 +1203,7 @@ pub struct HashJoin { pub lhs_field: TupleField, pub rhs_field: TupleField, pub unique: bool, + pub outer: bool, } /// An index join is a left deep join tree, @@ -1914,6 +1921,7 @@ mod tests { lhs_field: TupleField { field_pos: 1, .. }, rhs_field: TupleField { field_pos: 1, .. }, unique: true, + outer: false, }, Semi::Rhs, ) => (*rhs, *lhs), diff --git a/crates/physical-plan/src/rules.rs b/crates/physical-plan/src/rules.rs index 37905f41e27..50653423931 100644 --- a/crates/physical-plan/src/rules.rs +++ b/crates/physical-plan/src/rules.rs @@ -949,6 +949,7 @@ impl RewriteRule for ReorderHashJoin { lhs_field: join.rhs_field, rhs_field: join.lhs_field, unique: join.unique, + outer: join.outer, }, Semi::All, )), @@ -999,6 +1000,7 @@ impl RewriteRule for ReorderDeltaJoinRhs { lhs_field: join.rhs_field, rhs_field: join.lhs_field, unique: join.unique, + outer: join.outer, }, Semi::All, )), diff --git a/crates/sql-parser/src/ast/mod.rs b/crates/sql-parser/src/ast/mod.rs index 776d4fc5006..eaf113b8534 100644 --- a/crates/sql-parser/src/ast/mod.rs +++ b/crates/sql-parser/src/ast/mod.rs @@ -22,20 +22,48 @@ impl SqlFrom { } } -/// An inner join in a FROM clause +/// A join in a FROM clause #[derive(Debug)] -pub struct SqlJoin { - pub var: SqlIdent, - pub alias: SqlIdent, - pub on: Option, +pub enum SqlJoin { + Cross(CrossJoin), + Inner(InnerJoin), + Left(OuterJoin), } impl SqlJoin { pub fn has_unqualified_vars(&self) -> bool { - self.on.as_ref().is_some_and(|expr| expr.has_unqualified_vars()) + match self { + SqlJoin::Cross(_) => false, + SqlJoin::Inner(InnerJoin { on: None, .. }) => false, + SqlJoin::Inner(InnerJoin { on: Some(on), .. }) => on.has_unqualified_vars(), + SqlJoin::Left(OuterJoin { on, .. }) => on.has_unqualified_vars(), + } } } +/// Cross join +#[derive(Debug)] +pub struct CrossJoin { + pub var: SqlIdent, + pub alias: SqlIdent, +} + +/// Inner join +#[derive(Debug)] +pub struct InnerJoin { + pub var: SqlIdent, + pub alias: SqlIdent, + pub on: Option, +} + +/// Outer join +#[derive(Debug)] +pub struct OuterJoin { + pub var: SqlIdent, + pub alias: SqlIdent, + pub on: SqlExpr, +} + /// A projection expression in a SELECT clause #[derive(Debug)] pub struct ProjectElem(pub ProjectExpr, pub SqlIdent); diff --git a/crates/sql-parser/src/parser/errors.rs b/crates/sql-parser/src/parser/errors.rs index 953a031b8b8..4d7450c5ce3 100644 --- a/crates/sql-parser/src/parser/errors.rs +++ b/crates/sql-parser/src/parser/errors.rs @@ -63,8 +63,8 @@ pub enum SqlUnsupported { MultiPartName(ObjectName), #[error("Unsupported: {0}")] Feature(String), - #[error("Non-inner joins are not supported")] - JoinType, + #[error("Non-column join constraints are not supported")] + JoinConstraintType, #[error("Implicit joins are not supported")] ImplicitJoins, #[error("Mixed wildcard projections are not supported")] diff --git a/crates/sql-parser/src/parser/mod.rs b/crates/sql-parser/src/parser/mod.rs index 9e6e5642bda..3b632ae6cfd 100644 --- a/crates/sql-parser/src/parser/mod.rs +++ b/crates/sql-parser/src/parser/mod.rs @@ -6,7 +6,8 @@ use sqlparser::ast::{ }; use crate::ast::{ - BinOp, LogOp, Parameter, Project, ProjectElem, ProjectExpr, SqlExpr, SqlFrom, SqlIdent, SqlJoin, SqlLiteral, + BinOp, CrossJoin, InnerJoin, LogOp, OuterJoin, Parameter, Project, ProjectElem, ProjectExpr, + SqlExpr, SqlFrom, SqlIdent, SqlJoin, SqlLiteral, }; pub mod errors; @@ -50,8 +51,8 @@ trait RelParser { fn parse_join(join: Join) -> SqlParseResult { let (var, alias) = Self::parse_relvar(join.relation)?; match join.join_operator { - JoinOperator::CrossJoin => Ok(SqlJoin { var, alias, on: None }), - JoinOperator::Inner(JoinConstraint::None) => Ok(SqlJoin { var, alias, on: None }), + JoinOperator::CrossJoin => Ok(SqlJoin::Cross(CrossJoin { var, alias })), + JoinOperator::Inner(JoinConstraint::None) => Ok(SqlJoin::Inner(InnerJoin { var, alias, on: None })), JoinOperator::Inner(JoinConstraint::On(Expr::BinaryOp { left, op: BinaryOperator::Eq, @@ -59,7 +60,7 @@ trait RelParser { })) if matches!(*left, Expr::Identifier(..) | Expr::CompoundIdentifier(..)) && matches!(*right, Expr::Identifier(..) | Expr::CompoundIdentifier(..)) => { - Ok(SqlJoin { + Ok(SqlJoin::Inner(InnerJoin { var, alias, on: Some(parse_expr( @@ -70,9 +71,29 @@ trait RelParser { }, 0, )?), - }) + })) } - _ => Err(SqlUnsupported::JoinType.into()), + JoinOperator::LeftOuter(JoinConstraint::On(Expr::BinaryOp { + left, + op: BinaryOperator::Eq, + right, + })) if matches!(*left, Expr::Identifier(..) | Expr::CompoundIdentifier(..)) + && matches!(*right, Expr::Identifier(..) | Expr::CompoundIdentifier(..)) => + { + Ok(SqlJoin::Left(OuterJoin { + var, + alias, + on: parse_expr( + Expr::BinaryOp { + left, + op: BinaryOperator::Eq, + right, + }, + 0, + )?, + })) + } + _ => Err(SqlUnsupported::JoinConstraintType.into()), } } diff --git a/crates/vm/src/relation.rs b/crates/vm/src/relation.rs index 2d625369f87..7a00bdf8048 100644 --- a/crates/vm/src/relation.rs +++ b/crates/vm/src/relation.rs @@ -33,6 +33,7 @@ pub enum RelValue<'a> { impl<'a> From> for RelValue<'a> { fn from(value: Row<'a>) -> Self { match value { + Row::Null => Self::Projection(ProductValue { elements: Box::new([]) }), Row::Ptr(ptr) => Self::Row(ptr), Row::Ref(ptr) => Self::ProjRef(ptr), } From 83a0357e9278ed8f830e26cd4f275c8c2a9e3b71 Mon Sep 17 00:00:00 2001 From: Egor Vorontsov Date: Wed, 5 Nov 2025 16:54:19 +0300 Subject: [PATCH 2/5] Adjusted for Views support. --- crates/physical-plan/src/compile.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/physical-plan/src/compile.rs b/crates/physical-plan/src/compile.rs index 5b08c2295da..4dba6e5adf1 100644 --- a/crates/physical-plan/src/compile.rs +++ b/crates/physical-plan/src/compile.rs @@ -137,7 +137,7 @@ fn compile_rel_expr(var: &mut impl VarLabel, ast: RelExpr) -> PhysicalPlan { lhs: Box::new(compile_rel_expr(var, *lhs)), rhs: Box::new(PhysicalPlan::TableScan( TableScan { - schema: rhs_schema, + schema: rhs_schema.inner(), limit: None, delta, }, From 7ccfa61cbde268622d83370ea5ab22432566cc9c Mon Sep 17 00:00:00 2001 From: Egor Vorontsov Date: Sat, 8 Nov 2025 01:10:59 +0300 Subject: [PATCH 3/5] Added support for Views. --- crates/execution/src/pipelined.rs | 1 + crates/physical-plan/src/plan.rs | 2 ++ 2 files changed, 3 insertions(+) diff --git a/crates/execution/src/pipelined.rs b/crates/execution/src/pipelined.rs index 26fe9261a31..da517c84989 100644 --- a/crates/execution/src/pipelined.rs +++ b/crates/execution/src/pipelined.rs @@ -186,6 +186,7 @@ impl ViewProject { let mut n = 0; let mut bytes_scanned = 0; self.inner.execute(tx, metrics, &mut |row| match row { + Row::Null => Ok(()), Row::Ptr(ptr) => { n += 1; let col_list = ColList::from_iter(self.num_private_cols..self.num_cols); diff --git a/crates/physical-plan/src/plan.rs b/crates/physical-plan/src/plan.rs index e5125bf4bda..09cc2086a50 100644 --- a/crates/physical-plan/src/plan.rs +++ b/crates/physical-plan/src/plan.rs @@ -586,6 +586,7 @@ impl PhysicalPlan { lhs_field, rhs_field, unique, + outer, }, semi, ) => Self::HashJoin( @@ -595,6 +596,7 @@ impl PhysicalPlan { lhs_field, rhs_field, unique, + outer, }, semi, ), From 20e8347bf8a3b00139a7fa0a382115ce5c1e0e11 Mon Sep 17 00:00:00 2001 From: Egor Vorontsov Date: Thu, 11 Dec 2025 22:35:46 +0300 Subject: [PATCH 4/5] Fixed for updated Views. --- crates/execution/src/lib.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/execution/src/lib.rs b/crates/execution/src/lib.rs index 8e8ba385d33..04f7fad1f47 100644 --- a/crates/execution/src/lib.rs +++ b/crates/execution/src/lib.rs @@ -134,7 +134,7 @@ impl Hash for Row<'_> { impl Row<'_> { pub fn to_product_value(&self) -> ProductValue { match self { - Self::Null => ProductValue { elements: Box::new([]) }, + Self::Null => [].into(), Self::Ptr(ptr) => ptr.to_product_value(), Self::Ref(val) => (*val).clone(), } @@ -142,6 +142,7 @@ impl Row<'_> { pub fn project_product(self, cols: &ColList) -> Result { match self { + Self::Null => Ok([].into()), Self::Ptr(ptr) => ptr.project_product(cols), Self::Ref(val) => val.project_product(cols), } From 678a00bdb954756faa158de2f5c523c7532f8c44 Mon Sep 17 00:00:00 2001 From: Egor Vorontsov Date: Thu, 18 Dec 2025 04:37:01 +0300 Subject: [PATCH 5/5] Implemented `outer` for `IxJoin`s. --- crates/execution/src/pipelined.rs | 46 +++++++++++++++++++++++++++++-- crates/physical-plan/src/plan.rs | 3 ++ crates/physical-plan/src/rules.rs | 1 + crates/vm/src/relation.rs | 2 +- 4 files changed, 49 insertions(+), 3 deletions(-) diff --git a/crates/execution/src/pipelined.rs b/crates/execution/src/pipelined.rs index da517c84989..41e05f4330b 100644 --- a/crates/execution/src/pipelined.rs +++ b/crates/execution/src/pipelined.rs @@ -186,7 +186,7 @@ impl ViewProject { let mut n = 0; let mut bytes_scanned = 0; self.inner.execute(tx, metrics, &mut |row| match row { - Row::Null => Ok(()), + Row::Null => f([].into()), Row::Ptr(ptr) => { n += 1; let col_list = ColList::from_iter(self.num_private_cols..self.num_cols); @@ -312,6 +312,7 @@ impl From for PipelinedExecutor { rhs_index, rhs_field, unique, + outer, lhs_field, rhs_delta: None, .. @@ -324,6 +325,7 @@ impl From for PipelinedExecutor { rhs_field, lhs_field, unique, + outer, semijoin, }), PhysicalPlan::IxJoin( @@ -333,6 +335,7 @@ impl From for PipelinedExecutor { rhs_index, rhs_field, unique, + outer, lhs_field, rhs_delta: Some(rhs_delta), .. @@ -346,6 +349,7 @@ impl From for PipelinedExecutor { rhs_delta, lhs_field, unique, + outer, semijoin, }), PhysicalPlan::HashJoin( @@ -812,6 +816,7 @@ pub struct PipelinedIxJoin { pub lhs_field: TupleField, /// Is the index unique? pub unique: bool, + pub outer: bool, /// Is this a semijoin? pub semijoin: Semi, } @@ -845,10 +850,16 @@ impl PipelinedIxJoin { }; match self { + Self { + outer: true, + semijoin: Semi::Lhs | Semi::Rhs, + .. + } => unreachable!("Outer semijoin is not possible"), Self { lhs, lhs_field, unique: true, + outer: false, semijoin: Semi::Lhs, .. } => { @@ -867,6 +878,7 @@ impl PipelinedIxJoin { lhs, lhs_field, unique: true, + outer: false, semijoin: Semi::Rhs, .. } => { @@ -884,6 +896,7 @@ impl PipelinedIxJoin { lhs, lhs_field, unique: true, + outer, semijoin: Semi::All, .. } => { @@ -893,6 +906,8 @@ impl PipelinedIxJoin { index_seeks += 1; if let Some(v) = probe_rhs(&u, lhs_field, &mut bytes_scanned)? { f(u.join(v))?; + } else if *outer { + f(u.append(Row::Null))?; } Ok(()) })?; @@ -901,6 +916,7 @@ impl PipelinedIxJoin { lhs, lhs_field, unique: false, + outer: false, semijoin: Semi::Lhs, .. } => { @@ -919,6 +935,7 @@ impl PipelinedIxJoin { lhs, lhs_field, unique: false, + outer: false, semijoin: Semi::Rhs, .. } => { @@ -936,6 +953,7 @@ impl PipelinedIxJoin { lhs, lhs_field, unique: false, + outer, semijoin: Semi::All, .. } => { @@ -943,8 +961,13 @@ impl PipelinedIxJoin { lhs.execute(tx, metrics, &mut |u| { n += 1; index_seeks += 1; + let mut ok = false; for v in iter_rhs(&u, lhs_field, &mut bytes_scanned)? { - f(u.clone().join(v))?; + f(u.clone().join(v.clone()))?; + if !matches!(v, Tuple::Row(Row::Null)) { ok = true }; + } + if !ok && *outer { + f(u.clone().append(Row::Null))?; } Ok(()) })?; @@ -978,6 +1001,7 @@ pub struct PipelinedIxDeltaJoin { pub lhs_field: TupleField, /// Is the index unique? pub unique: bool, + pub outer: bool, /// Is this a semijoin? pub semijoin: Semi, } @@ -1002,10 +1026,16 @@ impl PipelinedIxDeltaJoin { let mut bytes_scanned = 0; match self { + Self { + outer: true, + semijoin: Semi::Lhs | Semi::Rhs, + .. + } => unreachable!("Outer semijoin is not possible"), Self { lhs, lhs_field, unique: true, + outer: false, semijoin: Semi::Lhs, .. } => { @@ -1033,6 +1063,7 @@ impl PipelinedIxDeltaJoin { lhs, lhs_field, unique: true, + outer: false, semijoin: Semi::Rhs, .. } => { @@ -1059,6 +1090,7 @@ impl PipelinedIxDeltaJoin { lhs, lhs_field, unique: true, + outer, semijoin: Semi::All, .. } => { @@ -1077,6 +1109,8 @@ impl PipelinedIxDeltaJoin { .map(Tuple::Row) { f(u.join(v))?; + } else if *outer { + f(u.append(Row::Null))?; } Ok(()) })?; @@ -1085,6 +1119,7 @@ impl PipelinedIxDeltaJoin { lhs, lhs_field, unique: false, + outer: false, semijoin: Semi::Lhs, .. } => { @@ -1111,6 +1146,7 @@ impl PipelinedIxDeltaJoin { lhs, lhs_field, unique: false, + outer: false, semijoin: Semi::Rhs, .. } => { @@ -1136,6 +1172,7 @@ impl PipelinedIxDeltaJoin { lhs, lhs_field, unique: false, + outer, semijoin: Semi::All, .. } => { @@ -1143,6 +1180,7 @@ impl PipelinedIxDeltaJoin { lhs.execute(tx, metrics, &mut |u| { n += 1; index_seeks += 1; + let mut ok = false; for v in tx .index_scan_point_for_delta( self.rhs_table, @@ -1153,6 +1191,10 @@ impl PipelinedIxDeltaJoin { .map(Tuple::Row) { f(u.clone().join(v.clone()))?; + if !matches!(v, Tuple::Row(Row::Null)) { ok = true }; + } + if !ok && *outer { + f(u.clone().append(Row::Null))?; } Ok(()) })?; diff --git a/crates/physical-plan/src/plan.rs b/crates/physical-plan/src/plan.rs index 09cc2086a50..1e4b7db06bd 100644 --- a/crates/physical-plan/src/plan.rs +++ b/crates/physical-plan/src/plan.rs @@ -562,6 +562,7 @@ impl PhysicalPlan { rhs_index, rhs_field, unique, + outer, lhs_field, rhs_delta, }, @@ -574,6 +575,7 @@ impl PhysicalPlan { rhs_index, rhs_field, unique, + outer, lhs_field, rhs_delta, }, @@ -1226,6 +1228,7 @@ pub struct IxJoin { pub rhs_field: ColId, /// Is the index a unique constraint index? pub unique: bool, + pub outer: bool, /// The expression for computing probe values. /// Values are projected from the lhs, /// and used to probe the index on the rhs. diff --git a/crates/physical-plan/src/rules.rs b/crates/physical-plan/src/rules.rs index 50653423931..e4bef11ec66 100644 --- a/crates/physical-plan/src/rules.rs +++ b/crates/physical-plan/src/rules.rs @@ -1143,6 +1143,7 @@ impl RewriteRule for HashToIxJoin { rhs_field, rhs_delta, unique: false, + outer: join.outer, lhs_field: join.lhs_field, }, semi, diff --git a/crates/vm/src/relation.rs b/crates/vm/src/relation.rs index 7a00bdf8048..5b2b8cce3a0 100644 --- a/crates/vm/src/relation.rs +++ b/crates/vm/src/relation.rs @@ -33,7 +33,7 @@ pub enum RelValue<'a> { impl<'a> From> for RelValue<'a> { fn from(value: Row<'a>) -> Self { match value { - Row::Null => Self::Projection(ProductValue { elements: Box::new([]) }), + Row::Null => Self::Projection([].into()), Row::Ptr(ptr) => Self::Row(ptr), Row::Ref(ptr) => Self::ProjRef(ptr), }