Skip to content

Commit 51969ab

Browse files
committed
Revert "Add ExecutionPlan::apply_expressions() (#20337)" (#22437)
- Reverts #20337 - Addresses concerns raised in #22415 - Closes #22415 `ExecutionPlan::apply_expressions()` was added in #20337 with no default implementation, forcing every custom `ExecutionPlan`, `FileSource`, and `DataSource` implementor to add the method as part of upgrading to DataFusion 54. As discussed on #22415, per @LiaCastaneda and @adriangb the method is not yet called from anywhere in DataFusion and the originally intended use (dynamic-filter discovery/serialization for distributed scenarios) is blocked on other in-progress work (#20009, #21350). The combined effect on downstream users is a required code change with no immediate benefit, and ambiguity about what a "correct" implementation even means today (e.g. is returning `Ok(TreeNodeRecursion::Continue)` is safe right now but becomes incorrect as soon as the method starts being used by an optimizer pass?. The plan agreed in the discussion is to remove the API from the 54.0 release and re-add it together with the concrete consumer that needs it. cc @adriangb @LiaCastaneda @milenkovicm. `git revert -m 1` of the merge commit, with the following manual conflict resolutions and follow-ups: By CI Yes -- this removes the new public API: - `ExecutionPlan::apply_expressions` - `FileSource::apply_expressions` - `DataSource::apply_expressions` These were only added in 54 and are not yet released. Custom implementors no longer need to implement these methods.
1 parent c8dddb8 commit 51969ab

72 files changed

Lines changed: 22 additions & 1531 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

datafusion-examples/examples/custom_data_source/custom_datasource.rs

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ use async_trait::async_trait;
2626
use datafusion::arrow::array::{UInt8Builder, UInt64Builder};
2727
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
2828
use datafusion::arrow::record_batch::RecordBatch;
29-
use datafusion::common::tree_node::TreeNodeRecursion;
3029
use datafusion::datasource::{TableProvider, TableType, provider_as_source};
3130
use datafusion::error::Result;
3231
use datafusion::execution::context::TaskContext;
@@ -275,20 +274,4 @@ impl ExecutionPlan for CustomExec {
275274
None,
276275
)?))
277276
}
278-
279-
fn apply_expressions(
280-
&self,
281-
f: &mut dyn FnMut(
282-
&dyn datafusion::physical_plan::PhysicalExpr,
283-
) -> Result<TreeNodeRecursion>,
284-
) -> Result<TreeNodeRecursion> {
285-
// Visit expressions in the output ordering from equivalence properties
286-
let mut tnr = TreeNodeRecursion::Continue;
287-
if let Some(ordering) = self.cache.output_ordering() {
288-
for sort_expr in ordering {
289-
tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?;
290-
}
291-
}
292-
Ok(tnr)
293-
}
294277
}

datafusion-examples/examples/execution_monitoring/memory_pool_execution_plan.rs

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
use arrow::record_batch::RecordBatch;
3030
use arrow_schema::SchemaRef;
3131
use datafusion::common::record_batch;
32-
use datafusion::common::tree_node::TreeNodeRecursion;
3332
use datafusion::common::{exec_datafusion_err, internal_err};
3433
use datafusion::datasource::{DefaultTableSource, memory::MemTable};
3534
use datafusion::error::Result;
@@ -292,20 +291,4 @@ impl ExecutionPlan for BufferingExecutionPlan {
292291
}),
293292
)))
294293
}
295-
296-
fn apply_expressions(
297-
&self,
298-
f: &mut dyn FnMut(
299-
&dyn datafusion::physical_plan::PhysicalExpr,
300-
) -> Result<TreeNodeRecursion>,
301-
) -> Result<TreeNodeRecursion> {
302-
// Visit expressions in the output ordering from equivalence properties
303-
let mut tnr = TreeNodeRecursion::Continue;
304-
if let Some(ordering) = self.properties.output_ordering() {
305-
for sort_expr in ordering {
306-
tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?;
307-
}
308-
}
309-
Ok(tnr)
310-
}
311294
}

