Skip to content

Commit b96aea6

Browse files
committed
chore: Cleanup shuffle structure
1 parent d0b154b commit b96aea6

11 files changed

Lines changed: 898 additions & 838 deletions

File tree

native/core/benches/row_columnar.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
// under the License.
1717

1818
use arrow::datatypes::DataType as ArrowDataType;
19-
use comet::execution::shuffle::row::{
19+
use comet::execution::shuffle::spark_unsafe::row::{
2020
process_sorted_row_partition, SparkUnsafeObject, SparkUnsafeRow,
2121
};
2222
use comet::execution::shuffle::CompressionCodec;

native/core/src/execution/jni_api.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use crate::{
2222
errors::{try_unwrap_or_throw, CometError, CometResult},
2323
execution::{
2424
metrics::utils::update_comet_metric, planner::PhysicalPlanner, serde::to_arrow_datatype,
25-
shuffle::row::process_sorted_row_partition, sort::RdxSort,
25+
shuffle::spark_unsafe::row::process_sorted_row_partition, sort::RdxSort,
2626
},
2727
jvm_bridge::{jni_new_global_ref, JVMClasses},
2828
};

native/core/src/execution/shuffle/mod.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,9 @@
1717

1818
pub(crate) mod codec;
1919
mod comet_partitioning;
20-
mod list;
21-
mod map;
22-
pub mod row;
20+
mod repartitioners;
2321
mod shuffle_writer;
22+
pub mod spark_unsafe;
2423

2524
pub use codec::{read_ipc_compressed, CompressionCodec, ShuffleBlockWriter};
2625
pub use comet_partitioning::CometPartitioning;
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
use arrow::array::RecordBatch;
2+
3+
pub(super) use multi_partition::MultiPartitionShuffleRepartitioner;
4+
pub(super) use single_partition::SinglePartitionShufflePartitioner;
5+
6+
mod multi_partition;
7+
mod single_partition;
8+
9+
#[async_trait::async_trait]
10+
pub(super) trait ShufflePartitioner: Send + Sync {
11+
/// Insert a batch into the partitioner
12+
async fn insert_batch(&mut self, batch: RecordBatch) -> datafusion::common::Result<()>;
13+
/// Write shuffle data and shuffle index file to disk
14+
fn shuffle_write(&mut self) -> datafusion::common::Result<()>;
15+
}

native/core/src/execution/shuffle/repartitioners/multi_partition.rs

Lines changed: 634 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
use crate::execution::shuffle::repartitioners::ShufflePartitioner;
2+
use crate::execution::shuffle::shuffle_writer::{BufBatchWriter, ShuffleRepartitionerMetrics};
3+
use crate::execution::shuffle::{shuffle_writer, CompressionCodec, ShuffleBlockWriter};
4+
use arrow::array::RecordBatch;
5+
use arrow::datatypes::SchemaRef;
6+
use datafusion::common::DataFusionError;
7+
use std::fs::{File, OpenOptions};
8+
use std::io::{BufWriter, Seek, Write};
9+
use tokio::time::Instant;
10+
11+
/// A partitioner that writes all shuffle data to a single file and a single index file
12+
pub(crate) struct SinglePartitionShufflePartitioner {
13+
// output_data_file: File,
14+
output_data_writer: BufBatchWriter<ShuffleBlockWriter, File>,
15+
output_index_path: String,
16+
/// Batches that are smaller than the batch size and to be concatenated
17+
buffered_batches: Vec<RecordBatch>,
18+
/// Number of rows in the concatenating batches
19+
num_buffered_rows: usize,
20+
/// Metrics for the repartitioner
21+
metrics: ShuffleRepartitionerMetrics,
22+
/// The configured batch size
23+
batch_size: usize,
24+
}
25+
26+
impl SinglePartitionShufflePartitioner {
27+
pub(crate) fn try_new(
28+
output_data_path: String,
29+
output_index_path: String,
30+
schema: SchemaRef,
31+
metrics: ShuffleRepartitionerMetrics,
32+
batch_size: usize,
33+
codec: CompressionCodec,
34+
write_buffer_size: usize,
35+
) -> datafusion::common::Result<Self> {
36+
let shuffle_block_writer = ShuffleBlockWriter::try_new(schema.as_ref(), codec.clone())?;
37+
38+
let output_data_file = OpenOptions::new()
39+
.write(true)
40+
.create(true)
41+
.truncate(true)
42+
.open(output_data_path)
43+
.map_err(shuffle_writer::to_df_err)?;
44+
45+
let output_data_writer =
46+
BufBatchWriter::new(shuffle_block_writer, output_data_file, write_buffer_size);
47+
48+
Ok(Self {
49+
output_data_writer,
50+
output_index_path,
51+
buffered_batches: vec![],
52+
num_buffered_rows: 0,
53+
metrics,
54+
batch_size,
55+
})
56+
}
57+
58+
/// Add a batch to the buffer of the partitioner, these buffered batches will be concatenated
59+
/// and written to the output data file when the number of rows in the buffer reaches the batch size.
60+
fn add_buffered_batch(&mut self, batch: RecordBatch) {
61+
self.num_buffered_rows += batch.num_rows();
62+
self.buffered_batches.push(batch);
63+
}
64+
65+
/// Consumes buffered batches and return a concatenated batch if successful
66+
fn concat_buffered_batches(&mut self) -> datafusion::common::Result<Option<RecordBatch>> {
67+
if self.buffered_batches.is_empty() {
68+
Ok(None)
69+
} else if self.buffered_batches.len() == 1 {
70+
let batch = self.buffered_batches.remove(0);
71+
self.num_buffered_rows = 0;
72+
Ok(Some(batch))
73+
} else {
74+
let schema = &self.buffered_batches[0].schema();
75+
match arrow::compute::concat_batches(schema, self.buffered_batches.iter()) {
76+
Ok(concatenated) => {
77+
self.buffered_batches.clear();
78+
self.num_buffered_rows = 0;
79+
Ok(Some(concatenated))
80+
}
81+
Err(e) => Err(DataFusionError::ArrowError(
82+
Box::from(e),
83+
Some(DataFusionError::get_back_trace()),
84+
)),
85+
}
86+
}
87+
}
88+
}
89+
90+
#[async_trait::async_trait]
91+
impl ShufflePartitioner for SinglePartitionShufflePartitioner {
92+
async fn insert_batch(&mut self, batch: RecordBatch) -> datafusion::common::Result<()> {
93+
let start_time = Instant::now();
94+
let num_rows = batch.num_rows();
95+
96+
if num_rows > 0 {
97+
self.metrics.data_size.add(batch.get_array_memory_size());
98+
self.metrics.baseline.record_output(num_rows);
99+
100+
if num_rows >= self.batch_size || num_rows + self.num_buffered_rows > self.batch_size {
101+
let concatenated_batch = self.concat_buffered_batches()?;
102+
103+
// Write the concatenated buffered batch
104+
if let Some(batch) = concatenated_batch {
105+
self.output_data_writer.write(
106+
&batch,
107+
&self.metrics.encode_time,
108+
&self.metrics.write_time,
109+
)?;
110+
}
111+
112+
if num_rows >= self.batch_size {
113+
// Write the new batch
114+
self.output_data_writer.write(
115+
&batch,
116+
&self.metrics.encode_time,
117+
&self.metrics.write_time,
118+
)?;
119+
} else {
120+
// Add the new batch to the buffer
121+
self.add_buffered_batch(batch);
122+
}
123+
} else {
124+
self.add_buffered_batch(batch);
125+
}
126+
}
127+
128+
self.metrics.input_batches.add(1);
129+
self.metrics
130+
.baseline
131+
.elapsed_compute()
132+
.add_duration(start_time.elapsed());
133+
Ok(())
134+
}
135+
136+
fn shuffle_write(&mut self) -> datafusion::common::Result<()> {
137+
let start_time = Instant::now();
138+
let concatenated_batch = self.concat_buffered_batches()?;
139+
140+
// Write the concatenated buffered batch
141+
if let Some(batch) = concatenated_batch {
142+
self.output_data_writer.write(
143+
&batch,
144+
&self.metrics.encode_time,
145+
&self.metrics.write_time,
146+
)?;
147+
}
148+
self.output_data_writer.flush(&self.metrics.write_time)?;
149+
150+
// Write index file. It should only contain 2 entries: 0 and the total number of bytes written
151+
let index_file = OpenOptions::new()
152+
.write(true)
153+
.create(true)
154+
.truncate(true)
155+
.open(self.output_index_path.clone())
156+
.map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?;
157+
let mut index_buf_writer = BufWriter::new(index_file);
158+
let data_file_length = self
159+
.output_data_writer
160+
.writer
161+
.stream_position()
162+
.map_err(shuffle_writer::to_df_err)?;
163+
for offset in [0, data_file_length] {
164+
index_buf_writer
165+
.write_all(&(offset as i64).to_le_bytes()[..])
166+
.map_err(shuffle_writer::to_df_err)?;
167+
}
168+
index_buf_writer.flush()?;
169+
170+
self.metrics
171+
.baseline
172+
.elapsed_compute()
173+
.add_duration(start_time.elapsed());
174+
Ok(())
175+
}
176+
}

0 commit comments

Comments
 (0)