Skip to content

Commit c96c285

Browse files
authored
Revert "Skip probe-side consumption when hash join build side is empty (apache#21068)" (#135)
This reverts commit 42ecad5.
1 parent 12d6c81 commit c96c285

4 files changed

Lines changed: 83 additions & 260 deletions

File tree

datafusion/common/src/join_type.rs

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -113,20 +113,6 @@ impl JoinType {
113113
| JoinType::RightMark
114114
)
115115
}
116-
117-
/// Returns true when an empty build side necessarily produces an empty
118-
/// result for this join type.
119-
pub fn empty_build_side_produces_empty_result(self) -> bool {
120-
matches!(
121-
self,
122-
JoinType::Inner
123-
| JoinType::Left
124-
| JoinType::LeftSemi
125-
| JoinType::LeftAnti
126-
| JoinType::LeftMark
127-
| JoinType::RightSemi
128-
)
129-
}
130116
}
131117

132118
impl Display for JoinType {

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 42 additions & 195 deletions
Original file line numberDiff line numberDiff line change
@@ -2215,110 +2215,6 @@ mod tests {
22152215
)
22162216
}
22172217

2218-
fn empty_build_with_probe_error_inputs()
2219-
-> (Arc<dyn ExecutionPlan>, Arc<dyn ExecutionPlan>, JoinOn) {
2220-
let left_batch =
2221-
build_table_i32(("a1", &vec![]), ("b1", &vec![]), ("c1", &vec![]));
2222-
let left_schema = left_batch.schema();
2223-
let left: Arc<dyn ExecutionPlan> = TestMemoryExec::try_new_exec(
2224-
&[vec![left_batch]],
2225-
Arc::clone(&left_schema),
2226-
None,
2227-
)
2228-
.unwrap();
2229-
2230-
let err = exec_err!("bad data error");
2231-
let right_batch =
2232-
build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![]));
2233-
let right_schema = right_batch.schema();
2234-
let on = vec![(
2235-
Arc::new(Column::new_with_schema("b1", &left_schema).unwrap()) as _,
2236-
Arc::new(Column::new_with_schema("b1", &right_schema).unwrap()) as _,
2237-
)];
2238-
let right: Arc<dyn ExecutionPlan> = Arc::new(
2239-
MockExec::new(vec![Ok(right_batch), err], right_schema).with_use_task(false),
2240-
);
2241-
2242-
(left, right, on)
2243-
}
2244-
2245-
async fn assert_empty_build_probe_behavior(
2246-
join_types: &[JoinType],
2247-
expect_probe_error: bool,
2248-
with_filter: bool,
2249-
) {
2250-
let (left, right, on) = empty_build_with_probe_error_inputs();
2251-
let filter = prepare_join_filter();
2252-
2253-
for join_type in join_types {
2254-
let join = if with_filter {
2255-
join_with_filter(
2256-
Arc::clone(&left),
2257-
Arc::clone(&right),
2258-
on.clone(),
2259-
filter.clone(),
2260-
join_type,
2261-
NullEquality::NullEqualsNothing,
2262-
)
2263-
.unwrap()
2264-
} else {
2265-
join(
2266-
Arc::clone(&left),
2267-
Arc::clone(&right),
2268-
on.clone(),
2269-
join_type,
2270-
NullEquality::NullEqualsNothing,
2271-
)
2272-
.unwrap()
2273-
};
2274-
2275-
let result = common::collect(
2276-
join.execute(0, Arc::new(TaskContext::default())).unwrap(),
2277-
)
2278-
.await;
2279-
2280-
if expect_probe_error {
2281-
let result_string = result.unwrap_err().to_string();
2282-
assert!(
2283-
result_string.contains("bad data error"),
2284-
"actual: {result_string}"
2285-
);
2286-
} else {
2287-
let batches = result.unwrap();
2288-
assert!(
2289-
batches.is_empty(),
2290-
"expected no output batches for {join_type}, got {batches:?}"
2291-
);
2292-
}
2293-
}
2294-
}
2295-
2296-
fn hash_join_with_dynamic_filter(
2297-
left: Arc<dyn ExecutionPlan>,
2298-
right: Arc<dyn ExecutionPlan>,
2299-
on: JoinOn,
2300-
join_type: JoinType,
2301-
) -> Result<(HashJoinExec, Arc<DynamicFilterPhysicalExpr>)> {
2302-
let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
2303-
let mut join = HashJoinExec::try_new(
2304-
left,
2305-
right,
2306-
on,
2307-
None,
2308-
&join_type,
2309-
None,
2310-
PartitionMode::CollectLeft,
2311-
NullEquality::NullEqualsNothing,
2312-
false,
2313-
)?;
2314-
join.dynamic_filter = Some(HashJoinExecDynamicFilter {
2315-
filter: Arc::clone(&dynamic_filter),
2316-
build_accumulator: OnceLock::new(),
2317-
});
2318-
2319-
Ok((join, dynamic_filter))
2320-
}
2321-
23222218
async fn join_collect(
23232219
left: Arc<dyn ExecutionPlan>,
23242220
right: Arc<dyn ExecutionPlan>,
@@ -5027,70 +4923,6 @@ mod tests {
50274923
}
50284924
}
50294925