datafusion-examples/examples/proto/composed_extension_codec.rs

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ use std::sync::Arc;
3737

3838
use datafusion::common::Result;
3939
use datafusion::common::internal_err;
40-
use datafusion::common::tree_node::TreeNodeRecursion;
4140
use datafusion::execution::TaskContext;
4241
use datafusion::physical_plan::{DisplayAs, ExecutionPlan};
4342
use datafusion::prelude::SessionContext;
@@ -124,15 +123,6 @@ impl ExecutionPlan for ParentExec {
124123
) -> Result<datafusion::physical_plan::SendableRecordBatchStream> {
125124
unreachable!()
126125
}
127-
128-
fn apply_expressions(
129-
&self,
130-
_f: &mut dyn FnMut(
131-
&dyn datafusion::physical_plan::PhysicalExpr,
132-
) -> Result<TreeNodeRecursion>,
133-
) -> Result<TreeNodeRecursion> {
134-
Ok(TreeNodeRecursion::Continue)
135-
}
136126
}
137127

138128
/// A PhysicalExtensionCodec that can serialize and deserialize ParentExec
@@ -205,15 +195,6 @@ impl ExecutionPlan for ChildExec {
205195
) -> Result<datafusion::physical_plan::SendableRecordBatchStream> {
206196
unreachable!()
207197
}
208-
209-
fn apply_expressions(
210-
&self,
211-
_f: &mut dyn FnMut(
212-
&dyn datafusion::physical_plan::PhysicalExpr,
213-
) -> Result<TreeNodeRecursion>,
214-
) -> Result<TreeNodeRecursion> {
215-
Ok(TreeNodeRecursion::Continue)
216-
}
217198
}
218199

219200
/// A PhysicalExtensionCodec that can serialize and deserialize ChildExec

datafusion-examples/examples/relation_planner/table_sample.rs

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ use datafusion::{
116116
};
117117
use datafusion_common::{
118118
DFSchemaRef, DataFusionError, Result, Statistics, internal_err, not_impl_err,
119-
plan_datafusion_err, plan_err, tree_node::TreeNodeRecursion,
119+
plan_datafusion_err, plan_err,
120120
};
121121
use datafusion_expr::{
122122
UserDefinedLogicalNode, UserDefinedLogicalNodeCore,
@@ -738,22 +738,6 @@ impl ExecutionPlan for SampleExec {
738738

739739
Ok(Arc::new(stats))
740740
}
741-
742-
fn apply_expressions(
743-
&self,
744-
f: &mut dyn FnMut(
745-
&dyn datafusion::physical_plan::PhysicalExpr,
746-
) -> Result<TreeNodeRecursion>,
747-
) -> Result<TreeNodeRecursion> {
748-
// Visit expressions in the output ordering from equivalence properties
749-
let mut tnr = TreeNodeRecursion::Continue;
750-
if let Some(ordering) = self.cache.output_ordering() {
751-
for sort_expr in ordering {
752-
tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?;
753-
}
754-
}
755-
Ok(tnr)
756-
}
757741
}
758742

759743
/// Bernoulli sampler: includes each row with probability `(upper - lower)`.

datafusion/catalog/src/memory/table.rs

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ use arrow::compute::{and, filter_record_batch};
3131
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
3232
use arrow::record_batch::RecordBatch;
3333
use datafusion_common::error::Result;
34-
use datafusion_common::tree_node::TreeNodeRecursion;
3534
use datafusion_common::{Constraints, DFSchema, SchemaExt, not_impl_err, plan_err};
3635
use datafusion_common_runtime::JoinSet;
3736
use datafusion_datasource::memory::{MemSink, MemorySourceConfig};
@@ -40,13 +39,13 @@ use datafusion_datasource::source::DataSourceExec;
4039
use datafusion_expr::dml::InsertOp;
4140
use datafusion_expr::{Expr, SortExpr, TableType};
4241
use datafusion_physical_expr::{
43-
LexOrdering, create_physical_expr, create_physical_sort_exprs,
42+
LexOrdering, PhysicalExpr, create_physical_expr, create_physical_sort_exprs,
4443
};
4544
use datafusion_physical_plan::repartition::RepartitionExec;
4645
use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
4746
use datafusion_physical_plan::{
4847
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, Partitioning,
49-
PhysicalExpr, PlanProperties, common,
48+
PlanProperties, common,
5049
};
5150
use datafusion_session::Session;
5251

