From c0f8d2d49bb83b745562447dcbd5d6f952dfcb04 Mon Sep 17 00:00:00 2001 From: Matt Cuento Date: Sat, 7 Feb 2026 11:15:07 -0800 Subject: [PATCH 1/2] feat: Use put_batch_with_indices to track partitions by index for sort-based shuffle lint first round of otpimizations second round lint lint --- .../execution_plans/sort_shuffle/buffer.rs | 514 +++++++++++++----- .../src/execution_plans/sort_shuffle/mod.rs | 2 +- .../execution_plans/sort_shuffle/writer.rs | 145 +++-- benchmarks/src/bin/shuffle_bench.rs | 26 + 4 files changed, 474 insertions(+), 213 deletions(-) diff --git a/ballista/core/src/execution_plans/sort_shuffle/buffer.rs b/ballista/core/src/execution_plans/sort_shuffle/buffer.rs index c2aa5e85fc..b797392ee1 100644 --- a/ballista/core/src/execution_plans/sort_shuffle/buffer.rs +++ b/ballista/core/src/execution_plans/sort_shuffle/buffer.rs @@ -15,157 +15,230 @@ // specific language governing permissions and limitations // under the License. -//! In-memory partition buffer for sort-based shuffle. +//! In-memory partition buffer and scratch space for sort-based shuffle. //! -//! Each output partition has a buffer that accumulates record batches -//! until the buffer is full or needs to be spilled to disk. +//! Provides: +//! - `ScratchSpace`: Reusable per-batch scratch buffer for computing partition +//! assignments using a prefix-sum algorithm (modeled on Apache DataFusion Comet). +//! - `InputBatchStore`: Centralized storage for input record batches. +//! - `materialize_partition`: Materializes partition data from indices using +//! `BatchCoalescer::push_batch_with_indices`. +use std::sync::Arc; + +use datafusion::arrow::array::UInt64Builder; +use datafusion::arrow::compute::BatchCoalescer; use datafusion::arrow::datatypes::SchemaRef; use datafusion::arrow::record_batch::RecordBatch; -use datafusion::physical_plan::coalesce::LimitedBatchCoalescer; +use datafusion::common::Result; +use datafusion::common::hash_utils::create_hashes; +use datafusion::physical_expr::PhysicalExpr; +use datafusion::physical_expr_common::utils::evaluate_expressions_to_arrays; +use datafusion::physical_plan::repartition::REPARTITION_RANDOM_STATE; -/// Buffer for accumulating record batches for a single output partition. +/// Reusable per-batch scratch buffer for computing partition assignments. /// -/// When the buffer exceeds its maximum size, it signals that it should be -/// spilled to disk. -#[derive(Debug)] -pub struct PartitionBuffer { - /// Partition ID this buffer is for - partition_id: usize, - /// Buffered record batches - batches: Vec, - /// Current memory usage in bytes - memory_used: usize, - /// Number of rows in the buffer - num_rows: usize, - /// Schema for this partition's data - schema: SchemaRef, +/// Uses a prefix-sum algorithm to efficiently map rows to partitions: +/// 1. Evaluate hash expressions and compute partition IDs +/// 2. Count rows per partition +/// 3. Compute cumulative offsets (prefix sum) +/// 4. Place row indices into contiguous slices per partition +/// +/// After `compute_partition_assignments`, use `partition_indices(id)` to get +/// the row indices for a specific partition. +pub struct ScratchSpace { + hash_buffer: Vec, + partition_ids: Vec, + partition_row_indices: Vec, + /// Length = num_partitions + 1. After computation, + /// partition k's rows are `partition_row_indices[starts[k]..starts[k+1]]`. + partition_starts: Vec, } -impl PartitionBuffer { - /// Creates a new partition buffer. - pub fn new(partition_id: usize, schema: SchemaRef) -> Self { +impl ScratchSpace { + /// Creates a new scratch space for the given number of partitions. + pub fn new(num_partitions: usize) -> Self { Self { - partition_id, - batches: Vec::new(), - memory_used: 0, - num_rows: 0, - schema, + hash_buffer: Vec::new(), + partition_ids: Vec::new(), + partition_row_indices: Vec::new(), + partition_starts: vec![0; num_partitions + 1], } } - /// Returns the partition ID for this buffer. - pub fn partition_id(&self) -> usize { - self.partition_id - } - - /// Returns the schema for this buffer's data. - pub fn schema(&self) -> &SchemaRef { - &self.schema - } - - /// Returns the current memory usage in bytes. - pub fn memory_used(&self) -> usize { - self.memory_used - } - - /// Returns the number of rows in the buffer. - pub fn num_rows(&self) -> usize { - self.num_rows - } - - /// Returns the number of batches in the buffer. - pub fn num_batches(&self) -> usize { - self.batches.len() - } - - /// Returns true if the buffer is empty. - pub fn is_empty(&self) -> bool { - self.batches.is_empty() - } - - /// Appends a record batch to the buffer. + /// Compute partition assignments for a batch using hash partitioning. /// - /// Returns the new total memory usage after appending. - pub fn append(&mut self, batch: RecordBatch) -> usize { - let batch_size = batch.get_array_memory_size(); - self.num_rows += batch.num_rows(); - self.memory_used += batch_size; - self.batches.push(batch); - self.memory_used - } + /// Uses the same hashing logic as DataFusion's `BatchPartitioner` to ensure + /// identical partition assignments. + pub fn compute_partition_assignments( + &mut self, + exprs: &[Arc], + batch: &RecordBatch, + num_partitions: usize, + ) -> Result<()> { + let num_rows = batch.num_rows(); + + let arrays = evaluate_expressions_to_arrays(exprs, batch)?; + + self.hash_buffer.resize(num_rows, 0); + create_hashes( + &arrays, + REPARTITION_RANDOM_STATE.random_state(), + &mut self.hash_buffer, + )?; + + self.partition_ids.resize(num_rows, 0); + for (i, hash) in self.hash_buffer.iter().enumerate() { + self.partition_ids[i] = (*hash % num_partitions as u64) as usize; + } - /// Drains all batches from the buffer, resetting it to empty. - /// - /// Returns the drained batches. - pub fn drain(&mut self) -> Vec { - self.memory_used = 0; - self.num_rows = 0; - std::mem::take(&mut self.batches) + self.map_partition_ids_to_starts_and_indices(num_partitions, num_rows); + Ok(()) } - /// Takes all batches from the buffer without resetting memory tracking. - /// - /// This is useful when the caller wants to handle the batches but the - /// buffer will be discarded anyway. - pub fn take_batches(&mut self) -> Vec { - std::mem::take(&mut self.batches) - } + /// Prefix-sum algorithm to organize row indices by partition. + fn map_partition_ids_to_starts_and_indices( + &mut self, + num_partitions: usize, + num_rows: usize, + ) { + self.partition_starts.truncate(0); + self.partition_starts.resize(num_partitions + 1, 0); + self.partition_row_indices.resize(num_rows, 0); + for &pid in &self.partition_ids { + self.partition_starts[pid] += 1; + } + + // Cumulative sum converts counts to end offsets + let mut sum = 0; + for start in self.partition_starts.iter_mut() { + sum += *start; + *start = sum; + } - /// Drains batches from the buffer, coalescing small batches into - /// larger ones up to `target_batch_size` rows each. - pub fn drain_coalesced(&mut self, target_batch_size: usize) -> Vec { - self.memory_used = 0; - self.num_rows = 0; - let batches = std::mem::take(&mut self.batches); - coalesce_batches(batches, &self.schema, target_batch_size) + // Reverse iteration converts end offsets to start offsets + for row_idx in (0..num_rows).rev() { + let pid = self.partition_ids[row_idx]; + self.partition_starts[pid] -= 1; + self.partition_row_indices[self.partition_starts[pid]] = row_idx as u32; + } } - /// Takes all batches, coalescing small batches into larger ones - /// up to `target_batch_size` rows each. - pub fn take_batches_coalesced( - &mut self, - target_batch_size: usize, - ) -> Vec { - let batches = std::mem::take(&mut self.batches); - coalesce_batches(batches, &self.schema, target_batch_size) + /// Returns the row indices assigned to `partition_id` after calling + /// `compute_partition_assignments`. + pub fn partition_indices(&self, partition_id: usize) -> &[u32] { + let start = self.partition_starts[partition_id]; + let end = self.partition_starts[partition_id + 1]; + &self.partition_row_indices[start..end] } } -/// Coalesces small batches into larger ones up to `target_batch_size` -/// rows each using DataFusion's `LimitedBatchCoalescer`. -fn coalesce_batches( - batches: Vec, - schema: &SchemaRef, +/// Materializes a partition's data from `(batch_idx, row_idx)` pairs into +/// coalesced `RecordBatch`es using `BatchCoalescer::push_batch_with_indices`. +/// +/// Uses `scratch_builder` as a reusable builder to avoid allocations. +pub fn materialize_partition( + partition_indices: &[(u32, u32)], + input_batches: &InputBatchStore, target_batch_size: usize, -) -> Vec { - if batches.len() <= 1 { - return batches; + scratch_builder: &mut UInt64Builder, +) -> Result> { + if partition_indices.is_empty() { + return Ok(Vec::new()); } let mut coalescer = - LimitedBatchCoalescer::new(schema.clone(), target_batch_size, None); + BatchCoalescer::new(input_batches.schema().clone(), target_batch_size); let mut result = Vec::new(); - for batch in batches { - if batch.num_rows() == 0 { - continue; + let mut start = 0; + while start < partition_indices.len() { + let current_batch_idx = partition_indices[start].0; + let mut end = start + 1; + while end < partition_indices.len() + && partition_indices[end].0 == current_batch_idx + { + end += 1; + } + + let batch = input_batches.get_batch(current_batch_idx); + + for (_, r) in &partition_indices[start..end] { + scratch_builder.append_value(*r as u64); } - // push_batch can only fail on schema mismatch, which won't - // happen here since all batches share the same schema - let _ = coalescer.push_batch(batch); + let idx_array = scratch_builder.finish(); + + coalescer.push_batch_with_indices(batch.clone(), &idx_array)?; while let Some(completed) = coalescer.next_completed_batch() { result.push(completed); } + + start = end; } - // Flush remaining buffered rows - let _ = coalescer.finish(); + coalescer.finish_buffered_batch()?; while let Some(completed) = coalescer.next_completed_batch() { result.push(completed); } + Ok(result) +} - result +/// Stores all input batches across all partitions +pub struct InputBatchStore { + /// Vector of all incoming record batches + batches: Vec, + /// Schema of all batches + schema: SchemaRef, + /// Total memory of all record batches + total_memory: usize, +} + +impl InputBatchStore { + /// Creates new instance of InputBatchStore + pub fn new(schema: SchemaRef) -> Self { + Self { + batches: Vec::new(), + schema, + total_memory: 0, + } + } + + /// Appends RecordBatch to the store, returning the batch index. + pub fn push_batch(&mut self, batch: RecordBatch) -> usize { + let batch_idx = self.batches.len(); + self.total_memory += batch.get_array_memory_size(); + self.batches.push(batch); + batch_idx + } + + /// Return the batch at the given index + /// + /// # Panics + /// Can panic if idx >= batches.len() + pub fn get_batch(&self, idx: u32) -> &RecordBatch { + &self.batches[idx as usize] + } + + /// Returns total memory of all record batches + pub fn total_memory(&self) -> usize { + self.total_memory + } + + /// Drains record batches and memory stats. The same SchemaRef remains. + pub fn clear(&mut self) { + self.batches.clear(); + self.total_memory = 0; + } + + /// Returns the schema of the stored batches. + pub fn schema(&self) -> &SchemaRef { + &self.schema + } + + /// Returns a reference to all stored batches. + pub fn batches(&self) -> &[RecordBatch] { + &self.batches + } } #[cfg(test)] @@ -173,6 +246,7 @@ mod tests { use super::*; use datafusion::arrow::array::Int32Array; use datafusion::arrow::datatypes::{DataType, Field, Schema}; + use datafusion::physical_plan::expressions::Column; use std::sync::Arc; fn create_test_schema() -> SchemaRef { @@ -185,47 +259,209 @@ mod tests { } #[test] - fn test_new_buffer() { + fn test_scratch_space_partition_assignments() { let schema = create_test_schema(); - let buffer = PartitionBuffer::new(0, schema); + let batch = create_test_batch(&schema, vec![1, 2, 3, 4, 5, 6]); + let exprs: Vec> = vec![Arc::new(Column::new("a", 0))]; + + let mut scratch = ScratchSpace::new(3); + scratch + .compute_partition_assignments(&exprs, &batch, 3) + .unwrap(); + + // Every row should be assigned to exactly one partition + let mut total_rows = 0; + for pid in 0..3 { + total_rows += scratch.partition_indices(pid).len(); + } + assert_eq!(total_rows, 6); - assert_eq!(buffer.partition_id(), 0); - assert!(buffer.is_empty()); - assert_eq!(buffer.memory_used(), 0); - assert_eq!(buffer.num_rows(), 0); - assert_eq!(buffer.num_batches(), 0); + // Row indices should be within range + for pid in 0..3 { + for &row_idx in scratch.partition_indices(pid) { + assert!(row_idx < 6); + } + } } #[test] - fn test_append() { + fn test_scratch_space_reuse() { let schema = create_test_schema(); - let mut buffer = PartitionBuffer::new(0, schema.clone()); + let exprs: Vec> = vec![Arc::new(Column::new("a", 0))]; + let mut scratch = ScratchSpace::new(2); + + // First batch + let batch1 = create_test_batch(&schema, vec![1, 2, 3]); + scratch + .compute_partition_assignments(&exprs, &batch1, 2) + .unwrap(); + let total1: usize = (0..2).map(|p| scratch.partition_indices(p).len()).sum(); + assert_eq!(total1, 3); + + // Second batch (reuses scratch space) + let batch2 = create_test_batch(&schema, vec![10, 20]); + scratch + .compute_partition_assignments(&exprs, &batch2, 2) + .unwrap(); + let total2: usize = (0..2).map(|p| scratch.partition_indices(p).len()).sum(); + assert_eq!(total2, 2); + } - let batch = create_test_batch(&schema, vec![1, 2, 3]); - buffer.append(batch); + #[test] + fn test_materialize_partition_empty() { + let schema = create_test_schema(); + let store = InputBatchStore::new(schema); + let mut scratch = UInt64Builder::new(); + let result = materialize_partition(&[], &store, 8192, &mut scratch).unwrap(); + assert!(result.is_empty()); + } - assert!(!buffer.is_empty()); - assert!(buffer.memory_used() > 0); - assert_eq!(buffer.num_rows(), 3); - assert_eq!(buffer.num_batches(), 1); + #[test] + fn test_materialize_partition_single_batch() { + let schema = create_test_schema(); + let mut store = InputBatchStore::new(schema.clone()); + store.push_batch(create_test_batch(&schema, vec![10, 20, 30, 40, 50])); + + // Select rows 0, 2, 4 from batch 0 + let indices = vec![(0u32, 0u32), (0, 2), (0, 4)]; + let mut scratch = UInt64Builder::new(); + let result = materialize_partition(&indices, &store, 8192, &mut scratch).unwrap(); + + let total_rows: usize = result.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 3); + + // Verify values + let col = result[0] + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(col.value(0), 10); + assert_eq!(col.value(1), 30); + assert_eq!(col.value(2), 50); + } + + #[test] + fn test_materialize_partition_multiple_batches() { + let schema = create_test_schema(); + let mut store = InputBatchStore::new(schema.clone()); + store.push_batch(create_test_batch(&schema, vec![10, 20, 30])); + store.push_batch(create_test_batch(&schema, vec![40, 50, 60])); + + // Select row 1 from batch 0, rows 0 and 2 from batch 1 + let indices = vec![(0u32, 1u32), (1, 0), (1, 2)]; + let mut scratch = UInt64Builder::new(); + let result = materialize_partition(&indices, &store, 8192, &mut scratch).unwrap(); + + let total_rows: usize = result.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 3); } #[test] - fn test_drain() { + fn test_materialize_partition_respects_batch_size() { let schema = create_test_schema(); - let mut buffer = PartitionBuffer::new(0, schema.clone()); + let mut store = InputBatchStore::new(schema.clone()); + + // Create a large input batch with 20,000 rows + let large_batch = create_test_batch(&schema, (0..20000).collect()); + store.push_batch(large_batch); + + // Select all rows from this partition + let indices: Vec<(u32, u32)> = (0..20000).map(|i| (0u32, i as u32)).collect(); + + // Use target_batch_size of 8192 + let mut scratch = UInt64Builder::new(); + let result = materialize_partition(&indices, &store, 8192, &mut scratch).unwrap(); + + // Should produce multiple batches + assert!(result.len() >= 2, "Expected multiple output batches"); + + // Verify total rows + let total_rows: usize = result.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 20000); + + // Most batches should be close to target_batch_size (8192) + // Last batch may be smaller + for (i, batch) in result.iter().enumerate() { + if i < result.len() - 1 { + // All but last batch should be close to target size + assert!( + batch.num_rows() >= 7000 && batch.num_rows() <= 9000, + "Batch {} has {} rows, expected ~8192", + i, + batch.num_rows() + ); + } + } + } - buffer.append(create_test_batch(&schema, vec![1, 2, 3])); - buffer.append(create_test_batch(&schema, vec![4, 5])); + #[test] + fn test_input_batch_store_memory_tracking() { + let schema = create_test_schema(); + let mut store = InputBatchStore::new(schema.clone()); - assert_eq!(buffer.num_batches(), 2); - assert_eq!(buffer.num_rows(), 5); + assert_eq!(store.total_memory(), 0); - let batches = buffer.drain(); + let batch1 = create_test_batch(&schema, vec![1, 2, 3]); + let memory1 = batch1.get_array_memory_size(); + store.push_batch(batch1); + assert_eq!(store.total_memory(), memory1); - assert_eq!(batches.len(), 2); - assert!(buffer.is_empty()); - assert_eq!(buffer.memory_used(), 0); - assert_eq!(buffer.num_rows(), 0); + let batch2 = create_test_batch(&schema, vec![4, 5, 6, 7]); + let memory2 = batch2.get_array_memory_size(); + store.push_batch(batch2); + assert_eq!(store.total_memory(), memory1 + memory2); + + store.clear(); + assert_eq!(store.total_memory(), 0); + assert_eq!(store.batches().len(), 0); + } + + #[test] + fn test_materialize_partition_coalesces_small_batches() { + let schema = create_test_schema(); + let mut store = InputBatchStore::new(schema.clone()); + + // Create many small input batches (100 batches with 100 rows each) + for i in 0..100 { + let start = i * 100; + let batch = create_test_batch(&schema, (start..start + 100).collect()); + store.push_batch(batch); + } + + // Select all rows from all batches + let mut indices = Vec::new(); + for batch_idx in 0..100u32 { + for row_idx in 0..100u32 { + indices.push((batch_idx, row_idx)); + } + } + + // Use target_batch_size of 8192 + let mut scratch = UInt64Builder::new(); + let result = materialize_partition(&indices, &store, 8192, &mut scratch).unwrap(); + + // Should coalesce into far fewer output batches than input batches + assert!( + result.len() < 10, + "Expected coalescing: got {} output batches from 100 input batches", + result.len() + ); + + // Verify total rows + let total_rows: usize = result.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 10000); + + // Most batches should be well-sized + for (i, batch) in result.iter().enumerate() { + if i < result.len() - 1 { + assert!( + batch.num_rows() >= 7000, + "Batch {} should be well-sized, got {} rows", + i, + batch.num_rows() + ); + } + } } } diff --git a/ballista/core/src/execution_plans/sort_shuffle/mod.rs b/ballista/core/src/execution_plans/sort_shuffle/mod.rs index fa5634b6ce..3924b95f6c 100644 --- a/ballista/core/src/execution_plans/sort_shuffle/mod.rs +++ b/ballista/core/src/execution_plans/sort_shuffle/mod.rs @@ -36,7 +36,7 @@ mod reader; mod spill; mod writer; -pub use buffer::PartitionBuffer; +pub use buffer::{InputBatchStore, ScratchSpace}; pub use config::SortShuffleConfig; pub use index::ShuffleIndex; pub use reader::{ diff --git a/ballista/core/src/execution_plans/sort_shuffle/writer.rs b/ballista/core/src/execution_plans/sort_shuffle/writer.rs index 91fb34742b..f92dcce760 100644 --- a/ballista/core/src/execution_plans/sort_shuffle/writer.rs +++ b/ballista/core/src/execution_plans/sort_shuffle/writer.rs @@ -30,7 +30,7 @@ use std::sync::Arc; use std::time::Instant; use super::super::shuffle_writer_trait::ShuffleWriter; -use super::buffer::PartitionBuffer; +use super::buffer::{InputBatchStore, ScratchSpace, materialize_partition}; use super::config::SortShuffleConfig; use super::index::ShuffleIndex; use super::spill::SpillManager; @@ -49,7 +49,6 @@ use datafusion::physical_plan::memory::MemoryStream; use datafusion::physical_plan::metrics::{ self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, }; -use datafusion::physical_plan::repartition::BatchPartitioner; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, @@ -211,12 +210,15 @@ impl SortShuffleWriterExec { )); }; - // Create partition buffers - let mut buffers: Vec = (0..num_output_partitions) - .map(|i| PartitionBuffer::new(i, schema.clone())) + // Pre-allocate capacity to reduce reallocations + let rows_per_partition_estimate = + (config.batch_size / num_output_partitions).max(64); + let mut partition_indices: Vec> = (0..num_output_partitions) + .map(|_| Vec::with_capacity(rows_per_partition_estimate)) .collect(); + let mut input_store = InputBatchStore::new(schema.clone()); + let mut scratch = ScratchSpace::new(num_output_partitions); - // Create spill manager let mut spill_manager = SpillManager::new( &work_dir, &job_id, @@ -226,62 +228,64 @@ impl SortShuffleWriterExec { ) .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?; - // Create batch partitioner - let mut partitioner = BatchPartitioner::new_hash_partitioner( - exprs, - num_output_partitions, - metrics.repart_time.clone(), - ); - - // Process input stream while let Some(result) = stream.next().await { let input_batch = result?; metrics.input_rows.add(input_batch.num_rows()); - // Partition the batch - partitioner.partition( - input_batch, - |output_partition, output_batch| { - buffers[output_partition].append(output_batch); - Ok(()) - }, + let timer = metrics.repart_time.timer(); + scratch.compute_partition_assignments( + &exprs, + &input_batch, + num_output_partitions, )?; + timer.done(); + + let batch_idx = input_store.push_batch(input_batch) as u32; + for (partition_id, indices) in partition_indices.iter_mut().enumerate() { + let row_indices = scratch.partition_indices(partition_id); + if !row_indices.is_empty() { + for &row_idx in row_indices { + indices.push((batch_idx, row_idx)); + } + } + } - // Check if we need to spill - let total_memory: usize = buffers.iter().map(|b| b.memory_used()).sum(); + let index_memory: usize = partition_indices + .iter() + .map(|v| v.len() * std::mem::size_of::<(u32, u32)>()) + .sum(); + let total_memory = input_store.total_memory() + index_memory; if total_memory > config.spill_memory_threshold() { let timer = metrics.spill_time.timer(); - spill_largest_buffers( - &mut buffers, + spill_all_partitions( + &mut partition_indices, + &mut input_store, &mut spill_manager, &schema, - config.spill_memory_threshold() / 2, config.batch_size, )?; timer.done(); } } - // Finish spill writers before reading them back spill_manager .finish_writers() .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?; - // Finalize: write consolidated output file let timer = metrics.write_time.timer(); let (data_path, index_path, partition_stats) = finalize_output( &work_dir, &job_id, stage_id, input_partition, - &mut buffers, + &partition_indices, + &input_store, &mut spill_manager, &schema, &config, )?; timer.done(); - // Update metrics metrics.spill_count.add(spill_manager.total_spills()); metrics .spill_bytes @@ -290,7 +294,6 @@ impl SortShuffleWriterExec { let total_rows: u64 = partition_stats.iter().map(|(_, _, r, _)| *r).sum(); metrics.output_rows.add(total_rows as usize); - // Cleanup spill files spill_manager .cleanup() .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?; @@ -306,7 +309,6 @@ impl SortShuffleWriterExec { spill_manager.total_bytes_spilled() ); - // Build result - one entry per output partition that has data let mut results = Vec::new(); for (part_id, num_batches, num_rows, num_bytes) in partition_stats { if num_rows > 0 { @@ -325,38 +327,37 @@ impl SortShuffleWriterExec { } } -/// Spills the largest buffers until total memory is below the target. -fn spill_largest_buffers( - buffers: &mut [PartitionBuffer], +/// Spills all partitions to disk and clears the shared InputBatchStore. +/// +/// Since all partition indices reference the same InputBatchStore, we must +/// spill all partitions at once before clearing the store. +fn spill_all_partitions( + partition_indices: &mut [Vec<(u32, u32)>], + input_store: &mut InputBatchStore, spill_manager: &mut SpillManager, schema: &SchemaRef, - target_memory: usize, batch_size: usize, ) -> Result<()> { - loop { - let total_memory: usize = buffers.iter().map(|b| b.memory_used()).sum(); - if total_memory <= target_memory { - break; - } + let mut scratch_builder = UInt64Builder::new(); - // Find the largest buffer - let largest_idx = buffers - .iter() - .enumerate() - .max_by_key(|(_, b)| b.memory_used()) - .map(|(i, _)| i); - - match largest_idx { - Some(idx) if buffers[idx].memory_used() > 0 => { - let partition_id = buffers[idx].partition_id(); - let batches = buffers[idx].drain_coalesced(batch_size); - spill_manager - .spill(partition_id, batches, schema) - .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?; - } - _ => break, // No more buffers to spill + for (partition_id, indices) in partition_indices.iter_mut().enumerate() { + if indices.is_empty() { + continue; } + let batches = materialize_partition( + indices, + input_store, + batch_size, + &mut scratch_builder, + )?; + if !batches.is_empty() { + spill_manager + .spill(partition_id, batches, schema) + .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?; + } + indices.clear(); } + input_store.clear(); Ok(()) } @@ -370,12 +371,13 @@ fn finalize_output( job_id: &str, stage_id: usize, input_partition: usize, - buffers: &mut [PartitionBuffer], + partition_indices: &[Vec<(u32, u32)>], + input_store: &InputBatchStore, spill_manager: &mut SpillManager, schema: &SchemaRef, config: &SortShuffleConfig, ) -> Result { - let num_partitions = buffers.len(); + let num_partitions = partition_indices.len(); let mut index = ShuffleIndex::new(num_partitions); let mut partition_stats = Vec::with_capacity(num_partitions); @@ -391,7 +393,6 @@ fn finalize_output( debug!("Writing consolidated shuffle output to {:?}", data_path); - // Use FileWriter for random access support via FileReader let file = File::create(&data_path)?; let mut buffered = BufWriter::new(file); @@ -399,20 +400,18 @@ fn finalize_output( IpcWriteOptions::default().try_with_compression(Some(config.compression))?; let mut writer = FileWriter::try_new_with_options(&mut buffered, schema, options)?; - // Track cumulative batch counts - index stores the starting batch index for each partition - // FileReader supports random access to batches by index + // Index stores the starting batch index for each partition let mut cumulative_batch_count: i64 = 0; - // Write partitions in order - for (partition_id, buffer) in buffers.iter_mut().enumerate() { - // Set the starting batch index for this partition + let mut scratch_builder = UInt64Builder::new(); + + for (partition_id, indices) in partition_indices.iter().enumerate() { index.set_offset(partition_id, cumulative_batch_count); let mut partition_rows: u64 = 0; let mut partition_batches: u64 = 0; let mut partition_bytes: u64 = 0; - // First, stream any spill file for this partition if let Some(reader) = spill_manager .open_spill_reader(partition_id) .map_err(|e| DataFusionError::Execution(format!("{e:?}")))? @@ -426,9 +425,13 @@ fn finalize_output( } } - // Then write remaining buffered data (coalesced) - let buffered_batches = buffer.take_batches_coalesced(config.batch_size); - for batch in buffered_batches { + let materialized_batches = materialize_partition( + indices, + input_store, + config.batch_size, + &mut scratch_builder, + )?; + for batch in materialized_batches { partition_rows += batch.num_rows() as u64; partition_bytes += batch.get_array_memory_size() as u64; partition_batches += 1; @@ -445,13 +448,10 @@ fn finalize_output( cumulative_batch_count += partition_batches as i64; } - // Finish writing (this writes the IPC footer for random access) writer.finish()?; - // Store total batch count index.set_total_length(cumulative_batch_count); - // Write index file index .write_to_file(&index_path) .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?; @@ -573,7 +573,6 @@ impl ExecutionPlan for SortShuffleWriterExec { } let stats = Arc::new(stats_builder.finish()); - // Build result batch containing metadata let batch = RecordBatch::try_new( schema_captured.clone(), vec![partition_num, path, stats], diff --git a/benchmarks/src/bin/shuffle_bench.rs b/benchmarks/src/bin/shuffle_bench.rs index 5ced6ee493..384e10dae4 100644 --- a/benchmarks/src/bin/shuffle_bench.rs +++ b/benchmarks/src/bin/shuffle_bench.rs @@ -333,6 +333,16 @@ async fn main() -> Result<(), Box> { let mut hash_file_count = 0; let mut hash_total_size = 0u64; + // Warmup iteration (discard result to avoid JIT/allocator initialization overhead) + { + let temp_dir = TempDir::new()?; + let work_dir = temp_dir.path().to_str().unwrap(); + let _ = + benchmark_hash_shuffle(&data, schema.clone(), opt.partitions, work_dir) + .await?; + println!(" Warmup iteration completed (not timed)"); + } + for i in 0..opt.iterations { let temp_dir = TempDir::new()?; let work_dir = temp_dir.path().to_str().unwrap(); @@ -380,6 +390,22 @@ async fn main() -> Result<(), Box> { let mut sort_file_count = 0; let mut sort_total_size = 0u64; + // Warmup iteration (discard result to avoid JIT/allocator initialization overhead) + { + let temp_dir = TempDir::new()?; + let work_dir = temp_dir.path().to_str().unwrap(); + let _ = benchmark_sort_shuffle( + &data, + schema.clone(), + opt.partitions, + work_dir, + buffer_size, + memory_limit, + ) + .await?; + println!(" Warmup iteration completed (not timed)"); + } + for i in 0..opt.iterations { let temp_dir = TempDir::new()?; let work_dir = temp_dir.path().to_str().unwrap(); From cacb7219698626ebd3c53ec1b2d5e3994fe7a4bc Mon Sep 17 00:00:00 2001 From: Matt Cuento Date: Thu, 12 Feb 2026 21:22:03 -0800 Subject: [PATCH 2/2] interleave_record_batch --- .../execution_plans/sort_shuffle/buffer.rs | 70 +++++++++++-------- 1 file changed, 39 insertions(+), 31 deletions(-) diff --git a/ballista/core/src/execution_plans/sort_shuffle/buffer.rs b/ballista/core/src/execution_plans/sort_shuffle/buffer.rs index b797392ee1..bbb9bb962d 100644 --- a/ballista/core/src/execution_plans/sort_shuffle/buffer.rs +++ b/ballista/core/src/execution_plans/sort_shuffle/buffer.rs @@ -22,12 +22,12 @@ //! assignments using a prefix-sum algorithm (modeled on Apache DataFusion Comet). //! - `InputBatchStore`: Centralized storage for input record batches. //! - `materialize_partition`: Materializes partition data from indices using -//! `BatchCoalescer::push_batch_with_indices`. +//! `interleave_record_batch` for efficient row selection. use std::sync::Arc; use datafusion::arrow::array::UInt64Builder; -use datafusion::arrow::compute::BatchCoalescer; +use datafusion::arrow::compute::interleave_record_batch; use datafusion::arrow::datatypes::SchemaRef; use datafusion::arrow::record_batch::RecordBatch; use datafusion::common::Result; @@ -134,52 +134,60 @@ impl ScratchSpace { } /// Materializes a partition's data from `(batch_idx, row_idx)` pairs into -/// coalesced `RecordBatch`es using `BatchCoalescer::push_batch_with_indices`. +/// coalesced `RecordBatch`es using `interleave_record_batch`. /// -/// Uses `scratch_builder` as a reusable builder to avoid allocations. +/// Uses `scratch_builder` as a reusable builder (kept for API compatibility). pub fn materialize_partition( partition_indices: &[(u32, u32)], input_batches: &InputBatchStore, target_batch_size: usize, - scratch_builder: &mut UInt64Builder, + _scratch_builder: &mut UInt64Builder, ) -> Result> { if partition_indices.is_empty() { return Ok(Vec::new()); } - let mut coalescer = - BatchCoalescer::new(input_batches.schema().clone(), target_batch_size); - let mut result = Vec::new(); - - let mut start = 0; - while start < partition_indices.len() { - let current_batch_idx = partition_indices[start].0; - let mut end = start + 1; - while end < partition_indices.len() - && partition_indices[end].0 == current_batch_idx - { - end += 1; - } + // Convert (u32, u32) to (usize, usize) for interleave API + let interleave_indices: Vec<(usize, usize)> = partition_indices + .iter() + .map(|&(batch_idx, row_idx)| (batch_idx as usize, row_idx as usize)) + .collect(); - let batch = input_batches.get_batch(current_batch_idx); + // Get references to all batches + let batches = input_batches.batches(); + let batch_refs: Vec<&RecordBatch> = batches.iter().collect(); - for (_, r) in &partition_indices[start..end] { - scratch_builder.append_value(*r as u64); - } - let idx_array = scratch_builder.finish(); + // Single interleave operation + let interleaved = interleave_record_batch(&batch_refs, &interleave_indices)?; - coalescer.push_batch_with_indices(batch.clone(), &idx_array)?; - while let Some(completed) = coalescer.next_completed_batch() { - result.push(completed); - } + // Split into target-sized batches using zero-copy slicing + split_into_batches(interleaved, target_batch_size) +} + +/// Splits a large RecordBatch into smaller batches of target size using zero-copy slicing. +/// +/// Returns a vector of RecordBatch, where all batches except possibly the last +/// have approximately `target_batch_size` rows. +fn split_into_batches( + batch: RecordBatch, + target_batch_size: usize, +) -> Result> { + let total_rows = batch.num_rows(); - start = end; + if total_rows <= target_batch_size { + return Ok(vec![batch]); } - coalescer.finish_buffered_batch()?; - while let Some(completed) = coalescer.next_completed_batch() { - result.push(completed); + let num_batches = total_rows.div_ceil(target_batch_size); + let mut result = Vec::with_capacity(num_batches); + + let mut offset = 0; + while offset < total_rows { + let length = (total_rows - offset).min(target_batch_size); + result.push(batch.slice(offset, length)); + offset += length; } + Ok(result) }