Skip to content

Commit ef5217a

Browse files
adriangbclaude
andcommitted
refactor: fold the FilePruner creation gate into try_new; deprecate is_dynamic_physical_expr
The Parquet opener gated `FilePruner` creation on `is_dynamic_physical_expr(p) || has_statistics()`, where `is_dynamic_physical_expr` answered "is it dynamic?" via `snapshot_generation(p) != 0`. `FilePruner` already classifies its predicate once (`DynamicFilterTracking`) to drive the change tracker, so the same classification can answer the gate. Move the decision into `FilePruner::try_new`: it returns `None` when the file has no statistics struct, or when the predicate is purely static and the file has no usable column statistics (planning already did everything such a pruner could). A dynamic predicate is still accepted (it may prune via partition-value folding even without column statistics). The opener now just calls `try_new`. With its last internal caller gone, `is_dynamic_physical_expr` is deprecated (since 55.0.0) rather than removed; downstream users should downcast to `DynamicFilterPhysicalExpr` or use `DynamicFilterTracking`. See the 55.0.0 upgrade guide. `snapshot_generation` itself is unchanged (FFI vtable + proto). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 6e3db5b commit ef5217a

5 files changed

Lines changed: 127 additions & 41 deletions

File tree

datafusion/datasource-parquet/src/opener/mod.rs