@@ -627,11 +626,4 @@ impl ExecutionPlan for DmlResultExec {
627626
stream,
628627
)))
629628
}
630-
631-
fn apply_expressions(
632-
&self,
633-
_f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result<TreeNodeRecursion>,
634-
) -> Result<TreeNodeRecursion> {
635-
Ok(TreeNodeRecursion::Continue)
636-
}
637629
}

datafusion/core/src/physical_planner.rs

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -4475,20 +4475,6 @@ mod tests {
44754475
) -> Result<SendableRecordBatchStream> {
44764476
unimplemented!("NoOpExecutionPlan::execute");
44774477
}
4478-
4479-
fn apply_expressions(
4480-
&self,
4481-
f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result<TreeNodeRecursion>,
4482-
) -> Result<TreeNodeRecursion> {
4483-
// Visit expressions in the output ordering from equivalence properties
4484-
let mut tnr = TreeNodeRecursion::Continue;
4485-
if let Some(ordering) = self.cache.output_ordering() {
4486-
for sort_expr in ordering {
4487-
tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?;
4488-
}
4489-
}
4490-
Ok(tnr)
4491-
}
44924478
}
44934479

44944480
// Produces an execution plan where the schema is mismatched from
@@ -4628,12 +4614,6 @@ digraph {
46284614
) -> Result<SendableRecordBatchStream> {
46294615
unimplemented!()
46304616
}
4631-
fn apply_expressions(
4632-
&self,
4633-
_f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result<TreeNodeRecursion>,
4634-
) -> Result<TreeNodeRecursion> {
4635-
Ok(TreeNodeRecursion::Continue)
4636-
}
46374617
}
46384618
impl DisplayAs for OkExtensionNode {
46394619
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
@@ -4680,12 +4660,6 @@ digraph {
46804660
) -> Result<SendableRecordBatchStream> {
46814661
unimplemented!()
46824662
}
4683-
fn apply_expressions(
4684-
&self,
4685-
_f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result<TreeNodeRecursion>,
4686-
) -> Result<TreeNodeRecursion> {
4687-
Ok(TreeNodeRecursion::Continue)
4688-
}
46894663
}
46904664
impl DisplayAs for InvariantFailsExtensionNode {
46914665
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
@@ -4804,12 +4778,6 @@ digraph {
48044778
) -> Result<SendableRecordBatchStream> {
48054779
unimplemented!()
48064780
}
4807-
fn apply_expressions(
4808-
&self,
4809-
_f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result<TreeNodeRecursion>,
4810-
) -> Result<TreeNodeRecursion> {
4811-
Ok(TreeNodeRecursion::Continue)
4812-
}
48134781
}
48144782
impl DisplayAs for ExecutableInvariantFails {
48154783
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {

datafusion/core/tests/custom_sources_cases/mod.rs

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ use datafusion_catalog::Session;
3838
use datafusion_common::cast::as_primitive_array;
3939
use datafusion_common::project_schema;
4040
use datafusion_common::stats::Precision;
41-
use datafusion_common::tree_node::TreeNodeRecursion;
4241
use datafusion_physical_expr::EquivalenceProperties;
4342
use datafusion_physical_plan::PlanProperties;
4443
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
@@ -205,22 +204,6 @@ impl ExecutionPlan for CustomExecutionPlan {
205204
.collect(),
206205
}))
207206
}
208-
209-
fn apply_expressions(
210-
&self,
211-
f: &mut dyn FnMut(
212-
&dyn datafusion::physical_plan::PhysicalExpr,
213-
) -> Result<TreeNodeRecursion>,
214-
) -> Result<TreeNodeRecursion> {
215-
// Visit expressions in the output ordering from equivalence properties
216-
let mut tnr = TreeNodeRecursion::Continue;
217-
if let Some(ordering) = self.cache.output_ordering() {
218-
for sort_expr in ordering {
219-
tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?;
220-
}
221-
}
222-
Ok(tnr)
223-
}
224207
}
225208

