Skip to content

Commit a5f490e

Browse files
LiaCastanedaalamb
andauthored
Add ExecutionPlan::apply_expressions() (#20337)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #18296 Needed for datafusion-contrib/datafusion-distributed#180 ## Rationale for this change Right now, there is no easy way to know if a given node in the plan holds Dynamic Filters or to traverse all physical expressions in an ExecutionPlan. This PR implements `apply_expressions()` that visits all `PhysicalExpr`s inside an `ExecutionPlan` using a callback pattern, including `DynamicFilterPhysicalExpr`. This is similar to the existing `apply_expressions()` API for `LogicalPlan`. ## What changes are included in this PR? - Added `apply_expressions()` method to the `ExecutionPlan` trait with no default implementation, forcing all implementors to explicitly handle their expressions - Uses a visitor pattern with `FnMut(&dyn PhysicalExpr) -> Result<TreeNodeRecursion>` to avoid allocations - Implemented `apply_expressions()` for all `ExecutionPlan` implementations - Also added `apply_expressions()` to `FileSource` and `DataSource` traits (required, no default) ## Are these changes tested? Yes, added a test that traverses the plan and discovers dynamic filters using `apply_expressions()`. ## Are there any user-facing changes? Yes, the new API `ExecutionPlan::apply_expressions()`, `FileSource::apply_expressions()`, and `DataSource::apply_expressions()`. --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent 02dae77 commit a5f490e

70 files changed

Lines changed: 1515 additions & 27 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

datafusion-examples/examples/custom_data_source/custom_datasource.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use async_trait::async_trait;
2727
use datafusion::arrow::array::{UInt8Builder, UInt64Builder};
2828
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
2929
use datafusion::arrow::record_batch::RecordBatch;
30+
use datafusion::common::tree_node::TreeNodeRecursion;
3031
use datafusion::datasource::{TableProvider, TableType, provider_as_source};
3132
use datafusion::error::Result;
3233
use datafusion::execution::context::TaskContext;
@@ -283,4 +284,20 @@ impl ExecutionPlan for CustomExec {
283284
None,
284285
)?))
285286
}
287+
288+
fn apply_expressions(
289+
&self,
290+
f: &mut dyn FnMut(
291+
&dyn datafusion::physical_plan::PhysicalExpr,
292+
) -> Result<TreeNodeRecursion>,
293+
) -> Result<TreeNodeRecursion> {
294+
// Visit expressions in the output ordering from equivalence properties
295+
let mut tnr = TreeNodeRecursion::Continue;
296+
if let Some(ordering) = self.cache.output_ordering() {
297+
for sort_expr in ordering {
298+
tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?;
299+
}
300+
}
301+
Ok(tnr)
302+
}
286303
}

datafusion-examples/examples/execution_monitoring/memory_pool_execution_plan.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
use arrow::record_batch::RecordBatch;
3030
use arrow_schema::SchemaRef;
3131
use datafusion::common::record_batch;
32+
use datafusion::common::tree_node::TreeNodeRecursion;
3233
use datafusion::common::{exec_datafusion_err, internal_err};
3334
use datafusion::datasource::{DefaultTableSource, memory::MemTable};
3435
use datafusion::error::Result;
@@ -296,4 +297,20 @@ impl ExecutionPlan for BufferingExecutionPlan {
296297
}),
297298
)))
298299
}
300+
301+
fn apply_expressions(
302+
&self,
303+
f: &mut dyn FnMut(
304+
&dyn datafusion::physical_plan::PhysicalExpr,
305+
) -> Result<TreeNodeRecursion>,
306+
) -> Result<TreeNodeRecursion> {
307+
// Visit expressions in the output ordering from equivalence properties
308+
let mut tnr = TreeNodeRecursion::Continue;
309+
if let Some(ordering) = self.properties.output_ordering() {
310+
for sort_expr in ordering {
311+
tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?;
312+
}
313+
}
314+
Ok(tnr)
315+
}
299316
}

datafusion-examples/examples/proto/composed_extension_codec.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use std::sync::Arc;
3838

