@@ -920,12 +920,14 @@ impl<'a> JoinState<'a> {
920920 }
921921 }
922922 let mut order = InstrOrder :: from_iter ( 0 ..stages. instrs . len ( ) ) ;
923- sort_plan_by_size ( & mut order, 0 , & stages. instrs , binding_info) ;
923+ let mut leaf_scans: LeafScans = smallvec:: smallvec![ false ; stages. instrs. len( ) ] ;
924+ sort_plan_by_size ( & mut order, & mut leaf_scans, 0 , & stages. instrs , binding_info) ;
924925 self . run_plan (
925926 stages,
926927 atoms,
927928 action,
928929 & mut order,
930+ & mut leaf_scans,
929931 0 ,
930932 binding_info,
931933 action_buf,
@@ -947,6 +949,7 @@ impl<'a> JoinState<'a> {
947949 atoms : & ' buf Arc < DenseIdMap < AtomId , Atom > > ,
948950 action : A ,
949951 instr_order : & mut InstrOrder ,
952+ leaf_scans : & mut LeafScans ,
950953 cur : usize ,
951954 binding_info : & mut BindingInfo ,
952955 action_buf : & mut BUF ,
@@ -971,7 +974,7 @@ impl<'a> JoinState<'a> {
971974 if cur_size > 32 && cur % 3 == 1 && cur < instr_order. len ( ) - 1 {
972975 // If we have a reasonable number of tuples to process, adjust the variable order every
973976 // 3 rounds, but always make sure to readjust on the second roung.
974- sort_plan_by_size ( instr_order, cur, & stages. instrs , binding_info) ;
977+ sort_plan_by_size ( instr_order, leaf_scans , cur, & stages. instrs , binding_info) ;
975978 cur_size = estimate_size ( & stages. instrs [ instr_order. get ( cur) ] , binding_info) ;
976979 }
977980
@@ -1014,6 +1017,7 @@ impl<'a> JoinState<'a> {
10141017 atoms,
10151018 action,
10161019 instr_order,
1020+ leaf_scans,
10171021 cur + 1 ,
10181022 binding_info,
10191023 action_buf,
@@ -1036,12 +1040,14 @@ impl<'a> JoinState<'a> {
10361040 BorrowedLocalState {
10371041 binding_info,
10381042 instr_order,
1043+ leaf_scans,
10391044 updates: & mut $updates,
10401045 } ,
10411046 move || exec_state_for_factory. clone( ) ,
10421047 move |BorrowedLocalState {
10431048 binding_info,
10441049 instr_order,
1050+ leaf_scans,
10451051 updates,
10461052 } ,
10471053 buf| {
@@ -1069,6 +1075,7 @@ impl<'a> JoinState<'a> {
10691075 atoms,
10701076 action,
10711077 instr_order,
1078+ leaf_scans,
10721079 cur + 1 ,
10731080 binding_info,
10741081 buf,
@@ -1370,92 +1377,86 @@ impl<'a> JoinState<'a> {
13701377 cover,
13711378 bind,
13721379 to_intersect,
1373- is_leaf_scan : true ,
13741380 } if to_intersect. is_empty ( ) => {
1381+ let is_leaf_scan = leaf_scans[ cur] ;
13751382 let cover_atom = cover. to_index . atom ;
13761383 if binding_info. has_empty_subset ( cover_atom) {
13771384 return ;
13781385 }
1379- let table = self . db . tables [ atoms[ cover_atom] . table ] . table . as_ref ( ) ;
1380- let cover_node = binding_info. unwrap_val ( cover_atom) ;
1381- let cover_subset = cover_node. subset . as_ref ( ) ;
1382-
1383- let proj = SmallVec :: < [ ColumnId ; 4 ] > :: from_iter ( bind. iter ( ) . map ( |( col, _) | * col) ) ;
1384- let vars = bind. iter ( ) . map ( |( _, var) | * var) . collect ( ) ;
1385- let mut buf = TaggedRowBuffer :: new_inline ( bind. len ( ) ) ;
1386- table. scan_project (
1387- cover_subset,
1388- & proj,
1389- Offset :: new ( 0 ) ,
1390- usize:: MAX ,
1391- & cover. constraints ,
1392- & mut buf,
1393- ) ;
1394-
1395- if buf. is_empty ( ) {
1396- return ;
1397- }
1398-
1399- binding_info. binding_sets . push ( ( vars, Arc :: new ( buf) ) ) ;
1400- let mut updates = FrameUpdates :: with_capacity ( 1 ) ;
1401- updates. finish_frame ( ) ;
1402- drain_updates ! ( updates) ;
1403- binding_info. binding_sets . pop ( ) ;
1404- binding_info. move_back_node ( cover_atom, cover_node) ;
1405- }
1406- JoinStage :: FusedIntersect {
1407- cover,
1408- bind,
1409- to_intersect,
1410- is_leaf_scan : false ,
1411- } if to_intersect. is_empty ( ) => {
1412- let cover_atom = cover. to_index . atom ;
1413- if binding_info. has_empty_subset ( cover_atom) {
1414- return ;
1415- }
1416- let proj = SmallVec :: < [ ColumnId ; 4 ] > :: from_iter ( bind. iter ( ) . map ( |( col, _) | * col) ) ;
1417- let cover_node = binding_info. unwrap_val ( cover_atom) ;
1418- let cover_subset = cover_node. subset . as_ref ( ) ;
1419- let mut cur = Offset :: new ( 0 ) ;
1420- let mut buffer = TaggedRowBuffer :: new ( bind. len ( ) ) ;
1421- let mut updates = FrameUpdates :: with_capacity ( cmp:: min ( chunk_size, cur_size) ) ;
1422- loop {
1423- buffer. clear ( ) ;
1424- let table = & self . db . tables [ atoms[ cover_atom] . table ] . table ;
1425- let next = table. scan_project (
1386+ if is_leaf_scan {
1387+ let table = self . db . tables [ atoms[ cover_atom] . table ] . table . as_ref ( ) ;
1388+ let cover_node = binding_info. unwrap_val ( cover_atom) ;
1389+ let cover_subset = cover_node. subset . as_ref ( ) ;
1390+
1391+ let proj =
1392+ SmallVec :: < [ ColumnId ; 4 ] > :: from_iter ( bind. iter ( ) . map ( |( col, _) | * col) ) ;
1393+ let vars = bind. iter ( ) . map ( |( _, var) | * var) . collect ( ) ;
1394+ let mut buf = TaggedRowBuffer :: new_inline ( bind. len ( ) ) ;
1395+ table. scan_project (
14261396 cover_subset,
14271397 & proj,
1428- cur ,
1429- chunk_size ,
1398+ Offset :: new ( 0 ) ,
1399+ usize :: MAX ,
14301400 & cover. constraints ,
1431- & mut buffer ,
1401+ & mut buf ,
14321402 ) ;
1433- for ( row, key) in buffer. iter ( ) {
1434- updates. refine_atom_dense ( cover_atom, OffsetRange :: new ( row, row. inc ( ) ) ) ;
1435- // bind the values
1436- for ( i, ( _, var) ) in bind. iter ( ) . enumerate ( ) {
1437- updates. push_binding ( * var, key[ i] ) ;
1403+
1404+ if buf. is_empty ( ) {
1405+ binding_info. move_back_node ( cover_atom, cover_node) ;
1406+ return ;
1407+ }
1408+
1409+ binding_info. binding_sets . push ( ( vars, Arc :: new ( buf) ) ) ;
1410+ let mut updates = FrameUpdates :: with_capacity ( 1 ) ;
1411+ updates. finish_frame ( ) ;
1412+ drain_updates ! ( updates) ;
1413+ binding_info. binding_sets . pop ( ) ;
1414+ binding_info. move_back_node ( cover_atom, cover_node) ;
1415+ } else {
1416+ let proj =
1417+ SmallVec :: < [ ColumnId ; 4 ] > :: from_iter ( bind. iter ( ) . map ( |( col, _) | * col) ) ;
1418+ let cover_node = binding_info. unwrap_val ( cover_atom) ;
1419+ let cover_subset = cover_node. subset . as_ref ( ) ;
1420+ let mut offset = Offset :: new ( 0 ) ;
1421+ let mut buffer = TaggedRowBuffer :: new ( bind. len ( ) ) ;
1422+ let mut updates = FrameUpdates :: with_capacity ( cmp:: min ( chunk_size, cur_size) ) ;
1423+ loop {
1424+ buffer. clear ( ) ;
1425+ let table = & self . db . tables [ atoms[ cover_atom] . table ] . table ;
1426+ let next = table. scan_project (
1427+ cover_subset,
1428+ & proj,
1429+ offset,
1430+ chunk_size,
1431+ & cover. constraints ,
1432+ & mut buffer,
1433+ ) ;
1434+ for ( row, key) in buffer. iter ( ) {
1435+ updates. refine_atom_dense ( cover_atom, OffsetRange :: new ( row, row. inc ( ) ) ) ;
1436+ // bind the values
1437+ for ( i, ( _, var) ) in bind. iter ( ) . enumerate ( ) {
1438+ updates. push_binding ( * var, key[ i] ) ;
1439+ }
1440+ updates. finish_frame ( ) ;
1441+ if updates. frames ( ) >= chunk_size {
1442+ drain_updates ! ( updates) ;
1443+ }
14381444 }
1439- updates . finish_frame ( ) ;
1440- if updates . frames ( ) >= chunk_size {
1441- drain_updates ! ( updates ) ;
1445+ if let Some ( next ) = next {
1446+ offset = next ;
1447+ continue ;
14421448 }
1449+ break ;
14431450 }
1444- if let Some ( next) = next {
1445- cur = next;
1446- continue ;
1447- }
1448- break ;
1451+ drain_updates ! ( updates) ;
1452+ // Restore the subsets we swapped out.
1453+ binding_info. move_back_node ( cover_atom, cover_node) ;
14491454 }
1450- drain_updates ! ( updates) ;
1451- // Restore the subsets we swapped out.
1452- binding_info. move_back_node ( cover_atom, cover_node) ;
14531455 }
14541456 JoinStage :: FusedIntersect {
14551457 cover,
14561458 bind,
14571459 to_intersect,
1458- is_leaf_scan : _,
14591460 } => {
14601461 let cover_atom = cover. to_index . atom ;
14611462 if binding_info. has_empty_subset ( cover_atom) {
@@ -2209,6 +2210,7 @@ fn num_intersected_rels(join_stage: &JoinStage) -> i32 {
22092210
22102211fn sort_plan_by_size (
22112212 order : & mut InstrOrder ,
2213+ leaf_scans : & mut LeafScans ,
22122214 start : usize ,
22132215 instrs : & [ JoinStage ] ,
22142216 binding_info : & mut BindingInfo ,
@@ -2228,6 +2230,79 @@ fn sort_plan_by_size(
22282230 }
22292231 }
22302232 sort_plan_by_size_inner ( order, last_pos..instrs. len ( ) , instrs, binding_info) ;
2233+ recompute_leaf_scans ( order, leaf_scans, instrs, start) ;
2234+ }
2235+
2236+ /// Recompute `leaf_scans[i]` for every position `i` in `[start, order.len())` against the
2237+ /// current order. A position is a leaf scan iff its stage is a `FusedIntersect` with empty
2238+ /// `to_intersect`, no later stage references the same cover atom, and no later
2239+ /// `FusedIntersectMat { mode: Value | Lookup }` reads any of the bound variables.
2240+ fn recompute_leaf_scans (
2241+ order : & InstrOrder ,
2242+ leaf_scans : & mut LeafScans ,
2243+ instrs : & [ JoinStage ] ,
2244+ start : usize ,
2245+ ) {
2246+ for i in start..order. len ( ) {
2247+ let stage_idx = order. get ( i) ;
2248+ let ( cover_atom, bind_vars) = match & instrs[ stage_idx] {
2249+ JoinStage :: FusedIntersect {
2250+ cover,
2251+ bind,
2252+ to_intersect,
2253+ } if to_intersect. is_empty ( ) => {
2254+ let vars: SmallVec < [ Variable ; 4 ] > = bind. iter ( ) . map ( |( _, v) | * v) . collect ( ) ;
2255+ ( cover. to_index . atom , vars)
2256+ }
2257+ _ => {
2258+ leaf_scans[ i] = false ;
2259+ continue ;
2260+ }
2261+ } ;
2262+ let mut blocked = false ;
2263+ for j in ( i + 1 ) ..order. len ( ) {
2264+ match & instrs[ order. get ( j) ] {
2265+ JoinStage :: Intersect { scans, .. } => {
2266+ if scans. iter ( ) . any ( |scan| scan. atom == cover_atom) {
2267+ blocked = true ;
2268+ break ;
2269+ }
2270+ }
2271+ JoinStage :: FusedIntersect {
2272+ cover,
2273+ to_intersect,
2274+ ..
2275+ } => {
2276+ if cover. to_index . atom == cover_atom
2277+ || to_intersect
2278+ . iter ( )
2279+ . any ( |( s, _) | s. to_index . atom == cover_atom)
2280+ {
2281+ blocked = true ;
2282+ break ;
2283+ }
2284+ }
2285+ JoinStage :: FusedIntersectMat {
2286+ mode, to_intersect, ..
2287+ } => {
2288+ if to_intersect
2289+ . iter ( )
2290+ . any ( |( s, _) | s. to_index . atom == cover_atom)
2291+ {
2292+ blocked = true ;
2293+ break ;
2294+ }
2295+ if let MatScanMode :: Value ( vars) | MatScanMode :: Lookup ( vars) = mode
2296+ && vars. iter ( ) . any ( |v| bind_vars. contains ( v) )
2297+ {
2298+ blocked = true ;
2299+ break ;
2300+ }
2301+ }
2302+ }
2303+ }
2304+ leaf_scans[ i] = !blocked;
2305+ }
22312306}
22322307
22332308fn sort_plan_by_size_inner (
@@ -2365,8 +2440,14 @@ impl InstrOrder {
23652440 }
23662441}
23672442
2443+ /// Per-position leaf-scan flags. `leaf_scans[i] == true` means the stage currently scheduled at
2444+ /// position `i` (i.e. `instrs[instr_order.get(i)]`) can take the factorized-binding fast path.
2445+ /// Recomputed by [`sort_plan_by_size`] whenever the order changes.
2446+ type LeafScans = SmallVec < [ bool ; 8 ] > ;
2447+
23682448struct BorrowedLocalState < ' a > {
23692449 instr_order : & ' a mut InstrOrder ,
2450+ leaf_scans : & ' a mut LeafScans ,
23702451 binding_info : & ' a mut BindingInfo ,
23712452 updates : & ' a mut FrameUpdates ,
23722453}
@@ -2375,6 +2456,7 @@ impl BorrowedLocalState<'_> {
23752456 fn clone_state ( & mut self ) -> LocalState {
23762457 LocalState {
23772458 instr_order : self . instr_order . clone ( ) ,
2459+ leaf_scans : self . leaf_scans . clone ( ) ,
23782460 binding_info : self . binding_info . clone ( ) ,
23792461 updates : std:: mem:: take ( self . updates ) ,
23802462 }
@@ -2383,6 +2465,7 @@ impl BorrowedLocalState<'_> {
23832465
23842466struct LocalState {
23852467 instr_order : InstrOrder ,
2468+ leaf_scans : LeafScans ,
23862469 binding_info : BindingInfo ,
23872470 updates : FrameUpdates ,
23882471}
@@ -2391,6 +2474,7 @@ impl LocalState {
23912474 fn borrow_mut < ' a > ( & ' a mut self ) -> BorrowedLocalState < ' a > {
23922475 BorrowedLocalState {
23932476 instr_order : & mut self . instr_order ,
2477+ leaf_scans : & mut self . leaf_scans ,
23942478 binding_info : & mut self . binding_info ,
23952479 updates : & mut self . updates ,
23962480 }
0 commit comments