Skip to content

Commit 8b0b12d

Browse files
committed
Add experimental Grace Hash Join operator
1 parent 425f9c9 commit 8b0b12d

16 files changed

Lines changed: 3788 additions & 92 deletions

File tree

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,28 @@ object CometConf extends ShimCometConf {
296296
val COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED: ConfigEntry[Boolean] =
297297
createExecEnabledConfig("localTableScan", defaultValue = false)
298298

299+
val COMET_EXEC_GRACE_HASH_JOIN_NUM_PARTITIONS: ConfigEntry[Int] =
300+
conf(s"$COMET_EXEC_CONFIG_PREFIX.graceHashJoin.numPartitions")
301+
.category(CATEGORY_EXEC)
302+
.doc("The number of partitions (buckets) to use for Grace Hash Join. A higher number " +
303+
"reduces the size of each partition but increases overhead.")
304+
.intConf
305+
.checkValue(v => v > 0, "The number of partitions must be positive.")
306+
.createWithDefault(16)
307+
308+
val COMET_EXEC_GRACE_HASH_JOIN_FAST_PATH_THRESHOLD: ConfigEntry[Long] =
309+
conf(s"$COMET_EXEC_CONFIG_PREFIX.graceHashJoin.fastPathThreshold")
310+
.category(CATEGORY_EXEC)
311+
.doc(
312+
"Per-task memory budget in bytes for Grace Hash Join fast-path hash tables. " +
313+
"When a build side fits in memory and is smaller than this threshold, " +
314+
"the join executes as a single HashJoinExec without partitioning or spilling. " +
315+
"Set to 0 to disable the fast path. Larger values risk OOM because HashJoinExec " +
316+
"creates non-spillable hash tables.")
317+
.longConf
318+
.checkValue(v => v >= 0, "The fast path threshold must be non-negative.")
319+
.createWithDefault(64L * 1024 * 1024) // 64 MB
320+
299321
val COMET_NATIVE_COLUMNAR_TO_ROW_ENABLED: ConfigEntry[Boolean] =
300322
conf(s"$COMET_EXEC_CONFIG_PREFIX.columnarToRow.native.enabled")
301323
.category(CATEGORY_EXEC)
@@ -383,6 +405,18 @@ object CometConf extends ShimCometConf {
383405
.booleanConf
384406
.createWithDefault(false)
385407

408+
val COMET_REPLACE_SMJ_MAX_BUILD_SIZE: ConfigEntry[Long] =
409+
conf(s"$COMET_EXEC_CONFIG_PREFIX.replaceSortMergeJoin.maxBuildSize")
410+
.category(CATEGORY_EXEC)
411+
.doc(
412+
"Maximum estimated size in bytes of the build side for replacing SortMergeJoin " +
413+
"with ShuffledHashJoin. When the build side's logical plan statistics exceed this " +
414+
"threshold, the SortMergeJoin is kept because sort-merge join's streaming merge " +
415+
"on pre-sorted data outperforms hash join's per-task hash table construction " +
416+
"for large build sides. Set to -1 to disable this check and always replace.")
417+
.longConf
418+
.createWithDefault(100L * 1024 * 1024) // 100 MB
419+
386420
val COMET_EXEC_SHUFFLE_WITH_HASH_PARTITIONING_ENABLED: ConfigEntry[Boolean] =
387421
conf("spark.comet.native.shuffle.partitioning.hash.enabled")
388422
.category(CATEGORY_SHUFFLE)

docs/source/contributor-guide/grace-hash-join-design.md

Lines changed: 293 additions & 0 deletions
Large diffs are not rendered by default.

native/Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

native/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ edition = "2021"
3434
rust-version = "1.88"
3535

3636
[workspace.dependencies]
37-
arrow = { version = "58.1.0", features = ["prettyprint", "ffi", "chrono-tz"] }
37+
arrow = { version = "58.1.0", features = ["prettyprint", "ffi", "chrono-tz", "ipc_compression"] }
3838
async-trait = { version = "0.1" }
3939
bytes = { version = "1.11.1" }
4040
parquet = { version = "58.1.0", default-features = false, features = ["experimental"] }

native/core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ include = [
3535
publish = false
3636

3737
[dependencies]
38+
ahash = "0.8"
3839
arrow = { workspace = true }
3940
parquet = { workspace = true, default-features = false, features = ["experimental", "arrow"] }
4041
futures = { workspace = true }

native/core/src/execution/jni_api.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,8 @@ struct ExecutionContext {
297297
pub memory_pool_config: MemoryPoolConfig,
298298
/// Whether to log memory usage on each call to execute_plan
299299
pub tracing_enabled: bool,
300+
/// Spark configuration map for comet-specific settings
301+
pub spark_conf: HashMap<String, String>,
300302
/// Rust thread ID, used for aggregating tracing metrics per thread
301303
pub rust_thread_id: u64,
302304
/// Pre-computed metric name for tracing memory usage
@@ -471,6 +473,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
471473
explain_native,
472474
memory_pool_config,
473475
tracing_enabled,
476+
spark_conf: spark_config,
474477
rust_thread_id,
475478
tracing_memory_metric_name: format!(
476479
"thread_{rust_thread_id}_comet_memory_reserved"
@@ -697,7 +700,8 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
697700
let start = Instant::now();
698701
let planner =
699702
PhysicalPlanner::new(Arc::clone(&exec_context.session_ctx), partition)
700-
.with_exec_id(exec_context_id);
703+
.with_exec_id(exec_context_id)
704+
.with_spark_conf(exec_context.spark_conf.clone());
701705
let (scans, shuffle_scans, root_op) = planner.create_plan(
702706
&exec_context.spark_plan,
703707
&mut exec_context.input_sources.clone(),

0 commit comments

Comments
 (0)