3939
use datafusion::common::Result;
4040
use datafusion::common::internal_err;
41+
use datafusion::common::tree_node::TreeNodeRecursion;
4142
use datafusion::execution::TaskContext;
4243
use datafusion::physical_plan::{DisplayAs, ExecutionPlan};
4344
use datafusion::prelude::SessionContext;
@@ -128,6 +129,15 @@ impl ExecutionPlan for ParentExec {
128129
) -> Result<datafusion::physical_plan::SendableRecordBatchStream> {
129130
unreachable!()
130131
}
132+
133+
fn apply_expressions(
134+
&self,
135+
_f: &mut dyn FnMut(
136+
&dyn datafusion::physical_plan::PhysicalExpr,
137+
) -> Result<TreeNodeRecursion>,
138+
) -> Result<TreeNodeRecursion> {
139+
Ok(TreeNodeRecursion::Continue)
140+
}
131141
}
132142

133143
/// A PhysicalExtensionCodec that can serialize and deserialize ParentExec
@@ -204,6 +214,15 @@ impl ExecutionPlan for ChildExec {
204214
) -> Result<datafusion::physical_plan::SendableRecordBatchStream> {
205215
unreachable!()
206216
}
217+
218+
fn apply_expressions(
219+
&self,
220+
_f: &mut dyn FnMut(
221+
&dyn datafusion::physical_plan::PhysicalExpr,
222+
) -> Result<TreeNodeRecursion>,
223+
) -> Result<TreeNodeRecursion> {
224+
Ok(TreeNodeRecursion::Continue)
225+
}
207226
}
208227

209228
/// A PhysicalExtensionCodec that can serialize and deserialize ChildExec

datafusion-examples/examples/relation_planner/table_sample.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ use datafusion::{
117117
};
118118
use datafusion_common::{
119119
DFSchemaRef, DataFusionError, Result, Statistics, internal_err, not_impl_err,
120-
plan_datafusion_err, plan_err,
120+
plan_datafusion_err, plan_err, tree_node::TreeNodeRecursion,
121121
};
122122
use datafusion_expr::{
123123
UserDefinedLogicalNode, UserDefinedLogicalNodeCore,
@@ -743,6 +743,22 @@ impl ExecutionPlan for SampleExec {
743743

744744
Ok(stats)
745745
}
746+
747+
fn apply_expressions(
748+
&self,
749+
f: &mut dyn FnMut(
750+
&dyn datafusion::physical_plan::PhysicalExpr,
751+
) -> Result<TreeNodeRecursion>,
752+
) -> Result<TreeNodeRecursion> {
753+
// Visit expressions in the output ordering from equivalence properties
754+
let mut tnr = TreeNodeRecursion::Continue;
755+
if let Some(ordering) = self.cache.output_ordering() {
756+
for sort_expr in ordering {
757+
tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?;
758+
}
759+
}
760+
Ok(tnr)
761+
}
746762
}
747763

748764
/// Bernoulli sampler: includes each row with probability `(upper - lower)`.

datafusion/catalog/src/memory/table.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use arrow::compute::{and, filter_record_batch};
3232
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
3333
use arrow::record_batch::RecordBatch;
3434
use datafusion_common::error::Result;
35+
use datafusion_common::tree_node::TreeNodeRecursion;
3536
use datafusion_common::{Constraints, DFSchema, SchemaExt, not_impl_err, plan_err};
3637
use datafusion_common_runtime::JoinSet;
3738
use datafusion_datasource::memory::{MemSink, MemorySourceConfig};
@@ -46,7 +47,7 @@ use datafusion_physical_plan::repartition::RepartitionExec;
4647
use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
4748
use datafusion_physical_plan::{
4849
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, Partitioning,
49-
PlanProperties, common,
50+
PhysicalExpr, PlanProperties, common,
5051
};
5152
use datafusion_session::Session;
5253