Lines changed: 28 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,7 @@ use datafusion_common::{ColumnStatistics, Result, ScalarValue, Statistics, exec_
5252
use datafusion_datasource::{PartitionedFile, TableSchema};
5353
use datafusion_physical_expr::simplifier::PhysicalExprSimplifier;
5454
use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
55-
use datafusion_physical_expr_common::physical_expr::{
56-
PhysicalExpr, is_dynamic_physical_expr,
57-
};
55+
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
5856
use datafusion_physical_expr_common::sort_expr::LexOrdering;
5957
use datafusion_physical_plan::metrics::{
6058
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricCategory,
@@ -618,18 +616,19 @@ impl ParquetMorselizer {
618616
.with_category(MetricCategory::Rows)
619617
.global_counter("num_predicate_creation_errors");
620618

621-
// Apply literal replacements to projection and predicate
622-
let file_pruner = predicate
623-
.as_ref()
624-
.filter(|p| is_dynamic_physical_expr(p) || partitioned_file.has_statistics())
625-
.and_then(|p| {
626-
FilePruner::try_new(
627-
Arc::clone(p),
628-
&logical_file_schema,
629-
&partitioned_file,
630-
predicate_creation_errors.clone(),
631-
)
632-
});
619+
// `FilePruner::try_new` decides whether a pruner is worthwhile (it needs
620+
// a statistics struct, and either real column statistics or a dynamic
621+
// filter that can prune via partition-value folding) and returns `None`
622+
// otherwise. For a static predicate the pruner's tracker reports no
623+
// changes, so it runs once and adds no ongoing cost.
624+
let file_pruner = predicate.as_ref().and_then(|p| {
625+
FilePruner::try_new(
626+
Arc::clone(p),
627+
&logical_file_schema,
628+
&partitioned_file,
629+
predicate_creation_errors.clone(),
630+
)
631+
});
633632

634633
Ok(PreparedParquetOpen {
635634
partition_index: self.partition_index,
@@ -677,30 +676,21 @@ impl PreparedParquetOpen {
677676
/// Returns `None` if the file can be skipped completely.
678677
fn prune_file(mut self) -> Result<Option<Self>> {
679678
// Prune this file using the file level statistics and partition values.
680-
// Since dynamic filters may have been updated since planning it is possible that we are able
681-
// to prune files now that we couldn't prune at planning time.
682-
// It is assumed that there is no point in doing pruning here if the predicate is not dynamic,
683-
// as it would have been done at planning time.
684-
// We'll also check this after every record batch we read,
685-
// and if at some point we are able to prove we can prune the file using just the file level statistics
686-
// we can end the stream early.
687-
//
688-
// Make a FilePruner only if there is either
689-
// 1. a dynamic expr in the predicate
690-
// 2. the file has file-level statistics.
691-
//
692-
// File-level statistics may prune the file without loading
693-
// any row groups or metadata.
694-
//
695-
// Dynamic filters may prune the file after initial
696-
// planning, as the dynamic filter is updated during
697-
// execution.
679+
// Since dynamic filters may have been updated since planning it is
680+
// possible that we are able to prune files now that we couldn't prune at
681+
// planning time. The `FilePruner` (built when the predicate is dynamic or
682+
// the file carries statistics) also watches any still-active dynamic
683+
// filter, so the
684+
// `EarlyStoppingStream` wrapping the scan can re-check after each batch
685+
// and end the stream early once a tightened filter proves the file can
686+
// be skipped.
698687
//
699-
// The case where there is a dynamic filter but no
700-
// statistics corresponds to a dynamic filter that
701-
// references partition columns. While rare, this is possible
702-
// e.g. `select * from table order by partition_col limit
703-
// 10` could hit this condition.
688+
// File-level statistics may prune the file without loading any row
689+
// groups or metadata. Partition column predicates are already folded to
690+
// literals (see `replace_columns_with_literals` above), so a dynamic
691+
// filter that references only partition columns can prune here too even
692+
// when the file has no column statistics, e.g.
693+
// `select * from t order by partition_col limit 10`.
704694
if let Some(file_pruner) = &mut self.file_pruner
705695
&& file_pruner.should_prune()?
706696
{

datafusion/physical-expr-common/src/physical_expr.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -870,6 +870,12 @@ pub fn snapshot_generation(expr: &Arc<dyn PhysicalExpr>) -> u64 {
870870
/// Check if the given `PhysicalExpr` is dynamic.
871871
/// Internally this calls [`snapshot_generation`] to check if the generation is non-zero,
872872
/// any dynamic `PhysicalExpr` should have a non-zero generation.
873+
#[deprecated(
874+
since = "55.0.0",
875+
note = "Downcast to `DynamicFilterPhysicalExpr`, or use \
876+
`DynamicFilterTracking::classify(expr).contains_dynamic_filter()` from \
877+
`datafusion_physical_expr`"
878+
)]
873879
pub fn is_dynamic_physical_expr(expr: &Arc<dyn PhysicalExpr>) -> bool {
874880
// If the generation is non-zero, then this `PhysicalExpr` is dynamic.
875881
snapshot_generation(expr) != 0

datafusion/pruning/src/file_pruner.rs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,18 +76,35 @@ impl FilePruner {
7676
})
7777
}
7878

79-
/// Create a new file pruner if statistics are available.
80-
/// Returns None if this file does not have statistics.
79+
/// Create a file pruner for this file, or `None` when pruning it cannot
80+
/// help.
81+
///
82+
/// Returns `None` when the file has no statistics struct to evaluate a
83+
/// pruning predicate against, or when the predicate is purely static and the
84+
/// file has no usable column statistics — in that case planning already did
85+
/// everything such a pruner could. A predicate carrying a dynamic filter is
86+
/// always accepted (given a statistics struct), since it may prune via
87+
/// partition-value folding even without column statistics.
8188
pub fn try_new(
8289
predicate: Arc<dyn PhysicalExpr>,
8390
file_schema: &SchemaRef,
8491
partitioned_file: &PartitionedFile,
8592
predicate_creation_errors: Count,
8693
) -> Option<Self> {
94+
// A pruning predicate is evaluated against a statistics struct, so one
95+
// must exist (its columns may all be `Absent`).
8796
let file_stats = partitioned_file.statistics.as_ref()?;
97+
let tracking = DynamicFilterTracking::classify(&predicate);
98+
// Only build a pruner when it could prune something planning didn't
99+
// already: the file has real column statistics, or the predicate carries
100+
// a dynamic filter (whose value, or folded partition columns, can prune
101+
// even without column statistics). For a purely static predicate with no
102+
// usable stats there is nothing to gain.
103+
if !partitioned_file.has_statistics() && !tracking.contains_dynamic_filter() {
104+
return None;
105+
}
88106
let file_stats_pruning =
89107
PrunableStatistics::new(vec![file_stats.clone()], Arc::clone(file_schema));
90-
let tracking = DynamicFilterTracking::classify(&predicate);
91108
Some(Self {
92109
predicate,
93110
tracking,
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
<!---
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing,
13+
software distributed under the License is distributed on an
14+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
KIND, either express or implied. See the License for the
16+
specific language governing permissions and limitations
17+
under the License.
18+
-->
19+
20+
# Upgrade Guides
21+
22+
## DataFusion 55.0.0
23+
24+
**Note:** DataFusion `55.0.0` has not been released yet. The information provided
25+
in this section pertains to features and changes that have already been merged
26+
to the main branch and are awaiting release in this version.
27+
28+
### `is_dynamic_physical_expr` is deprecated
29+
30+
`datafusion_physical_expr_common::physical_expr::is_dynamic_physical_expr` is
31+
deprecated. It was a thin wrapper over `snapshot_generation(expr) != 0` used to
32+
ask "does this predicate contain a dynamic filter?".
33+
34+
Prefer asking the question directly against the concrete type. For a one-off
35+
check, downcast to `DynamicFilterPhysicalExpr`:
36+
37+
```rust
38+
use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr;
39+
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
40+
41+
let mut is_dynamic = false;
42+
predicate.apply(|e| {
43+
if e.downcast_ref::<DynamicFilterPhysicalExpr>().is_some() {
44+
is_dynamic = true;
45+
Ok(TreeNodeRecursion::Stop)
46+
} else {
47+
Ok(TreeNodeRecursion::Continue)
48+
}
49+
})?;
50+
```
51+
52+
If you also need to know whether the dynamic filters can still change (and to be
53+
notified when they do), use the new `DynamicFilterTracking` /
54+
`DynamicFilterTracker` API in `datafusion_physical_expr`:
55+
56+
```rust
57+
use datafusion_physical_expr::DynamicFilterTracking;
58+
59+
let tracking = DynamicFilterTracking::classify(&predicate);
60+
if tracking.contains_dynamic_filter() {
61+
// worth re-evaluating the predicate at runtime
62+
}
63+
```
64+
65+
### `FilePruner::try_new` no longer builds a pruner for static predicates without statistics
66+
67+
`datafusion_pruning::FilePruner::try_new` now returns `None` when the predicate
68+
is purely static _and_ the file carries no usable column statistics, because
69+
such a pruner can never prune anything beyond what planning already did.
70+
Previously it returned `Some` whenever a statistics struct was present (the
71+
"is this worth pruning?" decision lived in the Parquet opener). Files with column
72+
statistics, and predicates that carry a dynamic filter, are unaffected.

docs/source/library-user-guide/upgrading/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ Upgrade Guides
2121
.. toctree::
2222
:maxdepth: 1
2323

24+
DataFusion 55.0.0 <55.0.0>
2425
DataFusion 54.0.0 <54.0.0>
2526
DataFusion 53.0.0 <53.0.0>
2627
DataFusion 52.0.0 <52.0.0>

0 commit comments

Comments
 (0)