Skip to content

Commit af76215

Browse files
committed
fix: improve grace hash join config defaults
- Change fastPathThreshold to per-task (64 MB) instead of per-executor divided by cores (was 10 MB total) - Change maxBuildSize default from -1 (no limit) to 100 MB to keep SMJ for large build sides where streaming merge outperforms hash join - Remove benchmark config overrides that are now covered by defaults
1 parent b21275f commit af76215

4 files changed

Lines changed: 10 additions & 18 deletions

File tree

benchmarks/tpc/engines/comet-hashjoin.toml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,4 @@ driver_class_path = ["$COMET_JAR"]
3232
"spark.shuffle.manager" = "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager"
3333
"spark.comet.scan.impl" = "native_datafusion"
3434
"spark.comet.exec.replaceSortMergeJoin" = "true"
35-
"spark.comet.exec.replaceSortMergeJoin.maxBuildSize" = "104857600"
36-
"spark.comet.exec.graceHashJoin.fastPathThreshold" = "34359738368"
3735
"spark.comet.expression.Cast.allowIncompatible" = "true"

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -318,15 +318,14 @@ object CometConf extends ShimCometConf {
318318
conf(s"$COMET_EXEC_CONFIG_PREFIX.graceHashJoin.fastPathThreshold")
319319
.category(CATEGORY_EXEC)
320320
.doc(
321-
"Total memory budget in bytes for Grace Hash Join fast-path hash tables across " +
322-
"all concurrent tasks. This is divided by spark.executor.cores to get the per-task " +
323-
"threshold. When a build side fits in memory and is smaller than the per-task " +
324-
"threshold, the join executes as a single HashJoinExec without spilling. " +
321+
"Per-task memory budget in bytes for Grace Hash Join fast-path hash tables. " +
322+
"When a build side fits in memory and is smaller than this threshold, " +
323+
"the join executes as a single HashJoinExec without partitioning or spilling. " +
325324
"Set to 0 to disable the fast path. Larger values risk OOM because HashJoinExec " +
326325
"creates non-spillable hash tables.")
327326
.longConf
328327
.checkValue(v => v >= 0, "The fast path threshold must be non-negative.")
329-
.createWithDefault(10L * 1024 * 1024) // 10 MB
328+
.createWithDefault(64L * 1024 * 1024) // 64 MB
330329

331330
val COMET_NATIVE_COLUMNAR_TO_ROW_ENABLED: ConfigEntry[Boolean] =
332331
conf(s"$COMET_EXEC_CONFIG_PREFIX.columnarToRow.native.enabled")
@@ -414,7 +413,7 @@ object CometConf extends ShimCometConf {
414413
"on pre-sorted data outperforms hash join's per-task hash table construction " +
415414
"for large build sides. Set to -1 to disable this check and always replace.")
416415
.longConf
417-
.createWithDefault(-1L)
416+
.createWithDefault(100L * 1024 * 1024) // 100 MB
418417

419418
val COMET_EXEC_SHUFFLE_WITH_HASH_PARTITIONING_ENABLED: ConfigEntry[Boolean] =
420419
conf("spark.comet.native.shuffle.partitioning.hash.enabled")

docs/source/contributor-guide/grace-hash-join-design.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@ Supports all join types: Inner, Left, Right, Full, LeftSemi, LeftAnti, LeftMark,
3232
| Config Key | Type | Default | Description |
3333
| ---------------------------------------------------- | ------- | ---------- | ---------------------------------------------------------- |
3434
| `spark.comet.exec.replaceSortMergeJoin` | boolean | `false` | Replace SortMergeJoin with ShuffledHashJoin (enables GHJ) |
35-
| `spark.comet.exec.replaceSortMergeJoin.maxBuildSize` | long | `-1` | Max build-side bytes for SMJ replacement. `-1` = no limit |
36-
| `spark.comet.exec.graceHashJoin.numPartitions` | int | `16` | Number of hash partitions (buckets) |
37-
| `spark.comet.exec.graceHashJoin.fastPathThreshold` | int | `10485760` | Total fast-path budget in bytes, divided by executor cores |
35+
| `spark.comet.exec.replaceSortMergeJoin.maxBuildSize` | long | `104857600` | Max build-side bytes for SMJ replacement. `-1` = no limit |
36+
| `spark.comet.exec.graceHashJoin.numPartitions` | int | `16` | Number of hash partitions (buckets) |
37+
| `spark.comet.exec.graceHashJoin.fastPathThreshold` | int | `67108864` | Per-task fast-path budget in bytes |
3838

3939
### SMJ Replacement Guard
4040

native/core/src/execution/planner.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1576,20 +1576,15 @@ impl PhysicalPlanner {
15761576

15771577
use crate::execution::spark_config::{
15781578
SparkConfig, COMET_GRACE_HASH_JOIN_FAST_PATH_THRESHOLD,
1579-
COMET_GRACE_HASH_JOIN_NUM_PARTITIONS, SPARK_EXECUTOR_CORES,
1579+
COMET_GRACE_HASH_JOIN_NUM_PARTITIONS,
15801580
};
15811581

15821582
let num_partitions = self
15831583
.spark_conf
15841584
.get_usize(COMET_GRACE_HASH_JOIN_NUM_PARTITIONS, 16);
1585-
let executor_cores = self.spark_conf.get_usize(SPARK_EXECUTOR_CORES, 1).max(1);
1586-
// The configured threshold is the total budget across all
1587-
// concurrent tasks. Divide by executor cores so each task's
1588-
// fast-path hash table stays within its fair share.
15891585
let fast_path_threshold = self
15901586
.spark_conf
1591-
.get_usize(COMET_GRACE_HASH_JOIN_FAST_PATH_THRESHOLD, 10 * 1024 * 1024)
1592-
/ executor_cores;
1587+
.get_usize(COMET_GRACE_HASH_JOIN_FAST_PATH_THRESHOLD, 64 * 1024 * 1024);
15931588

15941589
let build_left = join.build_side == BuildSide::BuildLeft as i32;
15951590

0 commit comments

Comments
 (0)