Skip to content

Commit 8724b76

Browse files
authored
Add batch coalescing in BufBatchWriter to reduce IPC schema overhead (#3441)
1 parent 7a07db2 commit 8724b76

5 files changed

Lines changed: 191 additions & 9 deletions

File tree

native/core/src/execution/shuffle/partitioners/multi_partition.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -442,14 +442,19 @@ impl MultiPartitionShuffleRepartitioner {
442442
encode_time: &Time,
443443
write_time: &Time,
444444
write_buffer_size: usize,
445+
batch_size: usize,
445446
) -> datafusion::common::Result<()> {
446-
let mut buf_batch_writer =
447-
BufBatchWriter::new(shuffle_block_writer, output_data, write_buffer_size);
447+
let mut buf_batch_writer = BufBatchWriter::new(
448+
shuffle_block_writer,
449+
output_data,
450+
write_buffer_size,
451+
batch_size,
452+
);
448453
for batch in partition_iter {
449454
let batch = batch?;
450455
buf_batch_writer.write(&batch, encode_time, write_time)?;
451456
}
452-
buf_batch_writer.flush(write_time)?;
457+
buf_batch_writer.flush(encode_time, write_time)?;
453458
Ok(())
454459
}
455460

@@ -508,6 +513,7 @@ impl MultiPartitionShuffleRepartitioner {
508513
&self.runtime,
509514
&self.metrics,
510515
self.write_buffer_size,
516+
self.batch_size,
511517
)?;
512518
}
513519

@@ -592,6 +598,7 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner {
592598
&self.metrics.encode_time,
593599
&self.metrics.write_time,
594600
self.write_buffer_size,
601+
self.batch_size,
595602
)?;
596603
}
597604

