Skip to content

Commit 4401493

Browse files
committed
fix: remaining Grace Hash Join correctness issues
- Divide spark.comet.exec.graceHashJoin.fastPathThreshold by spark.executor.cores in the planner so the configured value is an executor-wide budget rather than a per-task one. Without the division, N concurrent tasks could each independently take the fast path and cumulatively exceed the intended budget. Update the CometConf doc to match the design-doc semantics. - Mix recursion_level through the golden-ratio constant when deriving the per-level hash seed. Plain XOR only flipped a few low bits between adjacent levels, letting ahash produce correlated distributions and undermining the recursion depth limit for skewed data. - Generalize SpillReaderExec to accept both in-memory batches and multiple spill files, reading them sequentially into a single coalesced stream. Remove the eager-read fallback in join_with_spilled_probe so merged partitions always honor the streaming-probe design invariant.
1 parent 2b2d2b5 commit 4401493

5 files changed

Lines changed: 129 additions & 92 deletions

File tree

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -309,11 +309,12 @@ object CometConf extends ShimCometConf {
309309
conf(s"$COMET_EXEC_CONFIG_PREFIX.graceHashJoin.fastPathThreshold")
310310
.category(CATEGORY_EXEC)
311311
.doc(
312-
"Per-task memory budget in bytes for Grace Hash Join fast-path hash tables. " +
313-
"When a build side fits in memory and is smaller than this threshold, " +
314-
"the join executes as a single HashJoinExec without partitioning or spilling. " +
315-
"Set to 0 to disable the fast path. Larger values risk OOM because HashJoinExec " +
316-
"creates non-spillable hash tables.")
312+
"Executor-wide memory budget in bytes for Grace Hash Join fast-path hash tables. " +
313+
"The native planner divides this by spark.executor.cores so that each task's " +
314+
"fast-path hash table stays within its fair share. When a task's build side fits " +
315+
"in memory and is smaller than its share, the join executes as a single " +
316+
"HashJoinExec without partitioning or spilling. Set to 0 to disable the fast path. " +
317+
"Larger values risk OOM because HashJoinExec creates non-spillable hash tables.")
317318
.longConf
318319
.checkValue(v => v >= 0, "The fast path threshold must be non-negative.")
319320
.createWithDefault(64L * 1024 * 1024) // 64 MB

native/core/src/execution/operators/grace_hash_join/exec.rs

