Skip to content

Commit 189218c

Browse files
adriangbclaude
andcommitted
refactor: drop the opener's is_dynamic gate and remove is_dynamic_physical_expr
Now that `FilePruner` watches dynamic filters through `DynamicFilterTracker`, a non-dynamic predicate costs nothing beyond the one-shot prune: the tracker reports no changes (so `should_prune` never rebuilds) and `is_watching()` is false (so the scan is not wrapped in `EarlyStoppingStream`). The creation-time "is this dynamic?" gate therefore no longer earns its keep. Drop the `is_dynamic_physical_expr(p) || has_statistics()` filter and let `FilePruner::try_new` (which already requires `statistics.is_some()`) plus the self-regulating tracker decide. Behavior is identical for every real case: files with column statistics already passed the gate via `has_statistics()`, and dynamic partition-column pruning relies on a present (if all-`Absent`) statistics struct, which `try_new` still accepts. The only difference is a rare, harmless one-shot prune for static files whose stats struct is present but entirely `Absent`. With its sole caller gone, remove `is_dynamic_physical_expr`. `snapshot_generation` itself is untouched (still used by the FFI vtable and proto roundtrip); its free function is now unused in-workspace and can be retired in a later cleanup. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent c611461 commit 189218c

3 files changed

Lines changed: 33 additions & 52 deletions

File tree

datafusion/datasource-parquet/src/opener/mod.rs

Lines changed: 30 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,7 @@ use datafusion_common::{ColumnStatistics, Result, ScalarValue, Statistics, exec_
5252
use datafusion_datasource::{PartitionedFile, TableSchema};
5353
use datafusion_physical_expr::simplifier::PhysicalExprSimplifier;
5454
use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
55-
use datafusion_physical_expr_common::physical_expr::{
56-
PhysicalExpr, is_dynamic_physical_expr,
57-
};
55+
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
5856
use datafusion_physical_expr_common::sort_expr::LexOrdering;
5957
use datafusion_physical_plan::metrics::{
6058
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricCategory,
@@ -618,18 +616,22 @@ impl ParquetMorselizer {
618616
.with_category(MetricCategory::Rows)
619617
.global_counter("num_predicate_creation_errors");
620618

621-
// Apply literal replacements to projection and predicate
622-
let file_pruner = predicate
623-
.as_ref()
624-
.filter(|p| is_dynamic_physical_expr(p) || partitioned_file.has_statistics())
625-
.and_then(|p| {
626-
FilePruner::try_new(
627-
Arc::clone(p),
628-
&logical_file_schema,
629-
&partitioned_file,
630-
predicate_creation_errors.clone(),
631-
)
632-
});
619+
// Build a file pruner whenever the file carries statistics; `try_new`
620+
// returns `None` when it does not. The pruner runs a one-shot prune
621+
// before reading (catching dynamic filters that tightened since planning,
622+
// or per-file statistics that weren't available then) and only keeps
623+
// re-checking as the scan proceeds while it still watches an active
624+
// dynamic filter. A static predicate's tracker reports no changes, so
625+
// there is no ongoing re-pruning cost — which is why no separate
626+
// "is this dynamic?" gate is needed here.
627+
let file_pruner = predicate.as_ref().and_then(|p| {
628+
FilePruner::try_new(
629+
Arc::clone(p),
630+
&logical_file_schema,
631+
&partitioned_file,
632+
predicate_creation_errors.clone(),
633+
)
634+
});
633635

634636
Ok(PreparedParquetOpen {
635637
partition_index: self.partition_index,
@@ -677,30 +679,20 @@ impl PreparedParquetOpen {
677679
/// Returns `None` if the file can be skipped completely.
678680
fn prune_file(mut self) -> Result<Option<Self>> {
679681
// Prune this file using the file level statistics and partition values.
680-
// Since dynamic filters may have been updated since planning it is possible that we are able
681-
// to prune files now that we couldn't prune at planning time.
682-
// It is assumed that there is no point in doing pruning here if the predicate is not dynamic,
683-
// as it would have been done at planning time.
684-
// We'll also check this after every record batch we read,
685-
// and if at some point we are able to prove we can prune the file using just the file level statistics
686-
// we can end the stream early.
687-
//
688-
// Make a FilePruner only if there is either
689-
// 1. a dynamic expr in the predicate
690-
// 2. the file has file-level statistics.
691-
//
692-
// File-level statistics may prune the file without loading
693-
// any row groups or metadata.
694-
//
695-
// Dynamic filters may prune the file after initial
696-
// planning, as the dynamic filter is updated during
697-
// execution.
682+
// Since dynamic filters may have been updated since planning it is
683+
// possible that we are able to prune files now that we couldn't prune at
684+
// planning time. The `FilePruner` (built whenever the file carries
685+
// statistics) also watches any still-active dynamic filter, so the
686+
// `EarlyStoppingStream` wrapping the scan can re-check after each batch
687+
// and end the stream early once a tightened filter proves the file can
688+
// be skipped.
698689
//
699-
// The case where there is a dynamic filter but no
700-
// statistics corresponds to a dynamic filter that
701-
// references partition columns. While rare, this is possible
702-
// e.g. `select * from table order by partition_col limit
703-
// 10` could hit this condition.
690+
// File-level statistics may prune the file without loading any row
691+
// groups or metadata. Partition column predicates are already folded to
692+
// literals (see `replace_columns_with_literals` above), so a dynamic
693+
// filter that references only partition columns can prune here too even
694+
// when the file has no column statistics, e.g.
695+
// `select * from t order by partition_col limit 10`.
704696
if let Some(file_pruner) = &mut self.file_pruner
705697
&& file_pruner.should_prune()?
706698
{

datafusion/physical-expr-common/src/physical_expr.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -867,14 +867,6 @@ pub fn snapshot_generation(expr: &Arc<dyn PhysicalExpr>) -> u64 {
867867
generation
868868
}
869869

870-
/// Check if the given `PhysicalExpr` is dynamic.
871-
/// Internally this calls [`snapshot_generation`] to check if the generation is non-zero,
872-
/// any dynamic `PhysicalExpr` should have a non-zero generation.
873-
pub fn is_dynamic_physical_expr(expr: &Arc<dyn PhysicalExpr>) -> bool {
874-
// If the generation is non-zero, then this `PhysicalExpr` is dynamic.
875-
snapshot_generation(expr) != 0
876-
}
877-
878870
/// Returns true if the expression is volatile, i.e. whether it can return different
879871
/// results when evaluated multiple times with the same input.
880872
///

datafusion/physical-expr/src/dynamic_filter_tracker.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -93,12 +93,9 @@ impl DynamicFilterTracking {
9393
}
9494
}
9595

96-
/// `true` if the predicate contains any dynamic filter (complete or not).
97-
///
98-
/// This matches the question previously answered by
99-
/// [`is_dynamic_physical_expr`](datafusion_physical_expr_common::physical_expr::is_dynamic_physical_expr):
100-
/// whether the predicate's value may differ from what was known at planning
101-
/// time, and is therefore worth re-evaluating at least once.
96+
/// `true` if the predicate contains any dynamic filter (complete or not),
97+
/// i.e. its value may differ from what was known at planning time and is
98+
/// therefore worth re-evaluating at least once.
10299
pub fn contains_dynamic_filter(&self) -> bool {
103100
!matches!(self, DynamicFilterTracking::Static)
104101
}

0 commit comments

Comments
 (0)