@@ -400,10 +401,7 @@ impl TableProvider for MemTable {
400401
let df_schema = DFSchema::try_from(Arc::clone(&self.schema))?;
401402

402403
// Create physical expressions for assignments upfront (outside batch loop)
403-
let physical_assignments: HashMap<
404-
String,
405-
Arc<dyn datafusion_physical_plan::PhysicalExpr>,
406-
> = assignments
404+
let physical_assignments: HashMap<String, Arc<dyn PhysicalExpr>> = assignments
407405
.iter()
408406
.map(|(name, expr)| {
409407
let physical_expr =
@@ -638,4 +636,11 @@ impl ExecutionPlan for DmlResultExec {
638636
stream,
639637
)))
640638
}
639+
640+
fn apply_expressions(
641+
&self,
642+
_f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result<TreeNodeRecursion>,
643+
) -> Result<TreeNodeRecursion> {
644+
Ok(TreeNodeRecursion::Continue)
645+
}
641646
}

datafusion/core/src/physical_planner.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3891,6 +3891,20 @@ mod tests {
38913891
) -> Result<SendableRecordBatchStream> {
38923892
unimplemented!("NoOpExecutionPlan::execute");
38933893
}
3894+
3895+
fn apply_expressions(
3896+
&self,
3897+
f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result<TreeNodeRecursion>,
3898+
) -> Result<TreeNodeRecursion> {
3899+
// Visit expressions in the output ordering from equivalence properties
3900+
let mut tnr = TreeNodeRecursion::Continue;
3901+
if let Some(ordering) = self.cache.output_ordering() {
3902+
for sort_expr in ordering {
3903+
tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?;
3904+
}
3905+
}
3906+
Ok(tnr)
3907+
}
38943908
}
38953909

38963910
// Produces an execution plan where the schema is mismatched from
@@ -4033,6 +4047,12 @@ digraph {
40334047
) -> Result<SendableRecordBatchStream> {
40344048
unimplemented!()
40354049
}
4050+
fn apply_expressions(
4051+
&self,
4052+
_f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result<TreeNodeRecursion>,
4053+
) -> Result<TreeNodeRecursion> {
4054+
Ok(TreeNodeRecursion::Continue)
4055+
}
40364056
}
40374057
impl DisplayAs for OkExtensionNode {
40384058
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
@@ -4082,6 +4102,12 @@ digraph {
40824102
) -> Result<SendableRecordBatchStream> {
40834103
unimplemented!()
40844104
}
4105+
fn apply_expressions(
4106+
&self,
4107+
_f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result<TreeNodeRecursion>,
4108+
) -> Result<TreeNodeRecursion> {
4109+
Ok(TreeNodeRecursion::Continue)
4110+
}
40854111
}
40864112
impl DisplayAs for InvariantFailsExtensionNode {
40874113
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
@@ -4203,6 +4229,12 @@ digraph {
42034229
) -> Result<SendableRecordBatchStream> {
42044230
unimplemented!()
42054231
}
4232+
fn apply_expressions(
4233+
&self,
4234+
_f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result<TreeNodeRecursion>,
4235+
) -> Result<TreeNodeRecursion> {
4236+
Ok(TreeNodeRecursion::Continue)
4237+
}
42064238
}
42074239
impl DisplayAs for ExecutableInvariantFails {
42084240
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {

datafusion/core/tests/custom_sources_cases/mod.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ use datafusion_catalog::Session;
3939
use datafusion_common::cast::as_primitive_array;
4040
use datafusion_common::project_schema;
4141
use datafusion_common::stats::Precision;
42+
use datafusion_common::tree_node::TreeNodeRecursion;
4243
use datafusion_physical_expr::EquivalenceProperties;
4344
use datafusion_physical_plan::PlanProperties;
4445
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
@@ -209,6 +210,22 @@ impl ExecutionPlan for CustomExecutionPlan {
209210
.collect(),
210211
})
211212
}
213+
214+
fn apply_expressions(
215+
&self,
216+
f: &mut dyn FnMut(
217+
&dyn datafusion::physical_plan::PhysicalExpr,
218+
) -> Result<TreeNodeRecursion>,
219+
) -> Result<TreeNodeRecursion> {
220+
// Visit expressions in the output ordering from equivalence properties
221+
let mut tnr = TreeNodeRecursion::Continue;
222+
if let Some(ordering) = self.cache.output_ordering() {
223+
for sort_expr in ordering {
224+
tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?;
225+
}
226+
}
227+
Ok(tnr)
228+
}
212229
}
213230