5030-
#[tokio::test]
5031-
async fn join_does_not_consume_probe_when_empty_build_fixes_output() {
5032-
assert_empty_build_probe_behavior(
5033-
&[
5034-
JoinType::Inner,
5035-
JoinType::Left,
5036-
JoinType::LeftSemi,
5037-
JoinType::LeftAnti,
5038-
JoinType::LeftMark,
5039-
JoinType::RightSemi,
5040-
],
5041-
false,
5042-
false,
5043-
)
5044-
.await;
5045-
}
5046-
5047-
#[tokio::test]
5048-
async fn join_does_not_consume_probe_when_empty_build_fixes_output_with_filter() {
5049-
assert_empty_build_probe_behavior(
5050-
&[
5051-
JoinType::Inner,
5052-
JoinType::Left,
5053-
JoinType::LeftSemi,
5054-
JoinType::LeftAnti,
5055-
JoinType::LeftMark,
5056-
JoinType::RightSemi,
5057-
],
5058-
false,
5059-
true,
5060-
)
5061-
.await;
5062-
}
5063-
5064-
#[tokio::test]
5065-
async fn join_still_consumes_probe_when_empty_build_needs_probe_rows() {
5066-
assert_empty_build_probe_behavior(
5067-
&[
5068-
JoinType::Right,
5069-
JoinType::Full,
5070-
JoinType::RightAnti,
5071-
JoinType::RightMark,
5072-
],
5073-
true,
5074-
false,
5075-
)
5076-
.await;
5077-
}
5078-
5079-
#[tokio::test]
5080-
async fn join_still_consumes_probe_when_empty_build_needs_probe_rows_with_filter() {
5081-
assert_empty_build_probe_behavior(
5082-
&[
5083-
JoinType::Right,
5084-
JoinType::Full,
5085-
JoinType::RightAnti,
5086-
JoinType::RightMark,
5087-
],
5088-
true,
5089-
true,
5090-
)
5091-
.await;
5092-
}
5093-
50944926
#[tokio::test]
50954927
async fn join_split_batch() {
50964928
let left = build_table(
@@ -5534,16 +5366,34 @@ mod tests {
55345366
Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
55355367
)];
55365368

5537-
let (join, dynamic_filter) =
5538-
hash_join_with_dynamic_filter(left, right, on, JoinType::Inner)?;
5369+
// Create a dynamic filter manually
5370+
let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
5371+
let dynamic_filter_clone = Arc::clone(&dynamic_filter);
5372+
5373+
// Create HashJoinExec with the dynamic filter
5374+
let mut join = HashJoinExec::try_new(
5375+
left,
5376+
right,
5377+
on,
5378+
None,
5379+
&JoinType::Inner,
5380+
None,
5381+
PartitionMode::CollectLeft,
5382+
NullEquality::NullEqualsNothing,
5383+
false,
5384+
)?;
5385+
join.dynamic_filter = Some(HashJoinExecDynamicFilter {
5386+
filter: dynamic_filter,
5387+
build_accumulator: OnceLock::new(),
5388+
});
55395389

55405390
// Execute the join
55415391
let stream = join.execute(0, task_ctx)?;
55425392
let _batches = common::collect(stream).await?;
55435393

55445394
// After the join completes, the dynamic filter should be marked as complete
55455395
// wait_complete() should return immediately
5546-
dynamic_filter.wait_complete().await;
5396+
dynamic_filter_clone.wait_complete().await;
55475397

55485398
Ok(())
55495399
}
@@ -5565,37 +5415,34 @@ mod tests {
55655415
Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
55665416
)];
55675417

