Skip to content

Commit 4de5c84

Browse files
committed
Implemented LEFT JOIN support.
1 parent 8118dd8 commit 4de5c84

12 files changed

Lines changed: 239 additions & 55 deletions

File tree

crates/execution/src/lib.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ pub trait DeltaStore {
8888

8989
#[derive(Clone)]
9090
pub enum Row<'a> {
91+
Null,
9192
Ptr(RowRef<'a>),
9293
Ref(&'a ProductValue),
9394
}
@@ -99,6 +100,7 @@ impl PartialEq for Row<'_> {
99100
(Self::Ref(x), Self::Ref(y)) => x == y,
100101
(Self::Ptr(x), Self::Ref(y)) => x == *y,
101102
(Self::Ref(x), Self::Ptr(y)) => y == *x,
103+
(Self::Null, _) | (_, Self::Null) => false,
102104
}
103105
}
104106
}
@@ -108,6 +110,7 @@ impl Eq for Row<'_> {}
108110
impl Hash for Row<'_> {
109111
fn hash<H: Hasher>(&self, state: &mut H) {
110112
match self {
113+
Self::Null => AlgebraicValue::unit().hash(state),
111114
Self::Ptr(x) => x.hash(state),
112115
Self::Ref(x) => x.hash(state),
113116
}
@@ -117,34 +120,39 @@ impl Hash for Row<'_> {
117120
impl Row<'_> {
118121
pub fn to_product_value(&self) -> ProductValue {
119122
match self {
123+
Self::Null => ProductValue { elements: Box::new([]) },
120124
Self::Ptr(ptr) => ptr.to_product_value(),
121125
Self::Ref(val) => (*val).clone(),
122126
}
123127
}
124128
}
125129

126130
impl_serialize!(['a] Row<'a>, (self, ser) => match self {
131+
Self::Null => AlgebraicValue::unit().serialize(ser),
127132
Self::Ptr(row) => row.serialize(ser),
128133
Self::Ref(row) => row.serialize(ser),
129134
});
130135

131136
impl ToBsatn for Row<'_> {
132137
fn static_bsatn_size(&self) -> Option<u16> {
133138
match self {
139+
Self::Null => self.to_product_value().static_bsatn_size(),
134140
Self::Ptr(ptr) => ptr.static_bsatn_size(),
135141
Self::Ref(val) => val.static_bsatn_size(),
136142
}
137143
}
138144

139145
fn to_bsatn_extend(&self, buf: &mut Vec<u8>) -> std::result::Result<(), EncodeError> {
140146
match self {
147+
Self::Null => self.to_product_value().to_bsatn_extend(buf),
141148
Self::Ptr(ptr) => ptr.to_bsatn_extend(buf),
142149
Self::Ref(val) => val.to_bsatn_extend(buf),
143150
}
144151
}
145152

146153
fn to_bsatn_vec(&self) -> std::result::Result<Vec<u8>, EncodeError> {
147154
match self {
155+
Self::Null => self.to_product_value().to_bsatn_vec(),
148156
Self::Ptr(ptr) => ptr.to_bsatn_vec(),
149157
Self::Ref(val) => val.to_bsatn_vec(),
150158
}
@@ -154,6 +162,7 @@ impl ToBsatn for Row<'_> {
154162
impl ProjectField for Row<'_> {
155163
fn project(&self, field: &TupleField) -> AlgebraicValue {
156164
match self {
165+
Self::Null => AlgebraicValue::unit(),
157166
Self::Ptr(ptr) => ptr.project(field),
158167
Self::Ref(val) => val.project(field),
159168
}
@@ -179,7 +188,7 @@ impl ProjectField for Tuple<'_> {
179188
.label_pos
180189
.and_then(|i| ptrs.get(i))
181190
.map(|ptr| ptr.project(field))
182-
.unwrap(),
191+
.unwrap_or(AlgebraicValue::unit()),
183192
}
184193
}
185194
}