native/core/src/execution/shuffle/partitioners/single_partition.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,12 @@ impl SinglePartitionShufflePartitioner {
5959
.truncate(true)
6060
.open(output_data_path)?;
6161

62-
let output_data_writer =
63-
BufBatchWriter::new(shuffle_block_writer, output_data_file, write_buffer_size);
62+
let output_data_writer = BufBatchWriter::new(
63+
shuffle_block_writer,
64+
output_data_file,
65+
write_buffer_size,
66+
batch_size,
67+
);
6468

6569
Ok(Self {
6670
output_data_writer,
@@ -162,7 +166,8 @@ impl ShufflePartitioner for SinglePartitionShufflePartitioner {
162166
&self.metrics.write_time,
163167
)?;
164168
}
165-
self.output_data_writer.flush(&self.metrics.write_time)?;
169+
self.output_data_writer
170+
.flush(&self.metrics.encode_time, &self.metrics.write_time)?;
166171

167172
// Write index file. It should only contain 2 entries: 0 and the total number of bytes written
168173
let index_file = OpenOptions::new()

native/core/src/execution/shuffle/shuffle_writer.rs

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -585,4 +585,112 @@ mod test {
585585
let _ = fs::remove_file("/tmp/rr_data_1.out");
586586
let _ = fs::remove_file("/tmp/rr_index_1.out");
587587
}
588+
589+
/// Test that batch coalescing in BufBatchWriter reduces output size by
590+
/// writing fewer, larger IPC blocks instead of many small ones.
591+
#[test]
592+
#[cfg_attr(miri, ignore)]
593+
fn test_batch_coalescing_reduces_size() {
594+
use crate::execution::shuffle::writers::BufBatchWriter;
595+
use arrow::array::Int32Array;
596+
597+
// Create a wide schema to amplify per-block schema overhead
598+
let fields: Vec<Field> = (0..20)
599+
.map(|i| Field::new(format!("col_{i}"), DataType::Int32, false))
600+
.collect();
601+
let schema = Arc::new(Schema::new(fields));
602+
603+
// Create many small batches (50 rows each)
604+
let small_batches: Vec<RecordBatch> = (0..100)
605+
.map(|batch_idx| {
606+
let columns: Vec<Arc<dyn Array>> = (0..20)
607+
.map(|col_idx| {
608+
let values: Vec<i32> = (0..50)
609+
.map(|row| batch_idx * 50 + row + col_idx * 1000)
610+
.collect();
611+
Arc::new(Int32Array::from(values)) as Arc<dyn Array>
612+
})
613+
.collect();
614+
RecordBatch::try_new(Arc::clone(&schema), columns).unwrap()
615+
})
616+
.collect();
617+
618+
let codec = CompressionCodec::Lz4Frame;
619+
let encode_time = Time::default();
620+
let write_time = Time::default();
621+
622+
// Write with coalescing (batch_size=8192)
623+
let mut coalesced_output = Vec::new();
624+
{
625+
let mut writer = ShuffleBlockWriter::try_new(schema.as_ref(), codec.clone()).unwrap();
626+
let mut buf_writer = BufBatchWriter::new(
627+
&mut writer,
628+
Cursor::new(&mut coalesced_output),
629+
1024 * 1024,
630+
8192,
631+
);
632+
for batch in &small_batches {
633+
buf_writer.write(batch, &encode_time, &write_time).unwrap();
634+
}
635+
buf_writer.flush(&encode_time, &write_time).unwrap();
636+
}
637+
638+
// Write without coalescing (batch_size=1)
639+
let mut uncoalesced_output = Vec::new();
640+
{
641+
let mut writer = ShuffleBlockWriter::try_new(schema.as_ref(), codec.clone()).unwrap();
642+
let mut buf_writer = BufBatchWriter::new(
643+
&mut writer,
644+
Cursor::new(&mut uncoalesced_output),
645+
1024 * 1024,
646+
1,
647+
);
648+
for batch in &small_batches {
649+
buf_writer.write(batch, &encode_time, &write_time).unwrap();
650+
}
651+
buf_writer.flush(&encode_time, &write_time).unwrap();
652+
}
653+
654+
// Coalesced output should be smaller due to fewer IPC schema blocks
655+
assert!(
656+
coalesced_output.len() < uncoalesced_output.len(),
657+
"Coalesced output ({} bytes) should be smaller than uncoalesced ({} bytes)",
658+
coalesced_output.len(),
659+
uncoalesced_output.len()
660+
);
661+
662+
// Verify both roundtrip correctly by reading all IPC blocks
663+
let coalesced_rows = read_all_ipc_blocks(&coalesced_output);
664+
let uncoalesced_rows = read_all_ipc_blocks(&uncoalesced_output);
665+
assert_eq!(
666+
coalesced_rows, 5000,
667+
"Coalesced should contain all 5000 rows"
668+
);
669+
assert_eq!(
670+
uncoalesced_rows, 5000,
671+
"Uncoalesced should contain all 5000 rows"
672+
);
673+
}
674+
675+
/// Read all IPC blocks from a byte buffer written by BufBatchWriter/ShuffleBlockWriter,
676+
/// returning the total number of rows.
677+
fn read_all_ipc_blocks(data: &[u8]) -> usize {
678+
let mut offset = 0;
679+
let mut total_rows = 0;
680+
while offset < data.len() {
681+
// First 8 bytes are the IPC length (little-endian u64)
682+
let ipc_length =
683+
u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap()) as usize;
684+
// Skip the 8-byte length prefix; the next 8 bytes are field_count + codec header
685+
let block_start = offset + 8;
686+
let block_end = block_start + ipc_length;
687+
// read_ipc_compressed expects data starting after the 16-byte header
688+
// (i.e., after length + field_count), at the codec tag
689+
let ipc_data = &data[block_start + 8..block_end];
690+
let batch = read_ipc_compressed(ipc_data).unwrap();
691+
total_rows += batch.num_rows();
692+
offset = block_end;
693+
}
694+
total_rows
695+
}
588696
}

native/core/src/execution/shuffle/writers/buf_batch_writer.rs

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,27 +17,44 @@
1717

1818
use crate::execution::shuffle::ShuffleBlockWriter;
1919
use arrow::array::RecordBatch;
20+
use arrow::compute::kernels::coalesce::BatchCoalescer;
2021
use datafusion::physical_plan::metrics::Time;
2122
use std::borrow::Borrow;
2223
use std::io::{Cursor, Seek, SeekFrom, Write};
2324

2425
/// Write batches to writer while using a buffer to avoid frequent system calls.
2526
/// The record batches were first written by ShuffleBlockWriter into an internal buffer.
2627
/// Once the buffer exceeds the max size, the buffer will be flushed to the writer.
28+
///
29+
/// Small batches are coalesced using Arrow's [`BatchCoalescer`] before serialization,
30+
/// producing exactly `batch_size`-row output batches to reduce per-block IPC schema overhead.
31+
/// The coalescer is lazily initialized on the first write.
2732
pub(crate) struct BufBatchWriter<S: Borrow<ShuffleBlockWriter>, W: Write> {
2833
shuffle_block_writer: S,
2934
writer: W,
3035
buffer: Vec<u8>,
3136
buffer_max_size: usize,
37+
/// Coalesces small batches into target_batch_size before serialization.
38+
/// Lazily initialized on first write to capture the schema.
39+
coalescer: Option<BatchCoalescer>,
40+
/// Target batch size for coalescing
41+
batch_size: usize,
3242
}
3343