5568-
let (join, dynamic_filter) =
5569-
hash_join_with_dynamic_filter(left, right, on, JoinType::Inner)?;
5418+
// Create a dynamic filter manually
5419+
let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
5420+
let dynamic_filter_clone = Arc::clone(&dynamic_filter);
5421+
5422+
// Create HashJoinExec with the dynamic filter
5423+
let mut join = HashJoinExec::try_new(
5424+
left,
5425+
right,
5426+
on,
5427+
None,
5428+
&JoinType::Inner,
5429+
None,
5430+
PartitionMode::CollectLeft,
5431+
NullEquality::NullEqualsNothing,
5432+
false,
5433+
)?;
5434+
join.dynamic_filter = Some(HashJoinExecDynamicFilter {
5435+
filter: dynamic_filter,
5436+
build_accumulator: OnceLock::new(),
5437+
});
55705438

55715439
// Execute the join
55725440
let stream = join.execute(0, task_ctx)?;
55735441
let _batches = common::collect(stream).await?;
55745442

55755443
// Even with empty build side, the dynamic filter should be marked as complete
55765444
// wait_complete() should return immediately
5577-
dynamic_filter.wait_complete().await;
5578-
5579-
Ok(())
5580-
}
5581-
5582-
#[tokio::test]
5583-
async fn test_hash_join_skips_probe_on_empty_build_after_partition_bounds_report()
5584-
-> Result<()> {
5585-
let task_ctx = Arc::new(TaskContext::default());
5586-
let (left, right, on) = empty_build_with_probe_error_inputs();
5587-
5588-
// Keep an extra consumer reference so execute() enables dynamic filter pushdown
5589-
// and enters the WaitPartitionBoundsReport path before deciding whether to poll
5590-
// the probe side.
5591-
let (join, dynamic_filter) =
5592-
hash_join_with_dynamic_filter(left, right, on, JoinType::Inner)?;
5593-
5594-
let stream = join.execute(0, task_ctx)?;
5595-
let batches = common::collect(stream).await?;
5596-
assert!(batches.is_empty());
5597-
5598-
dynamic_filter.wait_complete().await;
5445+
dynamic_filter_clone.wait_complete().await;
55995446

56005447
Ok(())
56015448
}

datafusion/physical-plan/src/joins/hash_join/stream.rs

Lines changed: 4 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -406,21 +406,6 @@ impl HashJoinStream {
406406
}
407407
}
408408

409-
/// Returns the next state after the build side has been fully collected
410-
/// and any required build-side coordination has completed.
411-
fn state_after_build_ready(
412-
join_type: JoinType,
413-
left_data: &JoinLeftData,
414-
) -> HashJoinStreamState {
415-
if left_data.map().is_empty()
416-
&& join_type.empty_build_side_produces_empty_result()
417-
{
418-
HashJoinStreamState::Completed
419-
} else {
420-
HashJoinStreamState::FetchProbeBatch
421-
}
422-
}
423-
424409
/// Separate implementation function that unpins the [`HashJoinStream`] so
425410
/// that partial borrows work correctly
426411
fn poll_next_impl(
@@ -484,9 +469,7 @@ impl HashJoinStream {
484469
if let Some(ref mut fut) = self.build_waiter {
485470
ready!(fut.get_shared(cx))?;
486471
}
487-
let build_side = self.build_side.try_as_ready()?;
488-
self.state =
489-
Self::state_after_build_ready(self.join_type, build_side.left_data.as_ref());
472+
self.state = HashJoinStreamState::FetchProbeBatch;
490473
Poll::Ready(Ok(StatefulStreamResult::Continue))
491474
}
492475

@@ -557,8 +540,7 @@ impl HashJoinStream {
557540
}));
558541
self.state = HashJoinStreamState::WaitPartitionBoundsReport;
559542
} else {
560-
self.state =
561-
Self::state_after_build_ready(self.join_type, left_data.as_ref());
543+
self.state = HashJoinStreamState::FetchProbeBatch;
562544
}
563545

564546
self.build_side = BuildSide::Ready(BuildSideReadyState { left_data });
@@ -661,14 +643,10 @@ impl HashJoinStream {
661643
}
662644
}
663645

664-
// If the build side is empty, this stream only reaches ProcessProbeBatch for
665-
// join types whose output still depends on probe rows.
646+
// if the left side is empty, we can skip the (potentially expensive) join operation
666647
let is_empty = build_side.left_data.map().is_empty();
667648

668-
if is_empty {
669-
// Invariant: state_after_build_ready should have already completed
670-
// join types whose result is fixed to empty when the build side is empty.
671-
debug_assert!(!self.join_type.empty_build_side_produces_empty_result());
649+
if is_empty && self.filter.is_none() {
672650
let result = build_batch_empty_build_side(
673651
&self.schema,
674652
build_side.left_data.batch(),

0 commit comments

Comments
 (0)