Skip to content

Commit 334ca91

Browse files
rename
1 parent 921f64c commit 334ca91

6 files changed

Lines changed: 75 additions & 74 deletions

File tree

datafusion/core/tests/physical_optimizer/filter_pushdown.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2835,7 +2835,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_is_used() {
28352835

28362836
// Verify that a dynamic filter was created
28372837
let dynamic_filter = hash_join
2838-
.dynamic_filter()
2838+
.dynamic_filter_expr()
28392839
.expect("Dynamic filter should be created");
28402840

28412841
// Verify that is_used() returns the expected value based on probe side support.

datafusion/physical-plan/src/aggregates/mod.rs

Lines changed: 51 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -893,6 +893,47 @@ impl AggregateExec {
893893
&self.filter_expr
894894
}
895895

896+
/// Returns the dynamic filter expression for this aggregate, if set.
897+
pub fn dynamic_filter_expr(&self) -> Option<&Arc<DynamicFilterPhysicalExpr>> {
898+
self.dynamic_filter.as_ref().map(|df| &df.filter)
899+
}
900+
901+
/// Replace the dynamic filter expression. This method errors if the aggregate does not
902+
/// support dynamic filtering or if the filter expression is incompatible with this
903+
/// [`AggregateExec`].
904+
pub fn with_dynamic_filter_expr(
905+
mut self,
906+
filter: Arc<DynamicFilterPhysicalExpr>,
907+
) -> Result<Self> {
908+
// If there is no dynamic filter state initialized via `try_new`, then
909+
// we can safely assume that the aggregate does not support dynamic filtering.
910+
let Some(dyn_filter) = self.dynamic_filter.as_ref() else {
911+
return internal_err!("Aggregate does not support dynamic filtering");
912+
};
913+
914+
// Validate that the filter is compatible with the aggregation columns.
915+
let cols = self.cols_for_dynamic_filter(&dyn_filter.supported_accumulators_info);
916+
if cols.len() != filter.children().len() {
917+
return internal_err!(
918+
"Dynamic filter expression is incompatible with aggregate due to mismatched number of columns"
919+
);
920+
}
921+
for (col, child) in cols.iter().zip(filter.children()) {
922+
if !col.eq(child) {
923+
return internal_err!(
924+
"Dynamic filter expression is incompatible with aggregate due to mismatched column references {col} != {child}"
925+
);
926+
}
927+
}
928+
929+
// Overwrite our filter
930+
self.dynamic_filter = Some(Arc::new(AggrDynFilter {
931+
filter,
932+
supported_accumulators_info: dyn_filter.supported_accumulators_info.clone(),
933+
}));
934+
Ok(self)
935+
}
936+
896937
/// Input plan
897938
pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
898939
&self.input
@@ -1048,47 +1089,6 @@ impl AggregateExec {
10481089
&self.input_order_mode
10491090
}
10501091

1051-
/// Returns the dynamic filter expression for this aggregate, if set.
1052-
pub fn dynamic_filter(&self) -> Option<&Arc<DynamicFilterPhysicalExpr>> {
1053-
self.dynamic_filter.as_ref().map(|df| &df.filter)
1054-
}
1055-
1056-
/// Replace the dynamic filter expression. This method errors if the aggregate does not
1057-
/// support dynamic filtering or if the filter expression is incompatible with this
1058-
/// [`AggregateExec`].
1059-
pub fn with_dynamic_filter(
1060-
mut self,
1061-
filter: Arc<DynamicFilterPhysicalExpr>,
1062-
) -> Result<Self> {
1063-
// If there is no dynamic filter state initialized via `try_new`, then
1064-
// we can safely assume that the aggregate does not support dynamic filtering.
1065-
let Some(dyn_filter) = self.dynamic_filter.as_ref() else {
1066-
return internal_err!("Aggregate does not support dynamic filtering");
1067-
};
1068-
1069-
// Validate that the filter is compatible with the aggregation columns.
1070-
let cols = self.cols_for_dynamic_filter(&dyn_filter.supported_accumulators_info);
1071-
if cols.len() != filter.children().len() {
1072-
return internal_err!(
1073-
"Dynamic filter expression is incompatible with aggregate due to mismatched number of columns"
1074-
);
1075-
}
1076-
for (col, child) in cols.iter().zip(filter.children()) {
1077-
if !col.eq(child) {
1078-
return internal_err!(
1079-
"Dynamic filter expression is incompatible with aggregate due to mismatched column references {col} != {child}"
1080-
);
1081-
}
1082-
}
1083-
1084-
// Overwrite our filter
1085-
self.dynamic_filter = Some(Arc::new(AggrDynFilter {
1086-
filter,
1087-
supported_accumulators_info: dyn_filter.supported_accumulators_info.clone(),
1088-
}));
1089-
Ok(self)
1090-
}
1091-
10921092
/// Estimates output statistics for this aggregate node.
10931093
///
10941094
/// For grouped aggregations with known input row count > 1, the output row
@@ -4842,7 +4842,7 @@ mod tests {
48424842
Ok(())
48434843
}
48444844

4845-
/// Test that [`AggregateExec::with_dynamic_filter`] overrides the existing dynamic filter
4845+
/// Test that [`AggregateExec::with_dynamic_filter_expr`] overrides the existing dynamic filter
48464846
#[test]
48474847
fn test_with_dynamic_filter() -> Result<()> {
48484848
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]));
@@ -4869,11 +4869,11 @@ mod tests {
48694869
vec![col("a", &schema)?],
48704870
lit(false),
48714871
));
4872-
let agg = agg.with_dynamic_filter(Arc::clone(&new_df))?;
4872+
let agg = agg.with_dynamic_filter_expr(Arc::clone(&new_df))?;
48734873

