From 40fb9f32b26e70c7eea0b2365c9f6d83dfcb528f Mon Sep 17 00:00:00 2001 From: Emily Matheys Date: Wed, 4 Feb 2026 08:42:12 +0200 Subject: [PATCH 1/3] chore: Move writer-related logic to "writers" module --- native/core/src/execution/shuffle/mod.rs | 1 + .../src/execution/shuffle/shuffle_writer.rs | 165 ++---------------- .../shuffle/writers/buf_batch_writer.rs | 65 +++++++ .../core/src/execution/shuffle/writers/mod.rs | 5 + .../shuffle/writers/partition_writer.rs | 105 +++++++++++ 5 files changed, 188 insertions(+), 153 deletions(-) create mode 100644 native/core/src/execution/shuffle/writers/buf_batch_writer.rs create mode 100644 native/core/src/execution/shuffle/writers/mod.rs create mode 100644 native/core/src/execution/shuffle/writers/partition_writer.rs diff --git a/native/core/src/execution/shuffle/mod.rs b/native/core/src/execution/shuffle/mod.rs index a72258322a..a41d269d80 100644 --- a/native/core/src/execution/shuffle/mod.rs +++ b/native/core/src/execution/shuffle/mod.rs @@ -20,6 +20,7 @@ mod comet_partitioning; mod metrics; mod shuffle_writer; pub mod spark_unsafe; +mod writers; pub use codec::{read_ipc_compressed, CompressionCodec, ShuffleBlockWriter}; pub use comet_partitioning::CometPartitioning; diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs b/native/core/src/execution/shuffle/shuffle_writer.rs index 5c68940b98..2d2167b86b 100644 --- a/native/core/src/execution/shuffle/shuffle_writer.rs +++ b/native/core/src/execution/shuffle/shuffle_writer.rs @@ -18,6 +18,7 @@ //! Defines the External shuffle repartition plan. use crate::execution::shuffle::metrics::ShufflePartitionerMetrics; +use crate::execution::shuffle::writers::{BufBatchWriter, PartitionWriter}; use crate::execution::shuffle::{CometPartitioning, CompressionCodec, ShuffleBlockWriter}; use crate::execution::tracing::{with_trace, with_trace_async}; use arrow::compute::interleave_record_batch; @@ -31,7 +32,6 @@ use datafusion::{ error::{DataFusionError, Result}, execution::{ context::TaskContext, - disk_manager::RefCountedTempFile, memory_pool::{MemoryConsumer, MemoryReservation}, runtime_env::RuntimeEnv, }, @@ -45,8 +45,7 @@ use datafusion::{ use datafusion_comet_spark_expr::hash_funcs::murmur3::create_murmur3_hashes; use futures::{StreamExt, TryFutureExt, TryStreamExt}; use itertools::Itertools; -use std::borrow::Borrow; -use std::io::{Cursor, Error, SeekFrom}; +use std::io::Error; use std::{ any::Any, fmt, @@ -803,9 +802,8 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner { // if we wrote a spill file for this partition then copy the // contents into the shuffle file - if let Some(spill_data) = self.partition_writers[i].spill_file.as_ref() { - let mut spill_file = - BufReader::new(File::open(spill_data.temp_file.path()).map_err(to_df_err)?); + if let Some(spill_path) = self.partition_writers[i].path() { + let mut spill_file = BufReader::new(File::open(spill_path).map_err(to_df_err)?); let mut write_timer = self.metrics.write_time.timer(); std::io::copy(&mut spill_file, &mut output_data).map_err(to_df_err)?; write_timer.stop(); @@ -1011,11 +1009,7 @@ impl ShufflePartitioner for SinglePartitionShufflePartitioner { .open(self.output_index_path.clone()) .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?; let mut index_buf_writer = BufWriter::new(index_file); - let data_file_length = self - .output_data_writer - .writer - .stream_position() - .map_err(to_df_err)?; + let data_file_length = self.output_data_writer.writer_stream_position()?; for offset in [0, data_file_length] { index_buf_writer .write_all(&(offset as i64).to_le_bytes()[..]) @@ -1038,14 +1032,14 @@ fn to_df_err(e: Error) -> DataFusionError { /// A helper struct to produce shuffled batches. /// This struct takes ownership of the buffered batches and partition indices from the /// ShuffleRepartitioner, and provides an iterator over the batches in the specified partitions. -struct PartitionedBatchesProducer { +pub(crate) struct PartitionedBatchesProducer { buffered_batches: Vec, partition_indices: Vec>, batch_size: usize, } impl PartitionedBatchesProducer { - fn new( + pub(crate) fn new( buffered_batches: Vec, indices: Vec>, batch_size: usize, @@ -1066,7 +1060,7 @@ impl PartitionedBatchesProducer { } } -struct PartitionedBatchIterator<'a> { +pub(crate) struct PartitionedBatchIterator<'a> { record_batches: Vec<&'a RecordBatch>, batch_size: usize, indices: Vec<(usize, usize)>, @@ -1125,141 +1119,6 @@ impl Iterator for PartitionedBatchIterator<'_> { } } -struct PartitionWriter { - /// Spill file for intermediate shuffle output for this partition. Each spill event - /// will append to this file and the contents will be copied to the shuffle file at - /// the end of processing. - spill_file: Option, - /// Writer that performs encoding and compression - shuffle_block_writer: ShuffleBlockWriter, -} - -struct SpillFile { - temp_file: RefCountedTempFile, - file: File, -} - -impl PartitionWriter { - fn try_new(shuffle_block_writer: ShuffleBlockWriter) -> Result { - Ok(Self { - spill_file: None, - shuffle_block_writer, - }) - } - - fn spill( - &mut self, - iter: &mut PartitionedBatchIterator, - runtime: &RuntimeEnv, - metrics: &ShufflePartitionerMetrics, - write_buffer_size: usize, - ) -> Result { - if let Some(batch) = iter.next() { - self.ensure_spill_file_created(runtime)?; - - let total_bytes_written = { - let mut buf_batch_writer = BufBatchWriter::new( - &mut self.shuffle_block_writer, - &mut self.spill_file.as_mut().unwrap().file, - write_buffer_size, - ); - let mut bytes_written = - buf_batch_writer.write(&batch?, &metrics.encode_time, &metrics.write_time)?; - for batch in iter { - let batch = batch?; - bytes_written += buf_batch_writer.write( - &batch, - &metrics.encode_time, - &metrics.write_time, - )?; - } - buf_batch_writer.flush(&metrics.write_time)?; - bytes_written - }; - - Ok(total_bytes_written) - } else { - Ok(0) - } - } - - fn ensure_spill_file_created(&mut self, runtime: &RuntimeEnv) -> Result<()> { - if self.spill_file.is_none() { - // Spill file is not yet created, create it - let spill_file = runtime - .disk_manager - .create_tmp_file("shuffle writer spill")?; - let spill_data = OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(spill_file.path()) - .map_err(|e| { - DataFusionError::Execution(format!("Error occurred while spilling {e}")) - })?; - self.spill_file = Some(SpillFile { - temp_file: spill_file, - file: spill_data, - }); - } - Ok(()) - } -} - -/// Write batches to writer while using a buffer to avoid frequent system calls. -/// The record batches were first written by ShuffleBlockWriter into an internal buffer. -/// Once the buffer exceeds the max size, the buffer will be flushed to the writer. -struct BufBatchWriter, W: Write> { - shuffle_block_writer: S, - writer: W, - buffer: Vec, - buffer_max_size: usize, -} - -impl, W: Write> BufBatchWriter { - fn new(shuffle_block_writer: S, writer: W, buffer_max_size: usize) -> Self { - Self { - shuffle_block_writer, - writer, - buffer: vec![], - buffer_max_size, - } - } - - fn write( - &mut self, - batch: &RecordBatch, - encode_time: &Time, - write_time: &Time, - ) -> Result { - let mut cursor = Cursor::new(&mut self.buffer); - cursor.seek(SeekFrom::End(0))?; - let bytes_written = - self.shuffle_block_writer - .borrow() - .write_batch(batch, &mut cursor, encode_time)?; - let pos = cursor.position(); - if pos >= self.buffer_max_size as u64 { - let mut write_timer = write_time.timer(); - self.writer.write_all(&self.buffer)?; - write_timer.stop(); - self.buffer.clear(); - } - Ok(bytes_written) - } - - fn flush(&mut self, write_time: &Time) -> Result<()> { - let mut write_timer = write_time.timer(); - if !self.buffer.is_empty() { - self.writer.write_all(&self.buffer)?; - } - self.writer.flush()?; - write_timer.stop(); - self.buffer.clear(); - Ok(()) - } -} - fn pmod(hash: u32, n: usize) -> usize { let hash = hash as i32; let n = n as i32; @@ -1371,14 +1230,14 @@ mod test { assert_eq!(2, repartitioner.partition_writers.len()); - assert!(repartitioner.partition_writers[0].spill_file.is_none()); - assert!(repartitioner.partition_writers[1].spill_file.is_none()); + assert!(!repartitioner.partition_writers[0].has_spill_file()); + assert!(!repartitioner.partition_writers[1].has_spill_file()); repartitioner.spill().unwrap(); // after spill, there should be spill files - assert!(repartitioner.partition_writers[0].spill_file.is_some()); - assert!(repartitioner.partition_writers[1].spill_file.is_some()); + assert!(repartitioner.partition_writers[0].has_spill_file()); + assert!(repartitioner.partition_writers[1].has_spill_file()); // insert another batch after spilling repartitioner.insert_batch(batch.clone()).await.unwrap(); diff --git a/native/core/src/execution/shuffle/writers/buf_batch_writer.rs b/native/core/src/execution/shuffle/writers/buf_batch_writer.rs new file mode 100644 index 0000000000..ad582a5066 --- /dev/null +++ b/native/core/src/execution/shuffle/writers/buf_batch_writer.rs @@ -0,0 +1,65 @@ +use crate::execution::shuffle::ShuffleBlockWriter; +use arrow::array::RecordBatch; +use datafusion::physical_plan::metrics::Time; +use std::borrow::Borrow; +use std::io::{Cursor, Seek, SeekFrom, Write}; + +/// Write batches to writer while using a buffer to avoid frequent system calls. +/// The record batches were first written by ShuffleBlockWriter into an internal buffer. +/// Once the buffer exceeds the max size, the buffer will be flushed to the writer. +pub(crate) struct BufBatchWriter, W: Write> { + shuffle_block_writer: S, + writer: W, + buffer: Vec, + buffer_max_size: usize, +} + +impl, W: Write> BufBatchWriter { + pub(crate) fn new(shuffle_block_writer: S, writer: W, buffer_max_size: usize) -> Self { + Self { + shuffle_block_writer, + writer, + buffer: vec![], + buffer_max_size, + } + } + + pub(crate) fn write( + &mut self, + batch: &RecordBatch, + encode_time: &Time, + write_time: &Time, + ) -> datafusion::common::Result { + let mut cursor = Cursor::new(&mut self.buffer); + cursor.seek(SeekFrom::End(0))?; + let bytes_written = + self.shuffle_block_writer + .borrow() + .write_batch(batch, &mut cursor, encode_time)?; + let pos = cursor.position(); + if pos >= self.buffer_max_size as u64 { + let mut write_timer = write_time.timer(); + self.writer.write_all(&self.buffer)?; + write_timer.stop(); + self.buffer.clear(); + } + Ok(bytes_written) + } + + pub(crate) fn flush(&mut self, write_time: &Time) -> datafusion::common::Result<()> { + let mut write_timer = write_time.timer(); + if !self.buffer.is_empty() { + self.writer.write_all(&self.buffer)?; + } + self.writer.flush()?; + write_timer.stop(); + self.buffer.clear(); + Ok(()) + } +} + +impl, W: Write + Seek> BufBatchWriter { + pub(crate) fn writer_stream_position(&mut self) -> datafusion::common::Result { + self.writer.stream_position().map_err(Into::into) + } +} diff --git a/native/core/src/execution/shuffle/writers/mod.rs b/native/core/src/execution/shuffle/writers/mod.rs new file mode 100644 index 0000000000..ec5fb8bf2d --- /dev/null +++ b/native/core/src/execution/shuffle/writers/mod.rs @@ -0,0 +1,5 @@ +mod buf_batch_writer; +mod partition_writer; + +pub(super) use buf_batch_writer::BufBatchWriter; +pub(super) use partition_writer::PartitionWriter; diff --git a/native/core/src/execution/shuffle/writers/partition_writer.rs b/native/core/src/execution/shuffle/writers/partition_writer.rs new file mode 100644 index 0000000000..2447fcb482 --- /dev/null +++ b/native/core/src/execution/shuffle/writers/partition_writer.rs @@ -0,0 +1,105 @@ +use crate::execution::shuffle::metrics::ShufflePartitionerMetrics; +use crate::execution::shuffle::shuffle_writer::PartitionedBatchIterator; +use crate::execution::shuffle::writers::buf_batch_writer::BufBatchWriter; +use crate::execution::shuffle::ShuffleBlockWriter; +use datafusion::common::DataFusionError; +use datafusion::execution::disk_manager::RefCountedTempFile; +use datafusion::execution::runtime_env::RuntimeEnv; +use std::fs::{File, OpenOptions}; + +struct SpillFile { + temp_file: RefCountedTempFile, + file: File, +} + +pub(crate) struct PartitionWriter { + /// Spill file for intermediate shuffle output for this partition. Each spill event + /// will append to this file and the contents will be copied to the shuffle file at + /// the end of processing. + spill_file: Option, + /// Writer that performs encoding and compression + shuffle_block_writer: ShuffleBlockWriter, +} + +impl PartitionWriter { + pub(crate) fn try_new( + shuffle_block_writer: ShuffleBlockWriter, + ) -> datafusion::common::Result { + Ok(Self { + spill_file: None, + shuffle_block_writer, + }) + } + + fn ensure_spill_file_created( + &mut self, + runtime: &RuntimeEnv, + ) -> datafusion::common::Result<()> { + if self.spill_file.is_none() { + // Spill file is not yet created, create it + let spill_file = runtime + .disk_manager + .create_tmp_file("shuffle writer spill")?; + let spill_data = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(spill_file.path()) + .map_err(|e| { + DataFusionError::Execution(format!("Error occurred while spilling {e}")) + })?; + self.spill_file = Some(SpillFile { + temp_file: spill_file, + file: spill_data, + }); + } + Ok(()) + } + + pub(crate) fn spill( + &mut self, + iter: &mut PartitionedBatchIterator, + runtime: &RuntimeEnv, + metrics: &ShufflePartitionerMetrics, + write_buffer_size: usize, + ) -> datafusion::common::Result { + if let Some(batch) = iter.next() { + self.ensure_spill_file_created(runtime)?; + + let total_bytes_written = { + let mut buf_batch_writer = BufBatchWriter::new( + &mut self.shuffle_block_writer, + &mut self.spill_file.as_mut().unwrap().file, + write_buffer_size, + ); + let mut bytes_written = + buf_batch_writer.write(&batch?, &metrics.encode_time, &metrics.write_time)?; + for batch in iter { + let batch = batch?; + bytes_written += buf_batch_writer.write( + &batch, + &metrics.encode_time, + &metrics.write_time, + )?; + } + buf_batch_writer.flush(&metrics.write_time)?; + bytes_written + }; + + Ok(total_bytes_written) + } else { + Ok(0) + } + } + + pub(crate) fn path(&self) -> Option<&std::path::Path> { + self.spill_file + .as_ref() + .map(|spill_file| spill_file.temp_file.path()) + } + + #[cfg(test)] + pub(crate) fn has_spill_file(&self) -> bool { + self.spill_file.is_some() + } +} From dd5369a21e30f5f01b78bd0b6d03e84b5512eb46 Mon Sep 17 00:00:00 2001 From: Emily Matheys Date: Wed, 4 Feb 2026 08:45:25 +0200 Subject: [PATCH 2/3] add licenses --- .../shuffle/writers/buf_batch_writer.rs | 17 +++++++++++++++++ .../core/src/execution/shuffle/writers/mod.rs | 17 +++++++++++++++++ .../shuffle/writers/partition_writer.rs | 17 +++++++++++++++++ 3 files changed, 51 insertions(+) diff --git a/native/core/src/execution/shuffle/writers/buf_batch_writer.rs b/native/core/src/execution/shuffle/writers/buf_batch_writer.rs index ad582a5066..8428151dda 100644 --- a/native/core/src/execution/shuffle/writers/buf_batch_writer.rs +++ b/native/core/src/execution/shuffle/writers/buf_batch_writer.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use crate::execution::shuffle::ShuffleBlockWriter; use arrow::array::RecordBatch; use datafusion::physical_plan::metrics::Time; diff --git a/native/core/src/execution/shuffle/writers/mod.rs b/native/core/src/execution/shuffle/writers/mod.rs index ec5fb8bf2d..d41363b7fb 100644 --- a/native/core/src/execution/shuffle/writers/mod.rs +++ b/native/core/src/execution/shuffle/writers/mod.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + mod buf_batch_writer; mod partition_writer; diff --git a/native/core/src/execution/shuffle/writers/partition_writer.rs b/native/core/src/execution/shuffle/writers/partition_writer.rs index 2447fcb482..8b2555e09c 100644 --- a/native/core/src/execution/shuffle/writers/partition_writer.rs +++ b/native/core/src/execution/shuffle/writers/partition_writer.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use crate::execution::shuffle::metrics::ShufflePartitionerMetrics; use crate::execution::shuffle::shuffle_writer::PartitionedBatchIterator; use crate::execution::shuffle::writers::buf_batch_writer::BufBatchWriter; From 11a5ebec9be031a9c82065f0f720ad31a8cdad97 Mon Sep 17 00:00:00 2001 From: Emily Matheys Date: Wed, 4 Feb 2026 10:17:49 +0200 Subject: [PATCH 3/3] remove to_df_err --- .../src/execution/shuffle/shuffle_writer.rs | 32 ++++++++----------- 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs b/native/core/src/execution/shuffle/shuffle_writer.rs index 2d2167b86b..669a6df976 100644 --- a/native/core/src/execution/shuffle/shuffle_writer.rs +++ b/native/core/src/execution/shuffle/shuffle_writer.rs @@ -23,6 +23,7 @@ use crate::execution::shuffle::{CometPartitioning, CompressionCodec, ShuffleBloc use crate::execution::tracing::{with_trace, with_trace_async}; use arrow::compute::interleave_record_batch; use async_trait::async_trait; +use datafusion::common::exec_datafusion_err; use datafusion::common::utils::proxy::VecAllocExt; use datafusion::physical_expr::{EquivalenceProperties, Partitioning}; use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; @@ -45,7 +46,6 @@ use datafusion::{ use datafusion_comet_spark_expr::hash_funcs::murmur3::create_murmur3_hashes; use futures::{StreamExt, TryFutureExt, TryStreamExt}; use itertools::Itertools; -use std::io::Error; use std::{ any::Any, fmt, @@ -255,10 +255,15 @@ async fn external_shuffle( // into the corresponding partition buffer. // Otherwise, pull the next batch from the input stream might overwrite the // current batch in the repartitioner. - repartitioner.insert_batch(batch?).await?; + repartitioner + .insert_batch(batch?) + .await + .map_err(|err| exec_datafusion_err!("Error inserting batch: {err}"))?; } - repartitioner.shuffle_write()?; + repartitioner + .shuffle_write() + .map_err(|err| exec_datafusion_err!("Error in shuffle write: {err}"))?; // shuffle writer always has empty output Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone(&schema))) as SendableRecordBatchStream) @@ -803,9 +808,9 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner { // if we wrote a spill file for this partition then copy the // contents into the shuffle file if let Some(spill_path) = self.partition_writers[i].path() { - let mut spill_file = BufReader::new(File::open(spill_path).map_err(to_df_err)?); + let mut spill_file = BufReader::new(File::open(spill_path)?); let mut write_timer = self.metrics.write_time.timer(); - std::io::copy(&mut spill_file, &mut output_data).map_err(to_df_err)?; + std::io::copy(&mut spill_file, &mut output_data)?; write_timer.stop(); } @@ -826,7 +831,7 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner { write_timer.stop(); // add one extra offset at last to ease partition length computation - offsets[num_output_partitions] = output_data.stream_position().map_err(to_df_err)?; + offsets[num_output_partitions] = output_data.stream_position()?; let mut write_timer = self.metrics.write_time.timer(); let mut output_index = @@ -834,9 +839,7 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner { DataFusionError::Execution(format!("shuffle write error: {e:?}")) })?); for offset in offsets { - output_index - .write_all(&(offset as i64).to_le_bytes()[..]) - .map_err(to_df_err)?; + output_index.write_all(&(offset as i64).to_le_bytes()[..])?; } output_index.flush()?; write_timer.stop(); @@ -893,8 +896,7 @@ impl SinglePartitionShufflePartitioner { .write(true) .create(true) .truncate(true) - .open(output_data_path) - .map_err(to_df_err)?; + .open(output_data_path)?; let output_data_writer = BufBatchWriter::new(shuffle_block_writer, output_data_file, write_buffer_size); @@ -1011,9 +1013,7 @@ impl ShufflePartitioner for SinglePartitionShufflePartitioner { let mut index_buf_writer = BufWriter::new(index_file); let data_file_length = self.output_data_writer.writer_stream_position()?; for offset in [0, data_file_length] { - index_buf_writer - .write_all(&(offset as i64).to_le_bytes()[..]) - .map_err(to_df_err)?; + index_buf_writer.write_all(&(offset as i64).to_le_bytes()[..])?; } index_buf_writer.flush()?; @@ -1025,10 +1025,6 @@ impl ShufflePartitioner for SinglePartitionShufflePartitioner { } } -fn to_df_err(e: Error) -> DataFusionError { - DataFusionError::Execution(format!("shuffle write error: {e:?}")) -} - /// A helper struct to produce shuffled batches. /// This struct takes ownership of the buffered batches and partition indices from the /// ShuffleRepartitioner, and provides an iterator over the batches in the specified partitions.