3444
impl<S: Borrow<ShuffleBlockWriter>, W: Write> BufBatchWriter<S, W> {
35-
pub(crate) fn new(shuffle_block_writer: S, writer: W, buffer_max_size: usize) -> Self {
45+
pub(crate) fn new(
46+
shuffle_block_writer: S,
47+
writer: W,
48+
buffer_max_size: usize,
49+
batch_size: usize,
50+
) -> Self {
3651
Self {
3752
shuffle_block_writer,
3853
writer,
3954
buffer: vec![],
4055
buffer_max_size,
56+
coalescer: None,
57+
batch_size,
4158
}
4259
}
4360

@@ -46,6 +63,32 @@ impl<S: Borrow<ShuffleBlockWriter>, W: Write> BufBatchWriter<S, W> {
4663
batch: &RecordBatch,
4764
encode_time: &Time,
4865
write_time: &Time,
66+
) -> datafusion::common::Result<usize> {
67+
let coalescer = self
68+
.coalescer
69+
.get_or_insert_with(|| BatchCoalescer::new(batch.schema(), self.batch_size));
70+
coalescer.push_batch(batch.clone())?;
71+
72+
// Drain completed batches into a local vec so the coalescer borrow ends
73+
// before we call write_batch_to_buffer (which borrows &mut self).
74+
let mut completed = Vec::new();
75+
while let Some(batch) = coalescer.next_completed_batch() {
76+
completed.push(batch);
77+
}
78+
79+
let mut bytes_written = 0;
80+
for batch in &completed {
81+
bytes_written += self.write_batch_to_buffer(batch, encode_time, write_time)?;
82+
}
83+
Ok(bytes_written)
84+
}
85+
86+
/// Serialize a single batch into the byte buffer, flushing to the writer if needed.
87+
fn write_batch_to_buffer(
88+
&mut self,
89+
batch: &RecordBatch,
90+
encode_time: &Time,
91+
write_time: &Time,
4992
) -> datafusion::common::Result<usize> {
5093
let mut cursor = Cursor::new(&mut self.buffer);
5194
cursor.seek(SeekFrom::End(0))?;
@@ -63,7 +106,24 @@ impl<S: Borrow<ShuffleBlockWriter>, W: Write> BufBatchWriter<S, W> {
63106
Ok(bytes_written)
64107
}
65108

66-
pub(crate) fn flush(&mut self, write_time: &Time) -> datafusion::common::Result<()> {
109+
pub(crate) fn flush(
110+
&mut self,
111+
encode_time: &Time,
112+
write_time: &Time,
113+
) -> datafusion::common::Result<()> {
114+
// Finish any remaining buffered rows in the coalescer
115+
let mut remaining = Vec::new();
116+
if let Some(coalescer) = &mut self.coalescer {
117+
coalescer.finish_buffered_batch()?;
118+
while let Some(batch) = coalescer.next_completed_batch() {
119+
remaining.push(batch);
120+
}
121+
}
122+
for batch in &remaining {
123+
self.write_batch_to_buffer(batch, encode_time, write_time)?;
124+
}
125+
126+
// Flush the byte buffer to the underlying writer
67127
let mut write_timer = write_time.timer();
68128
if !self.buffer.is_empty() {
69129
self.writer.write_all(&self.buffer)?;

native/core/src/execution/shuffle/writers/partition_writer.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ impl PartitionWriter {
7979
runtime: &RuntimeEnv,
8080
metrics: &ShufflePartitionerMetrics,
8181
write_buffer_size: usize,
82+
batch_size: usize,
8283
) -> datafusion::common::Result<usize> {
8384
if let Some(batch) = iter.next() {
8485
self.ensure_spill_file_created(runtime)?;
@@ -88,6 +89,7 @@ impl PartitionWriter {
8889
&mut self.shuffle_block_writer,
8990
&mut self.spill_file.as_mut().unwrap().file,
9091
write_buffer_size,
92+
batch_size,
9193
);
9294
let mut bytes_written =
9395
buf_batch_writer.write(&batch?, &metrics.encode_time, &metrics.write_time)?;
@@ -99,7 +101,7 @@ impl PartitionWriter {
99101
&metrics.write_time,
100102
)?;
101103
}
102-
buf_batch_writer.flush(&metrics.write_time)?;
104+
buf_batch_writer.flush(&metrics.encode_time, &metrics.write_time)?;
103105
bytes_written
104106
};
105107

0 commit comments

Comments
 (0)