Add statistics-based thresholds for deferring RowFilter selection in ReadPlanBuilder#9659
Add statistics-based thresholds for deferring RowFilter selection in ReadPlanBuilder#9659sdf-jkl wants to merge 34 commits into
RowFilter selection in ReadPlanBuilder#9659Conversation
Previously, every predicate in the RowFilter received the same ProjectionMask containing ALL filter columns. This caused unnecessary decoding of expensive string columns when evaluating cheap integer predicates. Now each predicate receives a mask with only the single column it needs. Key sync improvements (vs baseline): - Q37: 63.7ms -> 7.3ms (-88.6%, Title LIKE with CounterID=62 filter) - Q36: 117ms -> 24ms (-79.5%, URL <> '' with CounterID=62 filter) - Q40: 17.9ms -> 5.1ms (-71.5%, multi-pred with RefererHash eq) - Q41: 17.3ms -> 5.5ms (-68.1%, multi-pred with URLHash eq) - Q22: 303ms -> 127ms (-58.2%, 3 string predicates) - Q42: 7.6ms -> 3.9ms (-48.5%, int-only multi-predicate) - Q38: 19.1ms -> 12.4ms (-34.9%, 5 int predicates) - Q21: 159ms -> 98ms (-38.5%, URL LIKE + SearchPhrase) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Use page-level min/max statistics (via StatisticsConverter) to compute a RowSelection that skips pages where equality predicates cannot match. For each equality predicate with an integer literal, we check if the literal falls within each page's [min, max] range and skip pages where it doesn't. Impact is data-dependent - most effective when data is sorted/clustered by the filter column. For this particular 100K-row sample file the data isn't sorted by filter columns, so improvements are modest (~5% for some CounterID=62 queries). Would show larger gains on sorted datasets. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Put the cheapest/most selective predicate first: SearchPhrase <> '' filters ~87% of rows before expensive LIKE predicates run. This reduces string column decoding for Title and URL significantly. Q22 sync: ~6% improvement, Q22 async: ~13% improvement. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
… full clone - Measure selectivity against absolute row count (after and_then) instead of relative to current selection, making predicates comparable regardless of prior filtering - Avoid cloning the full ReadPlanBuilder (including deferred_selection) by constructing a minimal builder for the predicate reader Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Measuring selectivity against the absolute result makes the threshold less intuitive since it becomes more scattered after and_then. Revert to measuring against the raw predicate result (relative to current selection). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Instead of measuring selectivity (fraction of rows passing), measure scattering: how much applying a predicate would fragment the selection. A predicate is deferred if it would increase the selector count beyond `current_selectors * scatter_threshold`. This directly targets what makes fragmented selections expensive: many small skip/read transitions during decoding. - Rename selectivity_threshold -> scatter_threshold - Add RowSelection::selector_count() (O(1) via Vec::len) - Use selector count ratio instead of row selectivity ratio Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Instead of comparing selector count ratios, measure selector density: selectors / total_rows. A density of 0.25 means at most 25 selectors per 100 rows — anything more fragmented gets deferred. This is more intuitive and directly proportional to the per-row cost of skip/read transitions during decoding. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Store total row count in RowSelection at construction time, enabling O(1) total_row_count() instead of iterating all selectors. Also add selector_count() for O(1) fragmentation measurement. Update split_off() and limit() to maintain the cached row_count. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The total row count needed for scatter density calculation is already available at both call sites (sync reader sums row group sizes, async reader has row_count in scope). Pass it as a parameter instead of storing it in RowSelection. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Based on ClickBench profiling, scattering predicates have densities of 0.008-0.054 while clean predicates are <0.001. A threshold of 0.01 defers the scattering ones while applying the clean ones. Also removes the eprintln debug instrumentation. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Don't defer a predicate if applying it would reduce the selector count (make the selection less fragmented). Only defer when the predicate both increases selectors AND exceeds the density threshold. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ivity_threshold_with_statistics
|
@Dandandan @adriangb Please take a look when available! The benchmarks looked good locally (I did 30 runs per query) |
|
run benchmarks arrow_reader_clickbench |
|
🤖 Arrow criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing selectivity_threshold_with_statistics (2c4787d) to d53df60 (merge-base) diff File an issue against this benchmark runner |
|
🤖 Arrow criterion benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagebase (merge-base)
branch
File an issue against this benchmark runner |
|
@Dandandan not too bad |
|
I wonder - does it trigger in more cases then purely on the other treshold? #9414 (comment) For clickbench I found it is often best to just use the first or first two row filters, the other ones don't seem to bring much extra. |
I'd even say pretty good! |
I think we can handle that on the datafusion side, what we have here (more efficient handling of the filters that do get pushed down) is good. We'll end up with more coarse control of IO / CPU patterns on the datafusion side and more granular and dynamic adaptivity in arrow-rs. I do like that |
I agree with that - although I wonder what is the minimum amount of levers we need 🤔 |
|
I was also thinking: instead of adding heuristics as we try to do here we could avoid "fragmented" selections. |
|
Hey @Dandandan, sorry for taking a while. I think with this strict logic, we'd be missing out on many potential skips. I could prototype and run local benchmarks, but I'm not very optimistic about it. |
…b.com/sdf-jkl/arrow-rs into selectivity_threshold_with_statistics
Conflicts resolved: - read_plan.rs: unify with_predicate(row_count) as delegator to with_predicate_options; carry row_count via PredicateOptions::with_total_rows so LIMIT/OFFSET (upstream apache#9766) and deferral metrics coexist. - push_decoder/reader_builder/mod.rs: call site uses with_predicate_options with always-set total_rows plus conditional with_limit. - RowGroupDecoderState size assertion updated to 288 to reflect both upstream and PR field additions.
The threshold value controls the long-skip-share gate of the filter deferral heuristic (long_skip_rows / skipped_rows); rename so the public API name describes what it actually controls. Pure rename, no behavior change.
- Make `ArrowReaderBuilder::with_long_skip_share_threshold` the canonical home for the heuristic's prose; slim `ReadPlanBuilder` setter and field docs to a one-line summary plus link. - Add a block comment above the deferral constants explaining their intentionally-private status and relationship to the public knob. - Tighten per-constant docs with explicit formulas (`skipped_rows / row_count`, `new_skipped_rows / row_count`).
The all-selected fast path and the main path both constructed identical 16-field `FilterSelectivityStat` records inline. Pull the construction into a single helper that takes a `DeferralDecision` and bumps `predicate_index`, and add `DeferralDecision::all_selected_fast_path()` for the zero-stats case so the fast path is a single call. Drops two low-value inline comments at the if/else that branch labels already convey.
The `#[cfg(test)]` shim only unwrapped `evaluate_deferral(...).should_defer`. Tests now call `evaluate_deferral` directly.
Match the file's existing test naming convention (used by both the pre-existing `preferred_selection_strategy_*` tests and the newer upstream `truncate_filter_*` / `with_predicate_options_*` tests).
`PARQUET_BENCH_LONG_SKIP_SHARE_THRESHOLD` lets bench runs sweep alternative threshold values; the default `0.75` is the value that produced the best ClickBench results during PR apache#9659 testing. Setting the variable to `none` disables deferral entirely; any other non-parseable value panics so misconfigured runs fail loudly.
The longer `long_skip_share_threshold` identifier and the new `.should_defer` suffix in tests pushed some lines past the column limit; rustfmt re-wrapped them.
The function has three early-return branches (threshold disabled, zero rows, fragmentation unchanged) before the four-gate test. Add a doc comment listing the precedence so readers don't have to recover it from the body.
RowFilter selection in ReadPlanBuilderRowFilter selection in ReadPlanBuilder
Which issue does this PR close?
Shows my idea on implementing this issue from #9414 (comment)
Rationale for this change
Check issue
What changes are included in this PR?
ReadPlanBuilder.Are these changes tested?
Are there any user-facing changes?