48744874
// The aggregate's filter should now resolve to the new inner expression.
48754875
let swapped = agg
4876-
.dynamic_filter()
4876+
.dynamic_filter_expr()
48774877
.expect("should still have dynamic filter")
48784878
.current()?;
48794879
assert_eq!(format!("{swapped}"), format!("{}", lit(false)));
@@ -4884,19 +4884,18 @@ mod tests {
48844884
Arc::<DynamicFilterPhysicalExpr>::clone(&new_df);
48854885
let remapped_pexpr =
48864886
new_df_as_pexpr.with_new_children(vec![col("a", &schema)?])?;
4887-
let Ok(remapped_df) = (remapped_pexpr
4888-
as Arc<dyn std::any::Any + Send + Sync>)
4887+
let Ok(remapped_df) = (remapped_pexpr as Arc<dyn std::any::Any + Send + Sync>)
48894888
.downcast::<DynamicFilterPhysicalExpr>()
48904889
else {
48914890
panic!("should be DynamicFilterPhysicalExpr after with_new_children");
48924891
};
48934892
// Hard to assert this because the filter is identical. No error means
48944893
// the filter was accepted. That's a good enough assertion for now.
4895-
let _agg = agg.with_dynamic_filter(remapped_df)?;
4894+
let _agg = agg.with_dynamic_filter_expr(remapped_df)?;
48964895
Ok(())
48974896
}
48984897

