Skip to content

Commit 80b72c5

Browse files
committed
more cleanup
1 parent b96aea6 commit 80b72c5

10 files changed

Lines changed: 354 additions & 309 deletions

File tree

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
use datafusion::physical_plan::metrics::{
2+
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, Time,
3+
};
4+
5+
pub(super) struct ShuffleRepartitionerMetrics {
6+
/// metrics
7+
pub(super) baseline: BaselineMetrics,
8+
9+
/// Time to perform repartitioning
10+
pub(super) repart_time: Time,
11+
12+
/// Time encoding batches to IPC format
13+
pub(super) encode_time: Time,
14+
15+
/// Time spent writing to disk. Maps to "shuffleWriteTime" in Spark SQL Metrics.
16+
pub(super) write_time: Time,
17+
18+
/// Number of input batches
19+
pub(super) input_batches: Count,
20+
21+
/// count of spills during the execution of the operator
22+
pub(super) spill_count: Count,
23+
24+
/// total spilled bytes during the execution of the operator
25+
pub(super) spilled_bytes: Count,
26+
27+
/// The original size of spilled data. Different to `spilled_bytes` because of compression.
28+
pub(super) data_size: Count,
29+
}
30+
31+
impl ShuffleRepartitionerMetrics {
32+
pub(super) fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
33+
Self {
34+
baseline: BaselineMetrics::new(metrics, partition),
35+
repart_time: MetricBuilder::new(metrics).subset_time("repart_time", partition),
36+
encode_time: MetricBuilder::new(metrics).subset_time("encode_time", partition),
37+
write_time: MetricBuilder::new(metrics).subset_time("write_time", partition),
38+
input_batches: MetricBuilder::new(metrics).counter("input_batches", partition),
39+
spill_count: MetricBuilder::new(metrics).spill_count(partition),
40+
spilled_bytes: MetricBuilder::new(metrics).spilled_bytes(partition),
41+
data_size: MetricBuilder::new(metrics).counter("data_size", partition),
42+
}
43+
}
44+
}

native/core/src/execution/shuffle/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@
1717

1818
pub(crate) mod codec;
1919
mod comet_partitioning;
20-
mod repartitioners;
20+
mod metrics;
21+
mod partitioners;
2122
mod shuffle_writer;
2223
pub mod spark_unsafe;
24+
mod writers;
2325

2426
pub use codec::{read_ipc_compressed, CompressionCodec, ShuffleBlockWriter};
2527
pub use comet_partitioning::CometPartitioning;

native/core/src/execution/shuffle/repartitioners/mod.rs renamed to native/core/src/execution/shuffle/partitioners/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
use arrow::array::RecordBatch;
22

3-
pub(super) use multi_partition::MultiPartitionShuffleRepartitioner;
3+
pub(super) use multi_partition::MultiPartitionShufflePartitioner;
4+
pub(super) use partitioned_batches_producer::{
5+
PartitionedBatchIterator, PartitionedBatchesProducer,
6+
};
47
pub(super) use single_partition::SinglePartitionShufflePartitioner;
58

69
mod multi_partition;
10+
mod partitioned_batches_producer;
711
mod single_partition;
812

913
#[async_trait::async_trait]

