Skip to content

Commit de0b455

Browse files
committed
first round of otpimizations
1 parent 6682b17 commit de0b455

2 files changed

Lines changed: 54 additions & 24 deletions

File tree

ballista/core/src/execution_plans/sort_shuffle/buffer.rs

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -80,20 +80,19 @@ impl ScratchSpace {
8080

8181
let arrays = evaluate_expressions_to_arrays(exprs, batch)?;
8282

83-
self.hash_buffer.clear();
83+
// Resize preserves capacity, only grows when needed
8484
self.hash_buffer.resize(num_rows, 0);
8585
create_hashes(
8686
&arrays,
8787
REPARTITION_RANDOM_STATE.random_state(),
8888
&mut self.hash_buffer,
8989
)?;
9090

91-
self.partition_ids.clear();
92-
self.partition_ids.extend(
93-
self.hash_buffer
94-
.iter()
95-
.map(|h| (*h % num_partitions as u64) as usize),
96-
);
91+
// Resize and populate partition IDs directly
92+
self.partition_ids.resize(num_rows, 0);
93+
for (i, hash) in self.hash_buffer.iter().enumerate() {
94+
self.partition_ids[i] = (*hash % num_partitions as u64) as usize;
95+
}
9796

9897
self.map_partition_ids_to_starts_and_indices(num_partitions, num_rows);
9998
Ok(())
@@ -105,9 +104,10 @@ impl ScratchSpace {
105104
num_partitions: usize,
106105
num_rows: usize,
107106
) {
108-
self.partition_starts.clear();
107+
// Truncate then resize for partition_starts (needs zeroing)
108+
self.partition_starts.truncate(0);
109109
self.partition_starts.resize(num_partitions + 1, 0);
110-
self.partition_row_indices.clear();
110+
// Resize preserves capacity for partition_row_indices
111111
self.partition_row_indices.resize(num_rows, 0);
112112

113113
// Count rows per partition
@@ -141,10 +141,13 @@ impl ScratchSpace {
141141

142142
/// Materializes a partition's data from `(batch_idx, row_idx)` pairs into
143143
/// coalesced `RecordBatch`es using `BatchCoalescer::push_batch_with_indices`.
144+
///
145+
/// Uses `scratch_indices` as a reusable buffer to avoid allocations.
144146
pub fn materialize_partition(
145147
partition_indices: &[(u32, u32)],
146148
input_batches: &InputBatchStore,
147149
target_batch_size: usize,
150+
scratch_indices: &mut Vec<u64>,
148151
) -> Result<Vec<RecordBatch>> {
149152
if partition_indices.is_empty() {
150153
return Ok(Vec::new());
@@ -166,12 +169,16 @@ pub fn materialize_partition(
166169
}
167170

168171
let batch = input_batches.get_batch(current_batch_idx);
169-
let idx_array = UInt64Array::from(
172+
173+
// Reuse scratch buffer to reduce allocations
174+
scratch_indices.clear();
175+
scratch_indices.extend(
170176
partition_indices[start..end]
171177
.iter()
172-
.map(|(_, r)| *r as u64)
173-
.collect::<Vec<_>>(),
178+
.map(|(_, r)| *r as u64),
174179
);
180+
let idx_array = UInt64Array::from(scratch_indices.clone());
181+
175182
coalescer.push_batch_with_indices(batch.clone(), &idx_array)?;
176183
while let Some(completed) = coalescer.next_completed_batch() {
177184
result.push(completed);
@@ -315,7 +322,8 @@ mod tests {
315322
fn test_materialize_partition_empty() {
316323
let schema = create_test_schema();
317324
let store = InputBatchStore::new(schema);
318-
let result = materialize_partition(&[], &store, 8192).unwrap();
325+
let mut scratch = Vec::new();
326+
let result = materialize_partition(&[], &store, 8192, &mut scratch).unwrap();
319327
assert!(result.is_empty());
320328
}
321329

@@ -327,7 +335,8 @@ mod tests {
327335

328336
// Select rows 0, 2, 4 from batch 0
329337
let indices = vec![(0u32, 0u32), (0, 2), (0, 4)];
330-
let result = materialize_partition(&indices, &store, 8192).unwrap();
338+
let mut scratch = Vec::new();
339+
let result = materialize_partition(&indices, &store, 8192, &mut scratch).unwrap();
331340

332341
let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
333342
assert_eq!(total_rows, 3);
@@ -352,7 +361,8 @@ mod tests {
352361

353362
// Select row 1 from batch 0, rows 0 and 2 from batch 1
354363
let indices = vec![(0u32, 1u32), (1, 0), (1, 2)];
355-
let result = materialize_partition(&indices, &store, 8192).unwrap();
364+
let mut scratch = Vec::new();
365+
let result = materialize_partition(&indices, &store, 8192, &mut scratch).unwrap();
356366

357367
let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
358368
assert_eq!(total_rows, 3);
@@ -371,7 +381,8 @@ mod tests {
371381
let indices: Vec<(u32, u32)> = (0..20000).map(|i| (0u32, i as u32)).collect();
372382

373383
// Use target_batch_size of 8192
374-
let result = materialize_partition(&indices, &store, 8192).unwrap();
384+
let mut scratch = Vec::new();
385+
let result = materialize_partition(&indices, &store, 8192, &mut scratch).unwrap();
375386

376387
// Should produce multiple batches
377388
assert!(result.len() >= 2, "Expected multiple output batches");
@@ -438,7 +449,8 @@ mod tests {
438449
}
439450

440451
// Use target_batch_size of 8192
441-
let result = materialize_partition(&indices, &store, 8192).unwrap();
452+
let mut scratch = Vec::new();
453+
let result = materialize_partition(&indices, &store, 8192, &mut scratch).unwrap();
442454

443455
// Should coalesce into far fewer output batches than input batches
444456
assert!(

ballista/core/src/execution_plans/sort_shuffle/writer.rs

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -211,8 +211,12 @@ impl SortShuffleWriterExec {
211211
};
212212

213213
// Index-based partition tracking: store (batch_idx, row_idx) pairs per partition
214-
let mut partition_indices: Vec<Vec<(u32, u32)>> =
215-
(0..num_output_partitions).map(|_| Vec::new()).collect();
214+
// Pre-allocate capacity to reduce reallocations during index insertion
215+
let rows_per_partition_estimate =
216+
(config.batch_size / num_output_partitions).max(64);
217+
let mut partition_indices: Vec<Vec<(u32, u32)>> = (0..num_output_partitions)
218+
.map(|_| Vec::with_capacity(rows_per_partition_estimate))
219+
.collect();
216220
let mut input_store = InputBatchStore::new(schema.clone());
217221
let mut scratch = ScratchSpace::new(num_output_partitions);
218222

@@ -244,8 +248,11 @@ impl SortShuffleWriterExec {
244248
let batch_idx = input_store.push_batch(input_batch) as u32;
245249
for (partition_id, indices) in partition_indices.iter_mut().enumerate() {
246250
let row_indices = scratch.partition_indices(partition_id);
247-
for &row_idx in row_indices {
248-
indices.push((batch_idx, row_idx));
251+
// Skip empty partitions to reduce iterator overhead
252+
if !row_indices.is_empty() {
253+
for &row_idx in row_indices {
254+
indices.push((batch_idx, row_idx));
255+
}
249256
}
250257
}
251258

@@ -343,11 +350,15 @@ fn spill_all_partitions(
343350
schema: &SchemaRef,
344351
batch_size: usize,
345352
) -> Result<()> {
353+
// Reusable scratch buffer for materialize_partition
354+
let mut scratch_indices = Vec::new();
355+
346356
for (partition_id, indices) in partition_indices.iter_mut().enumerate() {
347357
if indices.is_empty() {
348358
continue;
349359
}
350-
let batches = materialize_partition(indices, input_store, batch_size)?;
360+
let batches =
361+
materialize_partition(indices, input_store, batch_size, &mut scratch_indices)?;
351362
if !batches.is_empty() {
352363
spill_manager
353364
.spill(partition_id, batches, schema)
@@ -403,6 +414,9 @@ fn finalize_output(
403414
// FileReader supports random access to batches by index
404415
let mut cumulative_batch_count: i64 = 0;
405416

417+
// Reusable scratch buffer for materialize_partition
418+
let mut scratch_indices = Vec::new();
419+
406420
// Write partitions in order
407421
for (partition_id, indices) in partition_indices.iter().enumerate() {
408422
// Set the starting batch index for this partition
@@ -427,8 +441,12 @@ fn finalize_output(
427441
}
428442

429443
// Then materialize remaining buffered data from indices
430-
let materialized_batches =
431-
materialize_partition(indices, input_store, config.batch_size)?;
444+
let materialized_batches = materialize_partition(
445+
indices,
446+
input_store,
447+
config.batch_size,
448+
&mut scratch_indices,
449+
)?;
432450
for batch in materialized_batches {
433451
partition_rows += batch.num_rows() as u64;
434452
partition_bytes += batch.get_array_memory_size() as u64;

0 commit comments

Comments
 (0)