4899-
/// Test that [`AggregateExec::with_dynamic_filter`] errors when the aggregate does not support dynamic filtering
4898+
/// Test that [`AggregateExec::with_dynamic_filter_expr`] errors when the aggregate does not support dynamic filtering
49004899
#[test]
49014900
fn test_with_dynamic_filter_error_unsupported() -> Result<()> {
49024901
let schema = Arc::new(Schema::new(vec![
@@ -4919,17 +4918,17 @@ mod tests {
49194918
child,
49204919
Arc::clone(&schema),
49214920
)?;
4922-
assert!(agg.dynamic_filter().is_none());
4921+
assert!(agg.dynamic_filter_expr().is_none());
49234922

49244923
let df = Arc::new(DynamicFilterPhysicalExpr::new(
49254924
vec![col("a", &schema)?],
49264925
lit(true),
49274926
));
4928-
assert!(agg.with_dynamic_filter(df).is_err());
4927+
assert!(agg.with_dynamic_filter_expr(df).is_err());
49294928
Ok(())
49304929
}
49314930

4932-
/// Test that [`AggregateExec::with_dynamic_filter`] errors when the column is not in the schema
4931+
/// Test that [`AggregateExec::with_dynamic_filter_expr`] errors when the column is not in the schema
49334932
#[test]
49344933
fn test_with_dynamic_filter_error_column_mismatch() -> Result<()> {
49354934
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]));
@@ -4953,7 +4952,7 @@ mod tests {
49534952
vec![Arc::new(Column::new("bad", 99)) as _],
49544953
lit(true),
49554954
));
4956-
assert!(agg.with_dynamic_filter(df).is_err());
4955+
assert!(agg.with_dynamic_filter_expr(df).is_err());
49574956
Ok(())
49584957
}
49594958
}

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -904,7 +904,7 @@ impl HashJoinExec {
904904

905905
/// Get the dynamic filter expression for testing purposes.
906906
/// Returns the dynamic filter expression for this hash join, if set.
907-
pub fn dynamic_filter(&self) -> Option<&Arc<DynamicFilterPhysicalExpr>> {
907+
pub fn dynamic_filter_expr(&self) -> Option<&Arc<DynamicFilterPhysicalExpr>> {
908908
self.dynamic_filter.as_ref().map(|df| &df.filter)
909909
}
910910

@@ -914,7 +914,7 @@ impl HashJoinExec {
914914
///
915915
/// Validates that the filter's children reference valid columns in
916916
/// the probe (right) side's schema.
917-
pub fn with_dynamic_filter(
917+
pub fn with_dynamic_filter_expr(
918918
mut self,
919919
filter: Arc<DynamicFilterPhysicalExpr>,
920920
) -> Result<Self> {
@@ -6345,15 +6345,17 @@ mod tests {
63456345
NullEquality::NullEqualsNothing,
63466346
false,
63476347
)?;
6348-
assert!(join.dynamic_filter().is_none());
6348+
assert!(join.dynamic_filter_expr().is_none());
63496349

63506350
let df = Arc::new(DynamicFilterPhysicalExpr::new(
63516351
vec![Arc::new(Column::new("b1", 1)) as _],
63526352
lit(true),
63536353
));
6354-
let join = join.with_dynamic_filter(Arc::clone(&df))?;
6354+
let join = join.with_dynamic_filter_expr(Arc::clone(&df))?;
63556355

6356-
let restored = join.dynamic_filter().expect("should have dynamic filter");
6356+
let restored = join
6357+
.dynamic_filter_expr()
6358+
.expect("should have dynamic filter");
63576359
assert_eq!(
63586360
restored
63596361
.expression_id()
@@ -6387,7 +6389,7 @@ mod tests {
63876389
vec![Arc::new(Column::new("bad", 99)) as _],
63886390
lit(true),
63896391
));
6390-
assert!(join.with_dynamic_filter(df).is_err());
6392+
assert!(join.with_dynamic_filter_expr(df).is_err());
63916393
Ok(())
63926394
}
63936395
}

datafusion/physical-plan/src/sorts/sort.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -979,7 +979,7 @@ impl SortExec {
979979
}
980980

981981
/// Returns the dynamic filter expression for this sort (TopK), if set.
982-
pub fn dynamic_filter(&self) -> Option<Arc<DynamicFilterPhysicalExpr>> {
982+
pub fn dynamic_filter_expr(&self) -> Option<Arc<DynamicFilterPhysicalExpr>> {
983983
self.filter.as_ref().map(|f| f.read().expr())
984984
}
985985

@@ -990,7 +990,7 @@ impl SortExec {
990990
///
991991
/// Validates that the filter's children reference valid columns in
992992
/// the sort's input schema.
993-
pub fn with_dynamic_filter(
993+
pub fn with_dynamic_filter_expr(
994994
mut self,
995995
filter: Arc<DynamicFilterPhysicalExpr>,
996996
) -> Result<Self> {
@@ -2764,7 +2764,7 @@ mod tests {
27642764

27652765
// SortExec with fetch creates a dynamic filter automatically.
27662766
let original_id = sort
2767-
.dynamic_filter()
2767+
.dynamic_filter_expr()
27682768
.expect("should have dynamic filter with fetch")
27692769
.expression_id()
27702770
.expect("DynamicFilterPhysicalExpr always has an expression_id");
@@ -2777,9 +2777,9 @@ mod tests {
27772777
let new_id = new_df
27782778
.expression_id()
27792779
.expect("DynamicFilterPhysicalExpr always has an expression_id");
2780-
let sort = sort.with_dynamic_filter(Arc::clone(&new_df))?;
2780+
let sort = sort.with_dynamic_filter_expr(Arc::clone(&new_df))?;
27812781
let restored_id = sort
2782-
.dynamic_filter()
2782+
.dynamic_filter_expr()
27832783
.expect("should still have dynamic filter")
27842784
.expression_id()
27852785
.expect("DynamicFilterPhysicalExpr always has an expression_id");
@@ -2808,7 +2808,7 @@ mod tests {
28082808
vec![Arc::new(Column::new("bad", 99)) as _],
28092809
lit(true),
28102810
));
2811-
assert!(sort.with_dynamic_filter(df).is_err());
2811+
assert!(sort.with_dynamic_filter_expr(df).is_err());
28122812
Ok(())
28132813
}
28142814

datafusion/proto/src/physical_plan/mod.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1313,7 +1313,7 @@ impl protobuf::PhysicalPlanNode {
13131313
if let Ok(df) = (dynamic_filter_expr as Arc<dyn Any + Send + Sync>)
13141314
.downcast::<DynamicFilterPhysicalExpr>()
13151315
{
1316-
agg.with_dynamic_filter(df)?
1316+
agg.with_dynamic_filter_expr(df)?
13171317
} else {
13181318
agg
13191319
}
@@ -1448,7 +1448,7 @@ impl protobuf::PhysicalPlanNode {
14481448
if let Ok(df) = (dynamic_filter_expr as Arc<dyn Any + Send + Sync>)
14491449
.downcast::<DynamicFilterPhysicalExpr>()
14501450
{
1451-
hash_join = hash_join.with_dynamic_filter(df)?;
1451+
hash_join = hash_join.with_dynamic_filter_expr(df)?;
14521452
}
14531453
}
14541454

@@ -1699,7 +1699,7 @@ impl protobuf::PhysicalPlanNode {
16991699
if let Ok(df) = (dynamic_filter_expr as Arc<dyn Any + Send + Sync>)
17001700
.downcast::<DynamicFilterPhysicalExpr>()
17011701
{
1702-
new_sort.with_dynamic_filter(df)?
1702+
new_sort.with_dynamic_filter_expr(df)?
17031703
} else {
17041704
new_sort
17051705
}
@@ -2514,7 +2514,7 @@ impl protobuf::PhysicalPlanNode {
25142514
};
25152515

25162516
let dynamic_filter = exec
2517-
.dynamic_filter()
2517+
.dynamic_filter_expr()
25182518
.map(|df| {
25192519
let df_expr: Arc<dyn PhysicalExpr> =
25202520
Arc::clone(df) as Arc<dyn PhysicalExpr>;
@@ -2867,7 +2867,7 @@ impl protobuf::PhysicalPlanNode {
28672867
limit,
28682868
has_grouping_set: exec.group_expr().has_grouping_set(),
28692869
dynamic_filter: exec
2870-
.dynamic_filter()
2870+
.dynamic_filter_expr()
28712871
.map(|df| {
28722872
let df_expr: Arc<dyn PhysicalExpr> =
28732873
Arc::clone(df) as Arc<dyn PhysicalExpr>;
@@ -3168,7 +3168,7 @@ impl protobuf::PhysicalPlanNode {
31683168
})
31693169
.collect::<Result<Vec<_>>>()?;
31703170
let dynamic_filter = exec
3171-
.dynamic_filter()
3171+
.dynamic_filter_expr()
31723172
.map(|df| {
31733173
let df_expr: Arc<dyn PhysicalExpr> = df as Arc<dyn PhysicalExpr>;
31743174
proto_converter.physical_expr_to_proto(&df_expr, codec)

datafusion/proto/tests/cases/roundtrip_physical_plan.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3538,7 +3538,7 @@ fn test_hash_join_with_dynamic_filter_roundtrip() -> Result<()> {
35383538
.downcast_ref::<HashJoinExec>()
35393539
.expect("Should be HashJoinExec");
35403540
let deserialized_hash_join_df = deserialized_join
3541-
.dynamic_filter()
3541+
.dynamic_filter_expr()
35423542
.expect("HashJoinExec should have a dynamic filter after roundtrip");
35433543

35443544
// Extract the dynamic filter pushed down to the probe side's ParquetSource.
@@ -3691,7 +3691,7 @@ fn test_aggregate_with_dynamic_filter_roundtrip() -> Result<()> {
36913691
.downcast_ref::<AggregateExec>()
36923692
.expect("Should be AggregateExec");
36933693
let deserialized_agg_df = deserialized_agg
3694-
.dynamic_filter()
3694+
.dynamic_filter_expr()
36953695
.expect("AggregateExec should have a dynamic filter after roundtrip");
36963696

36973697
// Extract the dynamic filter pushed down to the child ParquetSource.
@@ -3759,7 +3759,7 @@ fn test_sort_topk_with_dynamic_filter_roundtrip() -> Result<()> {
37593759
.downcast_ref::<SortExec>()
37603760
.expect("Should be SortExec");
37613761
let deserialized_sort_df = deserialized_sort
3762-
.dynamic_filter()
3762+
.dynamic_filter_expr()
37633763
.expect("SortExec should have a dynamic filter after roundtrip");
37643764

37653765
// Extract the dynamic filter pushed down to the child ParquetSource.

0 commit comments

Comments
 (0)