Skip to content

Commit 06cd380

Browse files
authored
Merge branch 'main' into speed-up-stats-aggregation
2 parents 1ad9ca7 + 92078d9 commit 06cd380

26 files changed

+1409
-14
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

benchmarks/src/imdb/run.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,10 @@ pub struct RunOpt {
9292
/// True by default.
9393
#[arg(short = 'j', long = "prefer_hash_join", default_value = "true")]
9494
prefer_hash_join: BoolDefaultTrue,
95+
96+
/// How many bytes to buffer on the probe side of hash joins.
97+
#[arg(long, default_value = "0")]
98+
hash_join_buffering_capacity: usize,
9599
}
96100

97101
fn map_query_id_to_str(query_id: usize) -> &'static str {
@@ -306,6 +310,8 @@ impl RunOpt {
306310
.config()?
307311
.with_collect_statistics(!self.disable_statistics);
308312
config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join;
313+
config.options_mut().execution.hash_join_buffering_capacity =
314+
self.hash_join_buffering_capacity;
309315
let rt_builder = self.common.runtime_env_builder()?;
310316
let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?);
311317

@@ -527,6 +533,7 @@ mod tests {
527533
output_path: None,
528534
disable_statistics: false,
529535
prefer_hash_join: true,
536+
hash_join_buffering_capacity: 0,
530537
};
531538
opt.register_tables(&ctx).await?;
532539
let queries = get_query_sql(map_query_id_to_str(query))?;
@@ -563,6 +570,7 @@ mod tests {
563570
output_path: None,
564571
disable_statistics: false,
565572
prefer_hash_join: true,
573+
hash_join_buffering_capacity: 0,
566574
};
567575
opt.register_tables(&ctx).await?;
568576
let queries = get_query_sql(map_query_id_to_str(query))?;

benchmarks/src/tpcds/run.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,10 @@ pub struct RunOpt {
144144
/// The tables should have been created with the `--sort` option for this to have any effect.
145145
#[arg(short = 't', long = "sorted")]
146146
sorted: bool,
147+
148+
/// How many bytes to buffer on the probe side of hash joins.
149+
#[arg(long, default_value = "0")]
150+
hash_join_buffering_capacity: usize,
147151
}
148152

