Skip to content

Commit 623ae65

Browse files
committed
New guard and additional logging
1 parent d8d091b commit 623ae65

1 file changed

Lines changed: 35 additions & 0 deletions

File tree

  • datafusion/physical-plan/src/joins/grace_hash_join

datafusion/physical-plan/src/joins/grace_hash_join/stream.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -832,6 +832,24 @@ impl GraceHashJoinStream {
832832
}
833833
}
834834

835+
if need_repartition {
836+
// If repartitioning would not increase fan-out, skip it to avoid a useless extra pass.
837+
let prospective =
838+
compute_repartition_count(
839+
work.partition_count,
840+
work.total_bytes(),
841+
self.adaptive_budget.current_limit(),
842+
MAX_REPARTITION_PARTITIONS,
843+
);
844+
if prospective <= work.partition_count {
845+
debug!(
846+
"Grace hash join partition {} already at fan-out {}, skipping repartition (prospective {})",
847+
work.partition_id, work.partition_count, prospective
848+
);
849+
need_repartition = false;
850+
}
851+
}
852+
835853
if need_repartition {
836854
// Free loaded bytes before repartitioning
837855
let bytes_to_free = {
@@ -851,6 +869,23 @@ impl GraceHashJoinStream {
851869

852870
if repartition_fut.is_none() {
853871
let to_split = current_work.take().unwrap();
872+
let planned_fanout = compute_repartition_count(
873+
to_split.partition_count,
874+
to_split.total_bytes(),
875+
self.adaptive_budget.current_limit(),
876+
MAX_REPARTITION_PARTITIONS,
877+
);
878+
if planned_fanout <= to_split.partition_count {
879+
debug!(
880+
"Grace hash join partition {} would not increase fan-out ({} -> {}), skipping repartition",
881+
to_split.partition_id,
882+
to_split.partition_count,
883+
planned_fanout
884+
);
885+
*current_work = Some(to_split);
886+
need_repartition = false;
887+
continue;
888+
}
854889
let future = build_repartition_future(
855890
to_split,
856891
self.random_state.clone(),

0 commit comments

Comments
 (0)