Skip to content

Commit ac411f0

Browse files
committed
Return Transformed::no when possible
Tests ported from #21964
1 parent 4f727f4 commit ac411f0

1 file changed

Lines changed: 61 additions & 19 deletions

File tree

datafusion/optimizer/src/push_down_filter.rs

Lines changed: 61 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,7 @@ fn push_down_all_join(
423423
let mut keep_predicates = vec![];
424424
let mut join_conditions = vec![];
425425
let mut checker = ColumnChecker::new(left_schema, right_schema);
426+
let mut join_condition_added = false;
426427
for predicate in predicates {
427428
if left_preserved && checker.is_left_only(&predicate) {
428429
left_push.push(predicate);
@@ -432,6 +433,7 @@ fn push_down_all_join(
432433
// Here we do not differ it is eq or non-eq predicate, ExtractEquijoinPredicate will extract the eq predicate
433434
// and convert to the join on condition
434435
join_conditions.push(predicate);
436+
join_condition_added = true;
435437
} else {
436438
keep_predicates.push(predicate);
437439
}
@@ -448,16 +450,13 @@ fn push_down_all_join(
448450

449451
let mut on_filter_join_conditions = vec![];
450452
let (on_left_preserved, on_right_preserved) = on_lr_is_preserved(join.join_type);
451-
452-
if !on_filter.is_empty() {
453-
for on in on_filter {
454-
if on_left_preserved && checker.is_left_only(&on) {
455-
left_push.push(on)
456-
} else if on_right_preserved && checker.is_right_only(&on) {
457-
right_push.push(on)
458-
} else {
459-
on_filter_join_conditions.push(on)
460-
}
453+
for on in on_filter {
454+
if on_left_preserved && checker.is_left_only(&on) {
455+
left_push.push(on)
456+
} else if on_right_preserved && checker.is_right_only(&on) {
457+
right_push.push(on)
458+
} else {
459+
on_filter_join_conditions.push(on)
461460
}
462461
}
463462

@@ -473,6 +472,7 @@ fn push_down_all_join(
473472
&left_schema_columns,
474473
));
475474
}
475+
476476
if right_preserved {
477477
right_push.extend(extract_or_clauses_for_join(
478478
&keep_predicates,
@@ -492,13 +492,26 @@ fn push_down_all_join(
492492
&left_schema_columns,
493493
));
494494
}
495+
495496
if on_right_preserved {
496497
right_push.extend(extract_or_clauses_for_join(
497498
&on_filter_join_conditions,
498499
&right_schema_columns,
499500
));
500501
}
501502

503+
// Add any new join conditions as the non join predicates
504+
join_conditions.extend(on_filter_join_conditions);
505+
join.filter = conjunction(join_conditions);
506+
507+
if !join_condition_added && left_push.is_empty() && right_push.is_empty() {
508+
// wrap the join on the filter whose predicates must be kept, if any
509+
return Ok(Transformed::no(with_filters(
510+
keep_predicates,
511+
LogicalPlan::Join(join),
512+
)));
513+
}
514+
502515
if let Some(predicate) = conjunction(left_push) {
503516
join.left = Arc::new(LogicalPlan::Filter(Filter::new_unchecked(
504517
predicate, join.left,
@@ -511,10 +524,6 @@ fn push_down_all_join(
511524
)));
512525
}
513526

514-
// Add any new join conditions as the non join predicates
515-
join_conditions.extend(on_filter_join_conditions);
516-
join.filter = conjunction(join_conditions);
517-
518527
// wrap the join on the filter whose predicates must be kept, if any
519528
Ok(Transformed::yes(with_filters(
520529
keep_predicates,
@@ -1140,14 +1149,25 @@ impl OptimizerRule for PushDownFilter {
11401149
.map(|(&pred, _)| pred);
11411150

11421151
// Add new scan filters
1143-
scan.filters = scan
1152+
let new_scan_filters = scan
11441153
.filters
11451154
.iter()
11461155
.chain(new_scan_filters)
11471156
.unique()
11481157
.cloned()
11491158
.collect();
11501159

1160+
if supported_filters
1161+
.iter()
1162+
.all(|res| res == &TableProviderFilterPushDown::Inexact)
1163+
&& scan.filters == new_scan_filters
1164+
{
1165+
filter.input = Arc::new(LogicalPlan::TableScan(scan));
1166+
return Ok(Transformed::no(LogicalPlan::Filter(filter)));
1167+
} else {
1168+
scan.filters = new_scan_filters;
1169+
}
1170+
11511171
// Compose predicates to be of `Unsupported` or `Inexact` pushdown type,
11521172
// and also include volatile and subquery-containing filters
11531173
let new_predicate: Vec<Expr> = zip
@@ -1605,6 +1625,10 @@ mod tests {
16051625
.aggregate(vec![col("a")], vec![sum(col("b")).alias("b")])?
16061626
.filter(col("b").gt(lit(10i64)))?
16071627
.build()?;
1628+
let transformed = PushDownFilter::new()
1629+
.rewrite(plan.clone(), &OptimizerContext::new())
1630+
.expect("failed to optimize plan");
1631+
assert!(!transformed.transformed);
16081632
// filter of aggregate is after aggregation since they are non-commutative
16091633
assert_optimized_plan_equal!(
16101634
plan,
@@ -1797,6 +1821,10 @@ mod tests {
17971821
.window(vec![window])?
17981822
.filter(col("c").gt(lit(10i64)))?
17991823
.build()?;
1824+
let transformed = PushDownFilter::new()
1825+
.rewrite(plan.clone(), &OptimizerContext::new())
1826+
.expect("failed to optimize plan");
1827+
assert!(!transformed.transformed);
18001828

18011829
assert_optimized_plan_equal!(
18021830
plan,
@@ -3022,6 +3050,10 @@ mod tests {
30223050
Some(filter),
30233051
)?
30243052
.build()?;
3053+
let transformed = PushDownFilter::new()
3054+
.rewrite(plan.clone(), &OptimizerContext::new())
3055+
.expect("failed to optimize plan");
3056+
assert!(!transformed.transformed);
30253057

30263058
// not part of the test, just good to know:
30273059
assert_snapshot!(plan,
@@ -3128,15 +3160,20 @@ mod tests {
31283160
let plan =
31293161
table_scan_with_pushdown_provider(TableProviderFilterPushDown::Inexact)?;
31303162

3131-
let optimized_plan = PushDownFilter::new()
3163+
let optimized = PushDownFilter::new()
31323164
.rewrite(plan, &OptimizerContext::new())
3133-
.expect("failed to optimize plan")
3134-
.data;
3165+
.expect("failed to optimize plan");
3166+
assert!(optimized.transformed);
3167+
3168+
let optimized_again = PushDownFilter::new()
3169+
.rewrite(optimized.data.clone(), &OptimizerContext::new())
3170+
.expect("failed to optimize plan");
3171+
assert!(!optimized_again.transformed);
31353172

31363173
// Optimizing the same plan multiple times should produce the same plan
31373174
// each time.
31383175
assert_optimized_plan_equal!(
3139-
optimized_plan,
3176+
optimized_again.data,
31403177
@r"
31413178
Filter: a = Int64(1)
31423179
TableScan: test, partial_filters=[a = Int64(1)]
@@ -3149,6 +3186,11 @@ mod tests {
31493186
let plan =
31503187
table_scan_with_pushdown_provider(TableProviderFilterPushDown::Unsupported)?;
31513188

3189+
let transformed = PushDownFilter::new()
3190+
.rewrite(plan.clone(), &OptimizerContext::new())
3191+
.expect("failed to optimize plan");
3192+
assert!(!transformed.transformed);
3193+
31523194
assert_optimized_plan_equal!(
31533195
plan,
31543196
@r"

0 commit comments

Comments
 (0)