226209
#[async_trait]

datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ use datafusion::prelude::*;
3535
use datafusion::scalar::ScalarValue;
3636
use datafusion_catalog::Session;
3737
use datafusion_common::cast::as_primitive_array;
38-
use datafusion_common::tree_node::TreeNodeRecursion;
3938
use datafusion_common::{DataFusionError, internal_err, not_impl_err};
4039
use datafusion_expr::expr::{BinaryExpr, Cast};
4140
use datafusion_functions_aggregate::expr_fn::count;
@@ -149,22 +148,6 @@ impl ExecutionPlan for CustomPlan {
149148
})),
150149
)))
151150
}
152-
153-
fn apply_expressions(
154-
&self,
155-
f: &mut dyn FnMut(
156-
&dyn datafusion::physical_plan::PhysicalExpr,
157-
) -> Result<TreeNodeRecursion>,
158-
) -> Result<TreeNodeRecursion> {
159-
// Visit expressions in the output ordering from equivalence properties
160-
let mut tnr = TreeNodeRecursion::Continue;
161-
if let Some(ordering) = self.cache.output_ordering() {
162-
for sort_expr in ordering {
163-
tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?;
164-
}
165-
}
166-
Ok(tnr)
167-
}
168151
}
169152

170153
#[derive(Clone, Debug)]

datafusion/core/tests/custom_sources_cases/statistics.rs

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ use datafusion::{
3333
scalar::ScalarValue,
3434
};
3535
use datafusion_catalog::Session;
36-
use datafusion_common::tree_node::TreeNodeRecursion;
3736
use datafusion_common::{project_schema, stats::Precision};
3837
use datafusion_physical_expr::EquivalenceProperties;
3938
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
@@ -181,22 +180,6 @@ impl ExecutionPlan for StatisticsValidation {
181180
Ok(Arc::new(self.stats.clone()))
182181
}
183182
}
184-
185-
fn apply_expressions(
186-
&self,
187-
f: &mut dyn FnMut(
188-
&dyn datafusion::physical_plan::PhysicalExpr,
189-
) -> Result<TreeNodeRecursion>,
190-
) -> Result<TreeNodeRecursion> {
191-
// Visit expressions in the output ordering from equivalence properties
192-
let mut tnr = TreeNodeRecursion::Continue;
193-
if let Some(ordering) = self.cache.output_ordering() {
194-
for sort_expr in ordering {
195-
tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?;
196-
}
197-
}
198-
Ok(tnr)
199-
}
200183
}
201184

202185
fn init_ctx(stats: Statistics, schema: Schema) -> Result<SessionContext> {

datafusion/core/tests/fuzz_cases/once_exec.rs

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
use arrow_schema::SchemaRef;
1919
use datafusion_common::internal_datafusion_err;
20-
use datafusion_common::tree_node::TreeNodeRecursion;
2120
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
2221
use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
2322
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
@@ -106,20 +105,4 @@ impl ExecutionPlan for OnceExec {
106105

107106
stream.ok_or_else(|| internal_datafusion_err!("Stream already consumed"))
108107
}
109-
110-
fn apply_expressions(
111-
&self,
112-
f: &mut dyn FnMut(
113-
&dyn datafusion_physical_plan::PhysicalExpr,
114-
) -> datafusion_common::Result<TreeNodeRecursion>,
115-
) -> datafusion_common::Result<TreeNodeRecursion> {
116-
// Visit expressions in the output ordering from equivalence properties
117-
let mut tnr = TreeNodeRecursion::Continue;
118-
if let Some(ordering) = self.cache.output_ordering() {
119-
for sort_expr in ordering {
120-
tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?;
121-
}
122-
}
123-
Ok(tnr)
124-
}
125108
}

0 commit comments

Comments
 (0)