Skip to content

Commit 936f959

Browse files
Add FilterExecBuilder to avoid recomputing properties multiple times (#19854)
- Closes #19608, - replaces #19619 --------- Co-authored-by: Ganesh Patil <7030871503ganeshpatil@gmail.com>
1 parent fc7d090 commit 936f959

File tree

7 files changed

+550
-98
lines changed

7 files changed

+550
-98
lines changed

datafusion/core/src/physical_planner.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use crate::physical_expr::{create_physical_expr, create_physical_exprs};
3939
use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
4040
use crate::physical_plan::analyze::AnalyzeExec;
4141
use crate::physical_plan::explain::ExplainExec;
42-
use crate::physical_plan::filter::FilterExec;
42+
use crate::physical_plan::filter::FilterExecBuilder;
4343
use crate::physical_plan::joins::utils as join_utils;
4444
use crate::physical_plan::joins::{
4545
CrossJoinExec, HashJoinExec, NestedLoopJoinExec, PartitionMode, SortMergeJoinExec,
@@ -938,8 +938,12 @@ impl DefaultPhysicalPlanner {
938938
input_schema.as_arrow(),
939939
)? {
940940
PlanAsyncExpr::Sync(PlannedExprResult::Expr(runtime_expr)) => {
941-
FilterExec::try_new(Arc::clone(&runtime_expr[0]), physical_input)?
942-
.with_batch_size(session_state.config().batch_size())?
941+
FilterExecBuilder::new(
942+
Arc::clone(&runtime_expr[0]),
943+
physical_input,
944+
)
945+
.with_batch_size(session_state.config().batch_size())
946+
.build()?
943947
}
944948
PlanAsyncExpr::Async(
945949
async_map,
@@ -949,16 +953,17 @@ impl DefaultPhysicalPlanner {
949953
async_map.async_exprs,
950954
physical_input,
951955
)?;
952-
FilterExec::try_new(
956+
FilterExecBuilder::new(
953957
Arc::clone(&runtime_expr[0]),
954958
Arc::new(async_exec),
955-
)?
959+
)
956960
// project the output columns excluding the async functions
957961
// The async functions are always appended to the end of the schema.
958-
.with_projection(Some(
962+
.apply_projection(Some(
959963
(0..input.schema().fields().len()).collect(),
960964
))?
961-
.with_batch_size(session_state.config().batch_size())?
965+
.with_batch_size(session_state.config().batch_size())
966+
.build()?
962967
}
963968
_ => {
964969
return internal_err!(

datafusion/core/tests/parquet/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -576,9 +576,9 @@ fn make_uint_batches(start: u8, end: u8) -> RecordBatch {
576576
Field::new("u64", DataType::UInt64, true),
577577
]));
578578
let v8: Vec<u8> = (start..end).collect();
579-
let v16: Vec<u16> = (start as _..end as _).collect();
580-
let v32: Vec<u32> = (start as _..end as _).collect();
581-
let v64: Vec<u64> = (start as _..end as _).collect();
579+
let v16: Vec<u16> = (start as u16..end as u16).collect();
580+
let v32: Vec<u32> = (start as u32..end as u32).collect();
581+
let v64: Vec<u64> = (start as u64..end as u64).collect();
582582
RecordBatch::try_new(
583583
schema,
584584
vec![

datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ use datafusion_physical_plan::{
5858
aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy},
5959
coalesce_partitions::CoalescePartitionsExec,
6060
collect,
61-
filter::FilterExec,
61+
filter::{FilterExec, FilterExecBuilder},
6262
repartition::RepartitionExec,
6363
sorts::sort::SortExec,
6464
};
@@ -480,9 +480,10 @@ fn test_filter_with_projection() {
480480
let projection = vec![1, 0];
481481
let predicate = col_lit_predicate("a", "foo", &schema());
482482
let plan = Arc::new(
483-
FilterExec::try_new(predicate, Arc::clone(&scan))
483+
FilterExecBuilder::new(predicate, Arc::clone(&scan))
484+
.apply_projection(Some(projection))
484485
.unwrap()
485-
.with_projection(Some(projection))
486+
.build()
486487
.unwrap(),
487488
);
488489

@@ -505,9 +506,10 @@ fn test_filter_with_projection() {
505506
let projection = vec![1];
506507
let predicate = col_lit_predicate("a", "foo", &schema());
507508
let plan = Arc::new(
508-
FilterExec::try_new(predicate, scan)
509+
FilterExecBuilder::new(predicate, scan)
510+
.apply_projection(Some(projection))
509511
.unwrap()
510-
.with_projection(Some(projection))
512+
.build()
511513
.unwrap(),
512514
);
513515
insta::assert_snapshot!(
@@ -564,9 +566,9 @@ fn test_pushdown_through_aggregates_on_grouping_columns() {
564566
let scan = TestScanBuilder::new(schema()).with_support(true).build();
565567

566568
let filter = Arc::new(
567-
FilterExec::try_new(col_lit_predicate("a", "foo", &schema()), scan)
568-
.unwrap()
569+
FilterExecBuilder::new(col_lit_predicate("a", "foo", &schema()), scan)
569570
.with_batch_size(10)
571+
.build()
570572
.unwrap(),
571573
);
572574

@@ -596,9 +598,9 @@ fn test_pushdown_through_aggregates_on_grouping_columns() {
596598

597599
let predicate = col_lit_predicate("b", "bar", &schema());
598600
let plan = Arc::new(
599-
FilterExec::try_new(predicate, aggregate)
600-
.unwrap()
601+
FilterExecBuilder::new(predicate, aggregate)
601602
.with_batch_size(100)
603+
.build()
602604
.unwrap(),
603605
);
604606

0 commit comments

Comments
 (0)