Skip to content

Commit 6e3db5b

Browse files
adriangbclaude
andcommitted
refactor(pruning): drive FilePruner via DynamicFilterTracker
Replace `FilePruner`'s hand-rolled `snapshot_generation()` polling (store last `u64`, recompute + diff on every `should_prune`) with a `DynamicFilterTracking` classification computed once at construction. The pruner rebuilds the pruning predicate on the first check and thereafter only when a watched dynamic filter has actually moved. This also lets the Parquet opener skip wrapping the scan in `EarlyStoppingStream` when the predicate is static or its dynamic filters are already complete: the up-front `prune_file` check already captured everything such a predicate can prune, so per-batch re-checking was pure overhead. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 7c20e0b commit 6e3db5b

2 files changed

Lines changed: 51 additions & 26 deletions

File tree

datafusion/datasource-parquet/src/opener/mod.rs

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1247,16 +1247,21 @@ impl RowGroupsPrunedParquetOpen {
12471247
}
12481248
.into_stream();
12491249

1250-
// Wrap the stream so a dynamic filter can stop the file scan early.
1251-
if let Some(file_pruner) = prepared.file_pruner {
1252-
Ok(EarlyStoppingStream::new(
1253-
stream,
1254-
file_pruner,
1255-
files_ranges_pruned_statistics,
1256-
)
1257-
.boxed())
1258-
} else {
1259-
Ok(stream)
1250+
// Wrap the stream so a dynamic filter can stop the file scan early, but
1251+
// only when the pruner is still watching a filter that can change
1252+
// mid-scan. For a static (or already-complete) predicate the up-front
1253+
// `prune_file` check already captured everything that can be pruned, so
1254+
// per-batch re-checking would only add overhead.
1255+
match prepared.file_pruner {
1256+
Some(file_pruner) if file_pruner.is_watching() => {
1257+
Ok(EarlyStoppingStream::new(
1258+
stream,
1259+
file_pruner,
1260+
files_ranges_pruned_statistics,
1261+
)
1262+
.boxed())
1263+
}
1264+
_ => Ok(stream),
12601265
}
12611266
}
12621267
}

datafusion/pruning/src/file_pruner.rs

Lines changed: 36 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ use std::sync::Arc;
2222
use arrow::datatypes::{FieldRef, SchemaRef};
2323
use datafusion_common::{Result, internal_datafusion_err, pruning::PrunableStatistics};
2424
use datafusion_datasource::PartitionedFile;
25-
use datafusion_physical_expr_common::physical_expr::{PhysicalExpr, snapshot_generation};
25+
use datafusion_physical_expr::DynamicFilterTracking;
26+
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
2627
use datafusion_physical_plan::metrics::Count;
2728
use log::debug;
2829

@@ -34,8 +35,14 @@ use crate::build_pruning_predicate;
3435
/// which substitutes partition column references with their literal values before
3536
/// the predicate reaches this pruner.
3637
pub struct FilePruner {
37-
predicate_generation: Option<u64>,
3838
predicate: Arc<dyn PhysicalExpr>,
39+
/// Tracks the dynamic filters inside `predicate` so we only rebuild the
40+
/// pruning predicate when one of them has actually moved.
41+
tracking: DynamicFilterTracking,
42+
/// Whether [`Self::should_prune`] has built+evaluated the pruning predicate
43+
/// at least once. The first check always runs; subsequent checks only run
44+
/// when a watched dynamic filter changed.
45+
checked_once: bool,
3946
/// Schema used for pruning (the logical file schema).
4047
file_schema: SchemaRef,
4148
file_stats_pruning: PrunableStatistics,
@@ -80,31 +87,44 @@ impl FilePruner {
8087
let file_stats = partitioned_file.statistics.as_ref()?;
8188
let file_stats_pruning =
8289
PrunableStatistics::new(vec![file_stats.clone()], Arc::clone(file_schema));
90+
let tracking = DynamicFilterTracking::classify(&predicate);
8391
Some(Self {
84-
predicate_generation: None,
8592
predicate,
93+
tracking,
94+
checked_once: false,
8695
file_schema: Arc::clone(file_schema),
8796
file_stats_pruning,
8897
predicate_creation_errors,
8998
})
9099
}
91100

101+
/// Returns `true` if this pruner watches a dynamic filter that can still
102+
/// change, meaning [`Self::should_prune`] is worth re-checking as the scan
103+
/// progresses. When `false`, the predicate is effectively static for the
104+
/// remainder of the scan and the caller can avoid wrapping the stream in a
105+
/// per-batch re-pruning adapter.
106+
pub fn is_watching(&self) -> bool {
107+
matches!(self.tracking, DynamicFilterTracking::Watching(_))
108+
}
109+
92110
pub fn should_prune(&mut self) -> Result<bool> {
93-
// Check if the predicate has changed since last invocation by tracking
94-
// its "generation". Dynamic filter expressions can change their values
95-
// during query execution, so we use generation tracking to detect when
96-
// the predicate has been updated and needs to be rebuilt.
111+
// Building the pruning predicate is expensive (it involves expression
112+
// analysis), so we only do it on the first check and whenever a dynamic
113+
// filter inside the predicate has actually moved.
97114
//
98-
// If the generation hasn't changed, we can skip rebuilding the pruning
99-
// predicate, which is an expensive operation involving expression analysis.
100-
let new_generation = snapshot_generation(&self.predicate);
101-
if let Some(current_generation) = self.predicate_generation.as_mut() {
102-
if *current_generation == new_generation {
103-
return Ok(false);
104-
}
105-
*current_generation = new_generation;
115+
// Dynamic filter expressions can change their values during query
116+
// execution; `DynamicFilterTracking` watches the still-incomplete
117+
// filters and reports a change at most once per update. A purely static
118+
// predicate (or one whose dynamic filters have all completed) is checked
119+
// exactly once.
120+
let should_build = if self.checked_once {
121+
self.tracking.watcher().is_some_and(|w| w.changed())
106122
} else {
107-
self.predicate_generation = Some(new_generation);
123+
self.checked_once = true;
124+
true
125+
};
126+
if !should_build {
127+
return Ok(false);
108128
}
109129
let pruning_predicate = build_pruning_predicate(
110130
Arc::clone(&self.predicate),

0 commit comments

Comments
 (0)