Skip to content

Commit 4f00a73

Browse files
committed
Implemented LEFT JOIN support.
1 parent e9d2b11 commit 4f00a73

12 files changed

Lines changed: 239 additions & 55 deletions

File tree

crates/execution/src/lib.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ pub trait DeltaStore {
102102

103103
#[derive(Clone)]
104104
pub enum Row<'a> {
105+
Null,
105106
Ptr(RowRef<'a>),
106107
Ref(&'a ProductValue),
107108
}
@@ -113,6 +114,7 @@ impl PartialEq for Row<'_> {
113114
(Self::Ref(x), Self::Ref(y)) => x == y,
114115
(Self::Ptr(x), Self::Ref(y)) => x == *y,
115116
(Self::Ref(x), Self::Ptr(y)) => y == *x,
117+
(Self::Null, _) | (_, Self::Null) => false,
116118
}
117119
}
118120
}
@@ -122,6 +124,7 @@ impl Eq for Row<'_> {}
122124
impl Hash for Row<'_> {
123125
fn hash<H: Hasher>(&self, state: &mut H) {
124126
match self {
127+
Self::Null => AlgebraicValue::unit().hash(state),
125128
Self::Ptr(x) => x.hash(state),
126129
Self::Ref(x) => x.hash(state),
127130
}
@@ -131,6 +134,7 @@ impl Hash for Row<'_> {
131134
impl Row<'_> {
132135
pub fn to_product_value(&self) -> ProductValue {
133136
match self {
137+
Self::Null => ProductValue { elements: Box::new([]) },
134138
Self::Ptr(ptr) => ptr.to_product_value(),
135139
Self::Ref(val) => (*val).clone(),
136140
}
@@ -145,27 +149,31 @@ impl Row<'_> {
145149
}
146150

147151
impl_serialize!(['a] Row<'a>, (self, ser) => match self {
152+
Self::Null => AlgebraicValue::unit().serialize(ser),
148153
Self::Ptr(row) => row.serialize(ser),
149154
Self::Ref(row) => row.serialize(ser),
150155
});
151156

152157
impl ToBsatn for Row<'_> {
153158
fn static_bsatn_size(&self) -> Option<u16> {
154159
match self {
160+
Self::Null => self.to_product_value().static_bsatn_size(),
155161
Self::Ptr(ptr) => ptr.static_bsatn_size(),
156162
Self::Ref(val) => val.static_bsatn_size(),
157163
}
158164
}
159165

160166
fn to_bsatn_extend(&self, buf: &mut Vec<u8>) -> std::result::Result<(), EncodeError> {
161167
match self {
168+
Self::Null => self.to_product_value().to_bsatn_extend(buf),
162169
Self::Ptr(ptr) => ptr.to_bsatn_extend(buf),
163170
Self::Ref(val) => val.to_bsatn_extend(buf),
164171
}
165172
}
166173

167174
fn to_bsatn_vec(&self) -> std::result::Result<Vec<u8>, EncodeError> {
168175
match self {
176+
Self::Null => self.to_product_value().to_bsatn_vec(),
169177
Self::Ptr(ptr) => ptr.to_bsatn_vec(),
170178
Self::Ref(val) => val.to_bsatn_vec(),
171179
}
@@ -175,6 +183,7 @@ impl ToBsatn for Row<'_> {
175183
impl ProjectField for Row<'_> {
176184
fn project(&self, field: &TupleField) -> AlgebraicValue {
177185
match self {
186+
Self::Null => AlgebraicValue::unit(),
178187
Self::Ptr(ptr) => ptr.project(field),
179188
Self::Ref(val) => val.project(field),
180189
}
@@ -200,7 +209,7 @@ impl ProjectField for Tuple<'_> {
200209
.label_pos
201210
.and_then(|i| ptrs.get(i))
202211
.map(|ptr| ptr.project(field))
203-
.unwrap(),
212+
.unwrap_or(AlgebraicValue::unit()),
204213
}
205214
}
206215
}

crates/execution/src/pipelined.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,7 @@ impl From<PhysicalPlan> for PipelinedExecutor {
354354
lhs_field,
355355
rhs_field,
356356
unique,
357+
outer,
357358
},
358359
semijoin,
359360
) => Self::HashJoin(BlockingHashJoin {
@@ -362,6 +363,7 @@ impl From<PhysicalPlan> for PipelinedExecutor {
362363
lhs_field,
363364
rhs_field,
364365
unique,
366+
outer,
365367
semijoin,
366368
}),
367369
PhysicalPlan::NLJoin(lhs, rhs) => Self::NLJoin(BlockingNLJoin {
@@ -1172,6 +1174,7 @@ pub struct BlockingHashJoin {
11721174
pub lhs_field: TupleField,
11731175
pub rhs_field: TupleField,
11741176
pub unique: bool,
1177+
pub outer: bool,
11751178
pub semijoin: Semi,
11761179
}
11771180

@@ -1190,12 +1193,18 @@ impl BlockingHashJoin {
11901193
let mut n = 0;
11911194
let mut bytes_scanned = 0;
11921195
match self {
1196+
Self {
1197+
outer: true,
1198+
semijoin: Semi::Lhs | Semi::Rhs,
1199+
..
1200+
} => unreachable!("Outer semijoin is not possible"),
11931201
Self {
11941202
lhs,
11951203
rhs,
11961204
lhs_field,
11971205
rhs_field,
11981206
unique: true,
1207+
outer: false,
11991208
semijoin: Semi::Lhs,
12001209
} => {
12011210
let mut rhs_table = HashSet::new();
@@ -1221,6 +1230,7 @@ impl BlockingHashJoin {
12211230
lhs_field,
12221231
rhs_field,
12231232
unique: true,
1233+
outer: false,
12241234
semijoin: Semi::Rhs,
12251235
} => {
12261236
let mut rhs_table = HashMap::new();
@@ -1246,6 +1256,7 @@ impl BlockingHashJoin {
12461256
lhs_field,
12471257
rhs_field,
12481258
unique: true,
1259+
outer,
12491260
semijoin: Semi::All,
12501261
} => {
12511262
let mut rhs_table = HashMap::new();
@@ -1261,6 +1272,8 @@ impl BlockingHashJoin {
12611272
n += 1;
12621273
if let Some(v) = rhs_table.get(&project(&u, lhs_field, &mut bytes_scanned)) {
12631274
f(u.clone().join(v.clone()))?;
1275+
} else if *outer {
1276+
f(u.clone().append(Row::Null))?;
12641277
}
12651278
Ok(())
12661279
})?;
@@ -1271,6 +1284,7 @@ impl BlockingHashJoin {
12711284
lhs_field,
12721285
rhs_field,
12731286
unique: false,
1287+
outer: false,
12741288
semijoin: Semi::Lhs,
12751289
} => {
12761290
let mut rhs_table = HashMap::new();
@@ -1298,6 +1312,7 @@ impl BlockingHashJoin {
12981312
lhs_field,
12991313
rhs_field,
13001314
unique: false,
1315+
outer: false,
13011316
semijoin: Semi::Rhs,
13021317
} => {
13031318
let mut rhs_table: HashMap<AlgebraicValue, Vec<_>> = HashMap::new();
@@ -1327,6 +1342,7 @@ impl BlockingHashJoin {
13271342
lhs_field,
13281343
rhs_field,
13291344
unique: false,
1345+
outer,
13301346
semijoin: Semi::All,
13311347
} => {
13321348
let mut rhs_table: HashMap<AlgebraicValue, Vec<_>> = HashMap::new();
@@ -1346,6 +1362,8 @@ impl BlockingHashJoin {
13461362
for v in rhs_tuples {
13471363
f(u.clone().join(v.clone()))?;
13481364
}
1365+
} else if *outer {
1366+
f(u.clone().append(Row::Null))?;
13491367
}
13501368
Ok(())
13511369
})?;

crates/expr/src/check.rs

Lines changed: 41 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::collections::HashMap;
22
use std::ops::{Deref, DerefMut};
33
use std::sync::Arc;
44

5+
use crate::ast::{CrossJoin, InnerJoin, OuterJoin};
56
use crate::expr::LeftDeepJoin;
67
use crate::expr::{Expr, ProjectList, ProjectName, Relvar};
78
use spacetimedb_lib::identity::AuthCtx;
@@ -78,34 +79,56 @@ pub trait TypeChecker {
7879
delta: None,
7980
});
8081

81-
for SqlJoin {
82-
var: SqlIdent(name),
83-
alias: SqlIdent(alias),
84-
on,
85-
} in joins
86-
{
82+
for jn in joins {
8783
// Check for duplicate aliases
88-
if vars.contains_key(&alias) {
89-
return Err(DuplicateName(alias.into_string()).into());
84+
match jn {
85+
SqlJoin::Cross(CrossJoin { alias: SqlIdent(alias), .. })
86+
| SqlJoin::Inner(InnerJoin { alias: SqlIdent(alias), .. })
87+
| SqlJoin::Left(OuterJoin { alias: SqlIdent(alias), .. })
88+
if vars.contains_key(&alias) => {
89+
return Err(DuplicateName(alias.into_string()).into());
90+
}
91+
SqlJoin::Cross(_) => (),
92+
SqlJoin::Inner(_) => (),
93+
SqlJoin::Left(_) => (),
9094
}
9195

9296
let lhs = Box::new(join);
93-
let rhs = Relvar {
94-
schema: Self::type_relvar(tx, &name)?,
95-
alias,
96-
delta: None,
97+
let rhs = match &jn {
98+
SqlJoin::Cross(CrossJoin { var: SqlIdent(name), alias: SqlIdent(alias), .. })
99+
| SqlJoin::Inner(InnerJoin { var: SqlIdent(name), alias: SqlIdent(alias), .. })
100+
| SqlJoin::Left(OuterJoin { var: SqlIdent(name), alias: SqlIdent(alias), .. }) => {
101+
Relvar {
102+
schema: Self::type_relvar(tx, &name)?,
103+
alias: alias.clone(),
104+
delta: None,
105+
}
106+
}
97107
};
98108

99109
vars.insert(rhs.alias.clone(), rhs.schema.clone());
100110

101-
if let Some(on) = on {
102-
if let Expr::BinOp(BinOp::Eq, a, b) = type_expr(vars, on, Some(&AlgebraicType::Bool))? {
103-
if let (Expr::Field(a), Expr::Field(b)) = (*a, *b) {
104-
join = RelExpr::EqJoin(LeftDeepJoin { lhs, rhs }, a, b);
105-
continue;
111+
match jn {
112+
SqlJoin::Cross(_) => (),
113+
SqlJoin::Inner(InnerJoin { on: Some(on), .. }) => {
114+
if let Expr::BinOp(BinOp::Eq, a, b) = type_expr(vars, on, Some(&AlgebraicType::Bool))? {
115+
if let (Expr::Field(a), Expr::Field(b)) = (*a, *b) {
116+
join = RelExpr::InnerEqJoin(LeftDeepJoin { lhs, rhs }, a, b);
117+
continue;
118+
}
119+
}
120+
unreachable!("Unreachability guaranteed by parser")
121+
}
122+
SqlJoin::Inner(_) => (),
123+
SqlJoin::Left(OuterJoin { on, .. }) => {
124+
if let Expr::BinOp(BinOp::Eq, a, b) = type_expr(vars, on, Some(&AlgebraicType::Bool))? {
125+
if let (Expr::Field(a), Expr::Field(b)) = (*a, *b) {
126+
join = RelExpr::LeftOuterEqJoin(LeftDeepJoin { lhs, rhs }, a, b);
127+
continue;
128+
}
106129
}
130+
unreachable!("Unreachability guaranteed by parser")
107131
}
108-
unreachable!("Unreachability guaranteed by parser")
109132
}
110133

111134
join = RelExpr::LeftDeepJoin(LeftDeepJoin { lhs, rhs });

crates/expr/src/expr.rs

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -243,8 +243,10 @@ pub enum RelExpr {
243243
Select(Box<RelExpr>, Expr),
244244
/// A left deep binary cross product
245245
LeftDeepJoin(LeftDeepJoin),
246-
/// A left deep binary equi-join
247-
EqJoin(LeftDeepJoin, FieldProject, FieldProject),
246+
/// A left deep binary inner equi-join
247+
InnerEqJoin(LeftDeepJoin, FieldProject, FieldProject),
248+
/// A left deep binary left outer equi-join
249+
LeftOuterEqJoin(LeftDeepJoin, FieldProject, FieldProject),
248250
}
249251

250252
/// A table reference
@@ -277,7 +279,8 @@ impl RelExpr {
277279
match self {
278280
Self::Select(lhs, _)
279281
| Self::LeftDeepJoin(LeftDeepJoin { lhs, .. })
280-
| Self::EqJoin(LeftDeepJoin { lhs, .. }, ..) => {
282+
| Self::InnerEqJoin(LeftDeepJoin { lhs, .. }, ..)
283+
| Self::LeftOuterEqJoin(LeftDeepJoin { lhs, .. }, ..) => {
281284
lhs.visit(f);
282285
}
283286
Self::RelVar(..) => {}
@@ -290,7 +293,8 @@ impl RelExpr {
290293
match self {
291294
Self::Select(lhs, _)
292295
| Self::LeftDeepJoin(LeftDeepJoin { lhs, .. })
293-
| Self::EqJoin(LeftDeepJoin { lhs, .. }, ..) => {
296+
| Self::InnerEqJoin(LeftDeepJoin { lhs, .. }, ..)
297+
| Self::LeftOuterEqJoin(LeftDeepJoin { lhs, .. }, ..) => {
294298
lhs.visit_mut(f);
295299
}
296300
Self::RelVar(..) => {}
@@ -301,7 +305,11 @@ impl RelExpr {
301305
pub fn nfields(&self) -> usize {
302306
match self {
303307
Self::RelVar(..) => 1,
304-
Self::LeftDeepJoin(join) | Self::EqJoin(join, ..) => join.lhs.nfields() + 1,
308+
Self::LeftDeepJoin(join)
309+
| Self::InnerEqJoin(join, ..)
310+
| Self::LeftOuterEqJoin(join, ..) => {
311+
join.lhs.nfields() + 1
312+
}
305313
Self::Select(input, _) => input.nfields(),
306314
}
307315
}
@@ -310,7 +318,9 @@ impl RelExpr {
310318
pub fn has_field(&self, field: &str) -> bool {
311319
match self {
312320
Self::RelVar(Relvar { alias, .. }) => alias.as_ref() == field,
313-
Self::LeftDeepJoin(join) | Self::EqJoin(join, ..) => {
321+
Self::LeftDeepJoin(join)
322+
| Self::InnerEqJoin(join, ..)
323+
| Self::LeftOuterEqJoin(join, ..) => {
314324
join.rhs.alias.as_ref() == field || join.lhs.has_field(field)
315325
}
316326
Self::Select(input, _) => input.has_field(field),
@@ -322,10 +332,12 @@ impl RelExpr {
322332
match self {
323333
Self::RelVar(relvar) if relvar.alias.as_ref() == alias => Some(&relvar.schema),
324334
Self::Select(input, _) => input.find_table_schema(alias),
325-
Self::EqJoin(LeftDeepJoin { rhs, .. }, ..) if rhs.alias.as_ref() == alias => Some(&rhs.schema),
326-
Self::EqJoin(LeftDeepJoin { lhs, .. }, ..) => lhs.find_table_schema(alias),
327335
Self::LeftDeepJoin(LeftDeepJoin { rhs, .. }) if rhs.alias.as_ref() == alias => Some(&rhs.schema),
328336
Self::LeftDeepJoin(LeftDeepJoin { lhs, .. }) => lhs.find_table_schema(alias),
337+
Self::InnerEqJoin(LeftDeepJoin { rhs, .. }, ..) if rhs.alias.as_ref() == alias => Some(&rhs.schema),
338+
Self::InnerEqJoin(LeftDeepJoin { lhs, .. }, ..) => lhs.find_table_schema(alias),
339+
Self::LeftOuterEqJoin(LeftDeepJoin { rhs, .. }, ..) if rhs.alias.as_ref() == alias => Some(&rhs.schema),
340+
Self::LeftOuterEqJoin(LeftDeepJoin { lhs, .. }, ..) => lhs.find_table_schema(alias),
329341
_ => None,
330342
}
331343
}

0 commit comments

Comments
 (0)