Skip to content

Commit 678a00b

Browse files
committed
Implemented outer for IxJoins.
1 parent 20e8347 commit 678a00b

4 files changed

Lines changed: 49 additions & 3 deletions

File tree

crates/execution/src/pipelined.rs

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ impl ViewProject {
186186
let mut n = 0;
187187
let mut bytes_scanned = 0;
188188
self.inner.execute(tx, metrics, &mut |row| match row {
189-
Row::Null => Ok(()),
189+
Row::Null => f([].into()),
190190
Row::Ptr(ptr) => {
191191
n += 1;
192192
let col_list = ColList::from_iter(self.num_private_cols..self.num_cols);
@@ -312,6 +312,7 @@ impl From<PhysicalPlan> for PipelinedExecutor {
312312
rhs_index,
313313
rhs_field,
314314
unique,
315+
outer,
315316
lhs_field,
316317
rhs_delta: None,
317318
..
@@ -324,6 +325,7 @@ impl From<PhysicalPlan> for PipelinedExecutor {
324325
rhs_field,
325326
lhs_field,
326327
unique,
328+
outer,
327329
semijoin,
328330
}),
329331
PhysicalPlan::IxJoin(
@@ -333,6 +335,7 @@ impl From<PhysicalPlan> for PipelinedExecutor {
333335
rhs_index,
334336
rhs_field,
335337
unique,
338+
outer,
336339
lhs_field,
337340
rhs_delta: Some(rhs_delta),
338341
..
@@ -346,6 +349,7 @@ impl From<PhysicalPlan> for PipelinedExecutor {
346349
rhs_delta,
347350
lhs_field,
348351
unique,
352+
outer,
349353
semijoin,
350354
}),
351355
PhysicalPlan::HashJoin(
@@ -812,6 +816,7 @@ pub struct PipelinedIxJoin {
812816
pub lhs_field: TupleField,
813817
/// Is the index unique?
814818
pub unique: bool,
819+
pub outer: bool,
815820
/// Is this a semijoin?
816821
pub semijoin: Semi,
817822
}
@@ -845,10 +850,16 @@ impl PipelinedIxJoin {
845850
};
846851

847852
match self {
853+
Self {
854+
outer: true,
855+
semijoin: Semi::Lhs | Semi::Rhs,
856+
..
857+
} => unreachable!("Outer semijoin is not possible"),
848858
Self {
849859
lhs,
850860
lhs_field,
851861
unique: true,
862+
outer: false,
852863
semijoin: Semi::Lhs,
853864
..
854865
} => {
@@ -867,6 +878,7 @@ impl PipelinedIxJoin {
867878
lhs,
868879
lhs_field,
869880
unique: true,
881+
outer: false,
870882
semijoin: Semi::Rhs,
871883
..
872884
} => {
@@ -884,6 +896,7 @@ impl PipelinedIxJoin {
884896
lhs,
885897
lhs_field,
886898
unique: true,
899+
outer,
887900
semijoin: Semi::All,
888901
..
889902
} => {
@@ -893,6 +906,8 @@ impl PipelinedIxJoin {
893906
index_seeks += 1;
894907
if let Some(v) = probe_rhs(&u, lhs_field, &mut bytes_scanned)? {
895908
f(u.join(v))?;
909+
} else if *outer {
910+
f(u.append(Row::Null))?;
896911
}
897912
Ok(())
898913
})?;
@@ -901,6 +916,7 @@ impl PipelinedIxJoin {
901916
lhs,
902917
lhs_field,
903918
unique: false,
919+
outer: false,
904920
semijoin: Semi::Lhs,
905921
..
906922
} => {
@@ -919,6 +935,7 @@ impl PipelinedIxJoin {
919935
lhs,
920936
lhs_field,
921937
unique: false,
938+
outer: false,
922939
semijoin: Semi::Rhs,
923940
..
924941
} => {
@@ -936,15 +953,21 @@ impl PipelinedIxJoin {
936953
lhs,
937954
lhs_field,
938955
unique: false,
956+
outer,
939957
semijoin: Semi::All,
940958
..
941959
} => {
942960
// Probe the index and evaluate the matching rhs rows
943961
lhs.execute(tx, metrics, &mut |u| {
944962
n += 1;
945963
index_seeks += 1;
964+
let mut ok = false;
946965
for v in iter_rhs(&u, lhs_field, &mut bytes_scanned)? {
947-
f(u.clone().join(v))?;
966+
f(u.clone().join(v.clone()))?;
967+
if !matches!(v, Tuple::Row(Row::Null)) { ok = true };
968+
}
969+
if !ok && *outer {
970+
f(u.clone().append(Row::Null))?;
948971
}
949972
Ok(())
950973
})?;
@@ -978,6 +1001,7 @@ pub struct PipelinedIxDeltaJoin {
9781001
pub lhs_field: TupleField,
9791002
/// Is the index unique?
9801003
pub unique: bool,
1004+
pub outer: bool,
9811005
/// Is this a semijoin?
9821006
pub semijoin: Semi,
9831007
}
@@ -1002,10 +1026,16 @@ impl PipelinedIxDeltaJoin {
10021026
let mut bytes_scanned = 0;
10031027

10041028
match self {
1029+
Self {
1030+
outer: true,
1031+
semijoin: Semi::Lhs | Semi::Rhs,
1032+
..
1033+
} => unreachable!("Outer semijoin is not possible"),
10051034
Self {
10061035
lhs,
10071036
lhs_field,
10081037
unique: true,
1038+
outer: false,
10091039
semijoin: Semi::Lhs,
10101040
..
10111041
} => {
@@ -1033,6 +1063,7 @@ impl PipelinedIxDeltaJoin {
10331063
lhs,
10341064
lhs_field,
10351065
unique: true,
1066+
outer: false,
10361067
semijoin: Semi::Rhs,
10371068
..
10381069
} => {
@@ -1059,6 +1090,7 @@ impl PipelinedIxDeltaJoin {
10591090
lhs,
10601091
lhs_field,
10611092
unique: true,
1093+
outer,
10621094
semijoin: Semi::All,
10631095
..
10641096
} => {
@@ -1077,6 +1109,8 @@ impl PipelinedIxDeltaJoin {
10771109
.map(Tuple::Row)
10781110
{
10791111
f(u.join(v))?;
1112+
} else if *outer {
1113+
f(u.append(Row::Null))?;
10801114
}
10811115
Ok(())
10821116
})?;
@@ -1085,6 +1119,7 @@ impl PipelinedIxDeltaJoin {
10851119
lhs,
10861120
lhs_field,
10871121
unique: false,
1122+
outer: false,
10881123
semijoin: Semi::Lhs,
10891124
..
10901125
} => {
@@ -1111,6 +1146,7 @@ impl PipelinedIxDeltaJoin {
11111146
lhs,
11121147
lhs_field,
11131148
unique: false,
1149+
outer: false,
11141150
semijoin: Semi::Rhs,
11151151
..
11161152
} => {
@@ -1136,13 +1172,15 @@ impl PipelinedIxDeltaJoin {
11361172
lhs,
11371173
lhs_field,
11381174
unique: false,
1175+
outer,
11391176
semijoin: Semi::All,
11401177
..
11411178
} => {
11421179
// Probe the index and evaluate the matching rhs rows
11431180
lhs.execute(tx, metrics, &mut |u| {
11441181
n += 1;
11451182
index_seeks += 1;
1183+
let mut ok = false;
11461184
for v in tx
11471185
.index_scan_point_for_delta(
11481186
self.rhs_table,
@@ -1153,6 +1191,10 @@ impl PipelinedIxDeltaJoin {
11531191
.map(Tuple::Row)
11541192
{
11551193
f(u.clone().join(v.clone()))?;
1194+
if !matches!(v, Tuple::Row(Row::Null)) { ok = true };
1195+
}
1196+
if !ok && *outer {
1197+
f(u.clone().append(Row::Null))?;
11561198
}
11571199
Ok(())
11581200
})?;

crates/physical-plan/src/plan.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -562,6 +562,7 @@ impl PhysicalPlan {
562562
rhs_index,
563563
rhs_field,
564564
unique,
565+
outer,
565566
lhs_field,
566567
rhs_delta,
567568
},
@@ -574,6 +575,7 @@ impl PhysicalPlan {
574575
rhs_index,
575576
rhs_field,
576577
unique,
578+
outer,
577579
lhs_field,
578580
rhs_delta,
579581
},
@@ -1226,6 +1228,7 @@ pub struct IxJoin {
12261228
pub rhs_field: ColId,
12271229
/// Is the index a unique constraint index?
12281230
pub unique: bool,
1231+
pub outer: bool,
12291232
/// The expression for computing probe values.
12301233
/// Values are projected from the lhs,
12311234
/// and used to probe the index on the rhs.

crates/physical-plan/src/rules.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1143,6 +1143,7 @@ impl RewriteRule for HashToIxJoin {
11431143
rhs_field,
11441144
rhs_delta,
11451145
unique: false,
1146+
outer: join.outer,
11461147
lhs_field: join.lhs_field,
11471148
},
11481149
semi,

crates/vm/src/relation.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ pub enum RelValue<'a> {
3333
impl<'a> From<Row<'a>> for RelValue<'a> {
3434
fn from(value: Row<'a>) -> Self {
3535
match value {
36-
Row::Null => Self::Projection(ProductValue { elements: Box::new([]) }),
36+
Row::Null => Self::Projection([].into()),
3737
Row::Ptr(ptr) => Self::Row(ptr),
3838
Row::Ref(ptr) => Self::ProjRef(ptr),
3939
}

0 commit comments

Comments
 (0)