native/core/src/execution/shuffle/repartitioners/multi_partition.rs renamed to native/core/src/execution/shuffle/partitioners/multi_partition.rs

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
use crate::execution::shuffle::repartitioners::ShufflePartitioner;
2-
use crate::execution::shuffle::shuffle_writer::{
3-
BufBatchWriter, PartitionWriter, PartitionedBatchIterator, PartitionedBatchesProducer,
4-
ShuffleRepartitionerMetrics,
1+
use crate::execution::shuffle::metrics::ShuffleRepartitionerMetrics;
2+
use crate::execution::shuffle::partitioners::{
3+
PartitionedBatchIterator, PartitionedBatchesProducer, ShufflePartitioner,
54
};
5+
use crate::execution::shuffle::writers::{BufBatchWriter, PartitionWriter};
66
use crate::execution::shuffle::{
77
shuffle_writer, CometPartitioning, CompressionCodec, ShuffleBlockWriter,
88
};
@@ -24,7 +24,7 @@ use std::sync::Arc;
2424
use tokio::time::Instant;
2525

2626
/// A partitioner that uses a hash function to partition data into multiple partitions
27-
pub(crate) struct MultiPartitionShuffleRepartitioner {
27+
pub(crate) struct MultiPartitionShufflePartitioner {
2828
output_data_file: String,
2929
output_index_file: String,
3030
buffered_batches: Vec<RecordBatch>,
@@ -62,7 +62,7 @@ struct ScratchSpace {
6262
partition_starts: Vec<u32>,
6363
}
6464

65-
impl MultiPartitionShuffleRepartitioner {
65+
impl MultiPartitionShufflePartitioner {
6666
#[allow(clippy::too_many_arguments)]
6767
pub(crate) fn try_new(
6868
partition: usize,
@@ -509,14 +509,14 @@ impl MultiPartitionShuffleRepartitioner {
509509
}
510510

511511
#[cfg(test)]
512-
impl MultiPartitionShuffleRepartitioner {
512+
impl MultiPartitionShufflePartitioner {
513513
pub(crate) fn partition_writers(&self) -> &[PartitionWriter] {
514514
&self.partition_writers
515515
}
516516
}
517517

518518
#[async_trait::async_trait]
519-
impl ShufflePartitioner for MultiPartitionShuffleRepartitioner {
519+
impl ShufflePartitioner for MultiPartitionShufflePartitioner {
520520
/// Shuffles rows in input batch into corresponding partition buffer.
521521
/// This function will slice input batch according to configured batch size and then
522522
/// shuffle rows into corresponding partition buffer.
@@ -567,10 +567,9 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner {
567567

568568
// if we wrote a spill file for this partition then copy the
569569
// contents into the shuffle file
570-
if let Some(spill_data) = self.partition_writers[i].spill_file.as_ref() {
570+
if let Some(spill_data) = self.partition_writers[i].spill_file() {
571571
let mut spill_file = BufReader::new(
572-
File::open(spill_data.temp_file.path())
573-
.map_err(shuffle_writer::to_df_err)?,
572+
File::open(spill_data.path()).map_err(shuffle_writer::to_df_err)?,
574573
);
575574
let mut write_timer = self.metrics.write_time.timer();
576575
std::io::copy(&mut spill_file, &mut output_data)
@@ -622,7 +621,7 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner {
622621
}
623622
}
624623

625-
impl Debug for MultiPartitionShuffleRepartitioner {
624+
impl Debug for MultiPartitionShufflePartitioner {
626625
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
627626
f.debug_struct("ShuffleRepartitioner")
628627
.field("memory_used", &self.used())
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
use arrow::array::RecordBatch;
2+
use arrow::compute::interleave_record_batch;
3+
use datafusion::common::DataFusionError;
4+
5+
/// A helper struct to produce shuffled batches.
6+
/// This struct takes ownership of the buffered batches and partition indices from the
7+
/// ShuffleRepartitioner, and provides an iterator over the batches in the specified partitions.
8+
pub(crate) struct PartitionedBatchesProducer {
9+
buffered_batches: Vec<RecordBatch>,
10+
partition_indices: Vec<Vec<(u32, u32)>>,
11+
batch_size: usize,
12+
}
13+
14+
impl PartitionedBatchesProducer {
15+
pub(crate) fn new(
16+
buffered_batches: Vec<RecordBatch>,
17+
indices: Vec<Vec<(u32, u32)>>,
18+
batch_size: usize,
19+
) -> Self {
20+
Self {
21+
partition_indices: indices,
22+
buffered_batches,
23+
batch_size,
24+
}
25+
}
26+
27+
pub(crate) fn produce(&mut self, partition_id: usize) -> PartitionedBatchIterator<'_> {
28+
PartitionedBatchIterator::new(
29+
&self.partition_indices[partition_id],
30+
&self.buffered_batches,
31+
self.batch_size,
32+
)
33+
}
34+
}
35+
36+
pub(crate) struct PartitionedBatchIterator<'a> {
37+
record_batches: Vec<&'a RecordBatch>,
38+
batch_size: usize,
39+
indices: Vec<(usize, usize)>,
40+
pos: usize,
41+
}
42+
43+
impl<'a> PartitionedBatchIterator<'a> {
44+
fn new(
45+
indices: &'a [(u32, u32)],
46+
buffered_batches: &'a [RecordBatch],
47+
batch_size: usize,
48+
) -> Self {
49+
if indices.is_empty() {
50+
// Avoid unnecessary allocations when the partition is empty
51+
return Self {
52+
record_batches: vec![],
53+
batch_size,
54+
indices: vec![],
55+
pos: 0,
56+
};
57+
}
58+
let record_batches = buffered_batches.iter().collect::<Vec<_>>();
59+
let current_indices = indices
60+
.iter()
61+
.map(|(i_batch, i_row)| (*i_batch as usize, *i_row as usize))
62+
.collect::<Vec<_>>();
63+
Self {
64+
record_batches,
65+
batch_size,
66+
indices: current_indices,
67+
pos: 0,
68+
}
69+
}
70+
}
71+
72+
impl Iterator for PartitionedBatchIterator<'_> {
73+
type Item = datafusion::common::Result<RecordBatch>;
74+
75+
fn next(&mut self) -> Option<Self::Item> {
76+
if self.pos >= self.indices.len() {
77+
return None;
78+
}
79+
80+
let indices_end = std::cmp::min(self.pos + self.batch_size, self.indices.len());
81+
let indices = &self.indices[self.pos..indices_end];
82+
match interleave_record_batch(&self.record_batches, indices) {
83+
Ok(batch) => {
84+
self.pos = indices_end;
85+
Some(Ok(batch))
86+
}
87+
Err(e) => Some(Err(DataFusionError::ArrowError(
88+
Box::from(e),
89+
Some(DataFusionError::get_back_trace()),
90+
))),
91+
}
92+
}
93+
}

native/core/src/execution/shuffle/repartitioners/single_partition.rs renamed to native/core/src/execution/shuffle/partitioners/single_partition.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1-
use crate::execution::shuffle::repartitioners::ShufflePartitioner;
2-
use crate::execution::shuffle::shuffle_writer::{BufBatchWriter, ShuffleRepartitionerMetrics};
1+
use crate::execution::shuffle::metrics::ShuffleRepartitionerMetrics;
2+
use crate::execution::shuffle::partitioners::ShufflePartitioner;
3+
use crate::execution::shuffle::writers::BufBatchWriter;
34
use crate::execution::shuffle::{shuffle_writer, CompressionCodec, ShuffleBlockWriter};
45
use arrow::array::RecordBatch;
56
use arrow::datatypes::SchemaRef;
67
use datafusion::common::DataFusionError;
78
use std::fs::{File, OpenOptions};
8-
use std::io::{BufWriter, Seek, Write};
9+
use std::io::{BufWriter, Write};
910
use tokio::time::Instant;
1011

1112
/// A partitioner that writes all shuffle data to a single file and a single index file
@@ -155,11 +156,7 @@ impl ShufflePartitioner for SinglePartitionShufflePartitioner {
155156
.open(self.output_index_path.clone())
156157
.map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?;
157158
let mut index_buf_writer = BufWriter::new(index_file);
158-
let data_file_length = self
159-
.output_data_writer
160-
.writer
161-
.stream_position()
162-
.map_err(shuffle_writer::to_df_err)?;
159+
let data_file_length = self.output_data_writer.writer_position()?;
163160
for offset in [0, data_file_length] {
164161
index_buf_writer
165162
.write_all(&(offset as i64).to_le_bytes()[..])

0 commit comments

Comments
 (0)