Skip to content

Commit 3e8f5b5

Browse files
andygroveclaude
andcommitted
perf: batch processing for List and Map columns
Add batched processing for List and Map columns that moves type dispatch outside the row loop, similar to the struct field-major optimization. Changes: - Add `append_list_column_batch` that dispatches on element type once, then processes all rows with the typed builder - Add `append_map_column_batch` that dispatches on key/value types once, with optimized paths for common combinations (Int64/Int64, Int32/Int32, etc.) - Update `append_columns` to use the new batch functions - Add benchmark for List<Int64> column conversion The optimization: - List columns: Type dispatch goes from O(rows) to O(1) for primitive elements - Map columns: Type dispatch goes from O(rows × 2) to O(2) for primitive key/values - Complex element types fall back to per-row dispatch Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent f3da0dc commit 3e8f5b5

2 files changed

Lines changed: 351 additions & 39 deletions

File tree

native/core/benches/struct_conversion.rs

Lines changed: 128 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use comet::execution::shuffle::row::{
2626
};
2727
use comet::execution::shuffle::CompressionCodec;
2828
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
29+
use std::sync::Arc;
2930
use tempfile::Builder;
3031

3132
const BATCH_SIZE: usize = 5000;
@@ -459,13 +460,139 @@ fn benchmark_deeply_nested_struct_conversion(c: &mut Criterion) {
459460
group.finish();
460461
}
461462

463+
/// Create a schema with a list column: List<Int64>
464+
fn make_list_schema(element_type: DataType) -> DataType {
465+
DataType::List(Arc::new(Field::new("item", element_type, true)))
466+
}
467+
468+
/// Calculate row size for a list with the given number of elements.
469+
/// UnsafeRow layout for list: [null bits] [list pointer (offset, size)]
470+
/// List data: [num_elements (8 bytes)] [null bits] [element data]
471+
fn get_list_row_size(num_elements: usize, element_size: usize) -> usize {
472+
// Top-level row has 1 column (the list)
473+
let top_level_bitset_width = SparkUnsafeRow::get_row_bitset_width(1);
474+
let list_pointer_size = 8;
475+
476+
// List header: num_elements (8 bytes) + null bitset
477+
let list_null_bitset = ((num_elements + 63) / 64) * 8;
478+
let list_header = 8 + list_null_bitset;
479+
let list_data_size = num_elements * element_size;
480+
481+
top_level_bitset_width + list_pointer_size + list_header + list_data_size
482+
}
483+
484+
struct ListRowData {
485+
data: Vec<u8>,
486+
}
487+
488+
impl ListRowData {
489+
fn new_int64_list(num_elements: usize) -> Self {
490+
let row_size = get_list_row_size(num_elements, 8);
491+
let mut data = vec![0u8; row_size];
492+
493+
let top_level_bitset_width = SparkUnsafeRow::get_row_bitset_width(1);
494+
let list_null_bitset = ((num_elements + 63) / 64) * 8;
495+
496+
// List starts after top-level header + pointer
497+
let list_offset = top_level_bitset_width + 8;
498+
let list_size = 8 + list_null_bitset + num_elements * 8;
499+
500+
// Write list pointer (offset in upper 32 bits, size in lower 32 bits)
501+
let offset_and_size = ((list_offset as i64) << 32) | (list_size as i64);
502+
data[top_level_bitset_width..top_level_bitset_width + 8]
503+
.copy_from_slice(&offset_and_size.to_le_bytes());
504+
505+
// Write number of elements at list start
506+
data[list_offset..list_offset + 8].copy_from_slice(&(num_elements as i64).to_le_bytes());
507+
508+
// Fill list with data (after header)
509+
let data_start = list_offset + 8 + list_null_bitset;
510+
for i in 0..num_elements {
511+
let value_offset = data_start + i * 8;
512+
let value = (i as i64) * 100;
513+
data[value_offset..value_offset + 8].copy_from_slice(&value.to_le_bytes());
514+
}
515+
516+
ListRowData { data }
517+
}
518+
519+
fn to_spark_row(&self, spark_row: &mut SparkUnsafeRow) {
520+
spark_row.point_to_slice(&self.data);
521+
}
522+
}
523+
524+
fn benchmark_list_conversion(c: &mut Criterion) {
525+
let mut group = c.benchmark_group("list_conversion");
526+
527+
// Test with different list sizes and row counts
528+
for num_elements in [10, 100] {
529+
for num_rows in [1000, 10000] {
530+
let schema = vec![make_list_schema(DataType::Int64)];
531+
532+
// Create row data - each row has a list with num_elements items
533+
let rows: Vec<ListRowData> = (0..num_rows)
534+
.map(|_| ListRowData::new_int64_list(num_elements))
535+
.collect();
536+
537+
let spark_rows: Vec<SparkUnsafeRow> = rows
538+
.iter()
539+
.map(|row_data| {
540+
let mut spark_row = SparkUnsafeRow::new_with_num_fields(1);
541+
row_data.to_spark_row(&mut spark_row);
542+
spark_row.set_not_null_at(0);
543+
spark_row
544+
})
545+
.collect();
546+
547+
let mut row_addresses: Vec<i64> =
548+
spark_rows.iter().map(|row| row.get_row_addr()).collect();
549+
let mut row_sizes: Vec<i32> = spark_rows.iter().map(|row| row.get_row_size()).collect();
550+
551+
let row_address_ptr = row_addresses.as_mut_ptr();
552+
let row_size_ptr = row_sizes.as_mut_ptr();
553+
554+
group.bench_with_input(
555+
BenchmarkId::new(
556+
format!("elements_{}", num_elements),
557+
format!("rows_{}", num_rows),
558+
),
559+
&(num_rows, &schema),
560+
|b, (num_rows, schema)| {
561+
b.iter(|| {
562+
let tempfile = Builder::new().tempfile().unwrap();
563+
564+
process_sorted_row_partition(
565+
*num_rows,
566+
BATCH_SIZE,
567+
row_address_ptr,
568+
row_size_ptr,
569+
schema,
570+
tempfile.path().to_str().unwrap().to_string(),
571+
1.0,
572+
false,
573+
0,
574+
None,
575+
&CompressionCodec::Zstd(1),
576+
)
577+
.unwrap();
578+
});
579+
},
580+
);
581+
582+
std::mem::drop(spark_rows);
583+
}
584+
}
585+
586+
group.finish();
587+
}
588+
462589
fn config() -> Criterion {
463590
Criterion::default()
464591
}
465592

466593
criterion_group! {
467594
name = benches;
468595
config = config();
469-
targets = benchmark_struct_conversion, benchmark_nested_struct_conversion, benchmark_deeply_nested_struct_conversion
596+
targets = benchmark_struct_conversion, benchmark_nested_struct_conversion, benchmark_deeply_nested_struct_conversion, benchmark_list_conversion
470597
}
471598
criterion_main!(benches);

0 commit comments

Comments
 (0)