diff --git a/native/core/src/execution/shuffle/comet_partitioning.rs b/native/core/src/execution/shuffle/comet_partitioning.rs index b7ad158790..b8d68cd21e 100644 --- a/native/core/src/execution/shuffle/comet_partitioning.rs +++ b/native/core/src/execution/shuffle/comet_partitioning.rs @@ -46,3 +46,26 @@ impl CometPartitioning { } } } + +pub(super) fn pmod(hash: u32, n: usize) -> usize { + let hash = hash as i32; + let n = n as i32; + let r = hash % n; + let result = if r < 0 { (r + n) % n } else { r }; + result as usize +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_pmod() { + let i: Vec = vec![0x99f0149d, 0x9c67b85d, 0xc8008529, 0xa05b5d7b, 0xcd1e64fb]; + let result = i.into_iter().map(|i| pmod(i, 200)).collect::>(); + + // expected partition from Spark with n=200 + let expected = vec![69, 5, 193, 171, 115]; + assert_eq!(result, expected); + } +} diff --git a/native/core/src/execution/shuffle/mod.rs b/native/core/src/execution/shuffle/mod.rs index a41d269d80..6018cff50f 100644 --- a/native/core/src/execution/shuffle/mod.rs +++ b/native/core/src/execution/shuffle/mod.rs @@ -18,6 +18,7 @@ pub(crate) mod codec; mod comet_partitioning; mod metrics; +mod partitioners; mod shuffle_writer; pub mod spark_unsafe; mod writers; diff --git a/native/core/src/execution/shuffle/partitioners/mod.rs b/native/core/src/execution/shuffle/partitioners/mod.rs new file mode 100644 index 0000000000..b9058f66f4 --- /dev/null +++ b/native/core/src/execution/shuffle/partitioners/mod.rs @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod multi_partition; +mod partitioned_batch_iterator; +mod single_partition; + +use arrow::record_batch::RecordBatch; +use datafusion::common::Result; + +pub(super) use multi_partition::MultiPartitionShuffleRepartitioner; +pub(super) use partitioned_batch_iterator::PartitionedBatchIterator; +pub(super) use single_partition::SinglePartitionShufflePartitioner; + +#[async_trait::async_trait] +pub(super) trait ShufflePartitioner: Send + Sync { + /// Insert a batch into the partitioner + async fn insert_batch(&mut self, batch: RecordBatch) -> Result<()>; + /// Write shuffle data and shuffle index file to disk + fn shuffle_write(&mut self) -> Result<()>; +} diff --git a/native/core/src/execution/shuffle/partitioners/multi_partition.rs b/native/core/src/execution/shuffle/partitioners/multi_partition.rs new file mode 100644 index 0000000000..35f754695c --- /dev/null +++ b/native/core/src/execution/shuffle/partitioners/multi_partition.rs @@ -0,0 +1,635 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::execution::shuffle::metrics::ShufflePartitionerMetrics; +use crate::execution::shuffle::partitioners::partitioned_batch_iterator::{ + PartitionedBatchIterator, PartitionedBatchesProducer, +}; +use crate::execution::shuffle::partitioners::ShufflePartitioner; +use crate::execution::shuffle::writers::{BufBatchWriter, PartitionWriter}; +use crate::execution::shuffle::{ + comet_partitioning, CometPartitioning, CompressionCodec, ShuffleBlockWriter, +}; +use crate::execution::tracing::{with_trace, with_trace_async}; +use arrow::array::{ArrayRef, RecordBatch}; +use arrow::datatypes::SchemaRef; +use datafusion::common::utils::proxy::VecAllocExt; +use datafusion::common::DataFusionError; +use datafusion::execution::memory_pool::{MemoryConsumer, MemoryReservation}; +use datafusion::execution::runtime_env::RuntimeEnv; +use datafusion::physical_plan::metrics::Time; +use datafusion_comet_spark_expr::murmur3::create_murmur3_hashes; +use itertools::Itertools; +use std::fmt; +use std::fmt::{Debug, Formatter}; +use std::fs::{File, OpenOptions}; +use std::io::{BufReader, BufWriter, Seek, Write}; +use std::sync::Arc; +use tokio::time::Instant; + +#[derive(Default)] +struct ScratchSpace { + /// Hashes for each row in the current batch. + hashes_buf: Vec, + /// Partition ids for each row in the current batch. + partition_ids: Vec, + /// The row indices of the rows in each partition. This array is conceptually divided into + /// partitions, where each partition contains the row indices of the rows in that partition. + /// The length of this array is the same as the number of rows in the batch. + partition_row_indices: Vec, + /// The start indices of partitions in partition_row_indices. partition_starts[K] and + /// partition_starts[K + 1] are the start and end indices of partition K in partition_row_indices. + /// The length of this array is 1 + the number of partitions. + partition_starts: Vec, +} + +impl ScratchSpace { + fn map_partition_ids_to_starts_and_indices( + &mut self, + num_output_partitions: usize, + num_rows: usize, + ) { + let partition_ids = &mut self.partition_ids[..num_rows]; + + // count each partition size, while leaving the last extra element as 0 + let partition_counters = &mut self.partition_starts; + partition_counters.resize(num_output_partitions + 1, 0); + partition_counters.fill(0); + partition_ids + .iter() + .for_each(|partition_id| partition_counters[*partition_id as usize] += 1); + + // accumulate partition counters into partition ends + // e.g. partition counter: [1, 3, 2, 1, 0] => [1, 4, 6, 7, 7] + let partition_ends = partition_counters; + let mut accum = 0; + partition_ends.iter_mut().for_each(|v| { + *v += accum; + accum = *v; + }); + + // calculate partition row indices and partition starts + // e.g. partition ids: [3, 1, 1, 1, 2, 2, 0] will produce the following partition_row_indices + // and partition_starts arrays: + // + // partition_row_indices: [6, 1, 2, 3, 4, 5, 0] + // partition_starts: [0, 1, 4, 6, 7] + // + // partition_starts conceptually splits partition_row_indices into smaller slices. + // Each slice partition_row_indices[partition_starts[K]..partition_starts[K + 1]] contains the + // row indices of the input batch that are partitioned into partition K. For example, + // first partition 0 has one row index [6], partition 1 has row indices [1, 2, 3], etc. + let partition_row_indices = &mut self.partition_row_indices; + partition_row_indices.resize(num_rows, 0); + for (index, partition_id) in partition_ids.iter().enumerate().rev() { + partition_ends[*partition_id as usize] -= 1; + let end = partition_ends[*partition_id as usize]; + partition_row_indices[end as usize] = index as u32; + } + + // after calculating, partition ends become partition starts + } +} + +/// A partitioner that uses a hash function to partition data into multiple partitions +pub(crate) struct MultiPartitionShuffleRepartitioner { + output_data_file: String, + output_index_file: String, + buffered_batches: Vec, + partition_indices: Vec>, + partition_writers: Vec, + shuffle_block_writer: ShuffleBlockWriter, + /// Partitioning scheme to use + partitioning: CometPartitioning, + runtime: Arc, + metrics: ShufflePartitionerMetrics, + /// Reused scratch space for computing partition indices + scratch: ScratchSpace, + /// The configured batch size + batch_size: usize, + /// Reservation for repartitioning + reservation: MemoryReservation, + tracing_enabled: bool, + /// Size of the write buffer in bytes + write_buffer_size: usize, +} + +impl MultiPartitionShuffleRepartitioner { + #[allow(clippy::too_many_arguments)] + pub(crate) fn try_new( + partition: usize, + output_data_file: String, + output_index_file: String, + schema: SchemaRef, + partitioning: CometPartitioning, + metrics: ShufflePartitionerMetrics, + runtime: Arc, + batch_size: usize, + codec: CompressionCodec, + tracing_enabled: bool, + write_buffer_size: usize, + ) -> datafusion::common::Result { + let num_output_partitions = partitioning.partition_count(); + assert_ne!( + num_output_partitions, 1, + "Use SinglePartitionShufflePartitioner for 1 output partition." + ); + + // Vectors in the scratch space will be filled with valid values before being used, this + // initialization code is simply initializing the vectors to the desired size. + // The initial values are not used. + let scratch = ScratchSpace { + hashes_buf: match partitioning { + // Allocate hashes_buf for hash and round robin partitioning. + // Round robin hashes all columns to achieve even, deterministic distribution. + CometPartitioning::Hash(_, _) | CometPartitioning::RoundRobin(_, _) => { + vec![0; batch_size] + } + _ => vec![], + }, + partition_ids: vec![0; batch_size], + partition_row_indices: vec![0; batch_size], + partition_starts: vec![0; num_output_partitions + 1], + }; + + let shuffle_block_writer = ShuffleBlockWriter::try_new(schema.as_ref(), codec.clone())?; + + let partition_writers = (0..num_output_partitions) + .map(|_| PartitionWriter::try_new(shuffle_block_writer.clone())) + .collect::>>()?; + + let reservation = MemoryConsumer::new(format!("ShuffleRepartitioner[{partition}]")) + .with_can_spill(true) + .register(&runtime.memory_pool); + + Ok(Self { + output_data_file, + output_index_file, + buffered_batches: vec![], + partition_indices: vec![vec![]; num_output_partitions], + partition_writers, + shuffle_block_writer, + partitioning, + runtime, + metrics, + scratch, + batch_size, + reservation, + tracing_enabled, + write_buffer_size, + }) + } + + /// Shuffles rows in input batch into corresponding partition buffer. + /// This function first calculates hashes for rows and then takes rows in same + /// partition as a record batch which is appended into partition buffer. + /// This should not be called directly. Use `insert_batch` instead. + async fn partitioning_batch(&mut self, input: RecordBatch) -> datafusion::common::Result<()> { + if input.num_rows() == 0 { + // skip empty batch + return Ok(()); + } + + if input.num_rows() > self.batch_size { + return Err(DataFusionError::Internal( + "Input batch size exceeds configured batch size. Call `insert_batch` instead." + .to_string(), + )); + } + + // Update data size metric + self.metrics.data_size.add(input.get_array_memory_size()); + + // NOTE: in shuffle writer exec, the output_rows metrics represents the + // number of rows those are written to output data file. + self.metrics.baseline.record_output(input.num_rows()); + + match &self.partitioning { + CometPartitioning::Hash(exprs, num_output_partitions) => { + let mut scratch = std::mem::take(&mut self.scratch); + let (partition_starts, partition_row_indices): (&Vec, &Vec) = { + let mut timer = self.metrics.repart_time.timer(); + + // Evaluate partition expressions to get rows to apply partitioning scheme. + let arrays = exprs + .iter() + .map(|expr| expr.evaluate(&input)?.into_array(input.num_rows())) + .collect::>>()?; + + let num_rows = arrays[0].len(); + + // Use identical seed as Spark hash partitioning. + let hashes_buf = &mut scratch.hashes_buf[..num_rows]; + hashes_buf.fill(42_u32); + + // Generate partition ids for every row. + { + // Hash arrays and compute partition ids based on number of partitions. + let partition_ids = &mut scratch.partition_ids[..num_rows]; + create_murmur3_hashes(&arrays, hashes_buf)? + .iter() + .enumerate() + .for_each(|(idx, hash)| { + partition_ids[idx] = + comet_partitioning::pmod(*hash, *num_output_partitions) as u32; + }); + } + + // We now have partition ids for every input row, map that to partition starts + // and partition indices to eventually right these rows to partition buffers. + scratch + .map_partition_ids_to_starts_and_indices(*num_output_partitions, num_rows); + + timer.stop(); + Ok::<(&Vec, &Vec), DataFusionError>(( + &scratch.partition_starts, + &scratch.partition_row_indices, + )) + }?; + + self.buffer_partitioned_batch_may_spill( + input, + partition_row_indices, + partition_starts, + ) + .await?; + self.scratch = scratch; + } + CometPartitioning::RangePartitioning( + lex_ordering, + num_output_partitions, + row_converter, + bounds, + ) => { + let mut scratch = std::mem::take(&mut self.scratch); + let (partition_starts, partition_row_indices): (&Vec, &Vec) = { + let mut timer = self.metrics.repart_time.timer(); + + // Evaluate partition expressions for values to apply partitioning scheme on. + let arrays = lex_ordering + .iter() + .map(|expr| expr.expr.evaluate(&input)?.into_array(input.num_rows())) + .collect::>>()?; + + let num_rows = arrays[0].len(); + + // Generate partition ids for every row, first by converting the partition + // arrays to Rows, and then doing binary search for each Row against the + // bounds Rows. + { + let row_batch = row_converter.convert_columns(arrays.as_slice())?; + let partition_ids = &mut scratch.partition_ids[..num_rows]; + + row_batch.iter().enumerate().for_each(|(row_idx, row)| { + partition_ids[row_idx] = bounds + .as_slice() + .partition_point(|bound| bound.row() <= row) + as u32 + }); + } + + // We now have partition ids for every input row, map that to partition starts + // and partition indices to eventually right these rows to partition buffers. + scratch + .map_partition_ids_to_starts_and_indices(*num_output_partitions, num_rows); + + timer.stop(); + Ok::<(&Vec, &Vec), DataFusionError>(( + &scratch.partition_starts, + &scratch.partition_row_indices, + )) + }?; + + self.buffer_partitioned_batch_may_spill( + input, + partition_row_indices, + partition_starts, + ) + .await?; + self.scratch = scratch; + } + CometPartitioning::RoundRobin(num_output_partitions, max_hash_columns) => { + // Comet implements "round robin" as hash partitioning on columns. + // This achieves the same goal as Spark's round robin (even distribution + // without semantic grouping) while being deterministic for fault tolerance. + // + // Note: This produces different partition assignments than Spark's round robin, + // which sorts by UnsafeRow binary representation before assigning partitions. + // However, both approaches provide even distribution and determinism. + let mut scratch = std::mem::take(&mut self.scratch); + let (partition_starts, partition_row_indices): (&Vec, &Vec) = { + let mut timer = self.metrics.repart_time.timer(); + + let num_rows = input.num_rows(); + + // Collect columns for hashing, respecting max_hash_columns limit + // max_hash_columns of 0 means no limit (hash all columns) + // Negative values are normalized to 0 in the planner + let num_columns_to_hash = if *max_hash_columns == 0 { + input.num_columns() + } else { + (*max_hash_columns).min(input.num_columns()) + }; + let columns_to_hash: Vec = (0..num_columns_to_hash) + .map(|i| Arc::clone(input.column(i))) + .collect(); + + // Use identical seed as Spark hash partitioning. + let hashes_buf = &mut scratch.hashes_buf[..num_rows]; + hashes_buf.fill(42_u32); + + // Compute hash for selected columns + create_murmur3_hashes(&columns_to_hash, hashes_buf)?; + + // Assign partition IDs based on hash (same as hash partitioning) + let partition_ids = &mut scratch.partition_ids[..num_rows]; + hashes_buf.iter().enumerate().for_each(|(idx, hash)| { + partition_ids[idx] = + comet_partitioning::pmod(*hash, *num_output_partitions) as u32; + }); + + // We now have partition ids for every input row, map that to partition starts + // and partition indices to eventually write these rows to partition buffers. + scratch + .map_partition_ids_to_starts_and_indices(*num_output_partitions, num_rows); + + timer.stop(); + Ok::<(&Vec, &Vec), DataFusionError>(( + &scratch.partition_starts, + &scratch.partition_row_indices, + )) + }?; + + self.buffer_partitioned_batch_may_spill( + input, + partition_row_indices, + partition_starts, + ) + .await?; + self.scratch = scratch; + } + other => { + // this should be unreachable as long as the validation logic + // in the constructor is kept up-to-date + return Err(DataFusionError::NotImplemented(format!( + "Unsupported shuffle partitioning scheme {other:?}" + ))); + } + } + Ok(()) + } + + async fn buffer_partitioned_batch_may_spill( + &mut self, + input: RecordBatch, + partition_row_indices: &[u32], + partition_starts: &[u32], + ) -> datafusion::common::Result<()> { + let mut mem_growth: usize = input.get_array_memory_size(); + let buffered_partition_idx = self.buffered_batches.len() as u32; + self.buffered_batches.push(input); + + // partition_starts conceptually slices partition_row_indices into smaller slices, + // each slice contains the indices of rows in input that will go into the corresponding + // partition. The following loop iterates over the slices and put the row indices into + // the indices array of the corresponding partition. + for (partition_id, (&start, &end)) in partition_starts + .iter() + .tuple_windows() + .enumerate() + .filter(|(_, (start, end))| start < end) + { + let row_indices = &partition_row_indices[start as usize..end as usize]; + + // Put row indices for the current partition into the indices array of that partition. + // This indices array will be used for calling interleave_record_batch to produce + // shuffled batches. + let indices = &mut self.partition_indices[partition_id]; + let before_size = indices.allocated_size(); + indices.reserve(row_indices.len()); + for row_idx in row_indices { + indices.push((buffered_partition_idx, *row_idx)); + } + let after_size = indices.allocated_size(); + mem_growth += after_size.saturating_sub(before_size); + } + + if self.reservation.try_grow(mem_growth).is_err() { + self.spill()?; + } + + Ok(()) + } + + fn shuffle_write_partition( + partition_iter: &mut PartitionedBatchIterator, + shuffle_block_writer: &mut ShuffleBlockWriter, + output_data: &mut BufWriter, + encode_time: &Time, + write_time: &Time, + write_buffer_size: usize, + ) -> datafusion::common::Result<()> { + let mut buf_batch_writer = + BufBatchWriter::new(shuffle_block_writer, output_data, write_buffer_size); + for batch in partition_iter { + let batch = batch?; + buf_batch_writer.write(&batch, encode_time, write_time)?; + } + buf_batch_writer.flush(write_time)?; + Ok(()) + } + + fn used(&self) -> usize { + self.reservation.size() + } + + fn spilled_bytes(&self) -> usize { + self.metrics.spilled_bytes.value() + } + + fn spill_count(&self) -> usize { + self.metrics.spill_count.value() + } + + fn data_size(&self) -> usize { + self.metrics.data_size.value() + } + + /// This function transfers the ownership of the buffered batches and partition indices from the + /// ShuffleRepartitioner to a new PartitionedBatches struct. The returned PartitionedBatches struct + /// can be used to produce shuffled batches. + fn partitioned_batches(&mut self) -> PartitionedBatchesProducer { + let num_output_partitions = self.partition_indices.len(); + let buffered_batches = std::mem::take(&mut self.buffered_batches); + // let indices = std::mem::take(&mut self.partition_indices); + let indices = std::mem::replace( + &mut self.partition_indices, + vec![vec![]; num_output_partitions], + ); + PartitionedBatchesProducer::new(buffered_batches, indices, self.batch_size) + } + + pub(crate) fn spill(&mut self) -> datafusion::common::Result<()> { + log::info!( + "ShuffleRepartitioner spilling shuffle data of {} to disk while inserting ({} time(s) so far)", + self.used(), + self.spill_count() + ); + + // we could always get a chance to free some memory as long as we are holding some + if self.buffered_batches.is_empty() { + return Ok(()); + } + + with_trace("shuffle_spill", self.tracing_enabled, || { + let num_output_partitions = self.partition_writers.len(); + let mut partitioned_batches = self.partitioned_batches(); + let mut spilled_bytes = 0; + + for partition_id in 0..num_output_partitions { + let partition_writer = &mut self.partition_writers[partition_id]; + let mut iter = partitioned_batches.produce(partition_id); + spilled_bytes += partition_writer.spill( + &mut iter, + &self.runtime, + &self.metrics, + self.write_buffer_size, + )?; + } + + self.reservation.free(); + self.metrics.spill_count.add(1); + self.metrics.spilled_bytes.add(spilled_bytes); + Ok(()) + }) + } + + #[cfg(test)] + pub(crate) fn partition_writers(&self) -> &[PartitionWriter] { + &self.partition_writers + } +} + +#[async_trait::async_trait] +impl ShufflePartitioner for MultiPartitionShuffleRepartitioner { + /// Shuffles rows in input batch into corresponding partition buffer. + /// This function will slice input batch according to configured batch size and then + /// shuffle rows into corresponding partition buffer. + async fn insert_batch(&mut self, batch: RecordBatch) -> datafusion::common::Result<()> { + with_trace_async("shuffle_insert_batch", self.tracing_enabled, || async { + let start_time = Instant::now(); + let mut start = 0; + while start < batch.num_rows() { + let end = (start + self.batch_size).min(batch.num_rows()); + let batch = batch.slice(start, end - start); + self.partitioning_batch(batch).await?; + start = end; + } + self.metrics.input_batches.add(1); + self.metrics + .baseline + .elapsed_compute() + .add_duration(start_time.elapsed()); + Ok(()) + }) + .await + } + + /// Writes buffered shuffled record batches into Arrow IPC bytes. + fn shuffle_write(&mut self) -> datafusion::common::Result<()> { + with_trace("shuffle_write", self.tracing_enabled, || { + let start_time = Instant::now(); + + let mut partitioned_batches = self.partitioned_batches(); + let num_output_partitions = self.partition_indices.len(); + let mut offsets = vec![0; num_output_partitions + 1]; + + let data_file = self.output_data_file.clone(); + let index_file = self.output_index_file.clone(); + + let output_data = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(data_file) + .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?; + + let mut output_data = BufWriter::new(output_data); + + #[allow(clippy::needless_range_loop)] + for i in 0..num_output_partitions { + offsets[i] = output_data.stream_position()?; + + // if we wrote a spill file for this partition then copy the + // contents into the shuffle file + if let Some(spill_path) = self.partition_writers[i].path() { + let mut spill_file = BufReader::new(File::open(spill_path)?); + let mut write_timer = self.metrics.write_time.timer(); + std::io::copy(&mut spill_file, &mut output_data)?; + write_timer.stop(); + } + + // Write in memory batches to output data file + let mut partition_iter = partitioned_batches.produce(i); + Self::shuffle_write_partition( + &mut partition_iter, + &mut self.shuffle_block_writer, + &mut output_data, + &self.metrics.encode_time, + &self.metrics.write_time, + self.write_buffer_size, + )?; + } + + let mut write_timer = self.metrics.write_time.timer(); + output_data.flush()?; + write_timer.stop(); + + // add one extra offset at last to ease partition length computation + offsets[num_output_partitions] = output_data.stream_position()?; + + let mut write_timer = self.metrics.write_time.timer(); + let mut output_index = + BufWriter::new(File::create(index_file).map_err(|e| { + DataFusionError::Execution(format!("shuffle write error: {e:?}")) + })?); + for offset in offsets { + output_index.write_all(&(offset as i64).to_le_bytes()[..])?; + } + output_index.flush()?; + write_timer.stop(); + + self.metrics + .baseline + .elapsed_compute() + .add_duration(start_time.elapsed()); + + Ok(()) + }) + } +} + +impl Debug for MultiPartitionShuffleRepartitioner { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("ShuffleRepartitioner") + .field("memory_used", &self.used()) + .field("spilled_bytes", &self.spilled_bytes()) + .field("spilled_count", &self.spill_count()) + .field("data_size", &self.data_size()) + .finish() + } +} diff --git a/native/core/src/execution/shuffle/partitioners/partitioned_batch_iterator.rs b/native/core/src/execution/shuffle/partitioners/partitioned_batch_iterator.rs new file mode 100644 index 0000000000..77010938cd --- /dev/null +++ b/native/core/src/execution/shuffle/partitioners/partitioned_batch_iterator.rs @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::RecordBatch; +use arrow::compute::interleave_record_batch; +use datafusion::common::DataFusionError; + +/// A helper struct to produce shuffled batches. +/// This struct takes ownership of the buffered batches and partition indices from the +/// ShuffleRepartitioner, and provides an iterator over the batches in the specified partitions. +pub(super) struct PartitionedBatchesProducer { + buffered_batches: Vec, + partition_indices: Vec>, + batch_size: usize, +} + +impl PartitionedBatchesProducer { + pub(super) fn new( + buffered_batches: Vec, + indices: Vec>, + batch_size: usize, + ) -> Self { + Self { + partition_indices: indices, + buffered_batches, + batch_size, + } + } + + pub(super) fn produce(&mut self, partition_id: usize) -> PartitionedBatchIterator<'_> { + PartitionedBatchIterator::new( + &self.partition_indices[partition_id], + &self.buffered_batches, + self.batch_size, + ) + } +} + +pub(crate) struct PartitionedBatchIterator<'a> { + record_batches: Vec<&'a RecordBatch>, + batch_size: usize, + indices: Vec<(usize, usize)>, + pos: usize, +} + +impl<'a> PartitionedBatchIterator<'a> { + fn new( + indices: &'a [(u32, u32)], + buffered_batches: &'a [RecordBatch], + batch_size: usize, + ) -> Self { + if indices.is_empty() { + // Avoid unnecessary allocations when the partition is empty + return Self { + record_batches: vec![], + batch_size, + indices: vec![], + pos: 0, + }; + } + let record_batches = buffered_batches.iter().collect::>(); + let current_indices = indices + .iter() + .map(|(i_batch, i_row)| (*i_batch as usize, *i_row as usize)) + .collect::>(); + Self { + record_batches, + batch_size, + indices: current_indices, + pos: 0, + } + } +} + +impl Iterator for PartitionedBatchIterator<'_> { + type Item = datafusion::common::Result; + + fn next(&mut self) -> Option { + if self.pos >= self.indices.len() { + return None; + } + + let indices_end = std::cmp::min(self.pos + self.batch_size, self.indices.len()); + let indices = &self.indices[self.pos..indices_end]; + match interleave_record_batch(&self.record_batches, indices) { + Ok(batch) => { + self.pos = indices_end; + Some(Ok(batch)) + } + Err(e) => Some(Err(DataFusionError::ArrowError( + Box::from(e), + Some(DataFusionError::get_back_trace()), + ))), + } + } +} diff --git a/native/core/src/execution/shuffle/partitioners/single_partition.rs b/native/core/src/execution/shuffle/partitioners/single_partition.rs new file mode 100644 index 0000000000..4ee5bd2bf6 --- /dev/null +++ b/native/core/src/execution/shuffle/partitioners/single_partition.rs @@ -0,0 +1,187 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::execution::shuffle::metrics::ShufflePartitionerMetrics; +use crate::execution::shuffle::partitioners::ShufflePartitioner; +use crate::execution::shuffle::writers::BufBatchWriter; +use crate::execution::shuffle::{CompressionCodec, ShuffleBlockWriter}; +use arrow::array::RecordBatch; +use arrow::datatypes::SchemaRef; +use datafusion::common::DataFusionError; +use std::fs::{File, OpenOptions}; +use std::io::{BufWriter, Write}; +use tokio::time::Instant; + +/// A partitioner that writes all shuffle data to a single file and a single index file +pub(crate) struct SinglePartitionShufflePartitioner { + // output_data_file: File, + output_data_writer: BufBatchWriter, + output_index_path: String, + /// Batches that are smaller than the batch size and to be concatenated + buffered_batches: Vec, + /// Number of rows in the concatenating batches + num_buffered_rows: usize, + /// Metrics for the repartitioner + metrics: ShufflePartitionerMetrics, + /// The configured batch size + batch_size: usize, +} + +impl SinglePartitionShufflePartitioner { + pub(crate) fn try_new( + output_data_path: String, + output_index_path: String, + schema: SchemaRef, + metrics: ShufflePartitionerMetrics, + batch_size: usize, + codec: CompressionCodec, + write_buffer_size: usize, + ) -> datafusion::common::Result { + let shuffle_block_writer = ShuffleBlockWriter::try_new(schema.as_ref(), codec.clone())?; + + let output_data_file = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(output_data_path)?; + + let output_data_writer = + BufBatchWriter::new(shuffle_block_writer, output_data_file, write_buffer_size); + + Ok(Self { + output_data_writer, + output_index_path, + buffered_batches: vec![], + num_buffered_rows: 0, + metrics, + batch_size, + }) + } + + /// Add a batch to the buffer of the partitioner, these buffered batches will be concatenated + /// and written to the output data file when the number of rows in the buffer reaches the batch size. + fn add_buffered_batch(&mut self, batch: RecordBatch) { + self.num_buffered_rows += batch.num_rows(); + self.buffered_batches.push(batch); + } + + /// Consumes buffered batches and return a concatenated batch if successful + fn concat_buffered_batches(&mut self) -> datafusion::common::Result> { + if self.buffered_batches.is_empty() { + Ok(None) + } else if self.buffered_batches.len() == 1 { + let batch = self.buffered_batches.remove(0); + self.num_buffered_rows = 0; + Ok(Some(batch)) + } else { + let schema = &self.buffered_batches[0].schema(); + match arrow::compute::concat_batches(schema, self.buffered_batches.iter()) { + Ok(concatenated) => { + self.buffered_batches.clear(); + self.num_buffered_rows = 0; + Ok(Some(concatenated)) + } + Err(e) => Err(DataFusionError::ArrowError( + Box::from(e), + Some(DataFusionError::get_back_trace()), + )), + } + } + } +} + +#[async_trait::async_trait] +impl ShufflePartitioner for SinglePartitionShufflePartitioner { + async fn insert_batch(&mut self, batch: RecordBatch) -> datafusion::common::Result<()> { + let start_time = Instant::now(); + let num_rows = batch.num_rows(); + + if num_rows > 0 { + self.metrics.data_size.add(batch.get_array_memory_size()); + self.metrics.baseline.record_output(num_rows); + + if num_rows >= self.batch_size || num_rows + self.num_buffered_rows > self.batch_size { + let concatenated_batch = self.concat_buffered_batches()?; + + // Write the concatenated buffered batch + if let Some(batch) = concatenated_batch { + self.output_data_writer.write( + &batch, + &self.metrics.encode_time, + &self.metrics.write_time, + )?; + } + + if num_rows >= self.batch_size { + // Write the new batch + self.output_data_writer.write( + &batch, + &self.metrics.encode_time, + &self.metrics.write_time, + )?; + } else { + // Add the new batch to the buffer + self.add_buffered_batch(batch); + } + } else { + self.add_buffered_batch(batch); + } + } + + self.metrics.input_batches.add(1); + self.metrics + .baseline + .elapsed_compute() + .add_duration(start_time.elapsed()); + Ok(()) + } + + fn shuffle_write(&mut self) -> datafusion::common::Result<()> { + let start_time = Instant::now(); + let concatenated_batch = self.concat_buffered_batches()?; + + // Write the concatenated buffered batch + if let Some(batch) = concatenated_batch { + self.output_data_writer.write( + &batch, + &self.metrics.encode_time, + &self.metrics.write_time, + )?; + } + self.output_data_writer.flush(&self.metrics.write_time)?; + + // Write index file. It should only contain 2 entries: 0 and the total number of bytes written + let index_file = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(self.output_index_path.clone()) + .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?; + let mut index_buf_writer = BufWriter::new(index_file); + let data_file_length = self.output_data_writer.writer_stream_position()?; + for offset in [0, data_file_length] { + index_buf_writer.write_all(&(offset as i64).to_le_bytes()[..])?; + } + index_buf_writer.flush()?; + + self.metrics + .baseline + .elapsed_compute() + .add_duration(start_time.elapsed()); + Ok(()) + } +} diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs b/native/core/src/execution/shuffle/shuffle_writer.rs index 669a6df976..a7e689a69c 100644 --- a/native/core/src/execution/shuffle/shuffle_writer.rs +++ b/native/core/src/execution/shuffle/shuffle_writer.rs @@ -18,43 +18,34 @@ //! Defines the External shuffle repartition plan. use crate::execution::shuffle::metrics::ShufflePartitionerMetrics; -use crate::execution::shuffle::writers::{BufBatchWriter, PartitionWriter}; -use crate::execution::shuffle::{CometPartitioning, CompressionCodec, ShuffleBlockWriter}; -use crate::execution::tracing::{with_trace, with_trace_async}; -use arrow::compute::interleave_record_batch; +use crate::execution::shuffle::partitioners::{ + MultiPartitionShuffleRepartitioner, ShufflePartitioner, SinglePartitionShufflePartitioner, +}; +use crate::execution::shuffle::{CometPartitioning, CompressionCodec}; +use crate::execution::tracing::with_trace_async; use async_trait::async_trait; use datafusion::common::exec_datafusion_err; -use datafusion::common::utils::proxy::VecAllocExt; use datafusion::physical_expr::{EquivalenceProperties, Partitioning}; use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion::physical_plan::EmptyRecordBatchStream; use datafusion::{ - arrow::{array::*, datatypes::SchemaRef, error::ArrowError, record_batch::RecordBatch}, - error::{DataFusionError, Result}, - execution::{ - context::TaskContext, - memory_pool::{MemoryConsumer, MemoryReservation}, - runtime_env::RuntimeEnv, - }, + arrow::{datatypes::SchemaRef, error::ArrowError}, + error::Result, + execution::context::TaskContext, physical_plan::{ - metrics::{ExecutionPlanMetricsSet, MetricsSet, Time}, + metrics::{ExecutionPlanMetricsSet, MetricsSet}, stream::RecordBatchStreamAdapter, DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream, Statistics, }, }; -use datafusion_comet_spark_expr::hash_funcs::murmur3::create_murmur3_hashes; use futures::{StreamExt, TryFutureExt, TryStreamExt}; -use itertools::Itertools; use std::{ any::Any, fmt, fmt::{Debug, Formatter}, - fs::{File, OpenOptions}, - io::{BufReader, BufWriter, Seek, Write}, sync::Arc, }; -use tokio::time::Instant; /// The shuffle writer operator maps each input partition to M output partitions based on a /// partitioning scheme. No guarantees are made about the order of the resulting partitions. @@ -271,872 +262,24 @@ async fn external_shuffle( .await } -#[async_trait::async_trait] -trait ShufflePartitioner: Send + Sync { - /// Insert a batch into the partitioner - async fn insert_batch(&mut self, batch: RecordBatch) -> Result<()>; - /// Write shuffle data and shuffle index file to disk - fn shuffle_write(&mut self) -> Result<()>; -} - -/// A partitioner that uses a hash function to partition data into multiple partitions -struct MultiPartitionShuffleRepartitioner { - output_data_file: String, - output_index_file: String, - buffered_batches: Vec, - partition_indices: Vec>, - partition_writers: Vec, - shuffle_block_writer: ShuffleBlockWriter, - /// Partitioning scheme to use - partitioning: CometPartitioning, - runtime: Arc, - metrics: ShufflePartitionerMetrics, - /// Reused scratch space for computing partition indices - scratch: ScratchSpace, - /// The configured batch size - batch_size: usize, - /// Reservation for repartitioning - reservation: MemoryReservation, - tracing_enabled: bool, - /// Size of the write buffer in bytes - write_buffer_size: usize, -} - -#[derive(Default)] -struct ScratchSpace { - /// Hashes for each row in the current batch. - hashes_buf: Vec, - /// Partition ids for each row in the current batch. - partition_ids: Vec, - /// The row indices of the rows in each partition. This array is conceptually divided into - /// partitions, where each partition contains the row indices of the rows in that partition. - /// The length of this array is the same as the number of rows in the batch. - partition_row_indices: Vec, - /// The start indices of partitions in partition_row_indices. partition_starts[K] and - /// partition_starts[K + 1] are the start and end indices of partition K in partition_row_indices. - /// The length of this array is 1 + the number of partitions. - partition_starts: Vec, -} - -impl ScratchSpace { - fn map_partition_ids_to_starts_and_indices( - &mut self, - num_output_partitions: usize, - num_rows: usize, - ) { - let partition_ids = &mut self.partition_ids[..num_rows]; - - // count each partition size, while leaving the last extra element as 0 - let partition_counters = &mut self.partition_starts; - partition_counters.resize(num_output_partitions + 1, 0); - partition_counters.fill(0); - partition_ids - .iter() - .for_each(|partition_id| partition_counters[*partition_id as usize] += 1); - - // accumulate partition counters into partition ends - // e.g. partition counter: [1, 3, 2, 1, 0] => [1, 4, 6, 7, 7] - let partition_ends = partition_counters; - let mut accum = 0; - partition_ends.iter_mut().for_each(|v| { - *v += accum; - accum = *v; - }); - - // calculate partition row indices and partition starts - // e.g. partition ids: [3, 1, 1, 1, 2, 2, 0] will produce the following partition_row_indices - // and partition_starts arrays: - // - // partition_row_indices: [6, 1, 2, 3, 4, 5, 0] - // partition_starts: [0, 1, 4, 6, 7] - // - // partition_starts conceptually splits partition_row_indices into smaller slices. - // Each slice partition_row_indices[partition_starts[K]..partition_starts[K + 1]] contains the - // row indices of the input batch that are partitioned into partition K. For example, - // first partition 0 has one row index [6], partition 1 has row indices [1, 2, 3], etc. - let partition_row_indices = &mut self.partition_row_indices; - partition_row_indices.resize(num_rows, 0); - for (index, partition_id) in partition_ids.iter().enumerate().rev() { - partition_ends[*partition_id as usize] -= 1; - let end = partition_ends[*partition_id as usize]; - partition_row_indices[end as usize] = index as u32; - } - - // after calculating, partition ends become partition starts - } -} - -impl MultiPartitionShuffleRepartitioner { - #[allow(clippy::too_many_arguments)] - pub fn try_new( - partition: usize, - output_data_file: String, - output_index_file: String, - schema: SchemaRef, - partitioning: CometPartitioning, - metrics: ShufflePartitionerMetrics, - runtime: Arc, - batch_size: usize, - codec: CompressionCodec, - tracing_enabled: bool, - write_buffer_size: usize, - ) -> Result { - let num_output_partitions = partitioning.partition_count(); - assert_ne!( - num_output_partitions, 1, - "Use SinglePartitionShufflePartitioner for 1 output partition." - ); - - // Vectors in the scratch space will be filled with valid values before being used, this - // initialization code is simply initializing the vectors to the desired size. - // The initial values are not used. - let scratch = ScratchSpace { - hashes_buf: match partitioning { - // Allocate hashes_buf for hash and round robin partitioning. - // Round robin hashes all columns to achieve even, deterministic distribution. - CometPartitioning::Hash(_, _) | CometPartitioning::RoundRobin(_, _) => { - vec![0; batch_size] - } - _ => vec![], - }, - partition_ids: vec![0; batch_size], - partition_row_indices: vec![0; batch_size], - partition_starts: vec![0; num_output_partitions + 1], - }; - - let shuffle_block_writer = ShuffleBlockWriter::try_new(schema.as_ref(), codec.clone())?; - - let partition_writers = (0..num_output_partitions) - .map(|_| PartitionWriter::try_new(shuffle_block_writer.clone())) - .collect::>>()?; - - let reservation = MemoryConsumer::new(format!("ShuffleRepartitioner[{partition}]")) - .with_can_spill(true) - .register(&runtime.memory_pool); - - Ok(Self { - output_data_file, - output_index_file, - buffered_batches: vec![], - partition_indices: vec![vec![]; num_output_partitions], - partition_writers, - shuffle_block_writer, - partitioning, - runtime, - metrics, - scratch, - batch_size, - reservation, - tracing_enabled, - write_buffer_size, - }) - } - - /// Shuffles rows in input batch into corresponding partition buffer. - /// This function first calculates hashes for rows and then takes rows in same - /// partition as a record batch which is appended into partition buffer. - /// This should not be called directly. Use `insert_batch` instead. - async fn partitioning_batch(&mut self, input: RecordBatch) -> Result<()> { - if input.num_rows() == 0 { - // skip empty batch - return Ok(()); - } - - if input.num_rows() > self.batch_size { - return Err(DataFusionError::Internal( - "Input batch size exceeds configured batch size. Call `insert_batch` instead." - .to_string(), - )); - } - - // Update data size metric - self.metrics.data_size.add(input.get_array_memory_size()); - - // NOTE: in shuffle writer exec, the output_rows metrics represents the - // number of rows those are written to output data file. - self.metrics.baseline.record_output(input.num_rows()); - - match &self.partitioning { - CometPartitioning::Hash(exprs, num_output_partitions) => { - let mut scratch = std::mem::take(&mut self.scratch); - let (partition_starts, partition_row_indices): (&Vec, &Vec) = { - let mut timer = self.metrics.repart_time.timer(); - - // Evaluate partition expressions to get rows to apply partitioning scheme. - let arrays = exprs - .iter() - .map(|expr| expr.evaluate(&input)?.into_array(input.num_rows())) - .collect::>>()?; - - let num_rows = arrays[0].len(); - - // Use identical seed as Spark hash partitioning. - let hashes_buf = &mut scratch.hashes_buf[..num_rows]; - hashes_buf.fill(42_u32); - - // Generate partition ids for every row. - { - // Hash arrays and compute partition ids based on number of partitions. - let partition_ids = &mut scratch.partition_ids[..num_rows]; - create_murmur3_hashes(&arrays, hashes_buf)? - .iter() - .enumerate() - .for_each(|(idx, hash)| { - partition_ids[idx] = pmod(*hash, *num_output_partitions) as u32; - }); - } - - // We now have partition ids for every input row, map that to partition starts - // and partition indices to eventually right these rows to partition buffers. - scratch - .map_partition_ids_to_starts_and_indices(*num_output_partitions, num_rows); - - timer.stop(); - Ok::<(&Vec, &Vec), DataFusionError>(( - &scratch.partition_starts, - &scratch.partition_row_indices, - )) - }?; - - self.buffer_partitioned_batch_may_spill( - input, - partition_row_indices, - partition_starts, - ) - .await?; - self.scratch = scratch; - } - CometPartitioning::RangePartitioning( - lex_ordering, - num_output_partitions, - row_converter, - bounds, - ) => { - let mut scratch = std::mem::take(&mut self.scratch); - let (partition_starts, partition_row_indices): (&Vec, &Vec) = { - let mut timer = self.metrics.repart_time.timer(); - - // Evaluate partition expressions for values to apply partitioning scheme on. - let arrays = lex_ordering - .iter() - .map(|expr| expr.expr.evaluate(&input)?.into_array(input.num_rows())) - .collect::>>()?; - - let num_rows = arrays[0].len(); - - // Generate partition ids for every row, first by converting the partition - // arrays to Rows, and then doing binary search for each Row against the - // bounds Rows. - { - let row_batch = row_converter.convert_columns(arrays.as_slice())?; - let partition_ids = &mut scratch.partition_ids[..num_rows]; - - row_batch.iter().enumerate().for_each(|(row_idx, row)| { - partition_ids[row_idx] = bounds - .as_slice() - .partition_point(|bound| bound.row() <= row) - as u32 - }); - } - - // We now have partition ids for every input row, map that to partition starts - // and partition indices to eventually right these rows to partition buffers. - scratch - .map_partition_ids_to_starts_and_indices(*num_output_partitions, num_rows); - - timer.stop(); - Ok::<(&Vec, &Vec), DataFusionError>(( - &scratch.partition_starts, - &scratch.partition_row_indices, - )) - }?; - - self.buffer_partitioned_batch_may_spill( - input, - partition_row_indices, - partition_starts, - ) - .await?; - self.scratch = scratch; - } - CometPartitioning::RoundRobin(num_output_partitions, max_hash_columns) => { - // Comet implements "round robin" as hash partitioning on columns. - // This achieves the same goal as Spark's round robin (even distribution - // without semantic grouping) while being deterministic for fault tolerance. - // - // Note: This produces different partition assignments than Spark's round robin, - // which sorts by UnsafeRow binary representation before assigning partitions. - // However, both approaches provide even distribution and determinism. - let mut scratch = std::mem::take(&mut self.scratch); - let (partition_starts, partition_row_indices): (&Vec, &Vec) = { - let mut timer = self.metrics.repart_time.timer(); - - let num_rows = input.num_rows(); - - // Collect columns for hashing, respecting max_hash_columns limit - // max_hash_columns of 0 means no limit (hash all columns) - // Negative values are normalized to 0 in the planner - let num_columns_to_hash = if *max_hash_columns == 0 { - input.num_columns() - } else { - (*max_hash_columns).min(input.num_columns()) - }; - let columns_to_hash: Vec = (0..num_columns_to_hash) - .map(|i| Arc::clone(input.column(i))) - .collect(); - - // Use identical seed as Spark hash partitioning. - let hashes_buf = &mut scratch.hashes_buf[..num_rows]; - hashes_buf.fill(42_u32); - - // Compute hash for selected columns - create_murmur3_hashes(&columns_to_hash, hashes_buf)?; - - // Assign partition IDs based on hash (same as hash partitioning) - let partition_ids = &mut scratch.partition_ids[..num_rows]; - hashes_buf.iter().enumerate().for_each(|(idx, hash)| { - partition_ids[idx] = pmod(*hash, *num_output_partitions) as u32; - }); - - // We now have partition ids for every input row, map that to partition starts - // and partition indices to eventually write these rows to partition buffers. - scratch - .map_partition_ids_to_starts_and_indices(*num_output_partitions, num_rows); - - timer.stop(); - Ok::<(&Vec, &Vec), DataFusionError>(( - &scratch.partition_starts, - &scratch.partition_row_indices, - )) - }?; - - self.buffer_partitioned_batch_may_spill( - input, - partition_row_indices, - partition_starts, - ) - .await?; - self.scratch = scratch; - } - other => { - // this should be unreachable as long as the validation logic - // in the constructor is kept up-to-date - return Err(DataFusionError::NotImplemented(format!( - "Unsupported shuffle partitioning scheme {other:?}" - ))); - } - } - Ok(()) - } - - async fn buffer_partitioned_batch_may_spill( - &mut self, - input: RecordBatch, - partition_row_indices: &[u32], - partition_starts: &[u32], - ) -> Result<()> { - let mut mem_growth: usize = input.get_array_memory_size(); - let buffered_partition_idx = self.buffered_batches.len() as u32; - self.buffered_batches.push(input); - - // partition_starts conceptually slices partition_row_indices into smaller slices, - // each slice contains the indices of rows in input that will go into the corresponding - // partition. The following loop iterates over the slices and put the row indices into - // the indices array of the corresponding partition. - for (partition_id, (&start, &end)) in partition_starts - .iter() - .tuple_windows() - .enumerate() - .filter(|(_, (start, end))| start < end) - { - let row_indices = &partition_row_indices[start as usize..end as usize]; - - // Put row indices for the current partition into the indices array of that partition. - // This indices array will be used for calling interleave_record_batch to produce - // shuffled batches. - let indices = &mut self.partition_indices[partition_id]; - let before_size = indices.allocated_size(); - indices.reserve(row_indices.len()); - for row_idx in row_indices { - indices.push((buffered_partition_idx, *row_idx)); - } - let after_size = indices.allocated_size(); - mem_growth += after_size.saturating_sub(before_size); - } - - if self.reservation.try_grow(mem_growth).is_err() { - self.spill()?; - } - - Ok(()) - } - - fn shuffle_write_partition( - partition_iter: &mut PartitionedBatchIterator, - shuffle_block_writer: &mut ShuffleBlockWriter, - output_data: &mut BufWriter, - encode_time: &Time, - write_time: &Time, - write_buffer_size: usize, - ) -> Result<()> { - let mut buf_batch_writer = - BufBatchWriter::new(shuffle_block_writer, output_data, write_buffer_size); - for batch in partition_iter { - let batch = batch?; - buf_batch_writer.write(&batch, encode_time, write_time)?; - } - buf_batch_writer.flush(write_time)?; - Ok(()) - } - - fn used(&self) -> usize { - self.reservation.size() - } - - fn spilled_bytes(&self) -> usize { - self.metrics.spilled_bytes.value() - } - - fn spill_count(&self) -> usize { - self.metrics.spill_count.value() - } - - fn data_size(&self) -> usize { - self.metrics.data_size.value() - } - - /// This function transfers the ownership of the buffered batches and partition indices from the - /// ShuffleRepartitioner to a new PartitionedBatches struct. The returned PartitionedBatches struct - /// can be used to produce shuffled batches. - fn partitioned_batches(&mut self) -> PartitionedBatchesProducer { - let num_output_partitions = self.partition_indices.len(); - let buffered_batches = std::mem::take(&mut self.buffered_batches); - // let indices = std::mem::take(&mut self.partition_indices); - let indices = std::mem::replace( - &mut self.partition_indices, - vec![vec![]; num_output_partitions], - ); - PartitionedBatchesProducer::new(buffered_batches, indices, self.batch_size) - } - - fn spill(&mut self) -> Result<()> { - log::info!( - "ShuffleRepartitioner spilling shuffle data of {} to disk while inserting ({} time(s) so far)", - self.used(), - self.spill_count() - ); - - // we could always get a chance to free some memory as long as we are holding some - if self.buffered_batches.is_empty() { - return Ok(()); - } - - with_trace("shuffle_spill", self.tracing_enabled, || { - let num_output_partitions = self.partition_writers.len(); - let mut partitioned_batches = self.partitioned_batches(); - let mut spilled_bytes = 0; - - for partition_id in 0..num_output_partitions { - let partition_writer = &mut self.partition_writers[partition_id]; - let mut iter = partitioned_batches.produce(partition_id); - spilled_bytes += partition_writer.spill( - &mut iter, - &self.runtime, - &self.metrics, - self.write_buffer_size, - )?; - } - - self.reservation.free(); - self.metrics.spill_count.add(1); - self.metrics.spilled_bytes.add(spilled_bytes); - Ok(()) - }) - } -} - -#[async_trait::async_trait] -impl ShufflePartitioner for MultiPartitionShuffleRepartitioner { - /// Shuffles rows in input batch into corresponding partition buffer. - /// This function will slice input batch according to configured batch size and then - /// shuffle rows into corresponding partition buffer. - async fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> { - with_trace_async("shuffle_insert_batch", self.tracing_enabled, || async { - let start_time = Instant::now(); - let mut start = 0; - while start < batch.num_rows() { - let end = (start + self.batch_size).min(batch.num_rows()); - let batch = batch.slice(start, end - start); - self.partitioning_batch(batch).await?; - start = end; - } - self.metrics.input_batches.add(1); - self.metrics - .baseline - .elapsed_compute() - .add_duration(start_time.elapsed()); - Ok(()) - }) - .await - } - - /// Writes buffered shuffled record batches into Arrow IPC bytes. - fn shuffle_write(&mut self) -> Result<()> { - with_trace("shuffle_write", self.tracing_enabled, || { - let start_time = Instant::now(); - - let mut partitioned_batches = self.partitioned_batches(); - let num_output_partitions = self.partition_indices.len(); - let mut offsets = vec![0; num_output_partitions + 1]; - - let data_file = self.output_data_file.clone(); - let index_file = self.output_index_file.clone(); - - let output_data = OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(data_file) - .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?; - - let mut output_data = BufWriter::new(output_data); - - #[allow(clippy::needless_range_loop)] - for i in 0..num_output_partitions { - offsets[i] = output_data.stream_position()?; - - // if we wrote a spill file for this partition then copy the - // contents into the shuffle file - if let Some(spill_path) = self.partition_writers[i].path() { - let mut spill_file = BufReader::new(File::open(spill_path)?); - let mut write_timer = self.metrics.write_time.timer(); - std::io::copy(&mut spill_file, &mut output_data)?; - write_timer.stop(); - } - - // Write in memory batches to output data file - let mut partition_iter = partitioned_batches.produce(i); - Self::shuffle_write_partition( - &mut partition_iter, - &mut self.shuffle_block_writer, - &mut output_data, - &self.metrics.encode_time, - &self.metrics.write_time, - self.write_buffer_size, - )?; - } - - let mut write_timer = self.metrics.write_time.timer(); - output_data.flush()?; - write_timer.stop(); - - // add one extra offset at last to ease partition length computation - offsets[num_output_partitions] = output_data.stream_position()?; - - let mut write_timer = self.metrics.write_time.timer(); - let mut output_index = - BufWriter::new(File::create(index_file).map_err(|e| { - DataFusionError::Execution(format!("shuffle write error: {e:?}")) - })?); - for offset in offsets { - output_index.write_all(&(offset as i64).to_le_bytes()[..])?; - } - output_index.flush()?; - write_timer.stop(); - - self.metrics - .baseline - .elapsed_compute() - .add_duration(start_time.elapsed()); - - Ok(()) - }) - } -} - -impl Debug for MultiPartitionShuffleRepartitioner { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - f.debug_struct("ShuffleRepartitioner") - .field("memory_used", &self.used()) - .field("spilled_bytes", &self.spilled_bytes()) - .field("spilled_count", &self.spill_count()) - .field("data_size", &self.data_size()) - .finish() - } -} - -/// A partitioner that writes all shuffle data to a single file and a single index file -struct SinglePartitionShufflePartitioner { - // output_data_file: File, - output_data_writer: BufBatchWriter, - output_index_path: String, - /// Batches that are smaller than the batch size and to be concatenated - buffered_batches: Vec, - /// Number of rows in the concatenating batches - num_buffered_rows: usize, - /// Metrics for the repartitioner - metrics: ShufflePartitionerMetrics, - /// The configured batch size - batch_size: usize, -} - -impl SinglePartitionShufflePartitioner { - fn try_new( - output_data_path: String, - output_index_path: String, - schema: SchemaRef, - metrics: ShufflePartitionerMetrics, - batch_size: usize, - codec: CompressionCodec, - write_buffer_size: usize, - ) -> Result { - let shuffle_block_writer = ShuffleBlockWriter::try_new(schema.as_ref(), codec.clone())?; - - let output_data_file = OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(output_data_path)?; - - let output_data_writer = - BufBatchWriter::new(shuffle_block_writer, output_data_file, write_buffer_size); - - Ok(Self { - output_data_writer, - output_index_path, - buffered_batches: vec![], - num_buffered_rows: 0, - metrics, - batch_size, - }) - } - - /// Add a batch to the buffer of the partitioner, these buffered batches will be concatenated - /// and written to the output data file when the number of rows in the buffer reaches the batch size. - fn add_buffered_batch(&mut self, batch: RecordBatch) { - self.num_buffered_rows += batch.num_rows(); - self.buffered_batches.push(batch); - } - - /// Consumes buffered batches and return a concatenated batch if successful - fn concat_buffered_batches(&mut self) -> Result> { - if self.buffered_batches.is_empty() { - Ok(None) - } else if self.buffered_batches.len() == 1 { - let batch = self.buffered_batches.remove(0); - self.num_buffered_rows = 0; - Ok(Some(batch)) - } else { - let schema = &self.buffered_batches[0].schema(); - match arrow::compute::concat_batches(schema, self.buffered_batches.iter()) { - Ok(concatenated) => { - self.buffered_batches.clear(); - self.num_buffered_rows = 0; - Ok(Some(concatenated)) - } - Err(e) => Err(DataFusionError::ArrowError( - Box::from(e), - Some(DataFusionError::get_back_trace()), - )), - } - } - } -} - -#[async_trait::async_trait] -impl ShufflePartitioner for SinglePartitionShufflePartitioner { - async fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> { - let start_time = Instant::now(); - let num_rows = batch.num_rows(); - - if num_rows > 0 { - self.metrics.data_size.add(batch.get_array_memory_size()); - self.metrics.baseline.record_output(num_rows); - - if num_rows >= self.batch_size || num_rows + self.num_buffered_rows > self.batch_size { - let concatenated_batch = self.concat_buffered_batches()?; - - // Write the concatenated buffered batch - if let Some(batch) = concatenated_batch { - self.output_data_writer.write( - &batch, - &self.metrics.encode_time, - &self.metrics.write_time, - )?; - } - - if num_rows >= self.batch_size { - // Write the new batch - self.output_data_writer.write( - &batch, - &self.metrics.encode_time, - &self.metrics.write_time, - )?; - } else { - // Add the new batch to the buffer - self.add_buffered_batch(batch); - } - } else { - self.add_buffered_batch(batch); - } - } - - self.metrics.input_batches.add(1); - self.metrics - .baseline - .elapsed_compute() - .add_duration(start_time.elapsed()); - Ok(()) - } - - fn shuffle_write(&mut self) -> Result<()> { - let start_time = Instant::now(); - let concatenated_batch = self.concat_buffered_batches()?; - - // Write the concatenated buffered batch - if let Some(batch) = concatenated_batch { - self.output_data_writer.write( - &batch, - &self.metrics.encode_time, - &self.metrics.write_time, - )?; - } - self.output_data_writer.flush(&self.metrics.write_time)?; - - // Write index file. It should only contain 2 entries: 0 and the total number of bytes written - let index_file = OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(self.output_index_path.clone()) - .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?; - let mut index_buf_writer = BufWriter::new(index_file); - let data_file_length = self.output_data_writer.writer_stream_position()?; - for offset in [0, data_file_length] { - index_buf_writer.write_all(&(offset as i64).to_le_bytes()[..])?; - } - index_buf_writer.flush()?; - - self.metrics - .baseline - .elapsed_compute() - .add_duration(start_time.elapsed()); - Ok(()) - } -} - -/// A helper struct to produce shuffled batches. -/// This struct takes ownership of the buffered batches and partition indices from the -/// ShuffleRepartitioner, and provides an iterator over the batches in the specified partitions. -pub(crate) struct PartitionedBatchesProducer { - buffered_batches: Vec, - partition_indices: Vec>, - batch_size: usize, -} - -impl PartitionedBatchesProducer { - pub(crate) fn new( - buffered_batches: Vec, - indices: Vec>, - batch_size: usize, - ) -> Self { - Self { - partition_indices: indices, - buffered_batches, - batch_size, - } - } - - fn produce(&mut self, partition_id: usize) -> PartitionedBatchIterator<'_> { - PartitionedBatchIterator::new( - &self.partition_indices[partition_id], - &self.buffered_batches, - self.batch_size, - ) - } -} - -pub(crate) struct PartitionedBatchIterator<'a> { - record_batches: Vec<&'a RecordBatch>, - batch_size: usize, - indices: Vec<(usize, usize)>, - pos: usize, -} - -impl<'a> PartitionedBatchIterator<'a> { - fn new( - indices: &'a [(u32, u32)], - buffered_batches: &'a [RecordBatch], - batch_size: usize, - ) -> Self { - if indices.is_empty() { - // Avoid unnecessary allocations when the partition is empty - return Self { - record_batches: vec![], - batch_size, - indices: vec![], - pos: 0, - }; - } - let record_batches = buffered_batches.iter().collect::>(); - let current_indices = indices - .iter() - .map(|(i_batch, i_row)| (*i_batch as usize, *i_row as usize)) - .collect::>(); - Self { - record_batches, - batch_size, - indices: current_indices, - pos: 0, - } - } -} - -impl Iterator for PartitionedBatchIterator<'_> { - type Item = Result; - - fn next(&mut self) -> Option { - if self.pos >= self.indices.len() { - return None; - } - - let indices_end = std::cmp::min(self.pos + self.batch_size, self.indices.len()); - let indices = &self.indices[self.pos..indices_end]; - match interleave_record_batch(&self.record_batches, indices) { - Ok(batch) => { - self.pos = indices_end; - Some(Ok(batch)) - } - Err(e) => Some(Err(DataFusionError::ArrowError( - Box::from(e), - Some(DataFusionError::get_back_trace()), - ))), - } - } -} - -fn pmod(hash: u32, n: usize) -> usize { - let hash = hash as i32; - let n = n as i32; - let r = hash % n; - let result = if r < 0 { (r + n) % n } else { r }; - result as usize -} - #[cfg(test)] mod test { use super::*; - use crate::execution::shuffle::read_ipc_compressed; + use crate::execution::shuffle::{read_ipc_compressed, ShuffleBlockWriter}; + use arrow::array::{Array, StringArray, StringBuilder}; use arrow::datatypes::{DataType, Field, Schema}; + use arrow::record_batch::RecordBatch; use arrow::row::{RowConverter, SortField}; use datafusion::datasource::memory::MemorySourceConfig; use datafusion::datasource::source::DataSourceExec; use datafusion::execution::config::SessionConfig; - use datafusion::execution::runtime_env::RuntimeEnvBuilder; + use datafusion::execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder}; use datafusion::physical_expr::expressions::{col, Column}; use datafusion::physical_expr::{LexOrdering, PhysicalSortExpr}; use datafusion::physical_plan::common::collect; + use datafusion::physical_plan::metrics::Time; use datafusion::prelude::SessionContext; + use itertools::Itertools; use std::io::Cursor; use tokio::runtime::Runtime; @@ -1224,16 +367,22 @@ mod test { repartitioner.insert_batch(batch.clone()).await.unwrap(); - assert_eq!(2, repartitioner.partition_writers.len()); + { + let partition_writers = repartitioner.partition_writers(); + assert_eq!(partition_writers.len(), 2); - assert!(!repartitioner.partition_writers[0].has_spill_file()); - assert!(!repartitioner.partition_writers[1].has_spill_file()); + assert!(!partition_writers[0].has_spill_file()); + assert!(!partition_writers[1].has_spill_file()); + } repartitioner.spill().unwrap(); // after spill, there should be spill files - assert!(repartitioner.partition_writers[0].has_spill_file()); - assert!(repartitioner.partition_writers[1].has_spill_file()); + { + let partition_writers = repartitioner.partition_writers(); + assert!(partition_writers[0].has_spill_file()); + assert!(partition_writers[1].has_spill_file()); + } // insert another batch after spilling repartitioner.insert_batch(batch.clone()).await.unwrap(); @@ -1346,16 +495,6 @@ mod test { RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap() } - #[test] - fn test_pmod() { - let i: Vec = vec![0x99f0149d, 0x9c67b85d, 0xc8008529, 0xa05b5d7b, 0xcd1e64fb]; - let result = i.into_iter().map(|i| pmod(i, 200)).collect::>(); - - // expected partition from Spark with n=200 - let expected = vec![69, 5, 193, 171, 115]; - assert_eq!(result, expected); - } - #[test] #[cfg_attr(miri, ignore)] fn test_round_robin_deterministic() { diff --git a/native/core/src/execution/shuffle/writers/partition_writer.rs b/native/core/src/execution/shuffle/writers/partition_writer.rs index 8b2555e09c..8b5163d44d 100644 --- a/native/core/src/execution/shuffle/writers/partition_writer.rs +++ b/native/core/src/execution/shuffle/writers/partition_writer.rs @@ -16,7 +16,7 @@ // under the License. use crate::execution::shuffle::metrics::ShufflePartitionerMetrics; -use crate::execution::shuffle::shuffle_writer::PartitionedBatchIterator; +use crate::execution::shuffle::partitioners::PartitionedBatchIterator; use crate::execution::shuffle::writers::buf_batch_writer::BufBatchWriter; use crate::execution::shuffle::ShuffleBlockWriter; use datafusion::common::DataFusionError;