From 38fa07a0b28ad80ea0728b48f7f0c2cc43f8a8e0 Mon Sep 17 00:00:00 2001 From: Subham Singhal Date: Wed, 8 Apr 2026 22:42:31 +0530 Subject: [PATCH 1/5] Benchmark window topn optimisation --- datafusion/common/src/config.rs | 6 + .../core/examples/h2o_window_topn_bench.rs | 119 +++++ .../core/tests/physical_optimizer/mod.rs | 1 + .../tests/physical_optimizer/window_topn.rs | 421 +++++++++++++++ datafusion/physical-optimizer/src/lib.rs | 1 + .../physical-optimizer/src/optimizer.rs | 6 + .../physical-optimizer/src/window_topn.rs | 334 ++++++++++++ datafusion/physical-plan/src/sorts/mod.rs | 1 + .../src/sorts/partitioned_topk.rs | 498 ++++++++++++++++++ datafusion/physical-plan/src/topk/mod.rs | 2 +- .../sqllogictest/test_files/window_topn.slt | 112 ++++ 11 files changed, 1500 insertions(+), 1 deletion(-) create mode 100644 datafusion/core/examples/h2o_window_topn_bench.rs create mode 100644 datafusion/core/tests/physical_optimizer/window_topn.rs create mode 100644 datafusion/physical-optimizer/src/window_topn.rs create mode 100644 datafusion/physical-plan/src/sorts/partitioned_topk.rs create mode 100644 datafusion/sqllogictest/test_files/window_topn.slt diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 53f2501d60752..ad6045847069f 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1087,6 +1087,12 @@ config_namespace! { /// past window functions, if possible pub enable_window_limits: bool, default = true + /// When set to true, the optimizer will replace + /// Filter(rn<=K) → Window(ROW_NUMBER) → Sort patterns with a + /// PartitionedTopKExec that maintains per-partition heaps, avoiding + /// a full sort of the input. + pub enable_window_topn: bool, default = true + /// When set to true, the optimizer will push TopK (Sort with fetch) /// below hash repartition when the partition key is a prefix of the /// sort key, reducing data volume before the shuffle. diff --git a/datafusion/core/examples/h2o_window_topn_bench.rs b/datafusion/core/examples/h2o_window_topn_bench.rs new file mode 100644 index 0000000000000..9992e88182045 --- /dev/null +++ b/datafusion/core/examples/h2o_window_topn_bench.rs @@ -0,0 +1,119 @@ +// Standalone H2O groupby Q8 benchmark: PartitionedTopKExec enabled vs disabled +// +// Usage: +// cargo run --release --example h2o_window_topn_bench +// +// Generates 10M rows in-memory (matching H2O SMALL), then runs Q8 +// (ROW_NUMBER top-2 per partition) with the optimization on and off. + +use datafusion::prelude::*; +use datafusion_common::instant::Instant; +use std::sync::Arc; + +use arrow::array::{Int64Array, Float64Array, RecordBatch}; +use arrow::datatypes::{DataType, Field, Schema}; +use datafusion::datasource::MemTable; +use rand::rngs::StdRng; +use rand::{Rng, SeedableRng}; + +const NUM_ROWS: usize = 10_000_000; // 10M rows (H2O SMALL) +const BATCH_SIZE: usize = 100_000; +const ITERATIONS: usize = 3; + +fn generate_data(num_partitions: i64) -> Arc { + let schema = Arc::new(Schema::new(vec![ + Field::new("id6", DataType::Int64, false), + Field::new("v3", DataType::Float64, true), + ])); + + let mut rng = StdRng::seed_from_u64(42); + let mut batches = Vec::new(); + let mut remaining = NUM_ROWS; + + while remaining > 0 { + let batch_len = remaining.min(BATCH_SIZE); + remaining -= batch_len; + + let id6: Int64Array = (0..batch_len) + .map(|_| rng.random_range(0..num_partitions)) + .collect(); + + let v3: Float64Array = (0..batch_len) + .map(|_| { + if rng.random_range(0..100) < 5 { + None // 5% nulls + } else { + Some(rng.random_range(0.0..1000.0)) + } + }) + .collect(); + + batches.push( + RecordBatch::try_new(Arc::clone(&schema), vec![ + Arc::new(id6), + Arc::new(v3), + ]) + .unwrap(), + ); + } + + // Split into 8 partitions + let partition_size = batches.len() / 8; + let mut partitions: Vec> = Vec::new(); + for chunk in batches.chunks(partition_size.max(1)) { + partitions.push(chunk.to_vec()); + } + + Arc::new(MemTable::try_new(schema, partitions).unwrap()) +} + +const Q8: &str = "\ +SELECT id6, largest2_v3 FROM (\ + SELECT id6, v3 AS largest2_v3, \ + ROW_NUMBER() OVER (PARTITION BY id6 ORDER BY v3 DESC) AS order_v3 \ + FROM x WHERE v3 IS NOT NULL\ +) sub_query WHERE order_v3 <= 2"; + +#[tokio::main] +async fn main() { + // Test across different partition cardinalities + let scenarios = [ + (100, "100 partitions (100K rows/partition)"), + (1_000, "1K partitions (10K rows/partition)"), + (10_000, "10K partitions (1K rows/partition)"), + (100_000, "100K partitions (100 rows/partition, H2O-like)"), + ]; + + for (num_partitions, label) in scenarios { + println!("=== Scenario: {label} ==="); + println!("Generating {NUM_ROWS} rows with {num_partitions} partitions..."); + let table = generate_data(num_partitions); + + for (tag, enabled) in [("ENABLED ", true), ("DISABLED", false)] { + let mut config = SessionConfig::new(); + config.options_mut().optimizer.enable_window_topn = enabled; + let ctx = SessionContext::new_with_config(config); + ctx.register_table("x", Arc::clone(&table) as _).unwrap(); + + // Warmup + let df = ctx.sql(Q8).await.unwrap(); + let _ = df.collect().await.unwrap(); + + // Benchmark + let mut times = Vec::new(); + for _ in 0..ITERATIONS { + let start = Instant::now(); + let df = ctx.sql(Q8).await.unwrap(); + let batches = df.collect().await.unwrap(); + let elapsed = start.elapsed(); + let _row_count: usize = batches.iter().map(|b| b.num_rows()).sum(); + times.push(elapsed.as_millis()); + } + + let avg = times.iter().sum::() / times.len() as u128; + let min = *times.iter().min().unwrap(); + println!(" [{tag}] avg={avg} ms, min={min} ms"); + } + println!(); + } +} \ No newline at end of file diff --git a/datafusion/core/tests/physical_optimizer/mod.rs b/datafusion/core/tests/physical_optimizer/mod.rs index cf179cb727cf1..b7ba661d2343a 100644 --- a/datafusion/core/tests/physical_optimizer/mod.rs +++ b/datafusion/core/tests/physical_optimizer/mod.rs @@ -37,5 +37,6 @@ mod sanity_checker; #[expect(clippy::needless_pass_by_value)] mod test_utils; mod window_optimize; +mod window_topn; mod pushdown_utils; diff --git a/datafusion/core/tests/physical_optimizer/window_topn.rs b/datafusion/core/tests/physical_optimizer/window_topn.rs new file mode 100644 index 0000000000000..e08e1fff15034 --- /dev/null +++ b/datafusion/core/tests/physical_optimizer/window_topn.rs @@ -0,0 +1,421 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Tests for the WindowTopN physical optimizer rule. + +use std::sync::Arc; + +use arrow::datatypes::{DataType, Field, Schema}; +use datafusion_common::config::ConfigOptions; +use datafusion_common::Result; +use datafusion_common::ScalarValue; +use datafusion_expr::{WindowFrame, WindowFrameBound, WindowFrameUnits}; +use datafusion_functions_window::row_number::row_number_udwf; +use datafusion_physical_expr::expressions::{BinaryExpr, Column, col, lit}; +use datafusion_physical_expr::window::StandardWindowExpr; +use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; +use datafusion_physical_optimizer::window_topn::WindowTopN; +use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::displayable; +use datafusion_physical_plan::filter::FilterExec; +use datafusion_physical_plan::placeholder_row::PlaceholderRowExec; +use datafusion_physical_plan::projection::ProjectionExec; +use datafusion_physical_plan::sorts::sort::SortExec; +use datafusion_physical_plan::windows::{BoundedWindowAggExec, create_udwf_window_expr}; +use datafusion_physical_plan::{ExecutionPlan, InputOrderMode}; +use datafusion_expr::Operator; +use insta::assert_snapshot; + +fn schema() -> Arc { + Arc::new(Schema::new(vec![ + Field::new("pk", DataType::Int64, false), + Field::new("val", DataType::Int64, false), + ])) +} + +fn plan_str(plan: &dyn ExecutionPlan) -> String { + displayable(plan).indent(true).to_string() +} + +fn optimize(plan: Arc) -> Result> { + let config = ConfigOptions::new(); + WindowTopN::new().optimize(plan, &config) +} + +fn optimize_disabled(plan: Arc) -> Result> { + let mut config = ConfigOptions::new(); + config.optimizer.enable_window_topn = false; + WindowTopN::new().optimize(plan, &config) +} + +/// Build: FilterExec(rn <= limit) → BoundedWindowAggExec(ROW_NUMBER PBY pk OBY val) → SortExec(pk, val) +fn build_window_topn_plan( + limit_value: i64, + op: Operator, +) -> Result> { + let s = schema(); + let input: Arc = + Arc::new(PlaceholderRowExec::new(Arc::clone(&s))); + + // Sort by pk ASC, val ASC + let ordering = LexOrdering::new(vec![ + PhysicalSortExpr::new_default(col("pk", &s)?).asc(), + PhysicalSortExpr::new_default(col("val", &s)?).asc(), + ]).unwrap(); + + let sort: Arc = Arc::new( + SortExec::new(ordering.clone(), input).with_preserve_partitioning(true), + ); + + // ROW_NUMBER() OVER (PARTITION BY pk ORDER BY val) + let partition_by = vec![col("pk", &s)?]; + let order_by = vec![PhysicalSortExpr::new_default(col("val", &s)?).asc()]; + + let window_expr = Arc::new(StandardWindowExpr::new( + create_udwf_window_expr( + &row_number_udwf(), + &[], + &s, + "row_number".to_string(), + false, + )?, + &partition_by, + &order_by, + Arc::new(WindowFrame::new_bounds( + WindowFrameUnits::Rows, + WindowFrameBound::Preceding(ScalarValue::UInt64(None)), + WindowFrameBound::CurrentRow, + )), + )); + + let window: Arc = Arc::new(BoundedWindowAggExec::try_new( + vec![window_expr], + sort, + InputOrderMode::Sorted, + true, + )?); + + // FilterExec: rn op limit_value + // The ROW_NUMBER column is at index 2 (after pk=0, val=1) + let rn_col = Arc::new(Column::new("row_number", 2)); + let limit_lit = lit(ScalarValue::UInt64(Some(limit_value as u64))); + let predicate = Arc::new(BinaryExpr::new(rn_col, op, limit_lit)); + let filter: Arc = + Arc::new(FilterExec::try_new(predicate, window)?); + + Ok(filter) +} + +/// Build a plan with no partition-by: ROW_NUMBER() OVER (ORDER BY val) +fn build_window_topn_no_partition(limit_value: i64) -> Result> { + let s = schema(); + let input: Arc = + Arc::new(PlaceholderRowExec::new(Arc::clone(&s))); + + // Sort by val ASC only (no partition key) + let ordering = LexOrdering::new(vec![ + PhysicalSortExpr::new_default(col("val", &s)?).asc(), + ]).unwrap(); + + let sort: Arc = Arc::new( + SortExec::new(ordering.clone(), input).with_preserve_partitioning(true), + ); + + // ROW_NUMBER() OVER (ORDER BY val) — no partition by + let order_by = vec![PhysicalSortExpr::new_default(col("val", &s)?).asc()]; + + let window_expr = Arc::new(StandardWindowExpr::new( + create_udwf_window_expr( + &row_number_udwf(), + &[], + &s, + "row_number".to_string(), + false, + )?, + &[], // empty partition_by + &order_by, + Arc::new(WindowFrame::new_bounds( + WindowFrameUnits::Rows, + WindowFrameBound::Preceding(ScalarValue::UInt64(None)), + WindowFrameBound::CurrentRow, + )), + )); + + let window: Arc = Arc::new(BoundedWindowAggExec::try_new( + vec![window_expr], + sort, + InputOrderMode::Sorted, + true, + )?); + + let rn_col = Arc::new(Column::new("row_number", 2)); + let limit_lit = lit(ScalarValue::UInt64(Some(limit_value as u64))); + let predicate = Arc::new(BinaryExpr::new(rn_col, Operator::LtEq, limit_lit)); + let filter: Arc = + Arc::new(FilterExec::try_new(predicate, window)?); + + Ok(filter) +} + +/// Build a plan where filter is on a data column (not window output) +fn build_non_window_filter_plan() -> Result> { + let s = schema(); + let input: Arc = + Arc::new(PlaceholderRowExec::new(Arc::clone(&s))); + + let ordering = LexOrdering::new(vec![ + PhysicalSortExpr::new_default(col("pk", &s)?).asc(), + PhysicalSortExpr::new_default(col("val", &s)?).asc(), + ]).unwrap(); + + let sort: Arc = Arc::new( + SortExec::new(ordering.clone(), input).with_preserve_partitioning(true), + ); + + let partition_by = vec![col("pk", &s)?]; + let order_by = vec![PhysicalSortExpr::new_default(col("val", &s)?).asc()]; + + let window_expr = Arc::new(StandardWindowExpr::new( + create_udwf_window_expr( + &row_number_udwf(), + &[], + &s, + "row_number".to_string(), + false, + )?, + &partition_by, + &order_by, + Arc::new(WindowFrame::new_bounds( + WindowFrameUnits::Rows, + WindowFrameBound::Preceding(ScalarValue::UInt64(None)), + WindowFrameBound::CurrentRow, + )), + )); + + let window: Arc = Arc::new(BoundedWindowAggExec::try_new( + vec![window_expr], + sort, + InputOrderMode::Sorted, + true, + )?); + + // Filter on data column val (index 1), NOT on window output + let val_col = Arc::new(Column::new("val", 1)); + let limit_lit = lit(ScalarValue::Int64(Some(3))); + let predicate = Arc::new(BinaryExpr::new(val_col, Operator::LtEq, limit_lit)); + let filter: Arc = + Arc::new(FilterExec::try_new(predicate, window)?); + + Ok(filter) +} + +#[test] +fn basic_row_number_rn_lteq_3() -> Result<()> { + let plan = build_window_topn_plan(3, Operator::LtEq)?; + let optimized = optimize(plan)?; + assert_snapshot!(plan_str(optimized.as_ref()), @r#" + BoundedWindowAggExec: wdw=[row_number: Field { "row_number": UInt64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] + PartitionedTopKExec: fetch=3, partition=[pk@0], order=[val@1 ASC] + PlaceholderRowExec + "#); + Ok(()) +} + +#[test] +fn rn_lt_3_becomes_fetch_2() -> Result<()> { + let plan = build_window_topn_plan(3, Operator::Lt)?; + let optimized = optimize(plan)?; + assert_snapshot!(plan_str(optimized.as_ref()), @r#" + BoundedWindowAggExec: wdw=[row_number: Field { "row_number": UInt64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] + PartitionedTopKExec: fetch=2, partition=[pk@0], order=[val@1 ASC] + PlaceholderRowExec + "#); + Ok(()) +} + +#[test] +fn flipped_3_gteq_rn() -> Result<()> { + let plan = { + let s = schema(); + let input: Arc = + Arc::new(PlaceholderRowExec::new(Arc::clone(&s))); + + let ordering = LexOrdering::new(vec![ + PhysicalSortExpr::new_default(col("pk", &s)?).asc(), + PhysicalSortExpr::new_default(col("val", &s)?).asc(), + ]).unwrap(); + + let sort: Arc = Arc::new( + SortExec::new(ordering.clone(), input).with_preserve_partitioning(true), + ); + + let partition_by = vec![col("pk", &s)?]; + let order_by = vec![PhysicalSortExpr::new_default(col("val", &s)?).asc()]; + + let window_expr = Arc::new(StandardWindowExpr::new( + create_udwf_window_expr( + &row_number_udwf(), + &[], + &s, + "row_number".to_string(), + false, + )?, + &partition_by, + &order_by, + Arc::new(WindowFrame::new_bounds( + WindowFrameUnits::Rows, + WindowFrameBound::Preceding(ScalarValue::UInt64(None)), + WindowFrameBound::CurrentRow, + )), + )); + + let window: Arc = Arc::new(BoundedWindowAggExec::try_new( + vec![window_expr], + sort, + InputOrderMode::Sorted, + true, + )?); + + // Flipped: 3 >= rn (Literal GtEq Column) + let rn_col = Arc::new(Column::new("row_number", 2)); + let limit_lit = lit(ScalarValue::UInt64(Some(3))); let predicate = Arc::new(BinaryExpr::new(limit_lit, Operator::GtEq, rn_col)); + let filter: Arc = + Arc::new(FilterExec::try_new(predicate, window)?); + filter + }; + + let optimized = optimize(plan)?; + assert_snapshot!(plan_str(optimized.as_ref()), @r#" + BoundedWindowAggExec: wdw=[row_number: Field { "row_number": UInt64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] + PartitionedTopKExec: fetch=3, partition=[pk@0], order=[val@1 ASC] + PlaceholderRowExec + "#); + Ok(()) +} + +#[test] +fn non_window_column_filter_no_change() -> Result<()> { + let plan = build_non_window_filter_plan()?; + let before = plan_str(plan.as_ref()); + let optimized = optimize(plan)?; + let after = plan_str(optimized.as_ref()); + assert_eq!(before, after, "Plan should not change when filter is on data column"); + Ok(()) +} + +#[test] +fn config_disabled_no_change() -> Result<()> { + let plan = build_window_topn_plan(3, Operator::LtEq)?; + let before = plan_str(plan.as_ref()); + let optimized = optimize_disabled(plan)?; + let after = plan_str(optimized.as_ref()); + assert_eq!(before, after, "Plan should not change when config is disabled"); + Ok(()) +} + +#[test] +fn no_partition_by_no_change() -> Result<()> { + // Without PARTITION BY, this is a global top-K which SortExec with + // fetch already handles — the rule should not fire. + let plan = build_window_topn_no_partition(5)?; + let optimized = optimize(plan)?; + assert_snapshot!(plan_str(optimized.as_ref()), @r#" + FilterExec: row_number@2 <= 5 + BoundedWindowAggExec: wdw=[row_number: Field { "row_number": UInt64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] + SortExec: expr=[val@1 ASC], preserve_partitioning=[true] + PlaceholderRowExec + "#); + Ok(()) +} + +#[test] +fn with_projection_between() -> Result<()> { + let s = schema(); + let input: Arc = + Arc::new(PlaceholderRowExec::new(Arc::clone(&s))); + + let ordering = LexOrdering::new(vec![ + PhysicalSortExpr::new_default(col("pk", &s)?).asc(), + PhysicalSortExpr::new_default(col("val", &s)?).asc(), + ]).unwrap(); + + let sort: Arc = Arc::new( + SortExec::new(ordering.clone(), input).with_preserve_partitioning(true), + ); + + let partition_by = vec![col("pk", &s)?]; + let order_by = vec![PhysicalSortExpr::new_default(col("val", &s)?).asc()]; + + let window_expr = Arc::new(StandardWindowExpr::new( + create_udwf_window_expr( + &row_number_udwf(), + &[], + &s, + "row_number".to_string(), + false, + )?, + &partition_by, + &order_by, + Arc::new(WindowFrame::new_bounds( + WindowFrameUnits::Rows, + WindowFrameBound::Preceding(ScalarValue::UInt64(None)), + WindowFrameBound::CurrentRow, + )), + )); + + let window: Arc = Arc::new(BoundedWindowAggExec::try_new( + vec![window_expr], + sort, + InputOrderMode::Sorted, + true, + )?); + + // Add a ProjectionExec between Filter and Window + let window_schema = window.schema(); + let proj_exprs: Vec<(Arc, String)> = + window_schema + .fields() + .iter() + .enumerate() + .map(|(i, f)| { + ( + Arc::new(Column::new(f.name(), i)) + as Arc, + f.name().to_string(), + ) + }) + .collect(); + + let projection: Arc = + Arc::new(ProjectionExec::try_new(proj_exprs, window)?); + + // rn column is still at index 2 in the projected schema + let rn_col = Arc::new(Column::new("row_number", 2)); + let limit_lit = lit(ScalarValue::UInt64(Some(3))); + let predicate = Arc::new(BinaryExpr::new(rn_col, Operator::LtEq, limit_lit)); + let filter: Arc = + Arc::new(FilterExec::try_new(predicate, projection)?); + + let optimized = optimize(filter)?; + assert_snapshot!(plan_str(optimized.as_ref()), @r#" + ProjectionExec: expr=[pk@0 as pk, val@1 as val, row_number@2 as row_number] + BoundedWindowAggExec: wdw=[row_number: Field { "row_number": UInt64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] + PartitionedTopKExec: fetch=3, partition=[pk@0], order=[val@1 ASC] + PlaceholderRowExec + "#); + Ok(()) +} \ No newline at end of file diff --git a/datafusion/physical-optimizer/src/lib.rs b/datafusion/physical-optimizer/src/lib.rs index a328f43d22b2b..a86628a227552 100644 --- a/datafusion/physical-optimizer/src/lib.rs +++ b/datafusion/physical-optimizer/src/lib.rs @@ -46,5 +46,6 @@ pub mod topk_aggregation; pub mod topk_repartition; pub mod update_aggr_exprs; pub mod utils; +pub mod window_topn; pub use optimizer::PhysicalOptimizerRule; diff --git a/datafusion/physical-optimizer/src/optimizer.rs b/datafusion/physical-optimizer/src/optimizer.rs index 2151ded8d38e4..c78510ead52f3 100644 --- a/datafusion/physical-optimizer/src/optimizer.rs +++ b/datafusion/physical-optimizer/src/optimizer.rs @@ -39,6 +39,7 @@ use crate::update_aggr_exprs::OptimizeAggregateOrder; use crate::hash_join_buffering::HashJoinBuffering; use crate::limit_pushdown_past_window::LimitPushPastWindows; use crate::pushdown_sort::PushdownSort; +use crate::window_topn::WindowTopN; use datafusion_common::Result; use datafusion_common::config::ConfigOptions; use datafusion_physical_plan::ExecutionPlan; @@ -125,6 +126,11 @@ impl PhysicalOptimizer { Arc::new(EnforceSorting::new()), // Run once after the local sorting requirement is changed Arc::new(OptimizeAggregateOrder::new()), + // WindowTopN: replaces Filter(rn<=K) → Window(ROW_NUMBER) → Sort + // with Window(ROW_NUMBER) → PartitionedTopKExec(fetch=K). + // Must run after EnforceSorting (which inserts SortExec) and before + // ProjectionPushdown (which embeds projections into FilterExec). + Arc::new(WindowTopN::new()), // TODO: `try_embed_to_hash_join` in the ProjectionPushdown rule would be block by the CoalesceBatches, so add it before CoalesceBatches. Maybe optimize it in the future. Arc::new(ProjectionPushdown::new()), // Remove the ancillary output requirement operator since we are done with the planning diff --git a/datafusion/physical-optimizer/src/window_topn.rs b/datafusion/physical-optimizer/src/window_topn.rs new file mode 100644 index 0000000000000..abe4fc8d6ce31 --- /dev/null +++ b/datafusion/physical-optimizer/src/window_topn.rs @@ -0,0 +1,334 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`WindowTopN`] optimizer rule for per-partition top-K window queries. +//! +//! Detects queries of the form: +//! +//! ```sql +//! SELECT * FROM ( +//! SELECT *, ROW_NUMBER() OVER (PARTITION BY pk ORDER BY val) as rn +//! FROM t +//! ) WHERE rn <= K; +//! ``` +//! +//! And replaces the `FilterExec → BoundedWindowAggExec → SortExec` pipeline +//! with `BoundedWindowAggExec → PartitionedTopKExec(fetch=K)`, removing both +//! the `FilterExec` and `SortExec`. +//! +//! See [`PartitionedTopKExec`](datafusion_physical_plan::sorts::partitioned_topk::PartitionedTopKExec) +//! for details on the replacement operator. + +use std::sync::Arc; + +use crate::PhysicalOptimizerRule; +use datafusion_common::config::ConfigOptions; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; +use datafusion_common::{Result, ScalarValue}; +use datafusion_expr::Operator; +use arrow::datatypes::DataType; +use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal}; +use datafusion_physical_expr::window::StandardWindowExpr; +use datafusion_physical_plan::filter::FilterExec; +use datafusion_physical_plan::projection::ProjectionExec; +use datafusion_physical_plan::sorts::partitioned_topk::PartitionedTopKExec; +use datafusion_physical_plan::sorts::sort::SortExec; +use datafusion_physical_plan::windows::{BoundedWindowAggExec, WindowUDFExpr}; +use datafusion_physical_plan::{ExecutionPlan}; + +/// Physical optimizer rule that converts per-partition `ROW_NUMBER` top-K +/// queries into a more efficient plan using [`PartitionedTopKExec`]. +/// +/// # Pattern Detected +/// +/// ```text +/// FilterExec(rn <= K) +/// [optional ProjectionExec] +/// BoundedWindowAggExec(ROW_NUMBER PARTITION BY ... ORDER BY ...) +/// SortExec(partition_keys, order_keys) +/// ``` +/// +/// # Replacement +/// +/// ```text +/// [optional ProjectionExec] +/// BoundedWindowAggExec(ROW_NUMBER PARTITION BY ... ORDER BY ...) +/// PartitionedTopKExec(partition_keys, order_keys, fetch=K) +/// ``` +/// +/// The `FilterExec` is removed entirely (all output rows have `rn ∈ {1..K}`). +/// The `SortExec` is replaced by `PartitionedTopKExec` which maintains a +/// per-partition top-K heap instead of sorting the entire dataset. +/// +/// # Supported Predicates +/// +/// - `rn <= K` → fetch = K +/// - `rn < K` → fetch = K - 1 +/// - `K >= rn` (flipped) → fetch = K +/// - `K > rn` (flipped) → fetch = K - 1 +/// +/// # When the Rule Does NOT Fire +/// +/// - Window function is not `ROW_NUMBER` (e.g., `RANK`, `DENSE_RANK`) +/// - No `PARTITION BY` clause (global top-K is already handled by +/// `SortExec` with `fetch`) +/// - Filter predicate is on a data column, not the window output column +/// - `FilterExec` has an embedded projection +/// - Child of `BoundedWindowAggExec` is not a `SortExec` +/// - Config flag `enable_window_topn` is `false` +/// +/// [`PartitionedTopKExec`]: datafusion_physical_plan::sorts::partitioned_topk::PartitionedTopKExec +#[derive(Default, Clone, Debug)] +pub struct WindowTopN; + +impl WindowTopN { + pub fn new() -> Self { + Self + } + + /// Attempt to transform a single plan node. + /// + /// Returns `Some(new_plan)` if the node matches the + /// `FilterExec → [ProjectionExec] → BoundedWindowAggExec → SortExec` + /// pattern and can be rewritten, or `None` if the node should be + /// left unchanged. + fn try_transform( + plan: &Arc, + ) -> Option> { + // Step 1: Match FilterExec at the top + let filter = plan.downcast_ref::()?; + + // Don't handle filters with projections + if filter.projection().is_some() { + return None; + } + + // Step 2: Extract limit from predicate (rn <= K, rn < K, etc.) + let (col_idx, limit_n) = extract_window_limit(filter.predicate())?; + + // Step 3: Walk through optional ProjectionExec to find BoundedWindowAggExec + let child = filter.input(); + let (window_exec, proj_between) = find_window_below(child)?; + + // Step 4: Verify col_idx references a ROW_NUMBER window output column + let input_field_count = window_exec.input().schema().fields().len(); + if col_idx < input_field_count { + return None; // Filter is on an input column, not a window column + } + let window_expr_idx = col_idx - input_field_count; + let window_exprs = window_exec.window_expr(); + if window_expr_idx >= window_exprs.len() { + return None; + } + if !is_row_number(&window_exprs[window_expr_idx]) { + return None; + } + + // Step 5: Verify child of window is SortExec + let sort_exec = window_exec.input().downcast_ref::()?; + let sort_child = sort_exec.input(); + + // Step 6: Determine partition_prefix_len from the window expression + let partition_by = window_exprs[window_expr_idx].partition_by(); + let partition_prefix_len = partition_by.len(); + + // Without PARTITION BY, this is just a global top-K which + // SortExec with fetch already handles efficiently. + if partition_prefix_len == 0 { + return None; + } + + // Step 7: Build PartitionedTopKExec using SortExec's expressions + let partitioned_topk = PartitionedTopKExec::try_new( + Arc::clone(sort_child), + sort_exec.expr().clone(), + partition_prefix_len, + limit_n, + ) + .ok()?; + + // Step 8: Rebuild window with new child + let new_window = Arc::clone(&child_as_arc(window_exec)) + .with_new_children(vec![Arc::new(partitioned_topk)]) + .ok()?; + + // Step 9: If ProjectionExec was between Filter and Window, rebuild it + let result = match proj_between { + Some(proj) => Arc::clone(&child_as_arc(proj)) + .with_new_children(vec![new_window]) + .ok()?, + None => new_window, + }; + + Some(result) + } +} + +/// Helper to get an `Arc` from a reference. +/// We need this because `with_new_children` takes `Arc`. +fn child_as_arc(plan: &T) -> Arc { + Arc::new(plan.clone()) +} + +impl PhysicalOptimizerRule for WindowTopN { + fn optimize( + &self, + plan: Arc, + config: &ConfigOptions, + ) -> Result> { + if !config.optimizer.enable_window_topn { + return Ok(plan); + } + + plan.transform_down(|node| { + Ok( + if let Some(transformed) = WindowTopN::try_transform(&node) { + Transformed::yes(transformed) + } else { + Transformed::no(node) + }, + ) + }) + .data() + } + + fn name(&self) -> &str { + "WindowTopN" + } + + fn schema_check(&self) -> bool { + true + } +} + +/// Extract a window limit from a predicate expression. +/// +/// Returns `(column_index, fetch)` if the predicate constrains a column +/// to at most N rows. +/// +/// # Supported Patterns +/// +/// | Predicate | Returns | +/// |-----------|---------| +/// | `Column(idx) <= Literal(N)` | `(idx, N)` | +/// | `Column(idx) < Literal(N)` | `(idx, N-1)` | +/// | `Literal(N) >= Column(idx)` | `(idx, N)` | +/// | `Literal(N) > Column(idx)` | `(idx, N-1)` | +/// +/// # Examples +/// +/// - `rn <= 5` → `Some((2, 5))` (assuming rn is column index 2) +/// - `rn < 3` → `Some((2, 2))` +/// - `10 >= rn` → `Some((2, 10))` +/// - `rn = 1` → `None` (equality not supported) +/// - `val <= 5` → `Some((1, 5))` (caller must verify it's a window column) +fn extract_window_limit( + predicate: &Arc, +) -> Option<(usize, usize)> { + let binary = predicate.as_any().downcast_ref::()?; + let op = binary.op(); + let left = binary.left(); + let right = binary.right(); + + // Try Column op Literal + if let (Some(col), Some(lit_val)) = ( + left.as_any().downcast_ref::(), + right.as_any().downcast_ref::(), + ) { + let n = scalar_to_usize(lit_val.value())?; + return match op { + &Operator::LtEq => Some((col.index(), n)), + &Operator::Lt => Some((col.index(), n - 1)), + _ => None, + }; + } + + // Try Literal op Column (flipped) + if let (Some(lit_val), Some(col)) = ( + left.as_any().downcast_ref::(), + right.as_any().downcast_ref::(), + ) { + let n = scalar_to_usize(lit_val.value())?; + return match op { + &Operator::GtEq => Some((col.index(), n)), + &Operator::Gt => Some((col.index(), n - 1)), + _ => None, + }; + } + + None +} + +/// Convert a [`ScalarValue`] to `usize` if it's a positive integer. +/// +/// Returns `None` for null values, zero, negative integers, and +/// non-integer types (floats, strings, decimals, etc.). +fn scalar_to_usize(value: &ScalarValue) -> Option { + if !value.data_type().is_integer() { + return None; + } + let casted = value.cast_to(&DataType::UInt64).ok()?; + match casted { + ScalarValue::UInt64(Some(v)) if v > 0 => usize::try_from(v).ok(), + _ => None, + } +} + +/// Check if a window expression is `ROW_NUMBER`. +/// +/// Downcasts through `StandardWindowExpr` → `WindowUDFExpr` and checks +/// that the UDF name is `"row_number"`. Returns `false` for all other +/// window functions (e.g., `RANK`, `DENSE_RANK`, `SUM`). +fn is_row_number( + expr: &Arc, +) -> bool { + let Some(swe) = expr.as_any().downcast_ref::() else { + return false; + }; + let swfe = swe.get_standard_func_expr(); + let Some(udf) = swfe.as_any().downcast_ref::() else { + return false; + }; + udf.fun().name() == "row_number" +} + +/// Walk below a plan node looking for a [`BoundedWindowAggExec`]. +/// +/// Handles two cases: +/// - Direct child: `FilterExec → BoundedWindowAggExec` +/// - With projection: `FilterExec → ProjectionExec → BoundedWindowAggExec` +/// +/// Returns the window exec and an optional `ProjectionExec` in between, +/// or `None` if no `BoundedWindowAggExec` is found within one or two levels. +fn find_window_below( + plan: &Arc, +) -> Option<(&BoundedWindowAggExec, Option<&ProjectionExec>)> { + // Direct child is BoundedWindowAggExec + if let Some(window) = plan.downcast_ref::() { + return Some((window, None)); + } + + // Child is ProjectionExec with BoundedWindowAggExec below + if let Some(proj) = plan.downcast_ref::() { + let proj_child = proj.input(); + if let Some(window) = proj_child.downcast_ref::() { + return Some((window, Some(proj))); + } + } + + None +} \ No newline at end of file diff --git a/datafusion/physical-plan/src/sorts/mod.rs b/datafusion/physical-plan/src/sorts/mod.rs index a73872a175b9b..ca8d4a4400c49 100644 --- a/datafusion/physical-plan/src/sorts/mod.rs +++ b/datafusion/physical-plan/src/sorts/mod.rs @@ -22,6 +22,7 @@ mod cursor; mod merge; mod multi_level_merge; pub mod partial_sort; +pub mod partitioned_topk; pub mod sort; pub mod sort_preserving_merge; mod stream; diff --git a/datafusion/physical-plan/src/sorts/partitioned_topk.rs b/datafusion/physical-plan/src/sorts/partitioned_topk.rs new file mode 100644 index 0000000000000..7b964e56e3cae --- /dev/null +++ b/datafusion/physical-plan/src/sorts/partitioned_topk.rs @@ -0,0 +1,498 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`PartitionedTopKExec`]: Top-K per partition operator +//! +//! For queries like: +//! ```sql +//! SELECT *, ROW_NUMBER() OVER (PARTITION BY pk ORDER BY val) as rn +//! FROM t WHERE rn <= N +//! ``` +//! +//! Instead of sorting the entire dataset, this operator maintains a +//! [`TopK`] heap per partition (reusing the existing TopK implementation) +//! and emits only the top-K rows per partition in sorted order +//! `(partition_keys, order_keys)`. + +use std::fmt::{self, Formatter}; +use std::sync::Arc; + +use arrow::array::{RecordBatch, UInt32Array}; +use arrow::compute::take_record_batch; +use arrow::datatypes::SchemaRef; +use arrow::row::{OwnedRow, RowConverter}; +use datafusion_common::tree_node::TreeNodeRecursion; +use datafusion_common::{HashMap, Result}; +use datafusion_execution::TaskContext; +use datafusion_physical_expr::PhysicalExpr; +use datafusion_physical_expr::expressions::{DynamicFilterPhysicalExpr, lit}; +use datafusion_physical_expr_common::sort_expr::LexOrdering; +use futures::StreamExt; +use futures::TryStreamExt; +use parking_lot::RwLock; + +use crate::execution_plan::{Boundedness, EmissionType}; +use crate::metrics::ExecutionPlanMetricsSet; +use crate::topk::{TopK, TopKDynamicFilters, build_sort_fields}; +use crate::{ + DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, + PlanProperties, SendableRecordBatchStream, stream::RecordBatchStreamAdapter, +}; + +/// Per-partition Top-K operator for window function queries. +/// +/// # Background +/// +/// "Top K per partition" is a common analytics pattern used for queries such as +/// "find the top 3 products by revenue for each store". The (simplified) SQL +/// for such a query might be: +/// +/// ```sql +/// SELECT * FROM ( +/// SELECT *, ROW_NUMBER() OVER (PARTITION BY store ORDER BY revenue DESC) as rn +/// FROM sales +/// ) WHERE rn <= 3; +/// ``` +/// +/// The unoptimized physical plan would be: +/// +/// ```text +/// FilterExec: rn <= 3 +/// BoundedWindowAggExec: ROW_NUMBER() PARTITION BY [store] ORDER BY [revenue DESC] +/// SortExec: expr=[store ASC, revenue DESC] +/// DataSourceExec +/// ``` +/// +/// This plan sorts the **entire** dataset (O(N log N)), computes `ROW_NUMBER` +/// for **all** rows, and then filters to keep only the top K per partition. +/// With 10M rows, 1K partitions, and K=3, it sorts all 10M rows but only +/// keeps 3K. +/// +/// # Optimization +/// +/// `PartitionedTopKExec` replaces the `SortExec` and the `FilterExec` is +/// removed. The optimized plan becomes: +/// +/// ```text +/// BoundedWindowAggExec: ROW_NUMBER() PARTITION BY [store] ORDER BY [revenue DESC] +/// PartitionedTopKExec: fetch=3, partition=[store], order=[revenue DESC] +/// DataSourceExec +/// ``` +/// +/// Instead of sorting the entire dataset, this operator reads unsorted input, +/// maintains a [`TopK`] heap per distinct partition key, and emits only the +/// top-K rows per partition in sorted order `(partition_keys, order_keys)`. +/// +/// Cost: O(N log K) time instead of O(N log N), and O(K × P × row_size) +/// memory where K = fetch, P = number of distinct partitions. +/// +/// # Example +/// +/// For the query above with `fetch=3` and input: +/// +/// ```text +/// store | revenue +/// ------|-------- +/// A | 100 +/// B | 50 +/// A | 200 +/// B | 150 +/// A | 300 +/// A | 400 +/// ``` +/// +/// The operator maintains two heaps: +/// - **store=A**: keeps top-3 by revenue DESC → {400, 300, 200}, evicts 100 +/// - **store=B**: keeps top-3 by revenue DESC → {150, 50} (only 2 rows) +/// +/// Output (sorted by store ASC, revenue DESC): +/// +/// ```text +/// store | revenue +/// ------|-------- +/// A | 400 +/// A | 300 +/// A | 200 +/// B | 150 +/// B | 50 +/// ``` +/// +/// This is then passed to `BoundedWindowAggExec` which assigns +/// `ROW_NUMBER` 1, 2, 3 to each partition — all of which satisfy `rn <= 3`. +/// +/// # Limitations +/// +/// - Only activated when the window function is `ROW_NUMBER` with a +/// `PARTITION BY` clause. Global top-K (no `PARTITION BY`) is already +/// handled efficiently by `SortExec` with `fetch`. +/// - Memory usage is proportional to `K × P`. For very high cardinality +/// partition keys (millions of distinct values), this may use significant +/// memory. +#[derive(Debug, Clone)] +pub struct PartitionedTopKExec { + /// Input execution plan (reads unsorted data) + input: Arc, + /// Full sort expressions: `[partition_keys..., order_keys...]`. + /// + /// For `PARTITION BY store ORDER BY revenue DESC` with sort + /// `[store ASC, revenue DESC]`, the first `partition_prefix_len` + /// expressions are the partition keys (`[store ASC]`) and the + /// remaining are the order-by keys (`[revenue DESC]`). + expr: LexOrdering, + /// Number of leading expressions in `expr` that define the partition + /// key. For example, `PARTITION BY a, b` → `partition_prefix_len = 2`. + partition_prefix_len: usize, + /// Maximum number of rows to keep per partition (the K in "top-K"). + /// Derived from the filter predicate: `rn <= 3` → `fetch = 3`, + /// `rn < 3` → `fetch = 2`. + fetch: usize, + /// Execution metrics + metrics_set: ExecutionPlanMetricsSet, + /// Cached plan properties (output ordering, partitioning, etc.) + cache: Arc, +} + +impl PartitionedTopKExec { + /// Create a new `PartitionedTopKExec`. + /// + /// # Arguments + /// + /// * `input` - The child execution plan providing unsorted input rows. + /// * `expr` - Full sort ordering `[partition_keys..., order_keys...]`. + /// For `PARTITION BY pk ORDER BY val ASC`, this would be `[pk ASC, val ASC]`. + /// * `partition_prefix_len` - Number of leading expressions in `expr` + /// that form the partition key. Must be >= 1. + /// * `fetch` - Maximum rows to retain per partition (the K in "top-K"). + /// + /// # Example + /// + /// ```text + /// // For: ROW_NUMBER() OVER (PARTITION BY store ORDER BY revenue DESC) ... WHERE rn <= 5 + /// PartitionedTopKExec::try_new( + /// data_source, + /// LexOrdering([store ASC, revenue DESC]), + /// 1, // partition_prefix_len: 1 partition column (store) + /// 5, // fetch: keep top 5 per partition + /// ) + /// ``` + pub fn try_new( + input: Arc, + expr: LexOrdering, + partition_prefix_len: usize, + fetch: usize, + ) -> Result { + let cache = Self::compute_properties(&input, expr.clone())?; + Ok(Self { + input, + expr, + partition_prefix_len, + fetch, + metrics_set: ExecutionPlanMetricsSet::new(), + cache: Arc::new(cache), + }) + } + + /// Returns the child execution plan. + pub fn input(&self) -> &Arc { + &self.input + } + + /// Returns the full sort ordering `[partition_keys..., order_keys...]`. + pub fn expr(&self) -> &LexOrdering { + &self.expr + } + + /// Returns the number of leading expressions in [`Self::expr`] that + /// define the partition key. + pub fn partition_prefix_len(&self) -> usize { + self.partition_prefix_len + } + + /// Returns the maximum number of rows retained per partition. + pub fn fetch(&self) -> usize { + self.fetch + } + + /// Compute [`PlanProperties`] for this operator. + /// + /// The output is sorted by `sort_exprs` (partition keys then order keys), + /// uses the same partitioning as the input, emits all output at once + /// (`EmissionType::Final`), and is bounded. + + fn compute_properties( + input: &Arc, + sort_exprs: LexOrdering, + ) -> Result { + let mut eq_properties = input.equivalence_properties().clone(); + eq_properties.reorder(sort_exprs)?; + + Ok(PlanProperties::new( + eq_properties, + input.output_partitioning().clone(), + EmissionType::Final, + Boundedness::Bounded, + )) + } +} + +impl DisplayAs for PartitionedTopKExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + let partition_exprs: Vec = self.expr[..self.partition_prefix_len] + .iter() + .map(|e| format!("{}", e.expr)) + .collect(); + let order_exprs: Vec = self.expr[self.partition_prefix_len..] + .iter() + .map(|e| format!("{e}")) + .collect(); + write!( + f, + "PartitionedTopKExec: fetch={}, partition=[{}], order=[{}]", + self.fetch, + partition_exprs.join(", "), + order_exprs.join(", "), + ) + } + DisplayFormatType::TreeRender => { + writeln!(f, "fetch={}", self.fetch)?; + writeln!(f, "{}", self.expr) + } + } + } +} + +impl ExecutionPlan for PartitionedTopKExec { + fn name(&self) -> &'static str { + "PartitionedTopKExec" + } + + fn properties(&self) -> &Arc { + &self.cache + } + + fn required_input_distribution(&self) -> Vec { + vec![Distribution::UnspecifiedDistribution] + } + + fn maintains_input_order(&self) -> Vec { + vec![false] + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.input] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + assert_eq!(children.len(), 1); + Ok(Arc::new(PartitionedTopKExec::try_new( + Arc::clone(&children[0]), + self.expr.clone(), + self.partition_prefix_len, + self.fetch, + )?)) + } + + fn apply_expressions( + &self, + f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, + ) -> Result { + let mut tnr = TreeNodeRecursion::Continue; + for sort_expr in &self.expr { + tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?; + } + Ok(tnr) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> Result { + let input = self.input.execute(partition, Arc::clone(&context))?; + let schema = input.schema(); + + let partition_sort_fields = + build_sort_fields(&self.expr[..self.partition_prefix_len], &schema)?; + + let partition_converter = RowConverter::new(partition_sort_fields)?; + + let partition_exprs: Vec> = self.expr + [..self.partition_prefix_len] + .iter() + .map(|e| Arc::clone(&e.expr)) + .collect(); + let order_expr: LexOrdering = + LexOrdering::new(self.expr[self.partition_prefix_len..].iter().cloned()) + .expect("PartitionedTopKExec requires at least one order-by expression"); + let fetch = self.fetch; + let batch_size = context.session_config().batch_size(); + let runtime = Arc::clone(&context.runtime_env()); + let metrics_set = self.metrics_set.clone(); + + let stream = futures::stream::once(async move { + do_partitioned_topk( + input, + schema, + partition_converter, + partition_exprs, + order_expr, + fetch, + batch_size, + runtime, + metrics_set, + ) + .await + }) + .try_flatten(); + + Ok(Box::pin(RecordBatchStreamAdapter::new( + self.input.schema(), + stream, + ))) + } +} + +/// Create a no-op [`TopKDynamicFilters`] for a per-partition [`TopK`]. +/// +/// In normal `SortExec` top-K mode, dynamic filters push predicates down to +/// the data source (e.g., telling Parquet to skip rows worse than the current +/// K-th best). For per-partition heaps the data is already in memory and split +/// by partition key, so there is no data source to push filters to. We pass +/// `lit(true)` (accept everything) so the filter never rejects any row. +fn create_noop_dynamic_filter() -> Arc> { + Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new( + DynamicFilterPhysicalExpr::new(vec![], lit(true)), + )))) +} + +/// Read all input, split batches by partition key, feed each sub-batch +/// to a per-partition [`TopK`], then emit results in partition-key order. +/// +/// # Phases +/// +/// 1. **Accumulation** — For each input batch: +/// - Evaluate partition key expressions to get partition column arrays +/// - Convert partition columns to binary [`arrow::row::Row`] format +/// - Group row indices by partition key +/// - Extract sub-batches via [`take_record_batch`] and insert into +/// the partition's [`TopK`] heap +/// +/// 2. **Emission** — After all input is consumed: +/// - Sort partition keys so output is ordered by partition key +/// - For each partition in sorted order, call [`TopK::emit`] to get +/// rows sorted by order-by key +/// - Return all batches as a single stream +/// +/// # Cost +/// +/// - Time: O(N log K) where N = total rows, K = fetch +/// - Memory: O(K × P × row_size) where P = number of distinct partitions +async fn do_partitioned_topk( + mut input: SendableRecordBatchStream, + schema: SchemaRef, + partition_converter: RowConverter, + partition_exprs: Vec>, + order_expr: LexOrdering, + fetch: usize, + batch_size: usize, + runtime: Arc, + metrics_set: ExecutionPlanMetricsSet, +) -> Result { + let mut partitions: HashMap = HashMap::new(); + let mut partition_counter: usize = 0; + + // Macro-like helper: create a new TopK for a partition + macro_rules! new_topk { + () => {{ + let id = partition_counter; + partition_counter += 1; + TopK::try_new( + id, + Arc::clone(&schema), + vec![], + order_expr.clone(), + fetch, + batch_size, + Arc::clone(&runtime), + &metrics_set, + create_noop_dynamic_filter(), + ) + }}; + } + + // ---------- Accumulation phase ---------- + while let Some(batch) = input.next().await { + let batch = batch?; + let num_rows = batch.num_rows(); + if num_rows == 0 { + continue; + } + + // Evaluate partition key columns + let pk_arrays: Vec<_> = partition_exprs + .iter() + .map(|e| e.evaluate(&batch).and_then(|v| v.into_array(num_rows))) + .collect::>>()?; + + let pk_rows = partition_converter.convert_columns(&pk_arrays)?; + + // Group row indices by partition key + let mut groups: HashMap> = HashMap::new(); + for row_idx in 0..num_rows { + let pk = pk_rows.row(row_idx).owned(); + groups.entry(pk).or_default().push(row_idx as u32); + } + + // For each partition group, create a sub-batch and feed to TopK + for (pk, indices) in groups { + if !partitions.contains_key(&pk) { + partitions.insert(pk.clone(), new_topk!()?); + } + let topk = partitions.get_mut(&pk).unwrap(); + let indices_array = UInt32Array::from(indices); + let sub_batch = take_record_batch(&batch, &indices_array)?; + topk.insert_batch(sub_batch)?; + } + } + + // ---------- Emit phase ---------- + // Sort partition keys so output is ordered by (partition_keys, order_keys). + let mut sorted_pks: Vec = partitions.keys().cloned().collect(); + sorted_pks.sort(); + + let mut output_batches: Vec = Vec::new(); + + for pk in sorted_pks { + if let Some(topk) = partitions.remove(&pk) { + // TopK::emit() returns a stream of sorted batches + let mut stream = topk.emit()?; + while let Some(batch) = stream.next().await { + output_batches.push(batch?); + } + } + } + + Ok(Box::pin(RecordBatchStreamAdapter::new( + schema, + futures::stream::iter(output_batches.into_iter().map(Ok)), + ))) +} \ No newline at end of file diff --git a/datafusion/physical-plan/src/topk/mod.rs b/datafusion/physical-plan/src/topk/mod.rs index ab9249985b863..2c6d3bcad1a65 100644 --- a/datafusion/physical-plan/src/topk/mod.rs +++ b/datafusion/physical-plan/src/topk/mod.rs @@ -163,7 +163,7 @@ impl TopKDynamicFilters { // Guesstimate for memory allocation: estimated number of bytes used per row in the RowConverter const ESTIMATED_BYTES_PER_ROW: usize = 20; -fn build_sort_fields( +pub(crate) fn build_sort_fields( ordering: &[PhysicalSortExpr], schema: &SchemaRef, ) -> Result> { diff --git a/datafusion/sqllogictest/test_files/window_topn.slt b/datafusion/sqllogictest/test_files/window_topn.slt new file mode 100644 index 0000000000000..2685b05885331 --- /dev/null +++ b/datafusion/sqllogictest/test_files/window_topn.slt @@ -0,0 +1,112 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Tests for Window TopN optimization: PartitionedTopKExec + +statement ok +CREATE TABLE window_topn_t (id INT, pk INT, val INT) AS VALUES + (1, 1, 10), + (2, 1, 20), + (3, 1, 30), + (4, 1, 40), + (5, 2, 5), + (6, 2, 15), + (7, 2, 25), + (8, 3, 100), + (9, 3, 50), + (10, 3, 75); + +# Test 1: Correct results for top-2 per partition +query III rowsort +SELECT id, pk, val FROM ( + SELECT *, ROW_NUMBER() OVER (PARTITION BY pk ORDER BY val) as rn FROM window_topn_t +) WHERE rn <= 2; +---- +1 1 10 +10 3 75 +2 1 20 +5 2 5 +6 2 15 +9 3 50 + +# Test 2: Verify plan contains PartitionedTopKExec +query TT +EXPLAIN SELECT * FROM ( + SELECT *, ROW_NUMBER() OVER (PARTITION BY pk ORDER BY val) as rn FROM window_topn_t +) WHERE rn <= 2; +---- +logical_plan +01)Projection: window_topn_t.id, window_topn_t.pk, window_topn_t.val, row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn +02)--Filter: row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= UInt64(2) +03)----WindowAggr: windowExpr=[[row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] +04)------TableScan: window_topn_t projection=[id, pk, val] +physical_plan +01)ProjectionExec: expr=[id@0 as id, pk@1 as pk, val@2 as val, row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as rn] +02)--BoundedWindowAggExec: wdw=[row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] +03)----PartitionedTopKExec: fetch=2, partition=[pk@1], order=[val@2 ASC NULLS LAST] +04)------DataSourceExec: partitions=1, partition_sizes=[1] + +# Test 3: rn < 3 should give fetch=2 +query III rowsort +SELECT id, pk, val FROM ( + SELECT *, ROW_NUMBER() OVER (PARTITION BY pk ORDER BY val) as rn FROM window_topn_t +) WHERE rn < 3; +---- +1 1 10 +10 3 75 +2 1 20 +5 2 5 +6 2 15 +9 3 50 + +# Test 4: Without PARTITION BY (single global partition) +query II rowsort +SELECT id, val FROM ( + SELECT *, ROW_NUMBER() OVER (ORDER BY val) as rn FROM window_topn_t +) WHERE rn <= 3; +---- +1 10 +5 5 +6 15 + +# Test 5: Disabled config falls back to normal plan +statement ok +SET datafusion.optimizer.enable_window_topn = false; + +# When disabled, the plan should use FilterExec + SortExec (no PartitionedTopKExec) +query TT +EXPLAIN SELECT * FROM ( + SELECT *, ROW_NUMBER() OVER (PARTITION BY pk ORDER BY val) as rn FROM window_topn_t +) WHERE rn <= 2; +---- +logical_plan +01)Projection: window_topn_t.id, window_topn_t.pk, window_topn_t.val, row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn +02)--Filter: row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= UInt64(2) +03)----WindowAggr: windowExpr=[[row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] +04)------TableScan: window_topn_t projection=[id, pk, val] +physical_plan +01)ProjectionExec: expr=[id@0 as id, pk@1 as pk, val@2 as val, row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as rn] +02)--FilterExec: row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 <= 2 +03)----BoundedWindowAggExec: wdw=[row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] +04)------SortExec: expr=[pk@1 ASC NULLS LAST, val@2 ASC NULLS LAST], preserve_partitioning=[false] +05)--------DataSourceExec: partitions=1, partition_sizes=[1] + +statement ok +SET datafusion.optimizer.enable_window_topn = true; + +statement ok +DROP TABLE window_topn_t; \ No newline at end of file From 52147dd85b847b900316b601a80354c5d27073f8 Mon Sep 17 00:00:00 2001 From: Subham Singhal Date: Wed, 8 Apr 2026 23:18:18 +0530 Subject: [PATCH 2/5] Lint fix --- .../core/examples/h2o_window_topn_bench.rs | 26 +++++-- .../tests/physical_optimizer/window_topn.rs | 73 ++++++++++--------- .../physical-optimizer/src/window_topn.rs | 32 ++++---- .../src/sorts/partitioned_topk.rs | 8 +- 4 files changed, 76 insertions(+), 63 deletions(-) diff --git a/datafusion/core/examples/h2o_window_topn_bench.rs b/datafusion/core/examples/h2o_window_topn_bench.rs index 9992e88182045..323f890288f74 100644 --- a/datafusion/core/examples/h2o_window_topn_bench.rs +++ b/datafusion/core/examples/h2o_window_topn_bench.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + // Standalone H2O groupby Q8 benchmark: PartitionedTopKExec enabled vs disabled // // Usage: @@ -10,7 +27,7 @@ use datafusion::prelude::*; use datafusion_common::instant::Instant; use std::sync::Arc; -use arrow::array::{Int64Array, Float64Array, RecordBatch}; +use arrow::array::{Float64Array, Int64Array, RecordBatch}; use arrow::datatypes::{DataType, Field, Schema}; use datafusion::datasource::MemTable; use rand::rngs::StdRng; @@ -49,10 +66,7 @@ fn generate_data(num_partitions: i64) -> Arc { .collect(); batches.push( - RecordBatch::try_new(Arc::clone(&schema), vec![ - Arc::new(id6), - Arc::new(v3), - ]) + RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(id6), Arc::new(v3)]) .unwrap(), ); } @@ -116,4 +130,4 @@ async fn main() { } println!(); } -} \ No newline at end of file +} diff --git a/datafusion/core/tests/physical_optimizer/window_topn.rs b/datafusion/core/tests/physical_optimizer/window_topn.rs index e08e1fff15034..6ad09973aeeeb 100644 --- a/datafusion/core/tests/physical_optimizer/window_topn.rs +++ b/datafusion/core/tests/physical_optimizer/window_topn.rs @@ -20,16 +20,17 @@ use std::sync::Arc; use arrow::datatypes::{DataType, Field, Schema}; -use datafusion_common::config::ConfigOptions; use datafusion_common::Result; use datafusion_common::ScalarValue; +use datafusion_common::config::ConfigOptions; +use datafusion_expr::Operator; use datafusion_expr::{WindowFrame, WindowFrameBound, WindowFrameUnits}; use datafusion_functions_window::row_number::row_number_udwf; use datafusion_physical_expr::expressions::{BinaryExpr, Column, col, lit}; use datafusion_physical_expr::window::StandardWindowExpr; use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; -use datafusion_physical_optimizer::window_topn::WindowTopN; use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_optimizer::window_topn::WindowTopN; use datafusion_physical_plan::displayable; use datafusion_physical_plan::filter::FilterExec; use datafusion_physical_plan::placeholder_row::PlaceholderRowExec; @@ -37,7 +38,6 @@ use datafusion_physical_plan::projection::ProjectionExec; use datafusion_physical_plan::sorts::sort::SortExec; use datafusion_physical_plan::windows::{BoundedWindowAggExec, create_udwf_window_expr}; use datafusion_physical_plan::{ExecutionPlan, InputOrderMode}; -use datafusion_expr::Operator; use insta::assert_snapshot; fn schema() -> Arc { @@ -68,18 +68,17 @@ fn build_window_topn_plan( op: Operator, ) -> Result> { let s = schema(); - let input: Arc = - Arc::new(PlaceholderRowExec::new(Arc::clone(&s))); + let input: Arc = Arc::new(PlaceholderRowExec::new(Arc::clone(&s))); // Sort by pk ASC, val ASC let ordering = LexOrdering::new(vec![ PhysicalSortExpr::new_default(col("pk", &s)?).asc(), PhysicalSortExpr::new_default(col("val", &s)?).asc(), - ]).unwrap(); + ]) + .unwrap(); - let sort: Arc = Arc::new( - SortExec::new(ordering.clone(), input).with_preserve_partitioning(true), - ); + let sort: Arc = + Arc::new(SortExec::new(ordering.clone(), input).with_preserve_partitioning(true)); // ROW_NUMBER() OVER (PARTITION BY pk ORDER BY val) let partition_by = vec![col("pk", &s)?]; @@ -123,17 +122,15 @@ fn build_window_topn_plan( /// Build a plan with no partition-by: ROW_NUMBER() OVER (ORDER BY val) fn build_window_topn_no_partition(limit_value: i64) -> Result> { let s = schema(); - let input: Arc = - Arc::new(PlaceholderRowExec::new(Arc::clone(&s))); + let input: Arc = Arc::new(PlaceholderRowExec::new(Arc::clone(&s))); // Sort by val ASC only (no partition key) - let ordering = LexOrdering::new(vec![ - PhysicalSortExpr::new_default(col("val", &s)?).asc(), - ]).unwrap(); + let ordering = + LexOrdering::new(vec![PhysicalSortExpr::new_default(col("val", &s)?).asc()]) + .unwrap(); - let sort: Arc = Arc::new( - SortExec::new(ordering.clone(), input).with_preserve_partitioning(true), - ); + let sort: Arc = + Arc::new(SortExec::new(ordering.clone(), input).with_preserve_partitioning(true)); // ROW_NUMBER() OVER (ORDER BY val) — no partition by let order_by = vec![PhysicalSortExpr::new_default(col("val", &s)?).asc()]; @@ -146,7 +143,7 @@ fn build_window_topn_no_partition(limit_value: i64) -> Result Result Result> { let s = schema(); - let input: Arc = - Arc::new(PlaceholderRowExec::new(Arc::clone(&s))); + let input: Arc = Arc::new(PlaceholderRowExec::new(Arc::clone(&s))); let ordering = LexOrdering::new(vec![ PhysicalSortExpr::new_default(col("pk", &s)?).asc(), PhysicalSortExpr::new_default(col("val", &s)?).asc(), - ]).unwrap(); + ]) + .unwrap(); - let sort: Arc = Arc::new( - SortExec::new(ordering.clone(), input).with_preserve_partitioning(true), - ); + let sort: Arc = + Arc::new(SortExec::new(ordering.clone(), input).with_preserve_partitioning(true)); let partition_by = vec![col("pk", &s)?]; let order_by = vec![PhysicalSortExpr::new_default(col("val", &s)?).asc()]; @@ -257,7 +253,8 @@ fn flipped_3_gteq_rn() -> Result<()> { let ordering = LexOrdering::new(vec![ PhysicalSortExpr::new_default(col("pk", &s)?).asc(), PhysicalSortExpr::new_default(col("val", &s)?).asc(), - ]).unwrap(); + ]) + .unwrap(); let sort: Arc = Arc::new( SortExec::new(ordering.clone(), input).with_preserve_partitioning(true), @@ -292,7 +289,8 @@ fn flipped_3_gteq_rn() -> Result<()> { // Flipped: 3 >= rn (Literal GtEq Column) let rn_col = Arc::new(Column::new("row_number", 2)); - let limit_lit = lit(ScalarValue::UInt64(Some(3))); let predicate = Arc::new(BinaryExpr::new(limit_lit, Operator::GtEq, rn_col)); + let limit_lit = lit(ScalarValue::UInt64(Some(3))); + let predicate = Arc::new(BinaryExpr::new(limit_lit, Operator::GtEq, rn_col)); let filter: Arc = Arc::new(FilterExec::try_new(predicate, window)?); filter @@ -313,7 +311,10 @@ fn non_window_column_filter_no_change() -> Result<()> { let before = plan_str(plan.as_ref()); let optimized = optimize(plan)?; let after = plan_str(optimized.as_ref()); - assert_eq!(before, after, "Plan should not change when filter is on data column"); + assert_eq!( + before, after, + "Plan should not change when filter is on data column" + ); Ok(()) } @@ -323,7 +324,10 @@ fn config_disabled_no_change() -> Result<()> { let before = plan_str(plan.as_ref()); let optimized = optimize_disabled(plan)?; let after = plan_str(optimized.as_ref()); - assert_eq!(before, after, "Plan should not change when config is disabled"); + assert_eq!( + before, after, + "Plan should not change when config is disabled" + ); Ok(()) } @@ -345,17 +349,16 @@ fn no_partition_by_no_change() -> Result<()> { #[test] fn with_projection_between() -> Result<()> { let s = schema(); - let input: Arc = - Arc::new(PlaceholderRowExec::new(Arc::clone(&s))); + let input: Arc = Arc::new(PlaceholderRowExec::new(Arc::clone(&s))); let ordering = LexOrdering::new(vec![ PhysicalSortExpr::new_default(col("pk", &s)?).asc(), PhysicalSortExpr::new_default(col("val", &s)?).asc(), - ]).unwrap(); + ]) + .unwrap(); - let sort: Arc = Arc::new( - SortExec::new(ordering.clone(), input).with_preserve_partitioning(true), - ); + let sort: Arc = + Arc::new(SortExec::new(ordering.clone(), input).with_preserve_partitioning(true)); let partition_by = vec![col("pk", &s)?]; let order_by = vec![PhysicalSortExpr::new_default(col("val", &s)?).asc()]; @@ -418,4 +421,4 @@ fn with_projection_between() -> Result<()> { PlaceholderRowExec "#); Ok(()) -} \ No newline at end of file +} diff --git a/datafusion/physical-optimizer/src/window_topn.rs b/datafusion/physical-optimizer/src/window_topn.rs index abe4fc8d6ce31..e5f81ad1e9ff8 100644 --- a/datafusion/physical-optimizer/src/window_topn.rs +++ b/datafusion/physical-optimizer/src/window_topn.rs @@ -30,25 +30,25 @@ //! with `BoundedWindowAggExec → PartitionedTopKExec(fetch=K)`, removing both //! the `FilterExec` and `SortExec`. //! -//! See [`PartitionedTopKExec`](datafusion_physical_plan::sorts::partitioned_topk::PartitionedTopKExec) +//! See [`PartitionedTopKExec`] //! for details on the replacement operator. use std::sync::Arc; use crate::PhysicalOptimizerRule; +use arrow::datatypes::DataType; use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{Result, ScalarValue}; use datafusion_expr::Operator; -use arrow::datatypes::DataType; use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal}; use datafusion_physical_expr::window::StandardWindowExpr; +use datafusion_physical_plan::ExecutionPlan; use datafusion_physical_plan::filter::FilterExec; use datafusion_physical_plan::projection::ProjectionExec; use datafusion_physical_plan::sorts::partitioned_topk::PartitionedTopKExec; use datafusion_physical_plan::sorts::sort::SortExec; use datafusion_physical_plan::windows::{BoundedWindowAggExec, WindowUDFExpr}; -use datafusion_physical_plan::{ExecutionPlan}; /// Physical optimizer rule that converts per-partition `ROW_NUMBER` top-K /// queries into a more efficient plan using [`PartitionedTopKExec`]. @@ -106,9 +106,7 @@ impl WindowTopN { /// `FilterExec → [ProjectionExec] → BoundedWindowAggExec → SortExec` /// pattern and can be rewritten, or `None` if the node should be /// left unchanged. - fn try_transform( - plan: &Arc, - ) -> Option> { + fn try_transform(plan: &Arc) -> Option> { // Step 1: Match FilterExec at the top let filter = plan.downcast_ref::()?; @@ -159,7 +157,7 @@ impl WindowTopN { partition_prefix_len, limit_n, ) - .ok()?; + .ok()?; // Step 8: Rebuild window with new child let new_window = Arc::clone(&child_as_arc(window_exec)) @@ -203,7 +201,7 @@ impl PhysicalOptimizerRule for WindowTopN { }, ) }) - .data() + .data() } fn name(&self) -> &str { @@ -250,9 +248,9 @@ fn extract_window_limit( right.as_any().downcast_ref::(), ) { let n = scalar_to_usize(lit_val.value())?; - return match op { - &Operator::LtEq => Some((col.index(), n)), - &Operator::Lt => Some((col.index(), n - 1)), + return match *op { + Operator::LtEq => Some((col.index(), n)), + Operator::Lt => Some((col.index(), n - 1)), _ => None, }; } @@ -263,9 +261,9 @@ fn extract_window_limit( right.as_any().downcast_ref::(), ) { let n = scalar_to_usize(lit_val.value())?; - return match op { - &Operator::GtEq => Some((col.index(), n)), - &Operator::Gt => Some((col.index(), n - 1)), + return match *op { + Operator::GtEq => Some((col.index(), n)), + Operator::Gt => Some((col.index(), n - 1)), _ => None, }; } @@ -293,9 +291,7 @@ fn scalar_to_usize(value: &ScalarValue) -> Option { /// Downcasts through `StandardWindowExpr` → `WindowUDFExpr` and checks /// that the UDF name is `"row_number"`. Returns `false` for all other /// window functions (e.g., `RANK`, `DENSE_RANK`, `SUM`). -fn is_row_number( - expr: &Arc, -) -> bool { +fn is_row_number(expr: &Arc) -> bool { let Some(swe) = expr.as_any().downcast_ref::() else { return false; }; @@ -331,4 +327,4 @@ fn find_window_below( } None -} \ No newline at end of file +} diff --git a/datafusion/physical-plan/src/sorts/partitioned_topk.rs b/datafusion/physical-plan/src/sorts/partitioned_topk.rs index 7b964e56e3cae..c0e5134bceda5 100644 --- a/datafusion/physical-plan/src/sorts/partitioned_topk.rs +++ b/datafusion/physical-plan/src/sorts/partitioned_topk.rs @@ -232,7 +232,6 @@ impl PartitionedTopKExec { /// The output is sorted by `sort_exprs` (partition keys then order keys), /// uses the same partitioning as the input, emits all output at once /// (`EmissionType::Final`), and is bounded. - fn compute_properties( input: &Arc, sort_exprs: LexOrdering, @@ -360,9 +359,9 @@ impl ExecutionPlan for PartitionedTopKExec { runtime, metrics_set, ) - .await + .await }) - .try_flatten(); + .try_flatten(); Ok(Box::pin(RecordBatchStreamAdapter::new( self.input.schema(), @@ -406,6 +405,7 @@ fn create_noop_dynamic_filter() -> Arc> { /// /// - Time: O(N log K) where N = total rows, K = fetch /// - Memory: O(K × P × row_size) where P = number of distinct partitions +#[expect(clippy::too_many_arguments)] async fn do_partitioned_topk( mut input: SendableRecordBatchStream, schema: SchemaRef, @@ -495,4 +495,4 @@ async fn do_partitioned_topk( schema, futures::stream::iter(output_batches.into_iter().map(Ok)), ))) -} \ No newline at end of file +} From 48fd17871740309d355b4258ceaf88c9feadd165 Mon Sep 17 00:00:00 2001 From: Subham Singhal Date: Thu, 9 Apr 2026 10:43:18 +0530 Subject: [PATCH 3/5] Resolve comment --- datafusion/common/src/config.rs | 2 +- .../physical-plan/src/sorts/partitioned_topk.rs | 12 +++++++++++- .../sqllogictest/test_files/information_schema.slt | 2 ++ datafusion/sqllogictest/test_files/window_topn.slt | 6 +++++- docs/source/user-guide/configs.md | 1 + 5 files changed, 20 insertions(+), 3 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index ad6045847069f..32c1de17d43b6 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1091,7 +1091,7 @@ config_namespace! { /// Filter(rn<=K) → Window(ROW_NUMBER) → Sort patterns with a /// PartitionedTopKExec that maintains per-partition heaps, avoiding /// a full sort of the input. - pub enable_window_topn: bool, default = true + pub enable_window_topn: bool, default = false /// When set to true, the optimizer will push TopK (Sort with fetch) /// below hash repartition when the partition key is a prefix of the diff --git a/datafusion/physical-plan/src/sorts/partitioned_topk.rs b/datafusion/physical-plan/src/sorts/partitioned_topk.rs index c0e5134bceda5..4211de5a32ef9 100644 --- a/datafusion/physical-plan/src/sorts/partitioned_topk.rs +++ b/datafusion/physical-plan/src/sorts/partitioned_topk.rs @@ -232,6 +232,7 @@ impl PartitionedTopKExec { /// The output is sorted by `sort_exprs` (partition keys then order keys), /// uses the same partitioning as the input, emits all output at once /// (`EmissionType::Final`), and is bounded. + fn compute_properties( input: &Arc, sort_exprs: LexOrdering, @@ -269,8 +270,17 @@ impl DisplayAs for PartitionedTopKExec { ) } DisplayFormatType::TreeRender => { + let partition_exprs: Vec = self.expr[..self.partition_prefix_len] + .iter() + .map(|e| format!("{}", e.expr)) + .collect(); + let order_exprs: Vec = self.expr[self.partition_prefix_len..] + .iter() + .map(|e| format!("{e}")) + .collect(); writeln!(f, "fetch={}", self.fetch)?; - writeln!(f, "{}", self.expr) + writeln!(f, "partition=[{}]", partition_exprs.join(", "))?; + writeln!(f, "order=[{}]", order_exprs.join(", ")) } } } diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 0b34f381cbc59..f3706b5da7e0d 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -309,6 +309,7 @@ datafusion.optimizer.enable_topk_aggregation true datafusion.optimizer.enable_topk_dynamic_filter_pushdown true datafusion.optimizer.enable_topk_repartition true datafusion.optimizer.enable_window_limits true +datafusion.optimizer.enable_window_topn false datafusion.optimizer.expand_views_at_output false datafusion.optimizer.filter_null_join_keys false datafusion.optimizer.hash_join_inlist_pushdown_max_distinct_values 150 @@ -453,6 +454,7 @@ datafusion.optimizer.enable_topk_aggregation true When set to true, the optimize datafusion.optimizer.enable_topk_dynamic_filter_pushdown true When set to true, the optimizer will attempt to push down TopK dynamic filters into the file scan phase. datafusion.optimizer.enable_topk_repartition true When set to true, the optimizer will push TopK (Sort with fetch) below hash repartition when the partition key is a prefix of the sort key, reducing data volume before the shuffle. datafusion.optimizer.enable_window_limits true When set to true, the optimizer will attempt to push limit operations past window functions, if possible +datafusion.optimizer.enable_window_topn false When set to true, the optimizer will replace Filter(rn<=K) → Window(ROW_NUMBER) → Sort patterns with a PartitionedTopKExec that maintains per-partition heaps, avoiding a full sort of the input. datafusion.optimizer.expand_views_at_output false When set to true, if the returned type is a view type then the output will be coerced to a non-view. Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`. datafusion.optimizer.filter_null_join_keys false When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. datafusion.optimizer.hash_join_inlist_pushdown_max_distinct_values 150 Maximum number of distinct values (rows) in the build side of a hash join to be pushed down as an InList expression for dynamic filtering. Build sides with more rows than this will use hash table lookups instead. Set to 0 to always use hash table lookups. This provides an additional limit beyond `hash_join_inlist_pushdown_max_size` to prevent very large IN lists that might not provide much benefit over hash table lookups. This uses the deduplicated row count once the build side has been evaluated. The default is 150 values per partition. This is inspired by Trino's `max-filter-keys-per-column` setting. See: diff --git a/datafusion/sqllogictest/test_files/window_topn.slt b/datafusion/sqllogictest/test_files/window_topn.slt index 2685b05885331..259f5be92d045 100644 --- a/datafusion/sqllogictest/test_files/window_topn.slt +++ b/datafusion/sqllogictest/test_files/window_topn.slt @@ -30,6 +30,10 @@ CREATE TABLE window_topn_t (id INT, pk INT, val INT) AS VALUES (9, 3, 50), (10, 3, 75); +# Enable the optimization for all tests +statement ok +SET datafusion.optimizer.enable_window_topn = true; + # Test 1: Correct results for top-2 per partition query III rowsort SELECT id, pk, val FROM ( @@ -109,4 +113,4 @@ statement ok SET datafusion.optimizer.enable_window_topn = true; statement ok -DROP TABLE window_topn_t; \ No newline at end of file +DROP TABLE window_topn_t; diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index b88727e7e3b52..fc351053f22f3 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -140,6 +140,7 @@ The following configuration settings are available: | datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | | datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible | | datafusion.optimizer.enable_window_limits | true | When set to true, the optimizer will attempt to push limit operations past window functions, if possible | +| datafusion.optimizer.enable_window_topn | true | When set to true, the optimizer will replace Filter(rn<=K) → Window(ROW_NUMBER) → Sort patterns with a PartitionedTopKExec that maintains per-partition heaps, avoiding a full sort of the input. | | datafusion.optimizer.enable_topk_repartition | true | When set to true, the optimizer will push TopK (Sort with fetch) below hash repartition when the partition key is a prefix of the sort key, reducing data volume before the shuffle. | | datafusion.optimizer.enable_topk_dynamic_filter_pushdown | true | When set to true, the optimizer will attempt to push down TopK dynamic filters into the file scan phase. | | datafusion.optimizer.enable_join_dynamic_filter_pushdown | true | When set to true, the optimizer will attempt to push down Join dynamic filters into the file scan phase. | From 5c2c0fb0d47e07871daa3517934b1f6ba03d912e Mon Sep 17 00:00:00 2001 From: Subham Singhal Date: Thu, 9 Apr 2026 19:59:56 +0530 Subject: [PATCH 4/5] Adds UT --- .../src/sorts/partitioned_topk.rs | 1 - .../sqllogictest/test_files/window_topn.slt | 536 +++++++++++++++++- 2 files changed, 523 insertions(+), 14 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/partitioned_topk.rs b/datafusion/physical-plan/src/sorts/partitioned_topk.rs index 4211de5a32ef9..0e68b6a75c721 100644 --- a/datafusion/physical-plan/src/sorts/partitioned_topk.rs +++ b/datafusion/physical-plan/src/sorts/partitioned_topk.rs @@ -232,7 +232,6 @@ impl PartitionedTopKExec { /// The output is sorted by `sort_exprs` (partition keys then order keys), /// uses the same partitioning as the input, emits all output at once /// (`EmissionType::Final`), and is bounded. - fn compute_properties( input: &Arc, sort_exprs: LexOrdering, diff --git a/datafusion/sqllogictest/test_files/window_topn.slt b/datafusion/sqllogictest/test_files/window_topn.slt index 259f5be92d045..bf9ce26b35537 100644 --- a/datafusion/sqllogictest/test_files/window_topn.slt +++ b/datafusion/sqllogictest/test_files/window_topn.slt @@ -34,50 +34,56 @@ CREATE TABLE window_topn_t (id INT, pk INT, val INT) AS VALUES statement ok SET datafusion.optimizer.enable_window_topn = true; -# Test 1: Correct results for top-2 per partition +# Test 1: Correct results for top-3 per partition query III rowsort SELECT id, pk, val FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY pk ORDER BY val) as rn FROM window_topn_t -) WHERE rn <= 2; +) WHERE rn <= 3; ---- 1 1 10 10 3 75 2 1 20 +3 1 30 5 2 5 6 2 15 +7 2 25 +8 3 100 9 3 50 -# Test 2: Verify plan contains PartitionedTopKExec +# Test 2: Verify plan contains PartitionedTopKExec with fetch=3 query TT EXPLAIN SELECT * FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY pk ORDER BY val) as rn FROM window_topn_t -) WHERE rn <= 2; +) WHERE rn <= 3; ---- logical_plan 01)Projection: window_topn_t.id, window_topn_t.pk, window_topn_t.val, row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn -02)--Filter: row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= UInt64(2) +02)--Filter: row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= UInt64(3) 03)----WindowAggr: windowExpr=[[row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] 04)------TableScan: window_topn_t projection=[id, pk, val] physical_plan 01)ProjectionExec: expr=[id@0 as id, pk@1 as pk, val@2 as val, row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as rn] 02)--BoundedWindowAggExec: wdw=[row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] -03)----PartitionedTopKExec: fetch=2, partition=[pk@1], order=[val@2 ASC NULLS LAST] +03)----PartitionedTopKExec: fetch=3, partition=[pk@1], order=[val@2 ASC NULLS LAST] 04)------DataSourceExec: partitions=1, partition_sizes=[1] -# Test 3: rn < 3 should give fetch=2 +# Test 3: rn < 4 should give same results (fetch=3) query III rowsort SELECT id, pk, val FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY pk ORDER BY val) as rn FROM window_topn_t -) WHERE rn < 3; +) WHERE rn < 4; ---- 1 1 10 10 3 75 2 1 20 +3 1 30 5 2 5 6 2 15 +7 2 25 +8 3 100 9 3 50 -# Test 4: Without PARTITION BY (single global partition) +# Test 4: Without PARTITION BY — should NOT optimize query II rowsort SELECT id, val FROM ( SELECT *, ROW_NUMBER() OVER (ORDER BY val) as rn FROM window_topn_t @@ -91,20 +97,19 @@ SELECT id, val FROM ( statement ok SET datafusion.optimizer.enable_window_topn = false; -# When disabled, the plan should use FilterExec + SortExec (no PartitionedTopKExec) query TT EXPLAIN SELECT * FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY pk ORDER BY val) as rn FROM window_topn_t -) WHERE rn <= 2; +) WHERE rn <= 3; ---- logical_plan 01)Projection: window_topn_t.id, window_topn_t.pk, window_topn_t.val, row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn -02)--Filter: row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= UInt64(2) +02)--Filter: row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= UInt64(3) 03)----WindowAggr: windowExpr=[[row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] 04)------TableScan: window_topn_t projection=[id, pk, val] physical_plan 01)ProjectionExec: expr=[id@0 as id, pk@1 as pk, val@2 as val, row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as rn] -02)--FilterExec: row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 <= 2 +02)--FilterExec: row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 <= 3 03)----BoundedWindowAggExec: wdw=[row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] 04)------SortExec: expr=[pk@1 ASC NULLS LAST, val@2 ASC NULLS LAST], preserve_partitioning=[false] 05)--------DataSourceExec: partitions=1, partition_sizes=[1] @@ -112,5 +117,510 @@ physical_plan statement ok SET datafusion.optimizer.enable_window_topn = true; +# Test 6: Flipped predicate: 3 >= rn should also trigger optimization +query TT +EXPLAIN SELECT * FROM ( + SELECT *, ROW_NUMBER() OVER (PARTITION BY pk ORDER BY val) as rn FROM window_topn_t +) WHERE 3 >= rn; +---- +logical_plan +01)Projection: window_topn_t.id, window_topn_t.pk, window_topn_t.val, row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn +02)--Filter: row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= UInt64(3) +03)----WindowAggr: windowExpr=[[row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] +04)------TableScan: window_topn_t projection=[id, pk, val] +physical_plan +01)ProjectionExec: expr=[id@0 as id, pk@1 as pk, val@2 as val, row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as rn] +02)--BoundedWindowAggExec: wdw=[row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] +03)----PartitionedTopKExec: fetch=3, partition=[pk@1], order=[val@2 ASC NULLS LAST] +04)------DataSourceExec: partitions=1, partition_sizes=[1] + +# Test 7: Filter on data column (not window output) — should NOT optimize +query TT +EXPLAIN SELECT * FROM ( + SELECT *, ROW_NUMBER() OVER (PARTITION BY pk ORDER BY val) as rn FROM window_topn_t +) WHERE val <= 3; +---- +logical_plan +01)Projection: window_topn_t.id, window_topn_t.pk, window_topn_t.val, row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn +02)--Filter: window_topn_t.val <= Int32(3) +03)----WindowAggr: windowExpr=[[row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] +04)------TableScan: window_topn_t projection=[id, pk, val] +physical_plan +01)ProjectionExec: expr=[id@0 as id, pk@1 as pk, val@2 as val, row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as rn] +02)--FilterExec: val@2 <= 3 +03)----BoundedWindowAggExec: wdw=[row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] +04)------SortExec: expr=[pk@1 ASC NULLS LAST, val@2 ASC NULLS LAST], preserve_partitioning=[false] +05)--------DataSourceExec: partitions=1, partition_sizes=[1] + +# Test 8: rn < 4 plan shows PartitionedTopKExec with fetch=3 +statement ok +SET datafusion.explain.physical_plan_only = true; + +query TT +EXPLAIN SELECT * FROM ( + SELECT *, ROW_NUMBER() OVER (PARTITION BY pk ORDER BY val) as rn FROM window_topn_t +) WHERE rn < 4; +---- +physical_plan +01)ProjectionExec: expr=[id@0 as id, pk@1 as pk, val@2 as val, row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as rn] +02)--BoundedWindowAggExec: wdw=[row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] +03)----PartitionedTopKExec: fetch=3, partition=[pk@1], order=[val@2 ASC NULLS LAST] +04)------DataSourceExec: partitions=1, partition_sizes=[1] + +statement ok +SET datafusion.explain.physical_plan_only = false; + +# Test 9: 3 < rn is NOT a top-K pattern — should NOT optimize +query TT +EXPLAIN SELECT * FROM ( + SELECT *, ROW_NUMBER() OVER (PARTITION BY pk ORDER BY val) as rn FROM window_topn_t +) WHERE 3 < rn; +---- +logical_plan +01)Projection: window_topn_t.id, window_topn_t.pk, window_topn_t.val, row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn +02)--Filter: row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW > UInt64(3) +03)----WindowAggr: windowExpr=[[row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] +04)------TableScan: window_topn_t projection=[id, pk, val] +physical_plan +01)ProjectionExec: expr=[id@0 as id, pk@1 as pk, val@2 as val, row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as rn] +02)--FilterExec: row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 > 3 +03)----BoundedWindowAggExec: wdw=[row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] +04)------SortExec: expr=[pk@1 ASC NULLS LAST, val@2 ASC NULLS LAST], preserve_partitioning=[false] +05)--------DataSourceExec: partitions=1, partition_sizes=[1] + +# Test 10: Tree format EXPLAIN shows partition and order expressions +statement ok +SET datafusion.explain.format = tree; + +statement ok +SET datafusion.explain.physical_plan_only = true; + +query TT +EXPLAIN SELECT * FROM ( + SELECT *, ROW_NUMBER() OVER (PARTITION BY pk ORDER BY val) as rn FROM window_topn_t +) WHERE rn <= 3; +---- +physical_plan +01)┌───────────────────────────┐ +02)│ ProjectionExec │ +03)│ -------------------- │ +04)│ id: id │ +05)│ pk: pk │ +06)│ │ +07)│ rn: │ +08)│ row_number() PARTITION BY │ +09)│ [window_topn_t.pk] ORDER │ +10)│ BY [window_topn_t.val │ +11)│ ASC NULLS LAST] RANGE │ +12)│ BETWEEN UNBOUNDED │ +13)│ PRECEDING AND │ +14)│ CURRENT ROW │ +15)│ │ +16)│ val: val │ +17)└─────────────┬─────────────┘ +18)┌─────────────┴─────────────┐ +19)│ BoundedWindowAggExec │ +20)│ -------------------- │ +21)│ mode: Sorted │ +22)│ │ +23)│ select_list: │ +24)│ row_number() PARTITION BY │ +25)│ [window_topn_t.pk] ORDER │ +26)│ BY [window_topn_t.val │ +27)│ ASC NULLS LAST] RANGE │ +28)│ BETWEEN UNBOUNDED │ +29)│ PRECEDING AND │ +30)│ CURRENT ROW │ +31)└─────────────┬─────────────┘ +32)┌─────────────┴─────────────┐ +33)│ PartitionedTopKExec │ +34)│ -------------------- │ +35)│ fetch: 3 │ +36)│ │ +37)│ order: │ +38)│ [val@2 ASC NULLS LAST] │ +39)│ │ +40)│ partition: [pk@1] │ +41)└─────────────┬─────────────┘ +42)┌─────────────┴─────────────┐ +43)│ DataSourceExec │ +44)│ -------------------- │ +45)│ bytes: 480 │ +46)│ format: memory │ +47)│ rows: 1 │ +48)└───────────────────────────┘ + +statement ok +SET datafusion.explain.format = indent; + +statement ok +SET datafusion.explain.physical_plan_only = false; + +# Test 11: ROW_NUMBER + RANK together — filter on rn should still optimize +query IIIII rowsort +SELECT id, pk, val, rn, rnk FROM ( + SELECT *, + ROW_NUMBER() OVER (PARTITION BY pk ORDER BY val) as rn, + RANK() OVER (PARTITION BY pk ORDER BY val) as rnk + FROM window_topn_t +) WHERE rn <= 3; +---- +1 1 10 1 1 +10 3 75 2 2 +2 1 20 2 2 +3 1 30 3 3 +5 2 5 1 1 +6 2 15 2 2 +7 2 25 3 3 +8 3 100 3 3 +9 3 50 1 1 + +# Test 12: ROW_NUMBER + SUM together — filter on rn, correctness check +query IIIII rowsort +SELECT id, pk, val, rn, running_sum FROM ( + SELECT *, + ROW_NUMBER() OVER (PARTITION BY pk ORDER BY val) as rn, + SUM(val) OVER (PARTITION BY pk ORDER BY val) as running_sum + FROM window_topn_t +) WHERE rn <= 3; +---- +1 1 10 1 10 +10 3 75 2 125 +2 1 20 2 30 +3 1 30 3 60 +5 2 5 1 5 +6 2 15 2 20 +7 2 25 3 45 +8 3 100 3 225 +9 3 50 1 50 + +# Test 13: Filter on RANK (not ROW_NUMBER) — should NOT optimize +statement ok +SET datafusion.explain.physical_plan_only = true; + +query TT +EXPLAIN SELECT * FROM ( + SELECT *, + ROW_NUMBER() OVER (PARTITION BY pk ORDER BY val) as rn, + RANK() OVER (PARTITION BY pk ORDER BY val) as rnk + FROM window_topn_t +) WHERE rnk <= 3; +---- +physical_plan +01)ProjectionExec: expr=[id@0 as id, pk@1 as pk, val@2 as val, row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as rn, rank() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as rnk] +02)--FilterExec: rank() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 <= 3 +03)----BoundedWindowAggExec: wdw=[row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, rank() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "rank() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] +04)------SortExec: expr=[pk@1 ASC NULLS LAST, val@2 ASC NULLS LAST], preserve_partitioning=[false] +05)--------DataSourceExec: partitions=1, partition_sizes=[1] + +# Test 14: Filter on rn AND rnk — compound predicate should NOT optimize +query TT +EXPLAIN SELECT * FROM ( + SELECT *, + ROW_NUMBER() OVER (PARTITION BY pk ORDER BY val) as rn, + RANK() OVER (PARTITION BY pk ORDER BY val) as rnk + FROM window_topn_t +) WHERE rn <= 3 AND rnk <= 3; +---- +physical_plan +01)ProjectionExec: expr=[id@0 as id, pk@1 as pk, val@2 as val, row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as rn, rank() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as rnk] +02)--FilterExec: row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 <= 3 AND rank() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 <= 3 +03)----BoundedWindowAggExec: wdw=[row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, rank() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "rank() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] +04)------SortExec: expr=[pk@1 ASC NULLS LAST, val@2 ASC NULLS LAST], preserve_partitioning=[false] +05)--------DataSourceExec: partitions=1, partition_sizes=[1] + +statement ok +SET datafusion.explain.physical_plan_only = false; + +# Test 15: ROW_NUMBER with DESC ordering — correctness +query III rowsort +SELECT id, pk, val FROM ( + SELECT *, ROW_NUMBER() OVER (PARTITION BY pk ORDER BY val DESC) as rn FROM window_topn_t +) WHERE rn <= 3; +---- +10 3 75 +2 1 20 +3 1 30 +4 1 40 +5 2 5 +6 2 15 +7 2 25 +8 3 100 +9 3 50 + +# Test 16: Multiple partition keys — should optimize +statement ok +SET datafusion.explain.physical_plan_only = true; + +query TT +EXPLAIN SELECT * FROM ( + SELECT *, ROW_NUMBER() OVER (PARTITION BY pk, id ORDER BY val) as rn FROM window_topn_t +) WHERE rn <= 3; +---- +physical_plan +01)ProjectionExec: expr=[id@0 as id, pk@1 as pk, val@2 as val, row_number() PARTITION BY [window_topn_t.pk, window_topn_t.id] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as rn] +02)--BoundedWindowAggExec: wdw=[row_number() PARTITION BY [window_topn_t.pk, window_topn_t.id] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() PARTITION BY [window_topn_t.pk, window_topn_t.id] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] +03)----PartitionedTopKExec: fetch=3, partition=[pk@1, id@0], order=[val@2 ASC NULLS LAST] +04)------DataSourceExec: partitions=1, partition_sizes=[1] + +statement ok +SET datafusion.explain.physical_plan_only = false; + +# Test 17: No PARTITION BY (only ORDER BY) — should NOT optimize +statement ok +SET datafusion.explain.physical_plan_only = true; + +query TT +EXPLAIN SELECT * FROM ( + SELECT *, ROW_NUMBER() OVER (ORDER BY id) as rn FROM window_topn_t +) WHERE rn <= 3; +---- +physical_plan +01)ProjectionExec: expr=[id@0 as id, pk@1 as pk, val@2 as val, row_number() ORDER BY [window_topn_t.id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as rn] +02)--FilterExec: row_number() ORDER BY [window_topn_t.id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 <= 3 +03)----BoundedWindowAggExec: wdw=[row_number() ORDER BY [window_topn_t.id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() ORDER BY [window_topn_t.id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] +04)------SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] +05)--------DataSourceExec: partitions=1, partition_sizes=[1] + +# Test 18: Overlapping partition and order keys (PARTITION BY id ORDER BY id, val) — should optimize +query TT +EXPLAIN SELECT * FROM ( + SELECT *, ROW_NUMBER() OVER (PARTITION BY id ORDER BY id, val) as rn FROM window_topn_t +) WHERE rn <= 3; +---- +physical_plan +01)ProjectionExec: expr=[id@0 as id, pk@1 as pk, val@2 as val, row_number() PARTITION BY [window_topn_t.id] ORDER BY [window_topn_t.id ASC NULLS LAST, window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as rn] +02)--BoundedWindowAggExec: wdw=[row_number() PARTITION BY [window_topn_t.id] ORDER BY [window_topn_t.id ASC NULLS LAST, window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() PARTITION BY [window_topn_t.id] ORDER BY [window_topn_t.id ASC NULLS LAST, window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] +03)----PartitionedTopKExec: fetch=3, partition=[id@0], order=[val@2 ASC NULLS LAST] +04)------DataSourceExec: partitions=1, partition_sizes=[1] + +# Test 19: Overlapping keys correctness (each id is unique, so rn=1 for all) +statement ok +SET datafusion.explain.physical_plan_only = false; + +query IIII rowsort +SELECT id, pk, val, rn FROM ( + SELECT *, ROW_NUMBER() OVER (PARTITION BY id ORDER BY id, val) as rn FROM window_topn_t +) WHERE rn <= 3; +---- +1 1 10 1 +10 3 75 1 +2 1 20 1 +3 1 30 1 +4 1 40 1 +5 2 5 1 +6 2 15 1 +7 2 25 1 +8 3 100 1 +9 3 50 1 + +# Test 20: PARTITION BY same column used in ORDER BY with different direction — should optimize +statement ok +SET datafusion.explain.physical_plan_only = true; + +query TT +EXPLAIN SELECT * FROM ( + SELECT *, ROW_NUMBER() OVER (PARTITION BY pk ORDER BY pk, val DESC) as rn FROM window_topn_t +) WHERE rn <= 3; +---- +physical_plan +01)ProjectionExec: expr=[id@0 as id, pk@1 as pk, val@2 as val, row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.pk ASC NULLS LAST, window_topn_t.val DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as rn] +02)--BoundedWindowAggExec: wdw=[row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.pk ASC NULLS LAST, window_topn_t.val DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.pk ASC NULLS LAST, window_topn_t.val DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] +03)----PartitionedTopKExec: fetch=3, partition=[pk@1], order=[val@2 DESC] +04)------DataSourceExec: partitions=1, partition_sizes=[1] + +# Test 21: Correctness for PARTITION BY pk ORDER BY pk, val DESC +statement ok +SET datafusion.explain.physical_plan_only = false; + +query IIII rowsort +SELECT id, pk, val, rn FROM ( + SELECT *, ROW_NUMBER() OVER (PARTITION BY pk ORDER BY pk, val DESC) as rn FROM window_topn_t +) WHERE rn <= 3; +---- +10 3 75 2 +2 1 20 3 +3 1 30 2 +4 1 40 1 +5 2 5 3 +6 2 15 2 +7 2 25 1 +8 3 100 1 +9 3 50 3 + +# Test 22: PARTITION BY pk ORDER BY pk DESC, val DESC — pk direction conflicts with partition sort +statement ok +SET datafusion.explain.physical_plan_only = true; + +query TT +EXPLAIN SELECT * FROM ( + SELECT *, ROW_NUMBER() OVER (PARTITION BY pk ORDER BY pk DESC, val DESC) as rn FROM window_topn_t +) WHERE rn <= 3; +---- +physical_plan +01)ProjectionExec: expr=[id@0 as id, pk@1 as pk, val@2 as val, row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.pk DESC NULLS FIRST, window_topn_t.val DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as rn] +02)--BoundedWindowAggExec: wdw=[row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.pk DESC NULLS FIRST, window_topn_t.val DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.pk DESC NULLS FIRST, window_topn_t.val DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] +03)----PartitionedTopKExec: fetch=3, partition=[pk@1], order=[val@2 DESC] +04)------DataSourceExec: partitions=1, partition_sizes=[1] + +statement ok +SET datafusion.explain.physical_plan_only = false; + +# ---- Tests with QUALIFY clause ---- + +# Test 28: QUALIFY with ROW_NUMBER — correctness +query IIII rowsort +SELECT *, ROW_NUMBER() OVER (PARTITION BY pk ORDER BY val) as rn FROM window_topn_t +QUALIFY rn <= 3; +---- +1 1 10 1 +10 3 75 2 +2 1 20 2 +3 1 30 3 +5 2 5 1 +6 2 15 2 +7 2 25 3 +8 3 100 3 +9 3 50 1 + +# Test 29: QUALIFY plan should use PartitionedTopKExec +statement ok +SET datafusion.explain.physical_plan_only = true; + +query TT +EXPLAIN SELECT *, ROW_NUMBER() OVER (PARTITION BY pk ORDER BY val) as rn FROM window_topn_t +QUALIFY rn <= 3; +---- +physical_plan +01)ProjectionExec: expr=[id@0 as id, pk@1 as pk, val@2 as val, row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as rn] +02)--BoundedWindowAggExec: wdw=[row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] +03)----PartitionedTopKExec: fetch=3, partition=[pk@1], order=[val@2 ASC NULLS LAST] +04)------DataSourceExec: partitions=1, partition_sizes=[1] + +# Test 30: QUALIFY with < operator +statement ok +SET datafusion.explain.physical_plan_only = false; + +query IIII rowsort +SELECT *, ROW_NUMBER() OVER (PARTITION BY pk ORDER BY val) as rn FROM window_topn_t +QUALIFY rn < 3; +---- +1 1 10 1 +10 3 75 2 +2 1 20 2 +5 2 5 1 +6 2 15 2 +9 3 50 1 + +# Test 31: QUALIFY on RANK — should NOT optimize +statement ok +SET datafusion.explain.physical_plan_only = true; + +query TT +EXPLAIN SELECT *, RANK() OVER (PARTITION BY pk ORDER BY val) as rnk FROM window_topn_t +QUALIFY rnk <= 3; +---- +physical_plan +01)ProjectionExec: expr=[id@0 as id, pk@1 as pk, val@2 as val, rank() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as rnk] +02)--FilterExec: rank() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 <= 3 +03)----BoundedWindowAggExec: wdw=[rank() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "rank() PARTITION BY [window_topn_t.pk] ORDER BY [window_topn_t.val ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] +04)------SortExec: expr=[pk@1 ASC NULLS LAST, val@2 ASC NULLS LAST], preserve_partitioning=[false] +05)--------DataSourceExec: partitions=1, partition_sizes=[1] + +statement ok +SET datafusion.explain.physical_plan_only = false; + statement ok DROP TABLE window_topn_t; + +# ---- Tests with NULL values for sort option coverage ---- + +statement ok +CREATE TABLE window_topn_nulls (id INT, pk INT, val INT) AS VALUES + (1, 1, 10), + (2, 1, NULL), + (3, 1, 30), + (4, 2, NULL), + (5, 2, 5), + (6, 2, 15), + (7, 2, NULL); + +# Test 23: ORDER BY val ASC NULLS LAST (default) — NULLs sorted last within partition +query IIII rowsort +SELECT id, pk, val, rn FROM ( + SELECT *, ROW_NUMBER() OVER (PARTITION BY pk ORDER BY val ASC NULLS LAST) as rn FROM window_topn_nulls +) WHERE rn <= 2; +---- +1 1 10 1 +3 1 30 2 +5 2 5 1 +6 2 15 2 + +# Test 24: ORDER BY val ASC NULLS FIRST — NULLs sorted first within partition +query IIII rowsort +SELECT id, pk, val, rn FROM ( + SELECT *, ROW_NUMBER() OVER (PARTITION BY pk ORDER BY val ASC NULLS FIRST) as rn FROM window_topn_nulls +) WHERE rn <= 2; +---- +1 1 10 2 +2 1 NULL 1 +4 2 NULL 1 +7 2 NULL 2 + +# Test 25: ORDER BY val DESC NULLS FIRST (default for DESC) — NULLs first +query IIII rowsort +SELECT id, pk, val, rn FROM ( + SELECT *, ROW_NUMBER() OVER (PARTITION BY pk ORDER BY val DESC NULLS FIRST) as rn FROM window_topn_nulls +) WHERE rn <= 2; +---- +2 1 NULL 1 +3 1 30 2 +4 2 NULL 1 +7 2 NULL 2 + +# Test 26: ORDER BY val DESC NULLS LAST — NULLs last +query IIII rowsort +SELECT id, pk, val, rn FROM ( + SELECT *, ROW_NUMBER() OVER (PARTITION BY pk ORDER BY val DESC NULLS LAST) as rn FROM window_topn_nulls +) WHERE rn <= 2; +---- +1 1 10 2 +3 1 30 1 +5 2 5 2 +6 2 15 1 + +# Test 27: Verify plans show correct sort options +statement ok +SET datafusion.explain.physical_plan_only = true; + +query TT +EXPLAIN SELECT * FROM ( + SELECT *, ROW_NUMBER() OVER (PARTITION BY pk ORDER BY val ASC NULLS FIRST) as rn FROM window_topn_nulls +) WHERE rn <= 2; +---- +physical_plan +01)ProjectionExec: expr=[id@0 as id, pk@1 as pk, val@2 as val, row_number() PARTITION BY [window_topn_nulls.pk] ORDER BY [window_topn_nulls.val ASC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as rn] +02)--BoundedWindowAggExec: wdw=[row_number() PARTITION BY [window_topn_nulls.pk] ORDER BY [window_topn_nulls.val ASC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() PARTITION BY [window_topn_nulls.pk] ORDER BY [window_topn_nulls.val ASC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] +03)----PartitionedTopKExec: fetch=2, partition=[pk@1], order=[val@2 ASC] +04)------DataSourceExec: partitions=1, partition_sizes=[1] + +query TT +EXPLAIN SELECT * FROM ( + SELECT *, ROW_NUMBER() OVER (PARTITION BY pk ORDER BY val DESC NULLS LAST) as rn FROM window_topn_nulls +) WHERE rn <= 2; +---- +physical_plan +01)ProjectionExec: expr=[id@0 as id, pk@1 as pk, val@2 as val, row_number() PARTITION BY [window_topn_nulls.pk] ORDER BY [window_topn_nulls.val DESC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as rn] +02)--BoundedWindowAggExec: wdw=[row_number() PARTITION BY [window_topn_nulls.pk] ORDER BY [window_topn_nulls.val DESC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() PARTITION BY [window_topn_nulls.pk] ORDER BY [window_topn_nulls.val DESC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] +03)----PartitionedTopKExec: fetch=2, partition=[pk@1], order=[val@2 DESC NULLS LAST] +04)------DataSourceExec: partitions=1, partition_sizes=[1] + +statement ok +SET datafusion.explain.physical_plan_only = false; + +statement ok +DROP TABLE window_topn_nulls; + +# Reset config to default (false) +statement ok +SET datafusion.optimizer.enable_window_topn = false; From ca5a1aeaadae98d9af18a2193989f9fb70c9d360 Mon Sep 17 00:00:00 2001 From: Subham Singhal Date: Thu, 9 Apr 2026 21:20:30 +0530 Subject: [PATCH 5/5] Fix build failure --- .../tests/physical_optimizer/window_topn.rs | 3 ++- .../physical-optimizer/src/window_topn.rs | 17 +++++++++-------- .../physical-plan/src/sorts/partitioned_topk.rs | 7 ++++++- datafusion/sqllogictest/test_files/explain.slt | 4 ++++ docs/source/user-guide/configs.md | 2 +- 5 files changed, 22 insertions(+), 11 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/window_topn.rs b/datafusion/core/tests/physical_optimizer/window_topn.rs index 6ad09973aeeeb..e3f73a85353cc 100644 --- a/datafusion/core/tests/physical_optimizer/window_topn.rs +++ b/datafusion/core/tests/physical_optimizer/window_topn.rs @@ -52,7 +52,8 @@ fn plan_str(plan: &dyn ExecutionPlan) -> String { } fn optimize(plan: Arc) -> Result> { - let config = ConfigOptions::new(); + let mut config = ConfigOptions::new(); + config.optimizer.enable_window_topn = true; WindowTopN::new().optimize(plan, &config) } diff --git a/datafusion/physical-optimizer/src/window_topn.rs b/datafusion/physical-optimizer/src/window_topn.rs index e5f81ad1e9ff8..e138a69652e75 100644 --- a/datafusion/physical-optimizer/src/window_topn.rs +++ b/datafusion/physical-optimizer/src/window_topn.rs @@ -81,15 +81,16 @@ use datafusion_physical_plan::windows::{BoundedWindowAggExec, WindowUDFExpr}; /// - `K >= rn` (flipped) → fetch = K /// - `K > rn` (flipped) → fetch = K - 1 /// -/// # When the Rule Does NOT Fire +/// # When the Rule Fires /// -/// - Window function is not `ROW_NUMBER` (e.g., `RANK`, `DENSE_RANK`) -/// - No `PARTITION BY` clause (global top-K is already handled by -/// `SortExec` with `fetch`) -/// - Filter predicate is on a data column, not the window output column -/// - `FilterExec` has an embedded projection -/// - Child of `BoundedWindowAggExec` is not a `SortExec` -/// - Config flag `enable_window_topn` is `false` +/// All of the following must be true: +/// - Config flag `enable_window_topn` is `true` +/// - The plan matches `FilterExec → [ProjectionExec] → BoundedWindowAggExec → SortExec` +/// - The window function is `ROW_NUMBER` (not `RANK`, `DENSE_RANK`, etc.) +/// - `ROW_NUMBER` has a `PARTITION BY` clause (global top-K is already +/// handled by `SortExec` with `fetch`) +/// - The filter predicate compares the window output column to an integer +/// literal using `<=`, `<`, `>=`, or `>` /// /// [`PartitionedTopKExec`]: datafusion_physical_plan::sorts::partitioned_topk::PartitionedTopKExec #[derive(Default, Clone, Debug)] diff --git a/datafusion/physical-plan/src/sorts/partitioned_topk.rs b/datafusion/physical-plan/src/sorts/partitioned_topk.rs index 0e68b6a75c721..fafec7f379cd2 100644 --- a/datafusion/physical-plan/src/sorts/partitioned_topk.rs +++ b/datafusion/physical-plan/src/sorts/partitioned_topk.rs @@ -295,7 +295,12 @@ impl ExecutionPlan for PartitionedTopKExec { } fn required_input_distribution(&self) -> Vec { - vec![Distribution::UnspecifiedDistribution] + let partition_exprs: Vec> = self.expr + [..self.partition_prefix_len] + .iter() + .map(|e| Arc::clone(&e.expr)) + .collect(); + vec![Distribution::HashPartitioned(partition_exprs)] } fn maintains_input_order(&self) -> Vec { diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index 467afe7b6c2ba..2e8a65385541e 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -239,6 +239,7 @@ physical_plan after EnforceDistribution SAME TEXT AS ABOVE physical_plan after CombinePartialFinalAggregate SAME TEXT AS ABOVE physical_plan after EnforceSorting SAME TEXT AS ABOVE physical_plan after OptimizeAggregateOrder SAME TEXT AS ABOVE +physical_plan after WindowTopN SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after OutputRequirements DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], file_type=csv, has_header=true physical_plan after LimitAggregation SAME TEXT AS ABOVE @@ -319,6 +320,7 @@ physical_plan after EnforceDistribution SAME TEXT AS ABOVE physical_plan after CombinePartialFinalAggregate SAME TEXT AS ABOVE physical_plan after EnforceSorting SAME TEXT AS ABOVE physical_plan after OptimizeAggregateOrder SAME TEXT AS ABOVE +physical_plan after WindowTopN SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after OutputRequirements 01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]: ScanBytes=Exact(32)),(Col[1]: ScanBytes=Inexact(24)),(Col[2]: ScanBytes=Exact(32)),(Col[3]: ScanBytes=Exact(32)),(Col[4]: ScanBytes=Exact(32)),(Col[5]: ScanBytes=Exact(64)),(Col[6]: ScanBytes=Exact(32)),(Col[7]: ScanBytes=Exact(64)),(Col[8]: ScanBytes=Inexact(88)),(Col[9]: ScanBytes=Inexact(49)),(Col[10]: ScanBytes=Exact(64))]] @@ -365,6 +367,7 @@ physical_plan after EnforceDistribution SAME TEXT AS ABOVE physical_plan after CombinePartialFinalAggregate SAME TEXT AS ABOVE physical_plan after EnforceSorting SAME TEXT AS ABOVE physical_plan after OptimizeAggregateOrder SAME TEXT AS ABOVE +physical_plan after WindowTopN SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after OutputRequirements 01)GlobalLimitExec: skip=0, fetch=10 @@ -611,6 +614,7 @@ physical_plan after EnforceDistribution SAME TEXT AS ABOVE physical_plan after CombinePartialFinalAggregate SAME TEXT AS ABOVE physical_plan after EnforceSorting SAME TEXT AS ABOVE physical_plan after OptimizeAggregateOrder SAME TEXT AS ABOVE +physical_plan after WindowTopN SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after OutputRequirements DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], file_type=csv, has_header=true physical_plan after LimitAggregation SAME TEXT AS ABOVE diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index fc351053f22f3..4f9887376ba3e 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -140,7 +140,7 @@ The following configuration settings are available: | datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | | datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible | | datafusion.optimizer.enable_window_limits | true | When set to true, the optimizer will attempt to push limit operations past window functions, if possible | -| datafusion.optimizer.enable_window_topn | true | When set to true, the optimizer will replace Filter(rn<=K) → Window(ROW_NUMBER) → Sort patterns with a PartitionedTopKExec that maintains per-partition heaps, avoiding a full sort of the input. | +| datafusion.optimizer.enable_window_topn | false | When set to true, the optimizer will replace Filter(rn<=K) → Window(ROW_NUMBER) → Sort patterns with a PartitionedTopKExec that maintains per-partition heaps, avoiding a full sort of the input. | | datafusion.optimizer.enable_topk_repartition | true | When set to true, the optimizer will push TopK (Sort with fetch) below hash repartition when the partition key is a prefix of the sort key, reducing data volume before the shuffle. | | datafusion.optimizer.enable_topk_dynamic_filter_pushdown | true | When set to true, the optimizer will attempt to push down TopK dynamic filters into the file scan phase. | | datafusion.optimizer.enable_join_dynamic_filter_pushdown | true | When set to true, the optimizer will attempt to push down Join dynamic filters into the file scan phase. |