Skip to content

Commit 92078d9

Browse files
Copy limits before repartitions (#20736)
## Which issue does this PR close? - Closes #20735. ## Rationale for this change Described in issue. ## What changes are included in this PR? 1. optimizer rule + unit tests 2. slt tests showing behavior is the same, plus demonstrating plan changes 3. feature flag 4. criterion benchmark 5. fixes to existing slt plan assertions ## Are these changes tested? With unit tests & slts. ## Are there any user-facing changes? Some queries should go faster **Note: AI assistance was used in this PR**
1 parent 8fe926d commit 92078d9

File tree

11 files changed

+626
-3
lines changed

11 files changed

+626
-3
lines changed

datafusion/common/src/config.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -937,6 +937,11 @@ config_namespace! {
937937
/// past window functions, if possible
938938
pub enable_window_limits: bool, default = true
939939

940+
/// When set to true, the optimizer will push TopK (Sort with fetch)
941+
/// below hash repartition when the partition key is a prefix of the
942+
/// sort key, reducing data volume before the shuffle.
943+
pub enable_topk_repartition: bool, default = true
944+
940945
/// When set to true, the optimizer will attempt to push down TopK dynamic filters
941946
/// into the file scan phase.
942947
pub enable_topk_dynamic_filter_pushdown: bool, default = true

datafusion/core/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,10 @@ name = "struct_query_sql"
228228
harness = false
229229
name = "window_query_sql"
230230

231+
[[bench]]
232+
harness = false
233+
name = "topk_repartition"
234+
231235
[[bench]]
232236
harness = false
233237
name = "scalar"
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Benchmark for the TopKRepartition optimizer rule.
19+
//!
20+
//! Measures the benefit of pushing TopK (Sort with fetch) below hash
21+
//! repartition when running partitioned window functions with LIMIT.
22+
23+
mod data_utils;
24+
25+
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
26+
use data_utils::create_table_provider;
27+
use datafusion::prelude::{SessionConfig, SessionContext};
28+
use parking_lot::Mutex;
29+
use std::hint::black_box;
30+
use std::sync::Arc;
31+
use tokio::runtime::Runtime;
32+
33+
#[expect(clippy::needless_pass_by_value)]
34+
fn query(ctx: Arc<Mutex<SessionContext>>, rt: &Runtime, sql: &str) {
35+
let df = rt.block_on(ctx.lock().sql(sql)).unwrap();
36+
black_box(rt.block_on(df.collect()).unwrap());
37+
}
38+
39+
fn create_context(
40+
partitions_len: usize,
41+
target_partitions: usize,
42+
enable_topk_repartition: bool,
43+
) -> Arc<Mutex<SessionContext>> {
44+
let array_len = 1024 * 1024;
45+
let batch_size = 8 * 1024;
46+
let mut config = SessionConfig::new().with_target_partitions(target_partitions);
47+
config.options_mut().optimizer.enable_topk_repartition = enable_topk_repartition;
48+
let ctx = SessionContext::new_with_config(config);
49+
let rt = Runtime::new().unwrap();
50+
rt.block_on(async {
51+
let provider =
52+
create_table_provider(partitions_len, array_len, batch_size).unwrap();
53+
ctx.register_table("t", provider).unwrap();
54+
});
55+
Arc::new(Mutex::new(ctx))
56+
}
57+
58+
fn criterion_benchmark(c: &mut Criterion) {
59+
let rt = Runtime::new().unwrap();
60+
61+
let limits = [10, 1_000, 10_000, 100_000];
62+
let scans = 16;
63+
let target_partitions = 4;
64+
65+
let group = format!("topk_repartition_{scans}_to_{target_partitions}");
66+
let mut group = c.benchmark_group(group);
67+
for limit in limits {
68+
let sql = format!(
69+
"SELECT \
70+
SUM(f64) OVER (PARTITION BY u64_narrow ORDER BY u64_wide ROWS UNBOUNDED PRECEDING) \
71+
FROM t \
72+
ORDER BY u64_narrow, u64_wide \
73+
LIMIT {limit}"
74+
);
75+
76+
let ctx_disabled = create_context(scans, target_partitions, false);
77+
group.bench_function(BenchmarkId::new("disabled", limit), |b| {
78+
b.iter(|| query(ctx_disabled.clone(), &rt, &sql))
79+
});
80+
81+
let ctx_enabled = create_context(scans, target_partitions, true);
82+
group.bench_function(BenchmarkId::new("enabled", limit), |b| {
83+
b.iter(|| query(ctx_enabled.clone(), &rt, &sql))
84+
});
85+
}
86+
group.finish();
87+
}
88+
89+
criterion_group!(benches, criterion_benchmark);
90+
criterion_main!(benches);

datafusion/physical-optimizer/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ pub mod hash_join_buffering;
4343
pub mod pushdown_sort;
4444
pub mod sanity_checker;
4545
pub mod topk_aggregation;
46+
pub mod topk_repartition;
4647
pub mod update_aggr_exprs;
4748
pub mod utils;
4849

datafusion/physical-optimizer/src/optimizer.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use crate::output_requirements::OutputRequirements;
3333
use crate::projection_pushdown::ProjectionPushdown;
3434
use crate::sanity_checker::SanityCheckPlan;
3535
use crate::topk_aggregation::TopKAggregation;
36+
use crate::topk_repartition::TopKRepartition;
3637
use crate::update_aggr_exprs::OptimizeAggregateOrder;
3738

3839
use crate::hash_join_buffering::HashJoinBuffering;
@@ -146,6 +147,11 @@ impl PhysicalOptimizer {
146147
// replacing operators with fetching variants, or adding limits
147148
// past operators that support limit pushdown.
148149
Arc::new(LimitPushdown::new()),
150+
// TopKRepartition pushes TopK (Sort with fetch) below Hash
151+
// repartition when the partition key is a prefix of the sort key.
152+
// This reduces data volume before a hash shuffle. It must run
153+
// after LimitPushdown so that the TopK already exists on the SortExec.
154+
Arc::new(TopKRepartition::new()),
149155
// The ProjectionPushdown rule tries to push projections towards
150156
// the sources in the execution plan. As a result of this process,
151157
// a projection can disappear if it reaches the source providers, and

0 commit comments

Comments
 (0)