Skip to content

Commit 4c0f944

Browse files
adriangbclaude
andauthored
feat(physical-expr): DynamicFilterTracker for cheap dynamic-filter change detection (#22460)
Extracted from a design discussion around the duplicated "does this filter have a dynamic portion that might change?" / "has the filter changed?" patterns (e.g. #22450, `FilePruner`). ## Rationale for this change `DynamicFilterPhysicalExpr` has a rich *producer* API (`update()`, `mark_complete()`, `wait_update()`, `wait_complete()`), but *consumers* that hold a predicate which *contains* dynamic filters only had a bare, recursive `snapshot_generation() -> u64`. Call sites hand-rolled the same boilerplate around it: store a `last_generation`, recompute `snapshot_generation(&predicate)` (a full tree walk) on **every** check, diff it, and rebuild an expensive `PruningPredicate` on change. `FilePruner` did exactly this, and none of these consumers exploited `mark_complete()`. This adds a small consumer-side counterpart so the pattern lives in one place, driven by the existing `watch` channel rather than by re-walking the tree. This immediately eliminates some tree traversals (we were constantly traversing the expression tree to check if any filters updated). Long term I hope this makes changes like #22450 easier. ## What changes are included in this PR? **New public API (`datafusion_physical_expr`):** - `DynamicFilterTracking` (`classify` → `Static` / `AllComplete` / `Watching`, plus `contains_dynamic_filter` / `watcher`) and `DynamicFilterTracker` (`changed`). A tracker walks a (possibly composite) predicate **once**, subscribes to every still-incomplete dynamic filter, and answers `changed()` by polling only that shrinking set — steady-state is one atomic load per filter, no tree walk, no lock until something actually moves. - Lives in the `expressions::dynamic_filters` module (`dynamic_filters.rs` → `dynamic_filters/mod.rs`, tracker in `dynamic_filters/tracker.rs`). The subscription plumbing (`subscribe`, `DynamicFilterSubscription`, `DynamicFilterChange`, `observe`, `is_complete`) is `pub(crate)`; test-only constructors are `#[cfg(test)]`. **Consumers:** - `FilePruner` is driven by `DynamicFilterTracking` instead of `snapshot_generation` polling, and now decides its own existence in `try_new` (a static predicate with no usable stats builds no pruner). - The Parquet opener skips wrapping the scan in `EarlyStoppingStream` when nothing can change, and no longer needs an "is it dynamic?" gate. ## Are these changes tested? Yes — unit tests for the tracker (classification, detect-update-once, `mark_complete` is not a change, coalesced update+complete, multi-filter), plus the existing `datafusion-pruning` / `datafusion-datasource-parquet` suites (incl. the static/dynamic/partition opener pruning test) pass unchanged. ## Are there any user-facing changes? New public API as above (additive). One **deprecation** and one behavior change, both documented in the [DataFusion 55.0.0 upgrade guide](docs/source/library-user-guide/upgrading/55.0.0.md): - `datafusion_physical_expr_common::physical_expr::is_dynamic_physical_expr` is **deprecated (since 55.0.0)** — downcast to `DynamicFilterPhysicalExpr` or use `DynamicFilterTracking`. (`snapshot_generation` itself is unchanged — still backing the FFI vtable and proto roundtrip.) - `FilePruner::try_new` now returns `None` for a purely static predicate over a file with no usable column statistics (previously `Some` whenever a statistics struct was present). ## Followups I noticed a possible follow-up gate refinement, tracked in #22495. This also opens up the possibility to deprecate / remove the `snapshot` / `generation` machinery from the public physical expr APIs. These new APIs (the watchers, tracker) subsumes much of the functionality, and I don't think we want to add `PhysicalExpr::watch`. And after several releases the only thing using it right now is dynamic filters, i.e. no other legitimate use case has materialized. 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent e634472 commit 4c0f944

9 files changed

Lines changed: 612 additions & 67 deletions

File tree

datafusion/datasource-parquet/src/opener/mod.rs

Lines changed: 43 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,7 @@ use datafusion_common::{ColumnStatistics, Result, ScalarValue, Statistics, exec_
5252
use datafusion_datasource::{PartitionedFile, TableSchema};
5353
use datafusion_physical_expr::simplifier::PhysicalExprSimplifier;
5454
use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
55-
use datafusion_physical_expr_common::physical_expr::{
56-
PhysicalExpr, is_dynamic_physical_expr,
57-
};
55+
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
5856
use datafusion_physical_expr_common::sort_expr::LexOrdering;
5957
use datafusion_physical_plan::metrics::{
6058
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricCategory,
@@ -618,18 +616,19 @@ impl ParquetMorselizer {
618616
.with_category(MetricCategory::Rows)
619617
.global_counter("num_predicate_creation_errors");
620618

621-
// Apply literal replacements to projection and predicate
622-
let file_pruner = predicate
623-
.as_ref()
624-
.filter(|p| is_dynamic_physical_expr(p) || partitioned_file.has_statistics())
625-
.and_then(|p| {
626-
FilePruner::try_new(
627-
Arc::clone(p),
628-
&logical_file_schema,
629-
&partitioned_file,
630-
predicate_creation_errors.clone(),
631-
)
632-
});
619+
// `FilePruner::try_new` decides whether a pruner is worthwhile (it needs
620+
// a statistics struct, and either real column statistics or a dynamic
621+
// filter that can prune via partition-value folding) and returns `None`
622+
// otherwise. For a static predicate the pruner's tracker reports no
623+
// changes, so it runs once and adds no ongoing cost.
624+
let file_pruner = predicate.as_ref().and_then(|p| {
625+
FilePruner::try_new(
626+
Arc::clone(p),
627+
&logical_file_schema,
628+
&partitioned_file,
629+
predicate_creation_errors.clone(),
630+
)
631+
});
633632

634633
Ok(PreparedParquetOpen {
635634
partition_index: self.partition_index,
@@ -677,30 +676,21 @@ impl PreparedParquetOpen {
677676
/// Returns `None` if the file can be skipped completely.
678677
fn prune_file(mut self) -> Result<Option<Self>> {
679678
// Prune this file using the file level statistics and partition values.
680-
// Since dynamic filters may have been updated since planning it is possible that we are able
681-
// to prune files now that we couldn't prune at planning time.
682-
// It is assumed that there is no point in doing pruning here if the predicate is not dynamic,
683-
// as it would have been done at planning time.
684-
// We'll also check this after every record batch we read,
685-
// and if at some point we are able to prove we can prune the file using just the file level statistics
686-
// we can end the stream early.
687-
//
688-
// Make a FilePruner only if there is either
689-
// 1. a dynamic expr in the predicate
690-
// 2. the file has file-level statistics.
691-
//
692-
// File-level statistics may prune the file without loading
693-
// any row groups or metadata.
679+
// Since dynamic filters may have been updated since planning it is
680+
// possible that we are able to prune files now that we couldn't prune at
681+
// planning time. The `FilePruner` (built when the predicate is dynamic or
682+
// the file carries statistics) also watches any still-active dynamic
683+
// filter, so the
684+
// `EarlyStoppingStream` wrapping the scan can re-check after each batch
685+
// and end the stream early once a tightened filter proves the file can
686+
// be skipped.
694687
//
695-
// Dynamic filters may prune the file after initial
696-
// planning, as the dynamic filter is updated during
697-
// execution.
698-
//
699-
// The case where there is a dynamic filter but no
700-
// statistics corresponds to a dynamic filter that
701-
// references partition columns. While rare, this is possible
702-
// e.g. `select * from table order by partition_col limit
703-
// 10` could hit this condition.
688+
// File-level statistics may prune the file without loading any row
689+
// groups or metadata. Partition column predicates are already folded to
690+
// literals (see `replace_columns_with_literals` above), so a dynamic
691+
// filter that references only partition columns can prune here too even
692+
// when the file has no column statistics, e.g.
693+
// `select * from t order by partition_col limit 10`.
704694
if let Some(file_pruner) = &mut self.file_pruner
705695
&& file_pruner.should_prune()?
706696
{
@@ -1247,16 +1237,21 @@ impl RowGroupsPrunedParquetOpen {
12471237
}
12481238
.into_stream();
12491239

1250-
// Wrap the stream so a dynamic filter can stop the file scan early.
1251-
if let Some(file_pruner) = prepared.file_pruner {
1252-
Ok(EarlyStoppingStream::new(
1253-
stream,
1254-
file_pruner,
1255-
files_ranges_pruned_statistics,
1256-
)
1257-
.boxed())
1258-
} else {
1259-
Ok(stream)
1240+
// Wrap the stream so a dynamic filter can stop the file scan early, but
1241+
// only when the pruner is still watching a filter that can change
1242+
// mid-scan. For a static (or already-complete) predicate the up-front
1243+
// `prune_file` check already captured everything that can be pruned, so
1244+
// per-batch re-checking would only add overhead.
1245+
match prepared.file_pruner {
1246+
Some(file_pruner) if file_pruner.is_watching() => {
1247+
Ok(EarlyStoppingStream::new(
1248+
stream,
1249+
file_pruner,
1250+
files_ranges_pruned_statistics,
1251+
)
1252+
.boxed())
1253+
}
1254+
_ => Ok(stream),
12601255
}
12611256
}
12621257
}

datafusion/physical-expr-common/src/physical_expr.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -870,6 +870,12 @@ pub fn snapshot_generation(expr: &Arc<dyn PhysicalExpr>) -> u64 {
870870
/// Check if the given `PhysicalExpr` is dynamic.
871871
/// Internally this calls [`snapshot_generation`] to check if the generation is non-zero,
872872
/// any dynamic `PhysicalExpr` should have a non-zero generation.
873+
#[deprecated(
874+
since = "55.0.0",
875+
note = "Downcast to `DynamicFilterPhysicalExpr`, or use \
876+
`DynamicFilterTracking::classify(expr).contains_dynamic_filter()` from \
877+
`datafusion_physical_expr`"
878+
)]
873879
pub fn is_dynamic_physical_expr(expr: &Arc<dyn PhysicalExpr>) -> bool {
874880
// If the generation is non-zero, then this `PhysicalExpr` is dynamic.
875881
snapshot_generation(expr) != 0

datafusion/physical-expr/src/expressions/dynamic_filters.rs renamed to datafusion/physical-expr/src/expressions/dynamic_filters/mod.rs

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ use datafusion_common::{
2929
use datafusion_expr::ColumnarValue;
3030
use datafusion_physical_expr_common::physical_expr::DynHash;
3131

32+
mod tracker;
33+
pub use tracker::{DynamicFilterTracker, DynamicFilterTracking};
34+
3235
/// State of a dynamic filter, tracking both updates and completion.
3336
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3437
enum FilterState {
@@ -326,6 +329,31 @@ impl DynamicFilterPhysicalExpr {
326329
.await;
327330
}
328331

332+
/// Returns `true` if this filter has been marked complete via
333+
/// [`Self::mark_complete`] and will therefore never change again.
334+
pub(crate) fn is_complete(&self) -> bool {
335+
self.inner.read().is_complete
336+
}
337+
338+
/// Subscribe to this filter's updates for cheap, synchronous change
339+
/// detection.
340+
///
341+
/// The returned [`DynamicFilterSubscription`] lets a consumer poll whether
342+
/// the filter's expression has advanced since it last looked, without
343+
/// re-walking a predicate tree or re-deriving a generation on every check.
344+
/// This is the building block used by [`DynamicFilterTracker`] to watch
345+
/// every dynamic filter inside a (possibly composite) predicate.
346+
pub(crate) fn subscribe(&self) -> DynamicFilterSubscription {
347+
let mut receiver = self.state_watch.subscribe();
348+
// Mark the current state as already-seen so the first `observe()` only
349+
// reports updates that happen *after* subscription.
350+
let last_generation = receiver.borrow_and_update().generation();
351+
DynamicFilterSubscription {
352+
receiver,
353+
last_generation,
354+
}
355+
}
356+
329357
/// Check if this dynamic filter is being actively used by any consumers.
330358
///
331359
/// Returns `true` if there are references beyond the producer (e.g., the HashJoinExec
@@ -522,6 +550,77 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
522550
}
523551
}
524552

553+
/// The result of polling a [`DynamicFilterSubscription`].
554+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
555+
pub(crate) struct DynamicFilterChange {
556+
/// The filter's expression advanced since the previous observation.
557+
pub(crate) changed: bool,
558+
/// The filter has been marked complete; it will never change again and the
559+
/// subscription can be dropped.
560+
pub(crate) complete: bool,
561+
}
562+
563+
/// A cheap, synchronous handle for observing updates to a single
564+
/// [`DynamicFilterPhysicalExpr`].
565+
///
566+
/// Obtained via [`DynamicFilterPhysicalExpr::subscribe`]. Steady-state polling
567+
/// via [`Self::observe`] is a single atomic load (the underlying
568+
/// [`tokio::sync::watch`] version counter); the lock is only taken when the
569+
/// filter has actually been updated.
570+
#[derive(Debug)]
571+
pub(crate) struct DynamicFilterSubscription {
572+
receiver: watch::Receiver<FilterState>,
573+
/// Last generation we reported as "seen". Used to distinguish a real
574+
/// expression update from a bare [`DynamicFilterPhysicalExpr::mark_complete`]
575+
/// (which re-broadcasts the current generation without changing the
576+
/// expression).
577+
last_generation: u64,
578+
}
579+
580+
impl DynamicFilterSubscription {
581+
/// Observe the latest state of the filter.
582+
///
583+
/// Reports whether the filter's expression advanced since the previous call
584+
/// and whether it has since been marked complete. Cheap when nothing has
585+
/// changed: a single atomic comparison with no lock acquisition.
586+
pub(crate) fn observe(&mut self) -> DynamicFilterChange {
587+
match self.receiver.has_changed() {
588+
Ok(true) => {
589+
let state = *self.receiver.borrow_and_update();
590+
let changed = state.generation() > self.last_generation;
591+
if changed {
592+
self.last_generation = state.generation();
593+
}
594+
DynamicFilterChange {
595+
changed,
596+
complete: matches!(state, FilterState::Complete { .. }),
597+
}
598+
}
599+
Ok(false) => DynamicFilterChange {
600+
changed: false,
601+
complete: false,
602+
},
603+
// The watch sender lives inside the predicate's
604+
// `DynamicFilterPhysicalExpr`, which the owner of this subscription
605+
// keeps alive, so observing a dropped sender signals a bug rather
606+
// than normal completion. Flag it loudly in debug builds; in release
607+
// degrade to "complete" (no further updates are possible) instead of
608+
// silently masking it.
609+
Err(_) => {
610+
debug_assert!(
611+
false,
612+
"DynamicFilterSubscription observed a dropped watch sender; \
613+
the owning predicate should keep it alive"
614+
);
615+
DynamicFilterChange {
616+
changed: false,
617+
complete: true,
618+
}
619+
}
620+
}
621+
}
622+
}
623+
525624
/// An atomic counter used to generate monotonic u64 ids.
526625
struct ExpressionIdAtomicCounter {
527626
inner: AtomicU64,

0 commit comments

Comments
 (0)