Lines changed: 10 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -823,28 +823,14 @@ fn join_with_spilled_probe(
823823
// Build side: StreamSourceExec to avoid BatchSplitStream splitting
824824
let build_source = memory_source_exec(build_data, build_schema)?;
825825

826-
// Probe side: streaming from spill file(s).
827-
// With a single spill file and no in-memory batches, use the streaming
828-
// SpillReaderExec. Otherwise read eagerly since the merged group sizes
829-
// are bounded by TARGET_PARTITION_BUILD_SIZE.
830-
let probe_source: Arc<dyn ExecutionPlan> =
831-
if probe_spill_files.len() == 1 && probe_in_memory.is_empty() {
832-
Arc::new(SpillReaderExec::new(
833-
probe_spill_files.into_iter().next().unwrap(),
834-
Arc::clone(probe_schema),
835-
))
836-
} else {
837-
let mut probe_batches = probe_in_memory;
838-
for spill_file in &probe_spill_files {
839-
probe_batches.extend(read_spilled_batches(spill_file)?);
840-
}
841-
let probe_data = if probe_batches.is_empty() {
842-
vec![RecordBatch::new_empty(Arc::clone(probe_schema))]
843-
} else {
844-
vec![concat_batches(probe_schema, &probe_batches)?]
845-
};
846-
memory_source_exec(probe_data, probe_schema)?
847-
};
826+
// Probe side: streamed from spill files with any in-memory batches
827+
// prepended. `SpillReaderExec` handles both sources uniformly so we
828+
// never fall back to eagerly materializing a merged group's probe data.
829+
let probe_source: Arc<dyn ExecutionPlan> = Arc::new(SpillReaderExec::new(
830+
probe_in_memory,
831+
probe_spill_files,
832+
Arc::clone(probe_schema),
833+
));
848834

849835
// HashJoinExec expects left=build in CollectLeft mode
850836
let (left_source, right_source) = if build_left {
@@ -855,14 +841,8 @@ fn join_with_spilled_probe(
855841

856842
info!(
857843
"GraceHashJoin: SPILLED PROBE PATH creating HashJoinExec, \
858-
build_left={}, build_size={}, probe_source={}",
859-
build_left,
860-
build_size,
861-
if probe_spill_files_count == 1 {
862-
"SpillReaderExec"
863-
} else {
864-
"StreamSourceExec"
865-
},
844+
build_left={}, build_size={}, probe_spill_files={}",
845+
build_left, build_size, probe_spill_files_count,
866846
);
867847

868848
let stream = execute_hash_join(

native/core/src/execution/operators/grace_hash_join/partition.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,23 @@ use super::PROBE_PROGRESS_MILESTONE_ROWS;
4545

4646
/// Random state for hashing join keys into partitions. Uses fixed seeds
4747
/// different from DataFusion's HashJoinExec to avoid correlation.
48-
/// The `recursion_level` is XORed into the seed so that recursive
49-
/// repartitioning uses different hash functions at each level.
48+
///
49+
/// Each recursion level must produce a well-separated distribution so that
50+
/// rows which collided at level `N` scatter across sub-partitions at level
51+
/// `N+1`. A naïve `seed ^ recursion_level` only flips a few low bits between
52+
/// adjacent levels — ahash's multiply-rotate mixer can produce correlated
53+
/// outputs from such similar seeds, which would undermine the recursion
54+
/// depth limit for skewed data.
55+
///
56+
/// We mix the level through the golden-ratio constant (the FxHash seed,
57+
/// 2^64 / phi) so successive levels produce seeds that differ in roughly
58+
/// half their bits.
5059
fn partition_random_state(recursion_level: usize) -> RandomState {
60+
const PHI64: u64 = 0x9E3779B97F4A7C15;
61+
let mix = (recursion_level as u64).wrapping_mul(PHI64);
5162
RandomState::with_seeds(
52-
0x517cc1b727220a95 ^ (recursion_level as u64),
53-
0x3a8b7c9d1e2f4056,
63+
0x517cc1b727220a95 ^ mix,
64+
0x3a8b7c9d1e2f4056 ^ mix.rotate_left(32),
5465
0,
5566
0,
5667
)

native/core/src/execution/operators/grace_hash_join/spill.rs

Lines changed: 89 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -111,26 +111,38 @@ impl SpillWriter {
111111
// SpillReaderExec: streaming ExecutionPlan for reading spill files
112112
// ---------------------------------------------------------------------------
113113

114-
/// An ExecutionPlan that streams record batches from an Arrow IPC spill file.
115-
/// Used during the join phase so that spilled probe data is read on-demand
116-
/// instead of loaded entirely into memory.
114+
/// An `ExecutionPlan` that streams record batches from zero or more in-memory
115+
/// batches followed by zero or more Arrow IPC spill files. Used during the
116+
/// join phase so that spilled probe data is read on-demand instead of loaded
117+
/// entirely into memory.
118+
///
119+
/// All sources are concatenated into a single output stream with the same
120+
/// sub-batch coalescing applied uniformly.
117121
#[derive(Debug)]
118122
pub(super) struct SpillReaderExec {
119-
spill_file: RefCountedTempFile,
123+
/// Batches held in memory, emitted first.
124+
initial_batches: Vec<RecordBatch>,
125+
/// Spill files read sequentially after `initial_batches`.
126+
spill_files: Vec<RefCountedTempFile>,
120127
schema: SchemaRef,
121128
cache: Arc<PlanProperties>,
122129
}
123130

124131
impl SpillReaderExec {
125-
pub(super) fn new(spill_file: RefCountedTempFile, schema: SchemaRef) -> Self {
132+
pub(super) fn new(
133+
initial_batches: Vec<RecordBatch>,
134+
spill_files: Vec<RefCountedTempFile>,
135+
schema: SchemaRef,
136+
) -> Self {
126137
let cache = Arc::new(PlanProperties::new(
127138
EquivalenceProperties::new(Arc::clone(&schema)),
128139
Partitioning::UnknownPartitioning(1),
129140
datafusion::physical_plan::execution_plan::EmissionType::Incremental,
130141
datafusion::physical_plan::execution_plan::Boundedness::Bounded,
131142
));
132143
Self {
133-
spill_file,
144+
initial_batches,
145+
spill_files,
134146
schema,
135147
cache,
136148
}
@@ -178,74 +190,100 @@ impl ExecutionPlan for SpillReaderExec {
178190
) -> DFResult<SendableRecordBatchStream> {
179191
let stream_schema = Arc::clone(&self.schema);
180192
let coalesce_schema = Arc::clone(&self.schema);
181-
let path = self.spill_file.path().to_path_buf();
182-
// Move the spill file handle into the blocking closure to keep
183-
// the temp file alive until the reader is done.
184-
let spill_file_handle = self.spill_file.clone();
193+
let initial_batches = self.initial_batches.clone();
194+
// Clone the file handles so the blocking task owns references that
195+
// keep the temp files alive until reading completes.
196+
let spill_files: Vec<RefCountedTempFile> = self.spill_files.to_vec();
185197

186198
// Use a channel so file I/O runs on a blocking thread and doesn't
187199
// block the async executor. This lets select_all interleave multiple
188200
// partition streams effectively.
189201
let (tx, rx) = mpsc::channel::<DFResult<RecordBatch>>(4);
190202

191203
tokio::task::spawn_blocking(move || {
192-
let _keep_alive = spill_file_handle;
193-
let file = match File::open(&path) {
194-
Ok(f) => f,
195-
Err(e) => {
196-
let _ = tx.blocking_send(Err(DataFusionError::Execution(format!(
197-
"Failed to open spill file: {e}"
198-
))));
199-
return;
200-
}
201-
};
202-
let reader = match StreamReader::try_new(
203-
BufReader::with_capacity(SPILL_IO_BUFFER_SIZE, file),
204-
None,
205-
) {
206-
Ok(r) => r,
207-
Err(e) => {
208-
let _ = tx.blocking_send(Err(DataFusionError::ArrowError(Box::new(e), None)));
209-
return;
210-
}
211-
};
212-
213-
// Coalesce small sub-batches into larger ones to reduce per-batch
214-
// overhead in the downstream hash join.
204+
// Small sub-batches (~1000-row inputs split N ways produce ~1000/N
205+
// row sub-batches) are coalesced into ~SPILL_READ_COALESCE_TARGET
206+
// row batches to reduce per-batch overhead in the downstream join.
215207
let mut pending: Vec<RecordBatch> = Vec::new();
216208
let mut pending_rows = 0usize;
217209

218-
for batch_result in reader {
219-
let batch = match batch_result {
220-
Ok(b) => b,
221-
Err(e) => {
222-
let _ =
223-
tx.blocking_send(Err(DataFusionError::ArrowError(Box::new(e), None)));
224-
return;
210+
// Closure-free helper would complicate borrowing; inline the flush.
211+
macro_rules! flush_if_ready {
212+
() => {
213+
if pending_rows >= SPILL_READ_COALESCE_TARGET {
214+
let merged = if pending.len() == 1 {
215+
Ok(pending.pop().unwrap())
216+
} else {
217+
concat_batches(&coalesce_schema, &pending)
218+
.map_err(|e| DataFusionError::ArrowError(Box::new(e), None))
219+
};
220+
pending.clear();
221+
pending_rows = 0;
222+
if tx.blocking_send(merged).is_err() {
223+
return;
224+
}
225225
}
226226
};
227+
}
228+
229+
// Emit any in-memory batches first, applying the same coalescing.
230+
for batch in initial_batches {
227231
if batch.num_rows() == 0 {
228232
continue;
229233
}
230234
pending_rows += batch.num_rows();
231235
pending.push(batch);
236+
flush_if_ready!();
237+
}
232238

233-
if pending_rows >= SPILL_READ_COALESCE_TARGET {
234-
let merged = if pending.len() == 1 {
235-
Ok(pending.pop().unwrap())
236-
} else {
237-
concat_batches(&coalesce_schema, &pending)
238-
.map_err(|e| DataFusionError::ArrowError(Box::new(e), None))
239-
};
240-
pending.clear();
241-
pending_rows = 0;
242-
if tx.blocking_send(merged).is_err() {
239+
// Then read each spill file sequentially.
240+
for spill_file in &spill_files {
241+
let file = match File::open(spill_file.path()) {
242+
Ok(f) => f,
243+
Err(e) => {
244+
let _ = tx.blocking_send(Err(DataFusionError::Execution(format!(
245+
"Failed to open spill file: {e}"
246+
))));
247+
return;
248+
}
249+
};
250+
let reader = match StreamReader::try_new(
251+
BufReader::with_capacity(SPILL_IO_BUFFER_SIZE, file),
252+
None,
253+
) {
254+
Ok(r) => r,
255+
Err(e) => {
256+
let _ = tx.blocking_send(Err(DataFusionError::ArrowError(
257+
Box::new(e),
258+
None,
259+
)));
243260
return;
244261
}
262+
};
263+
264+
for batch_result in reader {
265+
let batch = match batch_result {
266+
Ok(b) => b,
267+
Err(e) => {
268+
let _ = tx.blocking_send(Err(DataFusionError::ArrowError(
269+
Box::new(e),
270+
None,
271+
)));
272+
return;
273+
}
274+
};
275+
if batch.num_rows() == 0 {
276+
continue;
277+
}
278+
pending_rows += batch.num_rows();
279+
pending.push(batch);
280+
flush_if_ready!();
245281
}
246282
}
283+
// Keep the temp files alive until the reader is done.
284+
drop(spill_files);
247285

248-
// Flush remaining
286+
// Flush any remaining buffered batches.
249287
if !pending.is_empty() {
250288
let merged = if pending.len() == 1 {
251289
Ok(pending.pop().unwrap())

native/core/src/execution/planner.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1702,15 +1702,22 @@ impl PhysicalPlanner {
17021702
use crate::execution::spark_config::{
17031703
SparkConfig, COMET_GRACE_HASH_JOIN_FAST_PATH_THRESHOLD,
17041704
COMET_GRACE_HASH_JOIN_MAX_CONCURRENT_PARTITIONS,
1705-
COMET_GRACE_HASH_JOIN_NUM_PARTITIONS,
1705+
COMET_GRACE_HASH_JOIN_NUM_PARTITIONS, SPARK_EXECUTOR_CORES,
17061706
};
17071707

17081708
let num_partitions = self
17091709
.spark_conf
17101710
.get_usize(COMET_GRACE_HASH_JOIN_NUM_PARTITIONS, 16);
1711-
let fast_path_threshold = self
1711+
// The fast-path threshold is the *executor-wide* budget across all
1712+
// concurrent tasks. Divide it by `spark.executor.cores` so each
1713+
// task's fast-path hash table stays within its fair share and N
1714+
// concurrent tasks don't collectively exceed the configured budget.
1715+
let executor_cores =
1716+
self.spark_conf.get_usize(SPARK_EXECUTOR_CORES, 1).max(1);
1717+
let total_fast_path_threshold = self
17121718
.spark_conf
17131719
.get_usize(COMET_GRACE_HASH_JOIN_FAST_PATH_THRESHOLD, 64 * 1024 * 1024);
1720+
let fast_path_threshold = total_fast_path_threshold / executor_cores;
17141721
let max_concurrent_partitions = self
17151722
.spark_conf
17161723
.get_usize(COMET_GRACE_HASH_JOIN_MAX_CONCURRENT_PARTITIONS, 2);

0 commit comments

Comments
 (0)