Skip to content

Commit 4f8791e

Browse files
authored
chore(shuffle): add interleave_time metric and specify buffer size for output_data buffer writer (#4599)
1 parent ceecae7 commit 4f8791e

5 files changed

Lines changed: 24 additions & 10 deletions

File tree

native/shuffle/src/bin/shuffle_bench.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,9 @@ fn print_shuffle_metrics(metrics: &MetricsSet, total_wall_time_secs: f64) {
277277
if let Some(nanos) = get_metric("repart_time") {
278278
println!(" repart time: {}", fmt_time(nanos));
279279
}
280+
if let Some(nanos) = get_metric("interleave_time") {
281+
println!(" interleave time: {}", fmt_time(nanos));
282+
}
280283
if let Some(nanos) = get_metric("encode_time") {
281284
println!(" encode time: {}", fmt_time(nanos));
282285
}

native/shuffle/src/metrics.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ pub(crate) struct ShufflePartitionerMetrics {
2727
/// Time to perform repartitioning
2828
pub(crate) repart_time: Time,
2929

30+
/// Time spent in `interleave_record_batch` gathering shuffled batches
31+
pub(crate) interleave_time: Time,
32+
3033
/// Time encoding batches to IPC format
3134
pub(crate) encode_time: Time,
3235

@@ -51,6 +54,7 @@ impl ShufflePartitionerMetrics {
5154
Self {
5255
baseline: BaselineMetrics::new(metrics, partition),
5356
repart_time: MetricBuilder::new(metrics).subset_time("repart_time", partition),
57+
interleave_time: MetricBuilder::new(metrics).subset_time("interleave_time", partition),
5458
encode_time: MetricBuilder::new(metrics).subset_time("encode_time", partition),
5559
write_time: MetricBuilder::new(metrics).subset_time("write_time", partition),
5660
input_batches: MetricBuilder::new(metrics).counter("input_batches", partition),

native/shuffle/src/partitioners/multi_partition.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -434,10 +434,12 @@ impl MultiPartitionShuffleRepartitioner {
434434
Ok(())
435435
}
436436

437+
#[allow(clippy::too_many_arguments)]
437438
fn shuffle_write_partition(
438439
partition_iter: &mut PartitionedBatchIterator,
439440
shuffle_block_writer: &mut ShuffleBlockWriter,
440441
output_data: &mut BufWriter<File>,
442+
interleave_time: &Time,
441443
encode_time: &Time,
442444
write_time: &Time,
443445
write_buffer_size: usize,
@@ -449,7 +451,7 @@ impl MultiPartitionShuffleRepartitioner {
449451
write_buffer_size,
450452
batch_size,
451453
);
452-
for batch in partition_iter {
454+
while let Some(batch) = partition_iter.next(interleave_time) {
453455
let batch = batch?;
454456
buf_batch_writer.write(&batch, encode_time, write_time)?;
455457
}
@@ -573,7 +575,7 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner {
573575
.open(data_file)
574576
.map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?;
575577

576-
let mut output_data = BufWriter::new(output_data);
578+
let mut output_data = BufWriter::with_capacity(self.write_buffer_size, output_data);
577579

578580
#[allow(clippy::needless_range_loop)]
579581
for i in 0..num_output_partitions {
@@ -596,6 +598,7 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner {
596598
&mut partition_iter,
597599
&mut self.shuffle_block_writer,
598600
&mut output_data,
601+
&self.metrics.interleave_time,
599602
&self.metrics.encode_time,
600603
&self.metrics.write_time,
601604
self.write_buffer_size,

native/shuffle/src/partitioners/partitioned_batch_iterator.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
use arrow::array::RecordBatch;
1919
use arrow::compute::interleave_record_batch;
2020
use datafusion::common::DataFusionError;
21+
use datafusion::physical_plan::metrics::Time;
2122

2223
/// A helper struct to produce shuffled batches.
2324
/// This struct takes ownership of the buffered batches and partition indices from the
@@ -85,19 +86,22 @@ impl<'a> PartitionedBatchIterator<'a> {
8586
pos: 0,
8687
}
8788
}
88-
}
89-
90-
impl Iterator for PartitionedBatchIterator<'_> {
91-
type Item = datafusion::common::Result<RecordBatch>;
9289

93-
fn next(&mut self) -> Option<Self::Item> {
90+
/// Returns the next shuffled batch, recording the gather cost into `interleave_time`.
91+
pub(crate) fn next(
92+
&mut self,
93+
interleave_time: &Time,
94+
) -> Option<datafusion::common::Result<RecordBatch>> {
9495
if self.pos >= self.indices.len() {
9596
return None;
9697
}
9798

9899
let indices_end = std::cmp::min(self.pos + self.batch_size, self.indices.len());
99100
let indices = &self.indices[self.pos..indices_end];
100-
match interleave_record_batch(&self.record_batches, indices) {
101+
let mut timer = interleave_time.timer();
102+
let result = interleave_record_batch(&self.record_batches, indices);
103+
timer.stop();
104+
match result {
101105
Ok(batch) => {
102106
self.pos = indices_end;
103107
Some(Ok(batch))

native/shuffle/src/writers/spill.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ impl PartitionWriter {
8383
write_buffer_size: usize,
8484
batch_size: usize,
8585
) -> datafusion::common::Result<usize> {
86-
if let Some(batch) = iter.next() {
86+
if let Some(batch) = iter.next(&metrics.interleave_time) {
8787
self.ensure_spill_file_created(runtime)?;
8888

8989
let total_bytes_written = {
@@ -95,7 +95,7 @@ impl PartitionWriter {
9595
);
9696
let mut bytes_written =
9797
buf_batch_writer.write(&batch?, &metrics.encode_time, &metrics.write_time)?;
98-
for batch in iter {
98+
while let Some(batch) = iter.next(&metrics.interleave_time) {
9999
let batch = batch?;
100100
bytes_written += buf_batch_writer.write(
101101
&batch,

0 commit comments

Comments
 (0)