crates/execution/src/pipelined.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,7 @@ impl From<PhysicalPlan> for PipelinedExecutor {
257257
lhs_field,
258258
rhs_field,
259259
unique,
260+
outer,
260261
},
261262
semijoin,
262263
) => Self::HashJoin(BlockingHashJoin {
@@ -265,6 +266,7 @@ impl From<PhysicalPlan> for PipelinedExecutor {
265266
lhs_field,
266267
rhs_field,
267268
unique,
269+
outer,
268270
semijoin,
269271
}),
270272
PhysicalPlan::NLJoin(lhs, rhs) => Self::NLJoin(BlockingNLJoin {
@@ -1075,6 +1077,7 @@ pub struct BlockingHashJoin {
10751077
pub lhs_field: TupleField,
10761078
pub rhs_field: TupleField,
10771079
pub unique: bool,
1080+
pub outer: bool,
10781081
pub semijoin: Semi,
10791082
}
10801083

@@ -1093,12 +1096,18 @@ impl BlockingHashJoin {
10931096
let mut n = 0;
10941097
let mut bytes_scanned = 0;
10951098
match self {
1099+
Self {
1100+
outer: true,
1101+
semijoin: Semi::Lhs | Semi::Rhs,
1102+
..
1103+
} => unreachable!("Outer semijoin is not possible"),
10961104
Self {
10971105
lhs,
10981106
rhs,
10991107
lhs_field,
11001108
rhs_field,
11011109
unique: true,
1110+
outer: false,
11021111
semijoin: Semi::Lhs,
11031112
} => {
11041113
let mut rhs_table = HashSet::new();
@@ -1124,6 +1133,7 @@ impl BlockingHashJoin {
11241133
lhs_field,
11251134
rhs_field,
11261135
unique: true,
1136+
outer: false,
11271137
semijoin: Semi::Rhs,
11281138
} => {
11291139
let mut rhs_table = HashMap::new();
@@ -1149,6 +1159,7 @@ impl BlockingHashJoin {
11491159
lhs_field,
11501160
rhs_field,
11511161
unique: true,
1162+
outer,
11521163
semijoin: Semi::All,
11531164
} => {
11541165
let mut rhs_table = HashMap::new();
@@ -1164,6 +1175,8 @@ impl BlockingHashJoin {
11641175
n += 1;
11651176
if let Some(v) = rhs_table.get(&project(&u, lhs_field, &mut bytes_scanned)) {
11661177
f(u.clone().join(v.clone()))?;
1178+
} else if *outer {
1179+
f(u.clone().append(Row::Null))?;
11671180
}
11681181
Ok(())
11691182
})?;
@@ -1174,6 +1187,7 @@ impl BlockingHashJoin {
11741187
lhs_field,
11751188
rhs_field,
11761189
unique: false,
1190+
outer: false,
11771191
semijoin: Semi::Lhs,
11781192
} => {
11791193
let mut rhs_table = HashMap::new();
@@ -1201,6 +1215,7 @@ impl BlockingHashJoin {
12011215
lhs_field,
12021216
rhs_field,
12031217
unique: false,
1218+
outer: false,
12041219
semijoin: Semi::Rhs,
12051220
} => {
12061221
let mut rhs_table: HashMap<AlgebraicValue, Vec<_>> = HashMap::new();
@@ -1230,6 +1245,7 @@ impl BlockingHashJoin {
12301245
lhs_field,
12311246
rhs_field,
12321247
unique: false,
1248+
outer,
12331249
semijoin: Semi::All,
12341250
} => {
12351251
let mut rhs_table: HashMap<AlgebraicValue, Vec<_>> = HashMap::new();
@@ -1249,6 +1265,8 @@ impl BlockingHashJoin {
12491265
for v in rhs_tuples {
12501266
f(u.clone().join(v.clone()))?;
12511267
}
1268+
} else if *outer {
1269+
f(u.clone().append(Row::Null))?;
12521270
}
12531271
Ok(())
12541272
})?;

crates/expr/src/check.rs

Lines changed: 41 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::collections::HashMap;
22
use std::ops::{Deref, DerefMut};
33
use std::sync::Arc;
44

5+
use crate::ast::{CrossJoin, InnerJoin, OuterJoin};
56
use crate::expr::LeftDeepJoin;
67
use crate::expr::{Expr, ProjectList, ProjectName, Relvar};
78
use spacetimedb_lib::identity::AuthCtx;
@@ -78,34 +79,56 @@ pub trait TypeChecker {
7879
delta: None,
7980
});
8081

81-
for SqlJoin {
82-
var: SqlIdent(name),
83-
alias: SqlIdent(alias),
84-
on,
85-
} in joins
86-
{
82+
for jn in joins {
8783
// Check for duplicate aliases
88-
if vars.contains_key(&alias) {
89-
return Err(DuplicateName(alias.into_string()).into());
84+
match jn {
85+
SqlJoin::Cross(CrossJoin { alias: SqlIdent(alias), .. })
86+
| SqlJoin::Inner(InnerJoin { alias: SqlIdent(alias), .. })
87+
| SqlJoin::Left(OuterJoin { alias: SqlIdent(alias), .. })
88+
if vars.contains_key(&alias) => {
89+
return Err(DuplicateName(alias.into_string()).into());
90+
}
91+
SqlJoin::Cross(_) => (),
92+
SqlJoin::Inner(_) => (),
93+
SqlJoin::Left(_) => (),
9094
}
9195

9296
let lhs = Box::new(join);
93-
let rhs = Relvar {
94-
schema: Self::type_relvar(tx, &name)?,
95-
alias,
96-
delta: None,
97+
let rhs = match &jn {
98+
SqlJoin::Cross(CrossJoin { var: SqlIdent(name), alias: SqlIdent(alias), .. })
99+
| SqlJoin::Inner(InnerJoin { var: SqlIdent(name), alias: SqlIdent(alias), .. })
100+
| SqlJoin::Left(OuterJoin { var: SqlIdent(name), alias: SqlIdent(alias), .. }) => {
101+
Relvar {
102+
schema: Self::type_relvar(tx, &name)?,
103+
alias: alias.clone(),
104+
delta: None,
105+
}
106+
}
97107
};
98108

99109
vars.insert(rhs.alias.clone(), rhs.schema.clone());
100110

101-
if let Some(on) = on {
102-
if let Expr::BinOp(BinOp::Eq, a, b) = type_expr(vars, on, Some(&AlgebraicType::Bool))? {
103-
if let (Expr::Field(a), Expr::Field(b)) = (*a, *b) {
104-
join = RelExpr::EqJoin(LeftDeepJoin { lhs, rhs }, a, b);
105-
continue;
111+
match jn {
112+
SqlJoin::Cross(_) => (),
113+
SqlJoin::Inner(InnerJoin { on: Some(on), .. }) => {
114+
if let Expr::BinOp(BinOp::Eq, a, b) = type_expr(vars, on, Some(&AlgebraicType::Bool))? {
115+
if let (Expr::Field(a), Expr::Field(b)) = (*a, *b) {
116+
join = RelExpr::InnerEqJoin(LeftDeepJoin { lhs, rhs }, a, b);
117+
continue;
118+
}
119+
}
120+
unreachable!("Unreachability guaranteed by parser")
121+
}
122+
SqlJoin::Inner(_) => (),
123+
SqlJoin::Left(OuterJoin { on, .. }) => {
124+
if let Expr::BinOp(BinOp::Eq, a, b) = type_expr(vars, on, Some(&AlgebraicType::Bool))? {
125+
if let (Expr::Field(a), Expr::Field(b)) = (*a, *b) {
126+
join = RelExpr::LeftOuterEqJoin(LeftDeepJoin { lhs, rhs }, a, b);
127+
continue;
128+
}
106129
}
130+
unreachable!("Unreachability guaranteed by parser")
107131
}
108-
unreachable!("Unreachability guaranteed by parser")
109132
}
110133

111134
join = RelExpr::LeftDeepJoin(LeftDeepJoin { lhs, rhs });

crates/expr/src/expr.rs

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -197,8 +197,10 @@ pub enum RelExpr {
197197
Select(Box<RelExpr>, Expr),
198198
/// A left deep binary cross product
199199
LeftDeepJoin(LeftDeepJoin),
200-
/// A left deep binary equi-join
201-
EqJoin(LeftDeepJoin, FieldProject, FieldProject),
200+
/// A left deep binary inner equi-join
201+
InnerEqJoin(LeftDeepJoin, FieldProject, FieldProject),
202+
/// A left deep binary left outer equi-join
203+
LeftOuterEqJoin(LeftDeepJoin, FieldProject, FieldProject),
202204
}
203205

204206
/// A table reference
@@ -219,7 +221,8 @@ impl RelExpr {
219221
match self {
220222
Self::Select(lhs, _)
221223
| Self::LeftDeepJoin(LeftDeepJoin { lhs, .. })
222-
| Self::EqJoin(LeftDeepJoin { lhs, .. }, ..) => {
224+
| Self::InnerEqJoin(LeftDeepJoin { lhs, .. }, ..)
225+
| Self::LeftOuterEqJoin(LeftDeepJoin { lhs, .. }, ..) => {
223226
lhs.visit(f);
224227
}
225228
Self::RelVar(..) => {}
@@ -232,7 +235,8 @@ impl RelExpr {
232235
match self {
233236
Self::Select(lhs, _)
234237
| Self::LeftDeepJoin(LeftDeepJoin { lhs, .. })
235-
| Self::EqJoin(LeftDeepJoin { lhs, .. }, ..) => {
238+
| Self::InnerEqJoin(LeftDeepJoin { lhs, .. }, ..)
239+
| Self::LeftOuterEqJoin(LeftDeepJoin { lhs, .. }, ..) => {
236240
lhs.visit_mut(f);
237241
}
238242
Self::RelVar(..) => {}
@@ -243,7 +247,11 @@ impl RelExpr {
243247
pub fn nfields(&self) -> usize {
244248
match self {
245249
Self::RelVar(..) => 1,
246-
Self::LeftDeepJoin(join) | Self::EqJoin(join, ..) => join.lhs.nfields() + 1,
250+
Self::LeftDeepJoin(join)
251+
| Self::InnerEqJoin(join, ..)
252+
| Self::LeftOuterEqJoin(join, ..) => {
253+
join.lhs.nfields() + 1
254+
}
247255
Self::Select(input, _) => input.nfields(),
248256
}
249257
}
@@ -252,7 +260,9 @@ impl RelExpr {
252260
pub fn has_field(&self, field: &str) -> bool {
253261
match self {
254262
Self::RelVar(Relvar { alias, .. }) => alias.as_ref() == field,
255-
Self::LeftDeepJoin(join) | Self::EqJoin(join, ..) => {
263+
Self::LeftDeepJoin(join)
264+
| Self::InnerEqJoin(join, ..)
265+
| Self::LeftOuterEqJoin(join, ..) => {
256266
join.rhs.alias.as_ref() == field || join.lhs.has_field(field)
257267
}
258268
Self::Select(input, _) => input.has_field(field),
@@ -264,10 +274,12 @@ impl RelExpr {
264274
match self {
265275
Self::RelVar(relvar) if relvar.alias.as_ref() == alias => Some(&relvar.schema),
266276
Self::Select(input, _) => input.find_table_schema(alias),
267-
Self::EqJoin(LeftDeepJoin { rhs, .. }, ..) if rhs.alias.as_ref() == alias => Some(&rhs.schema),
268-
Self::EqJoin(LeftDeepJoin { lhs, .. }, ..) => lhs.find_table_schema(alias),
269277
Self::LeftDeepJoin(LeftDeepJoin { rhs, .. }) if rhs.alias.as_ref() == alias => Some(&rhs.schema),
270278
Self::LeftDeepJoin(LeftDeepJoin { lhs, .. }) => lhs.find_table_schema(alias),
279+
Self::InnerEqJoin(LeftDeepJoin { rhs, .. }, ..) if rhs.alias.as_ref() == alias => Some(&rhs.schema),
280+
Self::InnerEqJoin(LeftDeepJoin { lhs, .. }, ..) => lhs.find_table_schema(alias),
281+
Self::LeftOuterEqJoin(LeftDeepJoin { rhs, .. }, ..) if rhs.alias.as_ref() == alias => Some(&rhs.schema),
282+
Self::LeftOuterEqJoin(LeftDeepJoin { lhs, .. }, ..) => lhs.find_table_schema(alias),
271283
_ => None,
272284
}
273285
}

0 commit comments

Comments
 (0)