Skip to content

Commit cacb721

Browse files
committed
interleave_record_batch
1 parent c0f8d2d commit cacb721

1 file changed

Lines changed: 39 additions & 31 deletions

File tree

  • ballista/core/src/execution_plans/sort_shuffle

ballista/core/src/execution_plans/sort_shuffle/buffer.rs

Lines changed: 39 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,12 @@
2222
//! assignments using a prefix-sum algorithm (modeled on Apache DataFusion Comet).
2323
//! - `InputBatchStore`: Centralized storage for input record batches.
2424
//! - `materialize_partition`: Materializes partition data from indices using
25-
//! `BatchCoalescer::push_batch_with_indices`.
25+
//! `interleave_record_batch` for efficient row selection.
2626
2727
use std::sync::Arc;
2828

2929
use datafusion::arrow::array::UInt64Builder;
30-
use datafusion::arrow::compute::BatchCoalescer;
30+
use datafusion::arrow::compute::interleave_record_batch;
3131
use datafusion::arrow::datatypes::SchemaRef;
3232
use datafusion::arrow::record_batch::RecordBatch;
3333
use datafusion::common::Result;
@@ -134,52 +134,60 @@ impl ScratchSpace {
134134
}
135135

136136
/// Materializes a partition's data from `(batch_idx, row_idx)` pairs into
137-
/// coalesced `RecordBatch`es using `BatchCoalescer::push_batch_with_indices`.
137+
/// coalesced `RecordBatch`es using `interleave_record_batch`.
138138
///
139-
/// Uses `scratch_builder` as a reusable builder to avoid allocations.
139+
/// Uses `scratch_builder` as a reusable builder (kept for API compatibility).
140140
pub fn materialize_partition(
141141
partition_indices: &[(u32, u32)],
142142
input_batches: &InputBatchStore,
143143
target_batch_size: usize,
144-
scratch_builder: &mut UInt64Builder,
144+
_scratch_builder: &mut UInt64Builder,
145145
) -> Result<Vec<RecordBatch>> {
146146
if partition_indices.is_empty() {
147147
return Ok(Vec::new());
148148
}
149149

150-
let mut coalescer =
151-
BatchCoalescer::new(input_batches.schema().clone(), target_batch_size);
152-
let mut result = Vec::new();
153-
154-
let mut start = 0;
155-
while start < partition_indices.len() {
156-
let current_batch_idx = partition_indices[start].0;
157-
let mut end = start + 1;
158-
while end < partition_indices.len()
159-
&& partition_indices[end].0 == current_batch_idx
160-
{
161-
end += 1;
162-
}
150+
// Convert (u32, u32) to (usize, usize) for interleave API
151+
let interleave_indices: Vec<(usize, usize)> = partition_indices
152+
.iter()
153+
.map(|&(batch_idx, row_idx)| (batch_idx as usize, row_idx as usize))
154+
.collect();
163155

164-
let batch = input_batches.get_batch(current_batch_idx);
156+
// Get references to all batches
157+
let batches = input_batches.batches();
158+
let batch_refs: Vec<&RecordBatch> = batches.iter().collect();
165159

166-
for (_, r) in &partition_indices[start..end] {
167-
scratch_builder.append_value(*r as u64);
168-
}
169-
let idx_array = scratch_builder.finish();
160+
// Single interleave operation
161+
let interleaved = interleave_record_batch(&batch_refs, &interleave_indices)?;
170162

171-
coalescer.push_batch_with_indices(batch.clone(), &idx_array)?;
172-
while let Some(completed) = coalescer.next_completed_batch() {
173-
result.push(completed);
174-
}
163+
// Split into target-sized batches using zero-copy slicing
164+
split_into_batches(interleaved, target_batch_size)
165+
}
166+
167+
/// Splits a large RecordBatch into smaller batches of target size using zero-copy slicing.
168+
///
169+
/// Returns a vector of RecordBatch, where all batches except possibly the last
170+
/// have approximately `target_batch_size` rows.
171+
fn split_into_batches(
172+
batch: RecordBatch,
173+
target_batch_size: usize,
174+
) -> Result<Vec<RecordBatch>> {
175+
let total_rows = batch.num_rows();
175176

176-
start = end;
177+
if total_rows <= target_batch_size {
178+
return Ok(vec![batch]);
177179
}
178180

179-
coalescer.finish_buffered_batch()?;
180-
while let Some(completed) = coalescer.next_completed_batch() {
181-
result.push(completed);
181+
let num_batches = total_rows.div_ceil(target_batch_size);
182+
let mut result = Vec::with_capacity(num_batches);
183+
184+
let mut offset = 0;
185+
while offset < total_rows {
186+
let length = (total_rows - offset).min(target_batch_size);
187+
result.push(batch.slice(offset, length));
188+
offset += length;
182189
}
190+
183191
Ok(result)
184192
}
185193

0 commit comments

Comments
 (0)