feat: add ExecutionPlan::benefits_from_output_partitioning#22440
feat: add ExecutionPlan::benefits_from_output_partitioning#22440adriangb wants to merge 5 commits into
Conversation
…ream `RowGroupsPrunedParquetOpen::build_stream` used to inline the `build_projection_read_plan` + `reassign_expr_columns` + `make_projector` + `replace_schema` triple right next to the decoder / stream wiring, which made the opener's main orchestration body harder to follow. Move that triple into a new `post_scan_filter` module exposing a single `DecoderProjection::build(projection, physical_file_schema, parquet_schema, output_schema)` entry point that returns the projection mask, projector, and replace_schema flag. The opener becomes a single call. `replace_schema` is now derived from the projector's output schema (rather than the read plan's projected schema) so it stays correct under future widening of the decoder mask. `DecoderBuilderConfig` now carries the projection mask directly (`projection_mask: &ProjectionMask`) instead of the full `ParquetReadPlan`, since the read plan's `projected_schema` is no longer needed in this layer. No behaviour change. All existing parquet tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
`build_row_filter` (and its `RowFilterGenerator` wrapper) silently dropped
conjuncts that `FilterCandidateBuilder::build` rejected (`Ok(None)` was
`.flatten()`-ed away) and swallowed whole-build errors. By the time
`build_row_filter` runs, `ParquetSource::try_pushdown_filters` has already
accepted the filter and the parent `FilterExec` has been removed, so those
dropped conjuncts were never applied anywhere — wrong results.
Most reproducible trigger: the per-file expr adapter rewrites a predicate
that was pushable at *table schema* time into something the
`PushdownChecker` rejects at *physical file schema* time (schema
evolution / coercion / whole-struct references introduced by the rewrite).
Surface the rejected conjuncts instead of dropping them:
- `build_row_filter` now returns
`Result<(Option<RowFilter>, Vec<Arc<dyn PhysicalExpr>>)>`. The second
element is the conjuncts it could not place. Bench / in-file test call
sites updated.
- `RowFilterGenerator` exposes `rejected_conjuncts()`. On a whole-file
build error it routes every conjunct through that list, so an error no
longer relaxes the predicate.
- `DecoderProjection::build` grows a `post_scan_conjuncts` parameter and
a `post_scan_filter: Option<PostScanFilter>` field. When non-empty it
widens the decoder mask (over the user projection ∪ post-scan filter
columns), rebases the conjuncts onto the stream schema, and returns a
`PostScanFilter` that the stream applies to every decoded batch with
SQL `WHERE` semantics (mirroring `FilterExec`'s `batch_filter`).
- `PushDecoderStreamState` carries the optional `PostScanFilter` and
applies it in the `DecodeResult::Data` arm, skipping empty batches.
- The decoder-local LIMIT is unsafe with a post-scan filter (the decoder
would short-circuit before the filter rejects enough rows), so the
opener routes the limit to `remaining_limit` whenever a post-scan
filter is present.
- New `post_scan_rows_pruned` / `post_scan_rows_matched` counters and
`post_scan_filter_eval_time` `Time` on `ParquetFileMetrics`, mirroring
the existing `pushdown_rows_*` / `row_pushdown_eval_time` so
`EXPLAIN ANALYZE` keeps surfacing filter cost.
Two regression tests:
- `build_row_filter_surfaces_rejected_struct_conjunct` (`row_filter.rs`)
asserts the new API contract directly — the rejected struct conjunct
is returned, not dropped.
- `rejected_struct_conjunct_runs_post_scan_not_dropped` (`opener/mod.rs`)
is end-to-end: with `pushdown_filters=true` and a `s IS NOT NULL`
predicate over a struct column where one row is NULL, `main` returns 3
rows (conjunct silently dropped, predicate relaxed); after this fix it
correctly returns 2.
The `pushdown_filters = false` path is intentionally unchanged in this
commit — `try_pushdown_filters` still leaves the `FilterExec` above the
scan in that case. Always-accepting filters and removing the `FilterExec`
unconditionally is a separate behaviour change in a follow-up commit.
`push_down_filter_parquet.slt` updated for the new `post_scan_rows_*`
metric lines on `EXPLAIN ANALYZE` output.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…st-scan
`ParquetSource::try_pushdown_filters` always returns the per-filter
`Yes` / `No` discriminant from `can_expr_be_pushed_down_with_schemas`,
regardless of the `pushdown_filters` config. The parent `FilterExec` is
always removed for pushable filters, and the scan owns the predicate.
The opener routes the predicate to the post-scan filter when
`pushdown_filters = false`, in addition to the rejected-conjunct path
that already exists for `pushdown_filters = true`:
- `pushdown_filters = true` → row-filterable conjuncts via the parquet
`RowFilter`; any rejected conjuncts via the post-scan filter (the
correctness fix from the previous commit).
- `pushdown_filters = false` → the whole predicate runs as a post-scan
filter on decoded batches (behaviorally identical to a `FilterExec`).
The `pushdown_filters` config keeps its meaning ("build a parquet
`RowFilter`"); doc comments updated.
Plan / test consequences (all results unchanged, plan shape and metrics
change):
- The `FilterExec` no longer appears above a `DataSourceExec` for
pushable parquet filters. The predicate appears as `predicate=…` on
the `DataSourceExec`. Parquet `.slt` files are regenerated to reflect
this (clickbench, push_down_filter_parquet, projection_pushdown,
parquet*, etc.). Spurious whitespace churn from `--complete` was
reverted.
- Opener / integration tests that asserted "row group not pruned ⇒ all
rows returned" (e.g. `a = 1` over `[1, 2, 3]` returning 3 rows) are
updated to reflect the matching-row count, since the scan now applies
the predicate row-level via the post-scan filter.
- `FilterExec: id@0 = 1` assertions in DataFrame / view tests become
`predicate=id@0 = 1` on the `DataSourceExec`.
- Insta inline snapshots in `parquet.rs` and `explain_analyze.rs` are
re-accepted (`output_rows=8` → `output_rows=5` plus
`post_scan_rows_pruned=3`, multi-line plans collapse where the
`FilterExec`/`RepartitionExec` chain is gone).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
When a parquet scan owns a filter and runs it post-decode inside the scan thread (which the post-scan-filter work in apache#22384 introduces for the `pushdown_filters = false` case), there is no sibling `FilterExec` above the scan, and `EnforceDistribution` no longer inserts the `RoundRobinBatch(target_partitions)` repartition it used to trigger from the filter's `benefits_from_input_partitioning`. Single-partition consumers — `SortExec`, `CoalescePartitionsExec`, a `CollectLeft` hash join build — therefore inherit a single-thread scan + filter, even when the cluster has plenty of idle cores. Add `ExecutionPlan::benefits_from_output_partitioning() -> bool` (default `false`) as the symmetric counterpart of `benefits_from_input_partitioning`. The optimizer consults it in the same branch that already decides whether to wrap a child in a round-robin, so the existing `add_roundrobin_on_top` path does the work — no special handling in `repartitioned()` or `DistributionContext` bookkeeping. Wire it through the data-source stack: ExecutionPlan ─┬─ DataSourceExec -> DataSource::benefits_from_output_partitioning │ DataSource ─── FileScanConfig -> FileSource::benefits_from_output_partitioning │ FileSource ─── ParquetSource -> predicate.is_some() && !pushdown_filters() With `pushdown_filters = true` parquet evaluates conjuncts via `RowFilter` during decode (so the round-robin wouldn't help and would also defeat limit pushdown), hence the gate. Restores the parallelism a sibling `FilterExec` used to provide. On TPC-DS SF1 (12 cores, with `enable_join_dynamic_filter_pushdown=false` + `repartition_file_min_size=1 MiB` applied via the companion PRs) the slower-than-main query count drops from 18 → 2 (and the residuals are ~3-5% noise around the post-scan filter's fixed per-batch cost).
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing benefits-from-output-partitioning (6e5c241) to c8b784a (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing benefits-from-output-partitioning (6e5c241) to c8b784a (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing benefits-from-output-partitioning (6e5c241) to c8b784a (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
Stacks on top of #22384 — both branches need to land for the diff to be coherent.
Summary
Add
ExecutionPlan::benefits_from_output_partitioning() -> bool(defaultfalse) as the symmetric counterpart of the existingbenefits_from_input_partitioning. The optimizer'sEnforceDistributionalready inserts aRepartitionExec(RoundRobinBatch(target_partitions))when a parent'sbenefits_from_input_partitioningistrue. With this addition it also fires when the child itself opts in viabenefits_from_output_partitioning— no special handling inrepartitioned()orDistributionContextbookkeeping.Why
When a parquet scan owns a filter and #22384 runs it post-decode inside the scan thread (the
pushdown_filters = falsepath), there is no siblingFilterExecabove the scan. Single-partition consumers —SortExec,CoalescePartitionsExec, aCollectLefthash-join build — therefore inherit a single-thread scan + filter, even when the cluster has plenty of idle cores. The companion PRs (#22438 disabling join dynamic filter pushdown by default, #22439 loweringrepartition_file_min_sizeto 1 MiB) close most of the regression but leave TPC-DS with ~18 queries still slower than main on small dim-table joins where byte-range splitting alone can't reachtarget_partitions. This PR closes the rest.Wiring
The
pushdown_filters = truegate is important: withRowFilterdoing the work during decode, the round-robin wouldn't help and would also defeat limit-pushdown for ordered scans.Benchmark numbers (12 cores, SF1)
Run with the companion PRs (#22438 + #22439) applied so the dynamic-filter and split-size doors are open:
The remaining residuals (TPC-H Q5 ~3%, TPC-DS Q41 ~4% on a 15 ms query, ClickBench Q13 ~5%) look like fixed-cost per-batch overhead in the post-scan filter path itself and are within run-to-run variance for the rest.
Test plan
cargo test --test sqllogictests— all 472 files pass after snapshot updates that all showRepartitionExec: partitioning=RoundRobinBatch(N)inserted above filtered scans where a single-partition parent sits above.cargo test -p datafusion --test core_integrationrun benchmarks