149153
impl RunOpt {
@@ -162,6 +166,8 @@ impl RunOpt {
162166
config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join;
163167
config.options_mut().optimizer.enable_piecewise_merge_join =
164168
self.enable_piecewise_merge_join;
169+
config.options_mut().execution.hash_join_buffering_capacity =
170+
self.hash_join_buffering_capacity;
165171
let rt_builder = self.common.runtime_env_builder()?;
166172
let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?);
167173
// register tables

benchmarks/src/tpch/run.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,10 @@ pub struct RunOpt {
105105
/// The tables should have been created with the `--sort` option for this to have any effect.
106106
#[arg(short = 't', long = "sorted")]
107107
sorted: bool,
108+
109+
/// How many bytes to buffer on the probe side of hash joins.
110+
#[arg(long, default_value = "0")]
111+
hash_join_buffering_capacity: usize,
108112
}
109113

110114
impl RunOpt {
@@ -123,6 +127,8 @@ impl RunOpt {
123127
config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join;
124128
config.options_mut().optimizer.enable_piecewise_merge_join =
125129
self.enable_piecewise_merge_join;
130+
config.options_mut().execution.hash_join_buffering_capacity =
131+
self.hash_join_buffering_capacity;
126132
let rt_builder = self.common.runtime_env_builder()?;
127133
let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?);
128134
// register tables
@@ -392,6 +398,7 @@ mod tests {
392398
prefer_hash_join: true,
393399
enable_piecewise_merge_join: false,
394400
sorted: false,
401+
hash_join_buffering_capacity: 0,
395402
};
396403
opt.register_tables(&ctx).await?;
397404
let queries = get_query_sql(query)?;
@@ -430,6 +437,7 @@ mod tests {
430437
prefer_hash_join: true,
431438
enable_piecewise_merge_join: false,
432439
sorted: false,
440+
hash_join_buffering_capacity: 0,
433441
};
434442
opt.register_tables(&ctx).await?;
435443
let queries = get_query_sql(query)?;

datafusion-cli/tests/cli_integration.rs

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use std::process::Command;
2020
use rstest::rstest;
2121

2222
use async_trait::async_trait;
23+
use insta::internals::SettingsBindDropGuard;
2324
use insta::{Settings, glob};
2425
use insta_cmd::{assert_cmd_snapshot, get_cargo_bin};
2526
use std::path::PathBuf;
@@ -215,6 +216,42 @@ fn test_cli_top_memory_consumers<'a>(
215216
#[case] snapshot_name: &str,
216217
#[case] top_memory_consumers: impl IntoIterator<Item = &'a str>,
217218
) {
219+
let _bound = bind_to_settings(snapshot_name);
220+
221+
let mut cmd = cli();
222+
let sql = "select * from generate_series(1,500000) as t1(v1) order by v1;";
223+
cmd.args(["--memory-limit", "10M", "--command", sql]);
224+
cmd.args(top_memory_consumers);
225+
226+
assert_cmd_snapshot!(cmd);
227+
}
228+
229+
#[rstest]
230+
#[case("no_track", ["--top-memory-consumers", "0"])]
231+
#[case("top2", ["--top-memory-consumers", "2"])]
232+
#[test]
233+
fn test_cli_top_memory_consumers_with_mem_pool_type<'a>(
234+
#[case] snapshot_name: &str,
235+
#[case] top_memory_consumers: impl IntoIterator<Item = &'a str>,
236+
) {
237+
let _bound = bind_to_settings(snapshot_name);
238+
239+
let mut cmd = cli();
240+
let sql = "select * from generate_series(1,500000) as t1(v1) order by v1;";
241+
cmd.args([
242+
"--memory-limit",
243+
"10M",
244+
"--mem-pool-type",
245+
"fair",
246+
"--command",
247+
sql,
248+
]);
249+
cmd.args(top_memory_consumers);
250+
251+
assert_cmd_snapshot!(cmd);
252+
}
253+
254+
fn bind_to_settings(snapshot_name: &str) -> SettingsBindDropGuard {
218255
let mut settings = make_settings();
219256

220257
settings.set_snapshot_suffix(snapshot_name);
@@ -232,12 +269,20 @@ fn test_cli_top_memory_consumers<'a>(
232269
"Resources exhausted: Failed to allocate",
233270
);
234271