214231
#[async_trait]

datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use datafusion::prelude::*;
3535
use datafusion::scalar::ScalarValue;
3636
use datafusion_catalog::Session;
3737
use datafusion_common::cast::as_primitive_array;
38+
use datafusion_common::tree_node::TreeNodeRecursion;
3839
use datafusion_common::{DataFusionError, internal_err, not_impl_err};
3940
use datafusion_expr::expr::{BinaryExpr, Cast};
4041
use datafusion_functions_aggregate::expr_fn::count;
@@ -152,6 +153,22 @@ impl ExecutionPlan for CustomPlan {
152153
})),
153154
)))
154155
}
156+
157+
fn apply_expressions(
158+
&self,
159+
f: &mut dyn FnMut(
160+
&dyn datafusion::physical_plan::PhysicalExpr,
161+
) -> Result<TreeNodeRecursion>,
162+
) -> Result<TreeNodeRecursion> {
163+
// Visit expressions in the output ordering from equivalence properties
164+
let mut tnr = TreeNodeRecursion::Continue;
165+
if let Some(ordering) = self.cache.output_ordering() {
166+
for sort_expr in ordering {
167+
tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?;
168+
}
169+
}
170+
Ok(tnr)
171+
}
155172
}
156173

157174
#[derive(Clone, Debug)]

datafusion/core/tests/custom_sources_cases/statistics.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use datafusion::{
3333
scalar::ScalarValue,
3434
};
3535
use datafusion_catalog::Session;
36+
use datafusion_common::tree_node::TreeNodeRecursion;
3637
use datafusion_common::{project_schema, stats::Precision};
3738
use datafusion_physical_expr::EquivalenceProperties;
3839
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
@@ -188,6 +189,22 @@ impl ExecutionPlan for StatisticsValidation {
188189
Ok(self.stats.clone())
189190
}
190191
}
192+
193+
fn apply_expressions(
194+
&self,
195+
f: &mut dyn FnMut(
196+
&dyn datafusion::physical_plan::PhysicalExpr,
197+
) -> Result<TreeNodeRecursion>,
198+
) -> Result<TreeNodeRecursion> {
199+
// Visit expressions in the output ordering from equivalence properties
200+
let mut tnr = TreeNodeRecursion::Continue;
201+
if let Some(ordering) = self.cache.output_ordering() {
202+
for sort_expr in ordering {
203+
tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?;
204+
}
205+
}
206+
Ok(tnr)
207+
}
191208
}
192209

193210
fn init_ctx(stats: Statistics, schema: Schema) -> Result<SessionContext> {

datafusion/core/tests/fuzz_cases/once_exec.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
use arrow_schema::SchemaRef;
1919
use datafusion_common::internal_datafusion_err;
20+
use datafusion_common::tree_node::TreeNodeRecursion;
2021
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
2122
use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
2223
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
@@ -110,4 +111,20 @@ impl ExecutionPlan for OnceExec {
110111

111112
stream.ok_or_else(|| internal_datafusion_err!("Stream already consumed"))
112113
}
114+
115+
fn apply_expressions(
116+
&self,
117+
f: &mut dyn FnMut(
118+
&dyn datafusion_physical_plan::PhysicalExpr,
119+
) -> datafusion_common::Result<TreeNodeRecursion>,
120+
) -> datafusion_common::Result<TreeNodeRecursion> {
121+
// Visit expressions in the output ordering from equivalence properties
122+
let mut tnr = TreeNodeRecursion::Continue;
123+
if let Some(ordering) = self.cache.output_ordering() {
124+
for sort_expr in ordering {
125+
tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?;
126+
}
127+
}
128+
Ok(tnr)
129+
}
113130
}

0 commit comments

Comments
 (0)