Skip to content

Commit 63078fe

Browse files
committed
factorized bindings for FusedIntersectMat
1 parent 56a7931 commit 63078fe

1 file changed

Lines changed: 103 additions & 12 deletions

File tree

core-relations/src/free_join/execute.rs

Lines changed: 103 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1567,6 +1567,81 @@ impl<'a> JoinState<'a> {
15671567
binding_info.move_back(atom, prober);
15681568
}
15691569
}
1570+
JoinStage::FusedIntersectMat {
1571+
cover,
1572+
mode,
1573+
bind,
1574+
to_intersect,
1575+
} if leaf_scans[cur]
1576+
&& to_intersect.is_empty()
1577+
&& matches!(
1578+
mode,
1579+
MatScanMode::Full | MatScanMode::KeyOnly | MatScanMode::Value(_)
1580+
) =>
1581+
{
1582+
// Leaf-scan factorization for FusedIntersectMat: flatten the materialization into
1583+
// one `TaggedRowBuffer`, push it onto `binding_sets`, and recurse to the leaf once.
1584+
let cover_mat = binding_info.materializations[*cover].clone();
1585+
let vars: SmallVec<[Variable; 4]> = bind.iter().map(|(_, v)| *v).collect();
1586+
let mut buf = TaggedRowBuffer::new_inline(bind.len());
1587+
let mut row_scratch: SmallVec<[Value; 8]> = SmallVec::new();
1588+
match mode {
1589+
MatScanMode::Full => {
1590+
for group in cover_mat.iter() {
1591+
let group_key = group.0;
1592+
let group_key_len = group_key.len();
1593+
for non_keys in group.1.iter() {
1594+
row_scratch.clear();
1595+
for (col, _) in bind.iter() {
1596+
let val = if col.index() < group_key_len {
1597+
group_key[col.index()]
1598+
} else {
1599+
non_keys[col.index() - group_key_len]
1600+
};
1601+
row_scratch.push(val);
1602+
}
1603+
buf.add_row(RowId::new(0), &row_scratch);
1604+
}
1605+
}
1606+
}
1607+
MatScanMode::KeyOnly => {
1608+
for group in cover_mat.iter() {
1609+
let group_key = group.0;
1610+
row_scratch.clear();
1611+
for (col, _) in bind.iter() {
1612+
debug_assert!(col.index() < group_key.len());
1613+
row_scratch.push(group_key[col.index()]);
1614+
}
1615+
buf.add_row(RowId::new(0), &row_scratch);
1616+
}
1617+
}
1618+
MatScanMode::Value(index_vars) => {
1619+
let keys: Vec<Value> = index_vars
1620+
.iter()
1621+
.map(|var| binding_info.bindings[*var])
1622+
.collect();
1623+
if let Some(group) = cover_mat.get(&keys) {
1624+
for vals in group.iter() {
1625+
debug_assert!(vals.len() == bind.len());
1626+
row_scratch.clear();
1627+
for (col, _) in bind.iter() {
1628+
row_scratch.push(vals[col.index()]);
1629+
}
1630+
buf.add_row(RowId::new(0), &row_scratch);
1631+
}
1632+
}
1633+
}
1634+
MatScanMode::Lookup(_) => unreachable!("guarded above"),
1635+
}
1636+
if buf.is_empty() {
1637+
return;
1638+
}
1639+
binding_info.binding_sets.push((vars, Arc::new(buf)));
1640+
let mut updates = FrameUpdates::with_capacity(1);
1641+
updates.finish_frame();
1642+
drain_updates!(updates);
1643+
binding_info.binding_sets.pop();
1644+
}
15701645
JoinStage::FusedIntersectMat {
15711646
cover,
15721647
mode,
@@ -2234,9 +2309,11 @@ fn sort_plan_by_size(
22342309
}
22352310

22362311
/// Recompute `leaf_scans[i]` for every position `i` in `[start, order.len())` against the
2237-
/// current order. A position is a leaf scan iff its stage is a `FusedIntersect` with empty
2238-
/// `to_intersect`, no later stage references the same cover atom, and no later
2239-
/// `FusedIntersectMat { mode: Value | Lookup }` reads any of the bound variables.
2312+
/// current order. A position is a leaf scan iff its stage is either a `FusedIntersect` or a
2313+
/// `FusedIntersectMat { mode: Full | KeyOnly | Value }`, both with empty `to_intersect`, AND no
2314+
/// later stage either (a) for `FusedIntersect`, references the same cover atom, or (b) reads
2315+
/// any of the bound variables as a scalar via `FusedIntersectMat { mode: Value | Lookup }`.
2316+
/// `FusedIntersectMat::Lookup` itself binds nothing, so it is never marked a leaf scan.
22402317
fn recompute_leaf_scans(
22412318
order: &InstrOrder,
22422319
leaf_scans: &mut LeafScans,
@@ -2252,7 +2329,21 @@ fn recompute_leaf_scans(
22522329
to_intersect,
22532330
} if to_intersect.is_empty() => {
22542331
let vars: SmallVec<[Variable; 4]> = bind.iter().map(|(_, v)| *v).collect();
2255-
(cover.to_index.atom, vars)
2332+
(Some(cover.to_index.atom), vars)
2333+
}
2334+
JoinStage::FusedIntersectMat {
2335+
mode,
2336+
bind,
2337+
to_intersect,
2338+
..
2339+
} if to_intersect.is_empty()
2340+
&& matches!(
2341+
mode,
2342+
MatScanMode::Full | MatScanMode::KeyOnly | MatScanMode::Value(_)
2343+
) =>
2344+
{
2345+
let vars: SmallVec<[Variable; 4]> = bind.iter().map(|(_, v)| *v).collect();
2346+
(None, vars)
22562347
}
22572348
_ => {
22582349
leaf_scans[i] = false;
@@ -2263,7 +2354,9 @@ fn recompute_leaf_scans(
22632354
for j in (i + 1)..order.len() {
22642355
match &instrs[order.get(j)] {
22652356
JoinStage::Intersect { scans, .. } => {
2266-
if scans.iter().any(|scan| scan.atom == cover_atom) {
2357+
if let Some(ca) = cover_atom
2358+
&& scans.iter().any(|scan| scan.atom == ca)
2359+
{
22672360
blocked = true;
22682361
break;
22692362
}
@@ -2273,10 +2366,9 @@ fn recompute_leaf_scans(
22732366
to_intersect,
22742367
..
22752368
} => {
2276-
if cover.to_index.atom == cover_atom
2277-
|| to_intersect
2278-
.iter()
2279-
.any(|(s, _)| s.to_index.atom == cover_atom)
2369+
if let Some(ca) = cover_atom
2370+
&& (cover.to_index.atom == ca
2371+
|| to_intersect.iter().any(|(s, _)| s.to_index.atom == ca))
22802372
{
22812373
blocked = true;
22822374
break;
@@ -2285,9 +2377,8 @@ fn recompute_leaf_scans(
22852377
JoinStage::FusedIntersectMat {
22862378
mode, to_intersect, ..
22872379
} => {
2288-
if to_intersect
2289-
.iter()
2290-
.any(|(s, _)| s.to_index.atom == cover_atom)
2380+
if let Some(ca) = cover_atom
2381+
&& to_intersect.iter().any(|(s, _)| s.to_index.atom == ca)
22912382
{
22922383
blocked = true;
22932384
break;

0 commit comments

Comments
 (0)