Skip to content

Commit 687885f

Browse files
committed
Add prefetch
1 parent b56b75b commit 687885f

2 files changed

Lines changed: 240 additions & 109 deletions

File tree

datafusion/physical-plan/src/joins/grace_hash_join/exec.rs

Lines changed: 55 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,10 @@ use crate::spill::get_record_batch_memory_size;
6767
use crate::spill::spill_manager::SpillLocation;
6868
use ahash::RandomState;
6969
use datafusion_common::hash_utils::create_hashes;
70+
use datafusion_execution::memory_pool::human_readable_size;
7071
use datafusion_physical_expr_common::physical_expr::fmt_sql;
7172
use futures::StreamExt;
73+
use log::debug;
7274

7375
/// Hard-coded seed to ensure hash values from the hash join differ from `RepartitionExec`, avoiding collisions.
7476
const HASH_JOIN_SEED: RandomState =
@@ -952,6 +954,20 @@ pub async fn partition_and_spill(
952954
partition_batch_size,
953955
)
954956
.await?;
957+
958+
// Log spill stats per side for visibility
959+
let left_files: usize = left_index.iter().map(|p| p.chunk_count()).sum();
960+
let right_files: usize = right_index.iter().map(|p| p.chunk_count()).sum();
961+
let left_bytes: usize = left_index.iter().map(|p| p.total_bytes()).sum();
962+
let right_bytes: usize = right_index.iter().map(|p| p.total_bytes()).sum();
963+
debug!(
964+
"Grace hash join partitioning: left files={}, right files={}, left bytes={}, right bytes={}",
965+
left_files,
966+
right_files,
967+
human_readable_size(left_bytes),
968+
human_readable_size(right_bytes)
969+
);
970+
955971
Ok((left_index, right_index))
956972
}
957973

@@ -974,8 +990,10 @@ async fn partition_and_spill_one_side(
974990
// Calculate dynamic buffer size threshold to keep total overhead under control.
975991
// Target total write buffer memory around 64MB per task.
976992
// At least 512KB per partition to ensure some coalescing.
977-
let total_target_buffer_mem = 64 * 1024 * 1024; // 64MB
978-
let batch_size_threshold = (total_target_buffer_mem / partition_count).max(512 * 1024);
993+
// Prefer smaller flush thresholds to keep spill files bounded and reduce stall time.
994+
let total_target_buffer_mem = 16 * 1024 * 1024; // 16MB
995+
let batch_size_threshold =
996+
(total_target_buffer_mem / partition_count).max(512 * 1024);
979997

980998
PartitionWriter::new(
981999
Arc::clone(&spill_manager),
@@ -1107,7 +1125,7 @@ pub struct PartitionWriter {
11071125
spill_manager: Arc<SpillManager>,
11081126
chunks: Vec<SpillChunk>,
11091127
buffer: Vec<RecordBatch>,
1110-
current_buffered_size: usize,
1128+
buffered_bytes: usize,
11111129
schema: SchemaRef,
11121130
batch_size_threshold: usize,
11131131
}
@@ -1122,7 +1140,7 @@ impl PartitionWriter {
11221140
spill_manager,
11231141
chunks: vec![],
11241142
buffer: vec![],
1125-
current_buffered_size: 0,
1143+
buffered_bytes: 0,
11261144
schema,
11271145
batch_size_threshold,
11281146
}
@@ -1137,9 +1155,9 @@ impl PartitionWriter {
11371155
let size = get_record_batch_memory_size(batch);
11381156
reservation.try_grow(size)?;
11391157
self.buffer.push(batch.clone());
1140-
self.current_buffered_size += size;
1158+
self.buffered_bytes += size;
11411159

1142-
if self.current_buffered_size >= self.batch_size_threshold {
1160+
if self.buffered_bytes >= self.batch_size_threshold {
11431161
self.flush(reservation, request_msg)?;
11441162
}
11451163
Ok(())
@@ -1154,35 +1172,19 @@ impl PartitionWriter {
11541172
return Ok(());
11551173
}
11561174

1157-
let large_batch = concat_batches(&self.schema, &self.buffer)?;
1158-
// Clear buffer and release memory tracking for the buffered pieces
1159-
// Note: concat_batches allocates new memory, which is what we are about to spill.
1160-
// The original buffered batches are dropped here.
1175+
// Coalesce buffered batches to reduce file count, but only within the configured threshold.
1176+
let total_accounted = self.buffered_bytes;
1177+
let coalesced = concat_batches(&self.schema, &self.buffer)?;
11611178
self.buffer.clear();
1162-
reservation.shrink(self.current_buffered_size);
1163-
self.current_buffered_size = 0;
1164-
1165-
// Compact StringViewArrays to avoid writing huge buffers
1166-
let new_columns: Vec<Arc<dyn Array>> = large_batch
1167-
.columns()
1168-
.iter()
1169-
.map(|arr| {
1170-
if let Some(sv) = arr.as_any().downcast_ref::<StringViewArray>() {
1171-
Arc::new(sv.gc()) as Arc<dyn Array>
1172-
} else {
1173-
Arc::clone(arr)
1174-
}
1175-
})
1176-
.collect();
1177-
let large_batch = RecordBatch::try_new(Arc::clone(&self.schema), new_columns)?;
1179+
reservation.shrink(total_accounted);
1180+
self.buffered_bytes = 0;
11781181

1179-
// Now spill the coalesced batch.
1180-
// Note: SpillManager might keep it in memory (untracked by us) or spill to disk.
1181-
let size = get_record_batch_memory_size(&large_batch);
1182-
let loc = self.spill_manager.spill_batch_auto(&large_batch, request_msg)?;
1182+
let batch = maybe_compact_string_views(&self.schema, &coalesced)?;
1183+
let spilled_size = get_record_batch_memory_size(&batch);
1184+
let loc = self.spill_manager.spill_batch_auto(&batch, request_msg)?;
11831185
self.chunks.push(SpillChunk {
11841186
location: loc,
1185-
size,
1187+
size: spilled_size,
11861188
});
11871189
Ok(())
11881190
}
@@ -1217,6 +1219,10 @@ impl PartitionIndex {
12171219
pub fn total_bytes(&self) -> usize {
12181220
self.total_bytes
12191221
}
1222+
1223+
pub fn chunk_count(&self) -> usize {
1224+
self.chunks.len()
1225+
}
12201226
}
12211227

12221228
#[derive(Debug, Clone)]
@@ -1225,6 +1231,24 @@ pub struct SpillChunk {
12251231
pub size: usize,
12261232
}
12271233

1234+
fn maybe_compact_string_views(
1235+
schema: &SchemaRef,
1236+
batch: &RecordBatch,
1237+
) -> Result<RecordBatch> {
1238+
let new_columns: Vec<Arc<dyn Array>> = batch
1239+
.columns()
1240+
.iter()
1241+
.map(|arr| {
1242+
if let Some(sv) = arr.as_any().downcast_ref::<StringViewArray>() {
1243+
Arc::new(sv.gc()) as Arc<dyn Array>
1244+
} else {
1245+
Arc::clone(arr)
1246+
}
1247+
})
1248+
.collect();
1249+
RecordBatch::try_new(Arc::clone(schema), new_columns).map_err(Into::into)
1250+
}
1251+
12281252
#[cfg(test)]
12291253
mod tests {
12301254
use super::*;

0 commit comments

Comments
 (0)