Skip to content

Commit 5e803f1

Browse files
andygroveclaude
andcommitted
perf: optimistic fast path for grace hash join
Skip build-side hash partitioning when the fast path threshold is set. Instead of always computing hashes and splitting every build batch into N partitions (only to collect them back together for the fast path), buffer the build side directly. When the build fits in memory and is under the threshold, feed it straight to HashJoinExec with zero partitioning overhead. Falls back to the partitioned slow path on memory pressure or when the build exceeds the threshold. Also fix CometConf fastPathThreshold type from intConf to longConf to support values > 2 GB without integer overflow, and remove a duplicate config line in the benchmark TOML. ~4% improvement on both TPC-H and TPC-DS benchmarks. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 7b7d834 commit 5e803f1

3 files changed

Lines changed: 341 additions & 125 deletions

File tree

benchmarks/tpc/engines/comet-hashjoin.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ driver_class_path = ["$COMET_JAR"]
3232
"spark.shuffle.manager" = "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager"
3333
"spark.comet.scan.impl" = "native_datafusion"
3434
"spark.comet.exec.replaceSortMergeJoin" = "true"
35-
"spark.comet.exec.replaceSortMergeJoin" = "true"
3635
"spark.comet.exec.replaceSortMergeJoin.maxBuildSize" = "104857600"
3736
"spark.comet.exec.graceHashJoin.fastPathThreshold" = "34359738368"
3837
"spark.comet.expression.Cast.allowIncompatible" = "true"

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ object CometConf extends ShimCometConf {
314314
.checkValue(v => v > 0, "The number of partitions must be positive.")
315315
.createWithDefault(16)
316316

317-
val COMET_EXEC_GRACE_HASH_JOIN_FAST_PATH_THRESHOLD: ConfigEntry[Int] =
317+
val COMET_EXEC_GRACE_HASH_JOIN_FAST_PATH_THRESHOLD: ConfigEntry[Long] =
318318
conf(s"$COMET_EXEC_CONFIG_PREFIX.graceHashJoin.fastPathThreshold")
319319
.category(CATEGORY_EXEC)
320320
.doc(
@@ -324,9 +324,9 @@ object CometConf extends ShimCometConf {
324324
"threshold, the join executes as a single HashJoinExec without spilling. " +
325325
"Set to 0 to disable the fast path. Larger values risk OOM because HashJoinExec " +
326326
"creates non-spillable hash tables.")
327-
.intConf
327+
.longConf
328328
.checkValue(v => v >= 0, "The fast path threshold must be non-negative.")
329-
.createWithDefault(10 * 1024 * 1024) // 10 MB
329+
.createWithDefault(10L * 1024 * 1024) // 10 MB
330330

331331
val COMET_NATIVE_COLUMNAR_TO_ROW_ENABLED: ConfigEntry[Boolean] =
332332
conf(s"$COMET_EXEC_CONFIG_PREFIX.columnarToRow.native.enabled")

0 commit comments

Comments
 (0)