Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions native/core/src/execution/shuffle/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod comet_partitioning;
mod metrics;
mod shuffle_writer;
pub mod spark_unsafe;
mod writers;

pub use codec::{read_ipc_compressed, CompressionCodec, ShuffleBlockWriter};
pub use comet_partitioning::CometPartitioning;
Expand Down
193 changes: 24 additions & 169 deletions native/core/src/execution/shuffle/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
//! Defines the External shuffle repartition plan.

use crate::execution::shuffle::metrics::ShufflePartitionerMetrics;
use crate::execution::shuffle::writers::{BufBatchWriter, PartitionWriter};
use crate::execution::shuffle::{CometPartitioning, CompressionCodec, ShuffleBlockWriter};
use crate::execution::tracing::{with_trace, with_trace_async};
use arrow::compute::interleave_record_batch;
use async_trait::async_trait;
use datafusion::common::exec_datafusion_err;
use datafusion::common::utils::proxy::VecAllocExt;
use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
Expand All @@ -31,7 +33,6 @@ use datafusion::{
error::{DataFusionError, Result},
execution::{
context::TaskContext,
disk_manager::RefCountedTempFile,
memory_pool::{MemoryConsumer, MemoryReservation},
runtime_env::RuntimeEnv,
},
Expand All @@ -45,8 +46,6 @@ use datafusion::{
use datafusion_comet_spark_expr::hash_funcs::murmur3::create_murmur3_hashes;
use futures::{StreamExt, TryFutureExt, TryStreamExt};
use itertools::Itertools;
use std::borrow::Borrow;
use std::io::{Cursor, Error, SeekFrom};
use std::{
any::Any,
fmt,
Expand Down Expand Up @@ -256,10 +255,15 @@ async fn external_shuffle(
// into the corresponding partition buffer.
// Otherwise, pull the next batch from the input stream might overwrite the
// current batch in the repartitioner.
repartitioner.insert_batch(batch?).await?;
repartitioner
.insert_batch(batch?)
.await
.map_err(|err| exec_datafusion_err!("Error inserting batch: {err}"))?;
}

repartitioner.shuffle_write()?;
repartitioner
.shuffle_write()
.map_err(|err| exec_datafusion_err!("Error in shuffle write: {err}"))?;

// shuffle writer always has empty output
Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone(&schema))) as SendableRecordBatchStream)
Expand Down Expand Up @@ -803,11 +807,10 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner {

// if we wrote a spill file for this partition then copy the
// contents into the shuffle file
if let Some(spill_data) = self.partition_writers[i].spill_file.as_ref() {
let mut spill_file =
BufReader::new(File::open(spill_data.temp_file.path()).map_err(to_df_err)?);
if let Some(spill_path) = self.partition_writers[i].path() {
let mut spill_file = BufReader::new(File::open(spill_path)?);
let mut write_timer = self.metrics.write_time.timer();
std::io::copy(&mut spill_file, &mut output_data).map_err(to_df_err)?;
std::io::copy(&mut spill_file, &mut output_data)?;
write_timer.stop();
}

Expand All @@ -828,17 +831,15 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner {
write_timer.stop();

// add one extra offset at last to ease partition length computation
offsets[num_output_partitions] = output_data.stream_position().map_err(to_df_err)?;
offsets[num_output_partitions] = output_data.stream_position()?;

let mut write_timer = self.metrics.write_time.timer();
let mut output_index =
BufWriter::new(File::create(index_file).map_err(|e| {
DataFusionError::Execution(format!("shuffle write error: {e:?}"))
})?);
for offset in offsets {
output_index
.write_all(&(offset as i64).to_le_bytes()[..])
.map_err(to_df_err)?;
output_index.write_all(&(offset as i64).to_le_bytes()[..])?;
}
output_index.flush()?;
write_timer.stop();
Expand Down Expand Up @@ -895,8 +896,7 @@ impl SinglePartitionShufflePartitioner {
.write(true)
.create(true)
.truncate(true)
.open(output_data_path)
.map_err(to_df_err)?;
.open(output_data_path)?;

let output_data_writer =
BufBatchWriter::new(shuffle_block_writer, output_data_file, write_buffer_size);
Expand Down Expand Up @@ -1011,15 +1011,9 @@ impl ShufflePartitioner for SinglePartitionShufflePartitioner {
.open(self.output_index_path.clone())
.map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?;
let mut index_buf_writer = BufWriter::new(index_file);
let data_file_length = self
.output_data_writer
.writer
.stream_position()
.map_err(to_df_err)?;
let data_file_length = self.output_data_writer.writer_stream_position()?;
for offset in [0, data_file_length] {
index_buf_writer
.write_all(&(offset as i64).to_le_bytes()[..])
.map_err(to_df_err)?;
index_buf_writer.write_all(&(offset as i64).to_le_bytes()[..])?;
}
index_buf_writer.flush()?;

Expand All @@ -1031,21 +1025,17 @@ impl ShufflePartitioner for SinglePartitionShufflePartitioner {
}
}

fn to_df_err(e: Error) -> DataFusionError {
DataFusionError::Execution(format!("shuffle write error: {e:?}"))
}

/// A helper struct to produce shuffled batches.
/// This struct takes ownership of the buffered batches and partition indices from the
/// ShuffleRepartitioner, and provides an iterator over the batches in the specified partitions.
struct PartitionedBatchesProducer {
pub(crate) struct PartitionedBatchesProducer {
buffered_batches: Vec<RecordBatch>,
partition_indices: Vec<Vec<(u32, u32)>>,
batch_size: usize,
}

impl PartitionedBatchesProducer {
fn new(
pub(crate) fn new(
buffered_batches: Vec<RecordBatch>,
indices: Vec<Vec<(u32, u32)>>,
batch_size: usize,
Expand All @@ -1066,7 +1056,7 @@ impl PartitionedBatchesProducer {
}
}

struct PartitionedBatchIterator<'a> {
pub(crate) struct PartitionedBatchIterator<'a> {
record_batches: Vec<&'a RecordBatch>,
batch_size: usize,
indices: Vec<(usize, usize)>,
Expand Down Expand Up @@ -1125,141 +1115,6 @@ impl Iterator for PartitionedBatchIterator<'_> {
}
}

struct PartitionWriter {
/// Spill file for intermediate shuffle output for this partition. Each spill event
/// will append to this file and the contents will be copied to the shuffle file at
/// the end of processing.
spill_file: Option<SpillFile>,
/// Writer that performs encoding and compression
shuffle_block_writer: ShuffleBlockWriter,
}

struct SpillFile {
temp_file: RefCountedTempFile,
file: File,
}

impl PartitionWriter {
fn try_new(shuffle_block_writer: ShuffleBlockWriter) -> Result<Self> {
Ok(Self {
spill_file: None,
shuffle_block_writer,
})
}

fn spill(
&mut self,
iter: &mut PartitionedBatchIterator,
runtime: &RuntimeEnv,
metrics: &ShufflePartitionerMetrics,
write_buffer_size: usize,
) -> Result<usize> {
if let Some(batch) = iter.next() {
self.ensure_spill_file_created(runtime)?;

let total_bytes_written = {
let mut buf_batch_writer = BufBatchWriter::new(
&mut self.shuffle_block_writer,
&mut self.spill_file.as_mut().unwrap().file,
write_buffer_size,
);
let mut bytes_written =
buf_batch_writer.write(&batch?, &metrics.encode_time, &metrics.write_time)?;
for batch in iter {
let batch = batch?;
bytes_written += buf_batch_writer.write(
&batch,
&metrics.encode_time,
&metrics.write_time,
)?;
}
buf_batch_writer.flush(&metrics.write_time)?;
bytes_written
};

Ok(total_bytes_written)
} else {
Ok(0)
}
}

fn ensure_spill_file_created(&mut self, runtime: &RuntimeEnv) -> Result<()> {
if self.spill_file.is_none() {
// Spill file is not yet created, create it
let spill_file = runtime
.disk_manager
.create_tmp_file("shuffle writer spill")?;
let spill_data = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(spill_file.path())
.map_err(|e| {
DataFusionError::Execution(format!("Error occurred while spilling {e}"))
})?;
self.spill_file = Some(SpillFile {
temp_file: spill_file,
file: spill_data,
});
}
Ok(())
}
}

/// Write batches to writer while using a buffer to avoid frequent system calls.
/// The record batches were first written by ShuffleBlockWriter into an internal buffer.
/// Once the buffer exceeds the max size, the buffer will be flushed to the writer.
struct BufBatchWriter<S: Borrow<ShuffleBlockWriter>, W: Write> {
shuffle_block_writer: S,
writer: W,
buffer: Vec<u8>,
buffer_max_size: usize,
}

impl<S: Borrow<ShuffleBlockWriter>, W: Write> BufBatchWriter<S, W> {
fn new(shuffle_block_writer: S, writer: W, buffer_max_size: usize) -> Self {
Self {
shuffle_block_writer,
writer,
buffer: vec![],
buffer_max_size,
}
}

fn write(
&mut self,
batch: &RecordBatch,
encode_time: &Time,
write_time: &Time,
) -> Result<usize> {
let mut cursor = Cursor::new(&mut self.buffer);
cursor.seek(SeekFrom::End(0))?;
let bytes_written =
self.shuffle_block_writer
.borrow()
.write_batch(batch, &mut cursor, encode_time)?;
let pos = cursor.position();
if pos >= self.buffer_max_size as u64 {
let mut write_timer = write_time.timer();
self.writer.write_all(&self.buffer)?;
write_timer.stop();
self.buffer.clear();
}
Ok(bytes_written)
}

fn flush(&mut self, write_time: &Time) -> Result<()> {
let mut write_timer = write_time.timer();
if !self.buffer.is_empty() {
self.writer.write_all(&self.buffer)?;
}
self.writer.flush()?;
write_timer.stop();
self.buffer.clear();
Ok(())
}
}

fn pmod(hash: u32, n: usize) -> usize {
let hash = hash as i32;
let n = n as i32;
Expand Down Expand Up @@ -1371,14 +1226,14 @@ mod test {

assert_eq!(2, repartitioner.partition_writers.len());

assert!(repartitioner.partition_writers[0].spill_file.is_none());
assert!(repartitioner.partition_writers[1].spill_file.is_none());
assert!(!repartitioner.partition_writers[0].has_spill_file());
assert!(!repartitioner.partition_writers[1].has_spill_file());

repartitioner.spill().unwrap();

// after spill, there should be spill files
assert!(repartitioner.partition_writers[0].spill_file.is_some());
assert!(repartitioner.partition_writers[1].spill_file.is_some());
assert!(repartitioner.partition_writers[0].has_spill_file());
assert!(repartitioner.partition_writers[1].has_spill_file());

// insert another batch after spilling
repartitioner.insert_batch(batch.clone()).await.unwrap();
Expand Down
82 changes: 82 additions & 0 deletions native/core/src/execution/shuffle/writers/buf_batch_writer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::execution::shuffle::ShuffleBlockWriter;
use arrow::array::RecordBatch;
use datafusion::physical_plan::metrics::Time;
use std::borrow::Borrow;
use std::io::{Cursor, Seek, SeekFrom, Write};

/// Write batches to writer while using a buffer to avoid frequent system calls.
/// The record batches were first written by ShuffleBlockWriter into an internal buffer.
/// Once the buffer exceeds the max size, the buffer will be flushed to the writer.
pub(crate) struct BufBatchWriter<S: Borrow<ShuffleBlockWriter>, W: Write> {
shuffle_block_writer: S,
writer: W,
buffer: Vec<u8>,
buffer_max_size: usize,
}

impl<S: Borrow<ShuffleBlockWriter>, W: Write> BufBatchWriter<S, W> {
pub(crate) fn new(shuffle_block_writer: S, writer: W, buffer_max_size: usize) -> Self {
Self {
shuffle_block_writer,
writer,
buffer: vec![],
buffer_max_size,
}
}

pub(crate) fn write(
&mut self,
batch: &RecordBatch,
encode_time: &Time,
write_time: &Time,
) -> datafusion::common::Result<usize> {
let mut cursor = Cursor::new(&mut self.buffer);
cursor.seek(SeekFrom::End(0))?;
let bytes_written =
self.shuffle_block_writer
.borrow()
.write_batch(batch, &mut cursor, encode_time)?;
let pos = cursor.position();
if pos >= self.buffer_max_size as u64 {
let mut write_timer = write_time.timer();
self.writer.write_all(&self.buffer)?;
write_timer.stop();
self.buffer.clear();
}
Ok(bytes_written)
}

pub(crate) fn flush(&mut self, write_time: &Time) -> datafusion::common::Result<()> {
let mut write_timer = write_time.timer();
if !self.buffer.is_empty() {
self.writer.write_all(&self.buffer)?;
}
self.writer.flush()?;
write_timer.stop();
self.buffer.clear();
Ok(())
}
}

impl<S: Borrow<ShuffleBlockWriter>, W: Write + Seek> BufBatchWriter<S, W> {
pub(crate) fn writer_stream_position(&mut self) -> datafusion::common::Result<u64> {
self.writer.stream_position().map_err(Into::into)
}
}
Loading
Loading