272+
settings.bind_to_scope()
273+
}
274+
275+
#[test]
276+
fn test_cli_with_unbounded_memory_pool() {
277+
let mut settings = make_settings();
278+
279+
settings.set_snapshot_suffix("default");
280+
235281
let _bound = settings.bind_to_scope();
236282

237283
let mut cmd = cli();
238284
let sql = "select * from generate_series(1,500000) as t1(v1) order by v1;";
239-
cmd.args(["--memory-limit", "10M", "--command", sql]);
240-
cmd.args(top_memory_consumers);
285+
cmd.args(["--maxrows", "10", "--command", sql]);
241286

242287
assert_cmd_snapshot!(cmd);
243288
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
---
2+
source: datafusion-cli/tests/cli_integration.rs
3+
info:
4+
program: datafusion-cli
5+
args:
6+
- "--memory-limit"
7+
- 10M
8+
- "--mem-pool-type"
9+
- fair
10+
- "--command"
11+
- "select * from generate_series(1,500000) as t1(v1) order by v1;"
12+
- "--top-memory-consumers"
13+
- "0"
14+
---
15+
success: false
16+
exit_code: 1
17+
----- stdout -----
18+
[CLI_VERSION]
19+
Error: Not enough memory to continue external sort. Consider increasing the memory limit config: 'datafusion.runtime.memory_limit', or decreasing the config: 'datafusion.execution.sort_spill_reservation_bytes'.
20+
caused by
21+
Resources exhausted: Failed to allocate
22+
23+
----- stderr -----
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
---
2+
source: datafusion-cli/tests/cli_integration.rs
3+
info:
4+
program: datafusion-cli
5+
args:
6+
- "--memory-limit"
7+
- 10M
8+
- "--mem-pool-type"
9+
- fair
10+
- "--command"
11+
- "select * from generate_series(1,500000) as t1(v1) order by v1;"
12+
- "--top-memory-consumers"
13+
- "2"
14+
---
15+
success: false
16+
exit_code: 1
17+
----- stdout -----
18+
[CLI_VERSION]
19+
Error: Not enough memory to continue external sort. Consider increasing the memory limit config: 'datafusion.runtime.memory_limit', or decreasing the config: 'datafusion.execution.sort_spill_reservation_bytes'.
20+
caused by
21+
Resources exhausted: Additional allocation failed for ExternalSorter[0] with top memory consumers (across reservations) as:
22+
Consumer(can spill: bool) consumed XB, peak XB,
23+
Consumer(can spill: bool) consumed XB, peak XB.
24+
Error: Failed to allocate
25+
26+
----- stderr -----
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
---
2+
source: datafusion-cli/tests/cli_integration.rs
3+
info:
4+
program: datafusion-cli
5+
args:
6+
- "--maxrows"
7+
- "10"
8+
- "--command"
9+
- "select * from generate_series(1,500000) as t1(v1) order by v1;"
10+
---
11+
success: true
12+
exit_code: 0
13+
----- stdout -----
14+
[CLI_VERSION]
15+
+----+
16+
| v1 |
17+
+----+
18+
| 1 |
19+
| 2 |
20+
| 3 |
21+
| 4 |
22+
| 5 |
23+
| 6 |
24+
| 7 |
25+
| 8 |
26+
| 9 |
27+
| 10 |
28+
| . |
29+
| . |
30+
| . |
31+
+----+
32+
500000 row(s) fetched. (First 10 displayed. Use --maxrows to adjust)
33+
[ELAPSED]
34+
35+
36+
----- stderr -----

datafusion/common/src/config.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -669,6 +669,21 @@ config_namespace! {
669669
/// # Default
670670
/// `false` — ANSI SQL mode is disabled by default.
671671
pub enable_ansi_mode: bool, default = false
672+
673+
/// How many bytes to buffer in the probe side of hash joins while the build side is
674+
/// concurrently being built.
675+
///
676+
/// Without this, hash joins will wait until the full materialization of the build side
677+
/// before polling the probe side. This is useful in scenarios where the query is not
678+
/// completely CPU bounded, allowing to do some early work concurrently and reducing the
679+
/// latency of the query.
680+
///
681+
/// Note that when hash join buffering is enabled, the probe side will start eagerly
682+
/// polling data, not giving time for the producer side of dynamic filters to produce any
683+
/// meaningful predicate. Queries with dynamic filters might see performance degradation.
684+
///
685+
/// Disabled by default, set to a number greater than 0 for enabling it.
686+
pub hash_join_buffering_capacity: usize, default = 0
672687
}
673688
}
674689

@@ -922,6 +937,11 @@ config_namespace! {
922937
/// past window functions, if possible
923938
pub enable_window_limits: bool, default = true
924939

940+
/// When set to true, the optimizer will push TopK (Sort with fetch)
941+
/// below hash repartition when the partition key is a prefix of the
942+
/// sort key, reducing data volume before the shuffle.
943+
pub enable_topk_repartition: bool, default = true
944+
925945
/// When set to true, the optimizer will attempt to push down TopK dynamic filters
926946
/// into the file scan phase.
927947
pub enable_topk_dynamic_filter_pushdown: bool, default = true

datafusion/core/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,10 @@ name = "struct_query_sql"
228228
harness = false
229229
name = "window_query_sql"
230230

231+
[[bench]]
232+
harness = false
233+
name = "topk_repartition"
234+
231235
[[bench]]
232236
harness = false
233237
name = "scalar"

0 commit comments

Comments
 (0)