Skip to content

Commit 52147dd

Browse files
Subham SinghalSubham Singhal
authored andcommitted
Lint fix
1 parent 38fa07a commit 52147dd

4 files changed

Lines changed: 76 additions & 63 deletions

File tree

datafusion/core/examples/h2o_window_topn_bench.rs

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,20 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
118
// Standalone H2O groupby Q8 benchmark: PartitionedTopKExec enabled vs disabled
219
//
320
// Usage:
@@ -10,7 +27,7 @@ use datafusion::prelude::*;
1027
use datafusion_common::instant::Instant;
1128
use std::sync::Arc;
1229

13-
use arrow::array::{Int64Array, Float64Array, RecordBatch};
30+
use arrow::array::{Float64Array, Int64Array, RecordBatch};
1431
use arrow::datatypes::{DataType, Field, Schema};
1532
use datafusion::datasource::MemTable;
1633
use rand::rngs::StdRng;
@@ -49,10 +66,7 @@ fn generate_data(num_partitions: i64) -> Arc<MemTable> {
4966
.collect();
5067

5168
batches.push(
52-
RecordBatch::try_new(Arc::clone(&schema), vec![
53-
Arc::new(id6),
54-
Arc::new(v3),
55-
])
69+
RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(id6), Arc::new(v3)])
5670
.unwrap(),
5771
);
5872
}
@@ -116,4 +130,4 @@ async fn main() {
116130
}
117131
println!();
118132
}
119-
}
133+
}

datafusion/core/tests/physical_optimizer/window_topn.rs

Lines changed: 38 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -20,24 +20,24 @@
2020
use std::sync::Arc;
2121

2222
use arrow::datatypes::{DataType, Field, Schema};
23-
use datafusion_common::config::ConfigOptions;
2423
use datafusion_common::Result;
2524
use datafusion_common::ScalarValue;
25+
use datafusion_common::config::ConfigOptions;
26+
use datafusion_expr::Operator;
2627
use datafusion_expr::{WindowFrame, WindowFrameBound, WindowFrameUnits};
2728
use datafusion_functions_window::row_number::row_number_udwf;
2829
use datafusion_physical_expr::expressions::{BinaryExpr, Column, col, lit};
2930
use datafusion_physical_expr::window::StandardWindowExpr;
3031
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
31-
use datafusion_physical_optimizer::window_topn::WindowTopN;
3232
use datafusion_physical_optimizer::PhysicalOptimizerRule;
33+
use datafusion_physical_optimizer::window_topn::WindowTopN;
3334
use datafusion_physical_plan::displayable;
3435
use datafusion_physical_plan::filter::FilterExec;
3536
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
3637
use datafusion_physical_plan::projection::ProjectionExec;
3738
use datafusion_physical_plan::sorts::sort::SortExec;
3839
use datafusion_physical_plan::windows::{BoundedWindowAggExec, create_udwf_window_expr};
3940
use datafusion_physical_plan::{ExecutionPlan, InputOrderMode};
40-
use datafusion_expr::Operator;
4141
use insta::assert_snapshot;
4242

4343
fn schema() -> Arc<Schema> {
@@ -68,18 +68,17 @@ fn build_window_topn_plan(
6868
op: Operator,
6969
) -> Result<Arc<dyn ExecutionPlan>> {
7070
let s = schema();
71-
let input: Arc<dyn ExecutionPlan> =
72-
Arc::new(PlaceholderRowExec::new(Arc::clone(&s)));
71+
let input: Arc<dyn ExecutionPlan> = Arc::new(PlaceholderRowExec::new(Arc::clone(&s)));
7372

7473
// Sort by pk ASC, val ASC
7574
let ordering = LexOrdering::new(vec![
7675
PhysicalSortExpr::new_default(col("pk", &s)?).asc(),
7776
PhysicalSortExpr::new_default(col("val", &s)?).asc(),
78-
]).unwrap();
77+
])
78+
.unwrap();
7979

80-
let sort: Arc<dyn ExecutionPlan> = Arc::new(
81-
SortExec::new(ordering.clone(), input).with_preserve_partitioning(true),
82-
);
80+
let sort: Arc<dyn ExecutionPlan> =
81+
Arc::new(SortExec::new(ordering.clone(), input).with_preserve_partitioning(true));
8382

