Skip to content

Commit bc23c3a

Browse files
committed
chore: add CometGraceHashJoinSuite to CI, run prettier and cargo fmt
- Add CometGraceHashJoinSuite to the 'exec' suite list in both pr_build_linux.yml and pr_build_macos.yml. - Update the Grace Hash Join design doc to list the new maxConcurrentPartitions config and to describe fastPathThreshold as an executor-wide budget (divided by spark.executor.cores). - cargo fmt + prettier formatting.
1 parent 4401493 commit bc23c3a

6 files changed

Lines changed: 18 additions & 22 deletions

File tree

.github/workflows/pr_build_linux.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,7 @@ jobs:
354354
org.apache.comet.exec.CometGenerateExecSuite
355355
org.apache.comet.exec.CometWindowExecSuite
356356
org.apache.comet.exec.CometJoinSuite
357+
org.apache.comet.exec.CometGraceHashJoinSuite
357358
org.apache.comet.CometNativeSuite
358359
org.apache.comet.CometSparkSessionExtensionsSuite
359360
org.apache.spark.CometPluginsSuite

.github/workflows/pr_build_macos.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ jobs:
193193
org.apache.comet.exec.CometGenerateExecSuite
194194
org.apache.comet.exec.CometWindowExecSuite
195195
org.apache.comet.exec.CometJoinSuite
196+
org.apache.comet.exec.CometGraceHashJoinSuite
196197
org.apache.comet.CometNativeSuite
197198
org.apache.comet.CometSparkSessionExtensionsSuite
198199
org.apache.spark.CometPluginsSuite

docs/source/contributor-guide/grace-hash-join-design.md

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,13 @@ Supports all join types: Inner, Left, Right, Full, LeftSemi, LeftAnti, LeftMark,
2929

3030
## Configuration
3131

32-
| Config Key | Type | Default | Description |
33-
| ---------------------------------------------------- | ------- | ---------- | ---------------------------------------------------------- |
34-
| `spark.comet.exec.replaceSortMergeJoin` | boolean | `false` | Replace SortMergeJoin with ShuffledHashJoin (enables GHJ) |
35-
| `spark.comet.exec.replaceSortMergeJoin.maxBuildSize` | long | `104857600` | Max build-side bytes for SMJ replacement. `-1` = no limit |
36-
| `spark.comet.exec.graceHashJoin.numPartitions` | int | `16` | Number of hash partitions (buckets) |
37-
| `spark.comet.exec.graceHashJoin.fastPathThreshold` | int | `67108864` | Per-task fast-path budget in bytes |
32+
| Config Key | Type | Default | Description |
33+
| -------------------------------------------------------- | ------- | ----------- | ---------------------------------------------------------- |
34+
| `spark.comet.exec.replaceSortMergeJoin` | boolean | `false` | Replace SortMergeJoin with ShuffledHashJoin (enables GHJ) |
35+
| `spark.comet.exec.replaceSortMergeJoin.maxBuildSize` | long | `104857600` | Max build-side bytes for SMJ replacement. `-1` = no limit |
36+
| `spark.comet.exec.graceHashJoin.numPartitions` | int | `16` | Number of hash partitions (buckets) |
37+
| `spark.comet.exec.graceHashJoin.fastPathThreshold` | long | `67108864` | Executor-wide fast-path budget in bytes (divided by cores) |
38+
| `spark.comet.exec.graceHashJoin.maxConcurrentPartitions` | int | `2` | Max partitions joined in parallel during Phase 3 |
3839

3940
### SMJ Replacement Guard
4041

native/core/src/execution/operators/grace_hash_join/exec.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,9 @@ use tokio::sync::mpsc;
4444

4545
use super::metrics::GraceHashJoinMetrics;
4646
use super::partition::{
47-
buffer_build_optimistic, finish_spill_writers, merge_finished_partitions,
48-
partition_build_side, partition_from_buffer, partition_probe_side, sub_partition_batches,
49-
BuildBufferResult, FinishedPartition, HashPartition, ScratchSpace,
47+
buffer_build_optimistic, finish_spill_writers, merge_finished_partitions, partition_build_side,
48+
partition_from_buffer, partition_probe_side, sub_partition_batches, BuildBufferResult,
49+
FinishedPartition, HashPartition, ScratchSpace,
5050
};
5151
use super::spill::{read_spilled_batches, SpillReaderExec, StreamSourceExec};
5252
use super::{JoinOnRef, DEFAULT_NUM_PARTITIONS, MAX_RECURSION_DEPTH, TARGET_PARTITION_BUILD_SIZE};

native/core/src/execution/operators/grace_hash_join/spill.rs

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -253,10 +253,8 @@ impl ExecutionPlan for SpillReaderExec {
253253
) {
254254
Ok(r) => r,
255255
Err(e) => {
256-
let _ = tx.blocking_send(Err(DataFusionError::ArrowError(
257-
Box::new(e),
258-
None,
259-
)));
256+
let _ =
257+
tx.blocking_send(Err(DataFusionError::ArrowError(Box::new(e), None)));
260258
return;
261259
}
262260
};
@@ -265,10 +263,8 @@ impl ExecutionPlan for SpillReaderExec {
265263
let batch = match batch_result {
266264
Ok(b) => b,
267265
Err(e) => {
268-
let _ = tx.blocking_send(Err(DataFusionError::ArrowError(
269-
Box::new(e),
270-
None,
271-
)));
266+
let _ = tx
267+
.blocking_send(Err(DataFusionError::ArrowError(Box::new(e), None)));
272268
return;
273269
}
274270
};
@@ -394,9 +390,7 @@ impl ExecutionPlan for StreamSourceExec {
394390
// ---------------------------------------------------------------------------
395391

396392
/// Read record batches from a finished spill file.
397-
pub(super) fn read_spilled_batches(
398-
spill_file: &RefCountedTempFile,
399-
) -> DFResult<Vec<RecordBatch>> {
393+
pub(super) fn read_spilled_batches(spill_file: &RefCountedTempFile) -> DFResult<Vec<RecordBatch>> {
400394
let file = File::open(spill_file.path())
401395
.map_err(|e| DataFusionError::Execution(format!("Failed to open spill file: {e}")))?;
402396
let reader = BufReader::with_capacity(SPILL_IO_BUFFER_SIZE, file);

native/core/src/execution/planner.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1712,8 +1712,7 @@ impl PhysicalPlanner {
17121712
// concurrent tasks. Divide it by `spark.executor.cores` so each
17131713
// task's fast-path hash table stays within its fair share and N
17141714
// concurrent tasks don't collectively exceed the configured budget.
1715-
let executor_cores =
1716-
self.spark_conf.get_usize(SPARK_EXECUTOR_CORES, 1).max(1);
1715+
let executor_cores = self.spark_conf.get_usize(SPARK_EXECUTOR_CORES, 1).max(1);
17171716
let total_fast_path_threshold = self
17181717
.spark_conf
17191718
.get_usize(COMET_GRACE_HASH_JOIN_FAST_PATH_THRESHOLD, 64 * 1024 * 1024);

0 commit comments

Comments
 (0)