feat(physical-expr): DynamicFilterTracker for cheap dynamic-filter change detection#22460
Draft
adriangb wants to merge 2 commits into
Draft
feat(physical-expr): DynamicFilterTracker for cheap dynamic-filter change detection#22460adriangb wants to merge 2 commits into
adriangb wants to merge 2 commits into
Conversation
Introduce a consumer-side counterpart to the producer's `update()` / `mark_complete()` API on `DynamicFilterPhysicalExpr`. `DynamicFilterPhysicalExpr::subscribe()` returns a `DynamicFilterSubscription` that observes updates to a single filter through its existing `watch` channel: steady-state polling is a single atomic load, and the lock is taken only when the filter has actually moved. A bare `mark_complete()` (which re-broadcasts the current generation) is correctly distinguished from a real expression change. `DynamicFilterTracker` walks a (possibly composite) predicate once, subscribing to every still-incomplete dynamic filter, then answers "did anything change since I last looked?" by polling only that shrinking set — replacing the pattern of recursively folding `snapshot_generation()` over the whole tree on every check. `DynamicFilterTracking::classify` distinguishes Static / AllComplete / Watching in a single traversal so callers can decide both whether a one-shot prune is worthwhile and whether runtime re-checking is needed. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replace `FilePruner`'s hand-rolled `snapshot_generation()` polling (store last `u64`, recompute + diff on every `should_prune`) with a `DynamicFilterTracking` classification computed once at construction. The pruner now rebuilds the pruning predicate on the first check and thereafter only when a watched dynamic filter has actually moved. This also lets the Parquet opener skip wrapping the scan in `EarlyStoppingStream` when the predicate is static or its dynamic filters are already complete: the up-front `prune_file` check already captured everything such a predicate can prune, so per-batch re-checking was pure overhead. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
N/A — extracted from a design discussion around the duplicated "does this filter have a dynamic portion that might change?" / "has the filter changed?" patterns (e.g. #22450,
FilePruner). Happy to file a tracking issue if preferred.Rationale for this change
DynamicFilterPhysicalExprhas a rich producer API (update(),mark_complete(),wait_update(),wait_complete()), but consumers that hold a predicate which contains dynamic filters have only a bare, recursivesnapshot_generation() -> u64. Several call sites hand-roll the same boilerplate around it:last_generation: Option<u64>,snapshot_generation(&predicate)(a full tree walk) on every check,PruningPredicate) on change.FilePrunerdoes exactly this today, and the runtime row-group pruner in #22450 reimplements the identical dance. None of them exploitmark_complete(), so they keep re-walking the tree even after the filters can no longer change.This PR adds a small consumer-side counterpart so the pattern lives in one place.
What changes are included in this PR?
New API (
datafusion-physical-expr):DynamicFilterPhysicalExpr::subscribe() -> DynamicFilterSubscriptionandis_complete(). A subscription observes a single filter through its existingtokio::sync::watchchannel — steady-state polling is a single atomic load; the lock is taken only when the filter actually moved. A baremark_complete()(which re-broadcasts the current generation) is distinguished from a real expression change, so it does not trigger a spurious rebuild.DynamicFilterTrackerwalks a (possibly composite) predicate once, subscribing to every still-incomplete dynamic filter, then answerschanged()by polling only that shrinking set (completed filters are dropped). No more re-foldingsnapshot_generation()over the whole tree on every batch.DynamicFilterTracking::classifyreturnsStatic/AllComplete/Watching(..)in one traversal, so a caller can decide both "is a one-shot prune worthwhile?" and "do I need to keep re-checking?".Consumer migration:
FilePrunernow classifies its predicate once at construction and rebuilds the pruning predicate on the first check + only when a watched filter moves.EarlyStoppingStreamwhen the predicate isStatic/AllComplete— the up-frontprune_filecheck already captured everything such a predicate can prune, so per-batch re-checking was pure overhead.This is intentionally scoped as a draft. Natural follow-ups: migrate the runtime
RowGroupPruner(#22450) onto the same tracker; replace the remainingis_dynamic_physical_expr/ free-functionsnapshot_generationcall sites; and (separately) decide whether thesnapshot_generation()trait method + FFI entry can eventually be retired.Are these changes tested?
Yes — new unit tests in
dynamic_filter_tracker.rscover: static / already-complete / watching classification, detecting an update exactly once,mark_complete()not counting as a change, a coalesced update+complete reported once, and independent tracking of multiple filters in a composite predicate. Existingdatafusion-pruninganddatafusion-datasource-parquetsuites pass unchanged.Are there any user-facing changes?
New public API in
datafusion-physical-expr(DynamicFilterTracker,DynamicFilterTracking,DynamicFilterSubscription,DynamicFilterChange, andDynamicFilterPhysicalExpr::{subscribe, is_complete}). No behavior change for end users beyond avoiding redundant per-batch re-pruning work for non-dynamic predicates.🤖 Generated with Claude Code