8483
// ROW_NUMBER() OVER (PARTITION BY pk ORDER BY val)
8584
let partition_by = vec![col("pk", &s)?];
@@ -123,17 +122,15 @@ fn build_window_topn_plan(
123122
/// Build a plan with no partition-by: ROW_NUMBER() OVER (ORDER BY val)
124123
fn build_window_topn_no_partition(limit_value: i64) -> Result<Arc<dyn ExecutionPlan>> {
125124
let s = schema();
126-
let input: Arc<dyn ExecutionPlan> =
127-
Arc::new(PlaceholderRowExec::new(Arc::clone(&s)));
125+
let input: Arc<dyn ExecutionPlan> = Arc::new(PlaceholderRowExec::new(Arc::clone(&s)));
128126

129127
// Sort by val ASC only (no partition key)
130-
let ordering = LexOrdering::new(vec![
131-
PhysicalSortExpr::new_default(col("val", &s)?).asc(),
132-
]).unwrap();
128+
let ordering =
129+
LexOrdering::new(vec![PhysicalSortExpr::new_default(col("val", &s)?).asc()])
130+
.unwrap();
133131

134-
let sort: Arc<dyn ExecutionPlan> = Arc::new(
135-
SortExec::new(ordering.clone(), input).with_preserve_partitioning(true),
136-
);
132+
let sort: Arc<dyn ExecutionPlan> =
133+
Arc::new(SortExec::new(ordering.clone(), input).with_preserve_partitioning(true));
137134

138135
// ROW_NUMBER() OVER (ORDER BY val) — no partition by
139136
let order_by = vec![PhysicalSortExpr::new_default(col("val", &s)?).asc()];
@@ -146,7 +143,7 @@ fn build_window_topn_no_partition(limit_value: i64) -> Result<Arc<dyn ExecutionP
146143
"row_number".to_string(),
147144
false,
148145
)?,
149-
&[], // empty partition_by
146+
&[], // empty partition_by
150147
&order_by,
151148
Arc::new(WindowFrame::new_bounds(
152149
WindowFrameUnits::Rows,
@@ -174,17 +171,16 @@ fn build_window_topn_no_partition(limit_value: i64) -> Result<Arc<dyn ExecutionP
174171
/// Build a plan where filter is on a data column (not window output)
175172
fn build_non_window_filter_plan() -> Result<Arc<dyn ExecutionPlan>> {
176173
let s = schema();
177-
let input: Arc<dyn ExecutionPlan> =
178-
Arc::new(PlaceholderRowExec::new(Arc::clone(&s)));
174+
let input: Arc<dyn ExecutionPlan> = Arc::new(PlaceholderRowExec::new(Arc::clone(&s)));
179175

180176
let ordering = LexOrdering::new(vec![
181177
PhysicalSortExpr::new_default(col("pk", &s)?).asc(),
182178
PhysicalSortExpr::new_default(col("val", &s)?).asc(),
183-
]).unwrap();
179+
])
180+
.unwrap();
184181

185-
let sort: Arc<dyn ExecutionPlan> = Arc::new(
186-
SortExec::new(ordering.clone(), input).with_preserve_partitioning(true),
187-
);
182+
let sort: Arc<dyn ExecutionPlan> =
183+
Arc::new(SortExec::new(ordering.clone(), input).with_preserve_partitioning(true));
188184

189185
let partition_by = vec![col("pk", &s)?];
190186
let order_by = vec![PhysicalSortExpr::new_default(col("val", &s)?).asc()];
@@ -257,7 +253,8 @@ fn flipped_3_gteq_rn() -> Result<()> {
257253
let ordering = LexOrdering::new(vec![
258254
PhysicalSortExpr::new_default(col("pk", &s)?).asc(),
259255
PhysicalSortExpr::new_default(col("val", &s)?).asc(),
260-
]).unwrap();
256+
])
257+
.unwrap();
261258

262259
let sort: Arc<dyn ExecutionPlan> = Arc::new(
263260
SortExec::new(ordering.clone(), input).with_preserve_partitioning(true),
@@ -292,7 +289,8 @@ fn flipped_3_gteq_rn() -> Result<()> {
292289

293290
// Flipped: 3 >= rn (Literal GtEq Column)
294291
let rn_col = Arc::new(Column::new("row_number", 2));
295-
let limit_lit = lit(ScalarValue::UInt64(Some(3))); let predicate = Arc::new(BinaryExpr::new(limit_lit, Operator::GtEq, rn_col));
292+
let limit_lit = lit(ScalarValue::UInt64(Some(3)));
293+
let predicate = Arc::new(BinaryExpr::new(limit_lit, Operator::GtEq, rn_col));
296294
let filter: Arc<dyn ExecutionPlan> =
297295
Arc::new(FilterExec::try_new(predicate, window)?);
298296
filter
@@ -313,7 +311,10 @@ fn non_window_column_filter_no_change() -> Result<()> {
313311
let before = plan_str(plan.as_ref());
314312
let optimized = optimize(plan)?;
315313
let after = plan_str(optimized.as_ref());
316-
assert_eq!(before, after, "Plan should not change when filter is on data column");
314+
assert_eq!(
315+
before, after,
316+
"Plan should not change when filter is on data column"
317+
);
317318
Ok(())
318319
}
319320

@@ -323,7 +324,10 @@ fn config_disabled_no_change() -> Result<()> {
323324
let before = plan_str(plan.as_ref());
324325
let optimized = optimize_disabled(plan)?;
325326
let after = plan_str(optimized.as_ref());
326-
assert_eq!(before, after, "Plan should not change when config is disabled");
327+
assert_eq!(
328+
before, after,
329+
"Plan should not change when config is disabled"
330+
);
327331
Ok(())
328332
}
329333

@@ -345,17 +349,16 @@ fn no_partition_by_no_change() -> Result<()> {
345349
#[test]
346350
fn with_projection_between() -> Result<()> {
347351
let s = schema();
348-
let input: Arc<dyn ExecutionPlan> =
349-
Arc::new(PlaceholderRowExec::new(Arc::clone(&s)));
352+
let input: Arc<dyn ExecutionPlan> = Arc::new(PlaceholderRowExec::new(Arc::clone(&s)));
350353

351354
let ordering = LexOrdering::new(vec![
352355
PhysicalSortExpr::new_default(col("pk", &s)?).asc(),
353356
PhysicalSortExpr::new_default(col("val", &s)?).asc(),
354-
]).unwrap();
357+
])
358+
.unwrap();
355359

356-
let sort: Arc<dyn ExecutionPlan> = Arc::new(
357-
SortExec::new(ordering.clone(), input).with_preserve_partitioning(true),
358-
);
360+
let sort: Arc<dyn ExecutionPlan> =
361+
Arc::new(SortExec::new(ordering.clone(), input).with_preserve_partitioning(true));
359362

360363
let partition_by = vec![col("pk", &s)?];
361364
let order_by = vec![PhysicalSortExpr::new_default(col("val", &s)?).asc()];
@@ -418,4 +421,4 @@ fn with_projection_between() -> Result<()> {
418421
PlaceholderRowExec
419422
"#);
420423
Ok(())
421-
}
424+
}

datafusion/physical-optimizer/src/window_topn.rs

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -30,25 +30,25 @@
3030
//! with `BoundedWindowAggExec → PartitionedTopKExec(fetch=K)`, removing both
3131
//! the `FilterExec` and `SortExec`.
3232
//!
33-
//! See [`PartitionedTopKExec`](datafusion_physical_plan::sorts::partitioned_topk::PartitionedTopKExec)
33+
//! See [`PartitionedTopKExec`]
3434
//! for details on the replacement operator.
3535
3636
use std::sync::Arc;
3737

3838
use crate::PhysicalOptimizerRule;
39+
use arrow::datatypes::DataType;
3940
use datafusion_common::config::ConfigOptions;
4041
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
4142
use datafusion_common::{Result, ScalarValue};
4243
use datafusion_expr::Operator;
43-
use arrow::datatypes::DataType;
4444
use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal};
4545
use datafusion_physical_expr::window::StandardWindowExpr;
46+
use datafusion_physical_plan::ExecutionPlan;
4647
use datafusion_physical_plan::filter::FilterExec;
4748
use datafusion_physical_plan::projection::ProjectionExec;
4849
use datafusion_physical_plan::sorts::partitioned_topk::PartitionedTopKExec;
4950
use datafusion_physical_plan::sorts::sort::SortExec;
5051
use datafusion_physical_plan::windows::{BoundedWindowAggExec, WindowUDFExpr};
51-
use datafusion_physical_plan::{ExecutionPlan};
5252

5353
/// Physical optimizer rule that converts per-partition `ROW_NUMBER` top-K
5454
/// queries into a more efficient plan using [`PartitionedTopKExec`].
@@ -106,9 +106,7 @@ impl WindowTopN {
106106
/// `FilterExec → [ProjectionExec] → BoundedWindowAggExec → SortExec`
107107
/// pattern and can be rewritten, or `None` if the node should be
108108
/// left unchanged.
109-
fn try_transform(
110-
plan: &Arc<dyn ExecutionPlan>,
111-
) -> Option<Arc<dyn ExecutionPlan>> {
109+
fn try_transform(plan: &Arc<dyn ExecutionPlan>) -> Option<Arc<dyn ExecutionPlan>> {
112110
// Step 1: Match FilterExec at the top
113111
let filter = plan.downcast_ref::<FilterExec>()?;
114112

@@ -159,7 +157,7 @@ impl WindowTopN {
159157
partition_prefix_len,
160158
limit_n,
161159
)
162-
.ok()?;
160+
.ok()?;
163161

164162
// Step 8: Rebuild window with new child
165163
let new_window = Arc::clone(&child_as_arc(window_exec))
@@ -203,7 +201,7 @@ impl PhysicalOptimizerRule for WindowTopN {
203201
},
204202
)
205203
})
206-
.data()
204+
.data()
207205
}
208206

