Skip to content

Commit 3aefbd6

Browse files
committed
second round
1 parent de0b455 commit 3aefbd6

3 files changed

Lines changed: 43 additions & 22 deletions

File tree

ballista/core/src/execution_plans/sort_shuffle/buffer.rs

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
2727
use std::sync::Arc;
2828

29-
use datafusion::arrow::array::UInt64Array;
29+
use datafusion::arrow::array::UInt64Builder;
3030
use datafusion::arrow::compute::BatchCoalescer;
3131
use datafusion::arrow::datatypes::SchemaRef;
3232
use datafusion::arrow::record_batch::RecordBatch;
@@ -142,12 +142,12 @@ impl ScratchSpace {
142142
/// Materializes a partition's data from `(batch_idx, row_idx)` pairs into
143143
/// coalesced `RecordBatch`es using `BatchCoalescer::push_batch_with_indices`.
144144
///
145-
/// Uses `scratch_indices` as a reusable buffer to avoid allocations.
145+
/// Uses `scratch_builder` as a reusable builder to avoid allocations.
146146
pub fn materialize_partition(
147147
partition_indices: &[(u32, u32)],
148148
input_batches: &InputBatchStore,
149149
target_batch_size: usize,
150-
scratch_indices: &mut Vec<u64>,
150+
scratch_builder: &mut UInt64Builder,
151151
) -> Result<Vec<RecordBatch>> {
152152
if partition_indices.is_empty() {
153153
return Ok(Vec::new());
@@ -170,14 +170,11 @@ pub fn materialize_partition(
170170

171171
let batch = input_batches.get_batch(current_batch_idx);
172172

173-
// Reuse scratch buffer to reduce allocations
174-
scratch_indices.clear();
175-
scratch_indices.extend(
176-
partition_indices[start..end]
177-
.iter()
178-
.map(|(_, r)| *r as u64),
179-
);
180-
let idx_array = UInt64Array::from(scratch_indices.clone());
173+
// Use builder pattern to avoid Vec allocation and clone
174+
for (_, r) in &partition_indices[start..end] {
175+
scratch_builder.append_value(*r as u64);
176+
}
177+
let idx_array = scratch_builder.finish();
181178

182179
coalescer.push_batch_with_indices(batch.clone(), &idx_array)?;
183180
while let Some(completed) = coalescer.next_completed_batch() {
@@ -322,7 +319,7 @@ mod tests {
322319
fn test_materialize_partition_empty() {
323320
let schema = create_test_schema();
324321
let store = InputBatchStore::new(schema);
325-
let mut scratch = Vec::new();
322+
let mut scratch = UInt64Builder::new();
326323
let result = materialize_partition(&[], &store, 8192, &mut scratch).unwrap();
327324
assert!(result.is_empty());
328325
}
@@ -335,7 +332,7 @@ mod tests {
335332

336333
// Select rows 0, 2, 4 from batch 0
337334
let indices = vec![(0u32, 0u32), (0, 2), (0, 4)];
338-
let mut scratch = Vec::new();
335+
let mut scratch = UInt64Builder::new();
339336
let result = materialize_partition(&indices, &store, 8192, &mut scratch).unwrap();
340337

341338
let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
@@ -361,7 +358,7 @@ mod tests {
361358

362359
// Select row 1 from batch 0, rows 0 and 2 from batch 1
363360
let indices = vec![(0u32, 1u32), (1, 0), (1, 2)];
364-
let mut scratch = Vec::new();
361+
let mut scratch = UInt64Builder::new();
365362
let result = materialize_partition(&indices, &store, 8192, &mut scratch).unwrap();
366363

367364
let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
@@ -381,7 +378,7 @@ mod tests {
381378
let indices: Vec<(u32, u32)> = (0..20000).map(|i| (0u32, i as u32)).collect();
382379

383380
// Use target_batch_size of 8192
384-
let mut scratch = Vec::new();
381+
let mut scratch = UInt64Builder::new();
385382
let result = materialize_partition(&indices, &store, 8192, &mut scratch).unwrap();
386383

387384
// Should produce multiple batches
@@ -449,7 +446,7 @@ mod tests {
449446
}
450447

451448
// Use target_batch_size of 8192
452-
let mut scratch = Vec::new();
449+
let mut scratch = UInt64Builder::new();
453450
let result = materialize_partition(&indices, &store, 8192, &mut scratch).unwrap();
454451

455452
// Should coalesce into far fewer output batches than input batches

ballista/core/src/execution_plans/sort_shuffle/writer.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -350,15 +350,15 @@ fn spill_all_partitions(
350350
schema: &SchemaRef,
351351
batch_size: usize,
352352
) -> Result<()> {
353-
// Reusable scratch buffer for materialize_partition
354-
let mut scratch_indices = Vec::new();
353+
// Reusable scratch builder for materialize_partition
354+
let mut scratch_builder = UInt64Builder::new();
355355

356356
for (partition_id, indices) in partition_indices.iter_mut().enumerate() {
357357
if indices.is_empty() {
358358
continue;
359359
}
360360
let batches =
361-
materialize_partition(indices, input_store, batch_size, &mut scratch_indices)?;
361+
materialize_partition(indices, input_store, batch_size, &mut scratch_builder)?;
362362
if !batches.is_empty() {
363363
spill_manager
364364
.spill(partition_id, batches, schema)
@@ -414,8 +414,8 @@ fn finalize_output(
414414
// FileReader supports random access to batches by index
415415
let mut cumulative_batch_count: i64 = 0;
416416

417-
// Reusable scratch buffer for materialize_partition
418-
let mut scratch_indices = Vec::new();
417+
// Reusable scratch builder for materialize_partition
418+
let mut scratch_builder = UInt64Builder::new();
419419

420420
// Write partitions in order
421421
for (partition_id, indices) in partition_indices.iter().enumerate() {
@@ -445,7 +445,7 @@ fn finalize_output(
445445
indices,
446446
input_store,
447447
config.batch_size,
448-
&mut scratch_indices,
448+
&mut scratch_builder,
449449
)?;
450450
for batch in materialized_batches {
451451
partition_rows += batch.num_rows() as u64;

benchmarks/src/bin/shuffle_bench.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
333333
let mut hash_file_count = 0;
334334
let mut hash_total_size = 0u64;
335335

336+
// Warmup iteration (discard result to avoid JIT/allocator initialization overhead)
337+
{
338+
let temp_dir = TempDir::new()?;
339+
let work_dir = temp_dir.path().to_str().unwrap();
340+
let _ = benchmark_hash_shuffle(&data, schema.clone(), opt.partitions, work_dir).await?;
341+
println!(" Warmup iteration completed (not timed)");
342+
}
343+
336344
for i in 0..opt.iterations {
337345
let temp_dir = TempDir::new()?;
338346
let work_dir = temp_dir.path().to_str().unwrap();
@@ -380,6 +388,22 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
380388
let mut sort_file_count = 0;
381389
let mut sort_total_size = 0u64;
382390

391+
// Warmup iteration (discard result to avoid JIT/allocator initialization overhead)
392+
{
393+
let temp_dir = TempDir::new()?;
394+
let work_dir = temp_dir.path().to_str().unwrap();
395+
let _ = benchmark_sort_shuffle(
396+
&data,
397+
schema.clone(),
398+
opt.partitions,
399+
work_dir,
400+
buffer_size,
401+
memory_limit,
402+
)
403+
.await?;
404+
println!(" Warmup iteration completed (not timed)");
405+
}
406+
383407
for i in 0..opt.iterations {
384408
let temp_dir = TempDir::new()?;
385409
let work_dir = temp_dir.path().to_str().unwrap();

0 commit comments

Comments
 (0)