Conversation
2010YOUY01
left a comment
There was a problem hiding this comment.
Thank you — this PR looks really nice.
I took a quick look and left a few suggestions. I’ll review the optimizer rewrite and execution side more carefully later.
| /// Filter(rn<=K) → Window(ROW_NUMBER) → Sort patterns with a | ||
| /// PartitionedTopKExec that maintains per-partition heaps, avoiding | ||
| /// a full sort of the input. | ||
| pub enable_window_topn: bool, default = true |
There was a problem hiding this comment.
I suggest to default it to false, for large partition counts, the regression seems significant.
As a follow-up, we could detect the input cardinality and automatically choose the right plan.
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| // Standalone H2O groupby Q8 benchmark: PartitionedTopKExec enabled vs disabled |
There was a problem hiding this comment.
We could keep this benchmark in this PR, but it would be great to clean it up later.
To make benchmark maintenance easier, we could directly add queries representing this workload to h2o window benchmark, so that similar benchmarks won't get scattered to multiple places.
datafusion/benchmarks/bench.sh
Line 123 in e1ad871
Though the issue is now the h2o benchmark counts the dataset loading time, so we can't isolate the target executor's processing time, so we could add an option to eliminate the data loading time later 🤔
| /// - `K >= rn` (flipped) → fetch = K | ||
| /// - `K > rn` (flipped) → fetch = K - 1 | ||
| /// | ||
| /// # When the Rule Does NOT Fire |
There was a problem hiding this comment.
It would be great to describe when this rule does apply, rather than focusing on when it does not. This optimization should only trigger for a fairly small set of cases.
| // Step 1: Match FilterExec at the top | ||
| let filter = plan.downcast_ref::<FilterExec>()?; | ||
|
|
||
| // Don't handle filters with projections |
There was a problem hiding this comment.
I'm curious why skipping this
There was a problem hiding this comment.
The filter's column indices would point to the projected schema, not the window exec's output schema, so our index-based matching for the ROW_NUMBER column would be wrong without resolving the projection mapping. Skipping this case for simplicity right now.
| } | ||
| DisplayFormatType::TreeRender => { | ||
| writeln!(f, "fetch={}", self.fetch)?; | ||
| writeln!(f, "{}", self.expr) |
There was a problem hiding this comment.
Tree format should also display partition/order expr, and we could also add simple tests for it in sqllogictests like
set datafusion.explain.format = tree;
explain ...
| } | ||
|
|
||
| fn required_input_distribution(&self) -> Vec<Distribution> { | ||
| vec![Distribution::UnspecifiedDistribution] |
There was a problem hiding this comment.
I think this should be requiring a Hash partition scheme for the window partition key, the optimizer would use this API for sanity check during optimization.
| )?)) | ||
| } | ||
|
|
||
| fn apply_expressions( |
There was a problem hiding this comment.
Not related to this PR, but I’m curious why this is a required ExecutionPlan API and when it is used, given that different operators can hold expressions for very different purposes 🤔
| # Tests for Window TopN optimization: PartitionedTopKExec | ||
|
|
||
| statement ok | ||
| CREATE TABLE window_topn_t (id INT, pk INT, val INT) AS VALUES |
There was a problem hiding this comment.
I suggest moving the main test coverage here, instead of keeping it in unit tests across different layers such as optimizer tests. Once we have solid coverage here, it is less likely to get lost during local refactors.
We can also extend the coverage with more edge cases, for example:
- predicates such as
rn < 2,2 > rn, etc. - mixing other window expressions with
row_number() - empty or overlapping partition / order keys, such as
... OVER (ORDER BY id)or... OVER (PARTITION BY id ORDER BY id, customer) - different sort options such as
ASC,DESC, andNULLS FIRST - the
QUALIFYclause https://datafusion.apache.org/user-guide/sql/select.html#qualify-clause - and more
Which issue does this PR close?
ROW_NUMBER < 5/ TopK #6899.Rationale for this change
Queries like
SELECT *, ROW_NUMBER() OVER (PARTITION BY pk ORDER BY val) as rn FROM t WHERE rn <= Kare extremely common in analytics ("top N per group"). The current plan sorts the entire dataset O(N log N), computes ROW_NUMBER for all rows, then filters. With 10M rows, 1K partitions, and K=3, we sort all 10M rows but only keep 3K.This PR introduces a
PartitionedTopKExecoperator that replaces theSortExec, maintaining a per-partitionTopKheap (reusing DataFusion's existingTopKimplementation). Cost drops to O(N log K) time and O(K × P × row_size) memory.What changes are included in this PR?
New physical operator:
PartitionedTopKExec(physical-plan/src/sorts/partitioned_topk.rs)RowConverter, feeds sub-batches to a per-partitionTopKheap(partition_keys, order_keys)orderTopKimplementation for heap management, sort key comparison, eviction, and batch compactionNew optimizer rule:
WindowTopN(physical-optimizer/src/window_topn.rs)Detects the pattern:
And replaces it with:
Both
FilterExecandSortExecare removed.Supported predicates:
rn <= K,rn < K,K >= rn,K > rn.The rule only fires for
ROW_NUMBERwith aPARTITION BYclause. Global top-K (noPARTITION BY) is already handled bySortExecwithfetch.Config flag:
datafusion.optimizer.enable_window_topn(default:true)Benchmark results (H2O groupby Q8, 10M rows, top-2 per partition):
cargo run --release --example h2o_window_topn_bench
The 100K-partition regression is expected: per-partition
TopKoverhead (RowConverter, MemoryReservation per instance)dominates when partitions are very numerous with few rows each. For the common case (moderate partition cardinality), the
optimization provides 2-3x speedup.
Are these changes tested?
Yes:
core/tests/physical_optimizer/window_topn.rs): basic ROW_NUMBER,rn < K, flipped predicates, non-window column filter, config disabled, no partition by, projection between filter and windowsqllogictest/test_files/window_topn.slt): correctness verification, EXPLAIN plan validation,rn < K, no-partition-by case, config disabled fallbackAre there any user-facing changes?
No breaking API changes. The optimization is enabled by default and transparent to users. It can be disabled via: