Skip to content

Commit de04ffd

Browse files
committed
Temporary unknown partitioning to make it work
1 parent 0cac457 commit de04ffd

3 files changed

Lines changed: 78 additions & 17 deletions

File tree

datafusion/common/src/config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -764,7 +764,7 @@ config_namespace! {
764764
/// Should DataFusion use spillable partitioned hash joins instead of regular partitioned joins
765765
/// when repartitioning is enabled. This allows handling larger datasets by spilling to disk
766766
/// when memory pressure occurs during join execution.
767-
pub enable_spillable_hash_join: bool, default = false
767+
pub enable_spillable_hash_join: bool, default = true
768768

769769
/// Should DataFusion allow symmetric hash joins for unbounded data sources even when
770770
/// its inputs do not have any ordering or filtering If the flag is not enabled,

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 76 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ use crate::{
5050
need_produce_result_in_final, symmetric_join_output_partitioning,
5151
BuildProbeJoinMetrics, ColumnIndex, JoinFilter, JoinHashMapType,
5252
},
53-
metrics::{ExecutionPlanMetricsSet, MetricsSet},
53+
metrics::{ExecutionPlanMetricsSet, MetricsSet, SpillMetrics},
5454
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
5555
PlanProperties, SendableRecordBatchStream, Statistics,
5656
};
@@ -594,8 +594,9 @@ impl HashJoinExec {
594594
symmetric_join_output_partitioning(left, right, &join_type)?
595595
}
596596
PartitionMode::PartitionedSpillable => {
597-
// For partitioned spillable, use the same partitioning as regular partitioned
598-
symmetric_join_output_partitioning(left, right, &join_type)?
597+
// While stabilizing spillable join, advertise single output partition to
598+
// match the current execution behavior and avoid downstream partition fanout.
599+
Partitioning::UnknownPartitioning(1)
599600
}
600601
};
601602

@@ -802,18 +803,11 @@ impl ExecutionPlan for HashJoinExec {
802803
Distribution::UnspecifiedDistribution,
803804
Distribution::UnspecifiedDistribution,
804805
],
805-
PartitionMode::PartitionedSpillable => {
806-
// For partitioned spillable, use the same distribution as regular partitioned
807-
let (left_expr, right_expr) = self
808-
.on
809-
.iter()
810-
.map(|(l, r)| (Arc::clone(l), Arc::clone(r)))
811-
.unzip();
812-
vec![
813-
Distribution::HashPartitioned(left_expr),
814-
Distribution::HashPartitioned(right_expr),
815-
]
816-
}
806+
PartitionMode::PartitionedSpillable => vec![
807+
// While stabilizing, do not require specific input distributions
808+
Distribution::UnspecifiedDistribution,
809+
Distribution::UnspecifiedDistribution,
810+
],
817811
}
818812
}
819813

@@ -1027,6 +1021,8 @@ impl ExecutionPlan for HashJoinExec {
10271021
};
10281022
let partitioned_reservation = MemoryConsumer::new("PartitionedHashJoin")
10291023
.register(context.memory_pool());
1024+
// Reuse this operator's metrics set for spill metrics visibility
1025+
let spill_metrics = SpillMetrics::new(&self.metrics, partition);
10301026
let partitioned_stream = PartitionedHashJoinStream::new(
10311027
partition,
10321028
self.schema(),
@@ -1038,6 +1034,7 @@ impl ExecutionPlan for HashJoinExec {
10381034
left_fut,
10391035
self.random_state.clone(),
10401036
join_metrics,
1037+
spill_metrics,
10411038
column_indices_after_projection,
10421039
self.null_equality,
10431040
batch_size,
@@ -4656,4 +4653,68 @@ mod tests {
46564653
fn columns(schema: &Schema) -> Vec<String> {
46574654
schema.fields().iter().map(|f| f.name().clone()).collect()
46584655
}
4656+
4657+
#[tokio::test]
4658+
async fn partitioned_spillable_spills_to_disk() -> Result<()> {
4659+
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
4660+
// Force spilling with very low reservation; single partition correctness path
4661+
let session_config = SessionConfig::default()
4662+
.with_batch_size(1024)
4663+
.with_target_partitions(1)
4664+
.with_sort_spill_reservation_bytes(1)
4665+
.with_spill_compression(datafusion_common::config::SpillCompression::Uncompressed);
4666+
let runtime = RuntimeEnvBuilder::new().build_arc()?;
4667+
let task_ctx = Arc::new(TaskContext::default()
4668+
.with_session_config(session_config)
4669+
.with_runtime(runtime));
4670+
4671+
// Build left/right to ensure build side has more than 1 row to trigger spill partitioning
4672+
let left = build_table(
4673+
("a1", &vec![1, 2, 3, 4, 5, 6, 7, 8]),
4674+
("b1", &vec![1, 1, 1, 1, 1, 1, 1, 1]),
4675+
("c1", &vec![0, 0, 0, 0, 0, 0, 0, 0]),
4676+
);
4677+
let right = build_table(
4678+
("a2", &vec![10, 20, 30, 40]),
4679+
("b1", &vec![1, 1, 1, 2]),
4680+
("c2", &vec![0, 0, 0, 0]),
4681+
);
4682+
let on = vec![
4683+
(
4684+
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
4685+
Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
4686+
),
4687+
];
4688+
4689+
// Execute with PartitionedSpillable
4690+
let join = HashJoinExec::try_new(
4691+
Arc::clone(&left),
4692+
Arc::clone(&right),
4693+
on,
4694+
None,
4695+
&JoinType::Inner,
4696+
None,
4697+
PartitionMode::PartitionedSpillable,
4698+
NullEquality::NullEqualsNothing,
4699+
)?;
4700+
4701+
let stream = join.execute(0, Arc::clone(&task_ctx))?;
4702+
// Collect all batches to drive execution and spill
4703+
let _ = common::collect(stream).await?;
4704+
4705+
// Assert that spilling occurred by inspecting metrics on the operator
4706+
let metrics = join.metrics().unwrap();
4707+
// Find any spill metrics in the tree and ensure spilled_rows > 0
4708+
let mut spilled_any = false;
4709+
for m in metrics.iter() {
4710+
let name = m.value().name();
4711+
let v = m.value().as_usize();
4712+
if (name == "spilled_rows" || name == "spilled_bytes" || name == "spill_count") && v > 0 {
4713+
spilled_any = true;
4714+
break;
4715+
}
4716+
}
4717+
assert!(spilled_any, "expected spilling to occur in PartitionedSpillable mode");
4718+
Ok(())
4719+
}
46594720
}

datafusion/physical-plan/src/joins/hash_join/partitioned.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,7 @@ impl PartitionedHashJoinStream {
298298
left_fut: OnceFut<JoinLeftData>,
299299
random_state: RandomState,
300300
join_metrics: BuildProbeJoinMetrics,
301+
spill_metrics: SpillMetrics,
301302
column_indices: Vec<ColumnIndex>,
302303
null_equality: NullEquality,
303304
batch_size: usize,
@@ -306,7 +307,6 @@ impl PartitionedHashJoinStream {
306307
memory_reservation: MemoryReservation,
307308
runtime_env: Arc<RuntimeEnv>,
308309
) -> Result<Self> {
309-
let spill_metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), partition);
310310
let spill_manager = SpillManager::new(
311311
runtime_env.clone(),
312312
spill_metrics,

0 commit comments

Comments
 (0)