209207
fn name(&self) -> &str {
@@ -250,9 +248,9 @@ fn extract_window_limit(
250248
right.as_any().downcast_ref::<Literal>(),
251249
) {
252250
let n = scalar_to_usize(lit_val.value())?;
253-
return match op {
254-
&Operator::LtEq => Some((col.index(), n)),
255-
&Operator::Lt => Some((col.index(), n - 1)),
251+
return match *op {
252+
Operator::LtEq => Some((col.index(), n)),
253+
Operator::Lt => Some((col.index(), n - 1)),
256254
_ => None,
257255
};
258256
}
@@ -263,9 +261,9 @@ fn extract_window_limit(
263261
right.as_any().downcast_ref::<Column>(),
264262
) {
265263
let n = scalar_to_usize(lit_val.value())?;
266-
return match op {
267-
&Operator::GtEq => Some((col.index(), n)),
268-
&Operator::Gt => Some((col.index(), n - 1)),
264+
return match *op {
265+
Operator::GtEq => Some((col.index(), n)),
266+
Operator::Gt => Some((col.index(), n - 1)),
269267
_ => None,
270268
};
271269
}
@@ -293,9 +291,7 @@ fn scalar_to_usize(value: &ScalarValue) -> Option<usize> {
293291
/// Downcasts through `StandardWindowExpr` → `WindowUDFExpr` and checks
294292
/// that the UDF name is `"row_number"`. Returns `false` for all other
295293
/// window functions (e.g., `RANK`, `DENSE_RANK`, `SUM`).
296-
fn is_row_number(
297-
expr: &Arc<dyn datafusion_physical_expr::window::WindowExpr>,
298-
) -> bool {
294+
fn is_row_number(expr: &Arc<dyn datafusion_physical_expr::window::WindowExpr>) -> bool {
299295
let Some(swe) = expr.as_any().downcast_ref::<StandardWindowExpr>() else {
300296
return false;
301297
};
@@ -331,4 +327,4 @@ fn find_window_below(
331327
}
332328

333329
None
334-
}
330+
}

datafusion/physical-plan/src/sorts/partitioned_topk.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,6 @@ impl PartitionedTopKExec {
232232
/// The output is sorted by `sort_exprs` (partition keys then order keys),
233233
/// uses the same partitioning as the input, emits all output at once
234234
/// (`EmissionType::Final`), and is bounded.
235-
236235
fn compute_properties(
237236
input: &Arc<dyn ExecutionPlan>,
238237
sort_exprs: LexOrdering,
@@ -360,9 +359,9 @@ impl ExecutionPlan for PartitionedTopKExec {
360359
runtime,
361360
metrics_set,
362361
)
363-
.await
362+
.await
364363
})
365-
.try_flatten();
364+
.try_flatten();
366365

367366
Ok(Box::pin(RecordBatchStreamAdapter::new(
368367
self.input.schema(),
@@ -406,6 +405,7 @@ fn create_noop_dynamic_filter() -> Arc<RwLock<TopKDynamicFilters>> {
406405
///
407406
/// - Time: O(N log K) where N = total rows, K = fetch
408407
/// - Memory: O(K × P × row_size) where P = number of distinct partitions
408+
#[expect(clippy::too_many_arguments)]
409409
async fn do_partitioned_topk(
410410
mut input: SendableRecordBatchStream,
411411
schema: SchemaRef,
@@ -495,4 +495,4 @@ async fn do_partitioned_topk(
495495
schema,
496496
futures::stream::iter(output_batches.into_iter().map(Ok)),
497497
)))
498-
}
498+
}

0 commit comments

Comments
 (0)