Skip to content

Commit 11a7bca

Browse files
andygroveclaude
andcommitted
test: add benchmark for map column processing
Adds criterion benchmark for Map<Int64, Int64> conversion to ensure the batched map column processing optimization is covered by benchmarks. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 3e8f5b5 commit 11a7bca

1 file changed

Lines changed: 164 additions & 1 deletion

File tree

native/core/benches/struct_conversion.rs

Lines changed: 164 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -586,13 +586,176 @@ fn benchmark_list_conversion(c: &mut Criterion) {
586586
group.finish();
587587
}
588588

589+
/// Create a schema with a map column: Map<Int64, Int64>
590+
fn make_map_schema() -> DataType {
591+
// Map is represented as List<Struct<key, value>> in Arrow
592+
let key_field = Field::new("key", DataType::Int64, false);
593+
let value_field = Field::new("value", DataType::Int64, true);
594+
let entries_field = Field::new(
595+
"entries",
596+
DataType::Struct(Fields::from(vec![key_field, value_field])),
597+
false,
598+
);
599+
DataType::Map(Arc::new(entries_field), false)
600+
}
601+
602+
/// Calculate row size for a map with the given number of entries.
603+
/// UnsafeRow layout for map: [null bits] [map pointer (offset, size)]
604+
/// Map data: [key_array_size (8 bytes)] [key_array] [value_array]
605+
/// Array format: [num_elements (8 bytes)] [null bits] [element data]
606+
fn get_map_row_size(num_entries: usize) -> usize {
607+
// Top-level row has 1 column (the map)
608+
let top_level_bitset_width = SparkUnsafeRow::get_row_bitset_width(1);
609+
let map_pointer_size = 8;
610+
611+
// Key array: num_elements (8) + null bitset + data
612+
let key_null_bitset = ((num_entries + 63) / 64) * 8;
613+
let key_array_size = 8 + key_null_bitset + num_entries * 8;
614+
615+
// Value array: num_elements (8) + null bitset + data
616+
let value_null_bitset = ((num_entries + 63) / 64) * 8;
617+
let value_array_size = 8 + value_null_bitset + num_entries * 8;
618+
619+
// Map header (key array size) + key array + value array
620+
let map_size = 8 + key_array_size + value_array_size;
621+
622+
top_level_bitset_width + map_pointer_size + map_size
623+
}
624+
625+
struct MapRowData {
626+
data: Vec<u8>,
627+
}
628+
629+
impl MapRowData {
630+
fn new_int64_map(num_entries: usize) -> Self {
631+
let row_size = get_map_row_size(num_entries);
632+
let mut data = vec![0u8; row_size];
633+
634+
let top_level_bitset_width = SparkUnsafeRow::get_row_bitset_width(1);
635+
let key_null_bitset = ((num_entries + 63) / 64) * 8;
636+
let value_null_bitset = ((num_entries + 63) / 64) * 8;
637+
638+
let key_array_size = 8 + key_null_bitset + num_entries * 8;
639+
let value_array_size = 8 + value_null_bitset + num_entries * 8;
640+
let map_size = 8 + key_array_size + value_array_size;
641+
642+
// Map starts after top-level header + pointer
643+
let map_offset = top_level_bitset_width + 8;
644+
645+
// Write map pointer (offset in upper 32 bits, size in lower 32 bits)
646+
let offset_and_size = ((map_offset as i64) << 32) | (map_size as i64);
647+
data[top_level_bitset_width..top_level_bitset_width + 8]
648+
.copy_from_slice(&offset_and_size.to_le_bytes());
649+
650+
// Write key array size at map start
651+
data[map_offset..map_offset + 8].copy_from_slice(&(key_array_size as i64).to_le_bytes());
652+
653+
// Key array starts after map header
654+
let key_array_offset = map_offset + 8;
655+
// Write number of elements
656+
data[key_array_offset..key_array_offset + 8]
657+
.copy_from_slice(&(num_entries as i64).to_le_bytes());
658+
// Fill key data (after header)
659+
let key_data_start = key_array_offset + 8 + key_null_bitset;
660+
for i in 0..num_entries {
661+
let value_offset = key_data_start + i * 8;
662+
let value = i as i64;
663+
data[value_offset..value_offset + 8].copy_from_slice(&value.to_le_bytes());
664+
}
665+
666+
// Value array starts after key array
667+
let value_array_offset = key_array_offset + key_array_size;
668+
// Write number of elements
669+
data[value_array_offset..value_array_offset + 8]
670+
.copy_from_slice(&(num_entries as i64).to_le_bytes());
671+
// Fill value data (after header)
672+
let value_data_start = value_array_offset + 8 + value_null_bitset;
673+
for i in 0..num_entries {
674+
let value_offset = value_data_start + i * 8;
675+
let value = (i as i64) * 100;
676+
data[value_offset..value_offset + 8].copy_from_slice(&value.to_le_bytes());
677+
}
678+
679+
MapRowData { data }
680+
}
681+
682+
fn to_spark_row(&self, spark_row: &mut SparkUnsafeRow) {
683+
spark_row.point_to_slice(&self.data);
684+
}
685+
}
686+
687+
fn benchmark_map_conversion(c: &mut Criterion) {
688+
let mut group = c.benchmark_group("map_conversion");
689+
690+
// Test with different map sizes and row counts
691+
for num_entries in [10, 100] {
692+
for num_rows in [1000, 10000] {
693+
let schema = vec![make_map_schema()];
694+
695+
// Create row data - each row has a map with num_entries items
696+
let rows: Vec<MapRowData> = (0..num_rows)
697+
.map(|_| MapRowData::new_int64_map(num_entries))
698+
.collect();
699+
700+
let spark_rows: Vec<SparkUnsafeRow> = rows
701+
.iter()
702+
.map(|row_data| {
703+
let mut spark_row = SparkUnsafeRow::new_with_num_fields(1);
704+
row_data.to_spark_row(&mut spark_row);
705+
spark_row.set_not_null_at(0);
706+
spark_row
707+
})
708+
.collect();
709+
710+
let mut row_addresses: Vec<i64> =
711+
spark_rows.iter().map(|row| row.get_row_addr()).collect();
712+
let mut row_sizes: Vec<i32> = spark_rows.iter().map(|row| row.get_row_size()).collect();
713+
714+
let row_address_ptr = row_addresses.as_mut_ptr();
715+
let row_size_ptr = row_sizes.as_mut_ptr();
716+
717+
group.bench_with_input(
718+
BenchmarkId::new(
719+
format!("entries_{}", num_entries),
720+
format!("rows_{}", num_rows),
721+
),
722+
&(num_rows, &schema),
723+
|b, (num_rows, schema)| {
724+
b.iter(|| {
725+
let tempfile = Builder::new().tempfile().unwrap();
726+
727+
process_sorted_row_partition(
728+
*num_rows,
729+
BATCH_SIZE,
730+
row_address_ptr,
731+
row_size_ptr,
732+
schema,
733+
tempfile.path().to_str().unwrap().to_string(),
734+
1.0,
735+
false,
736+
0,
737+
None,
738+
&CompressionCodec::Zstd(1),
739+
)
740+
.unwrap();
741+
});
742+
},
743+
);
744+
745+
std::mem::drop(spark_rows);
746+
}
747+
}
748+
749+
group.finish();
750+
}
751+
589752
fn config() -> Criterion {
590753
Criterion::default()
591754
}
592755

593756
criterion_group! {
594757
name = benches;
595758
config = config();
596-
targets = benchmark_struct_conversion, benchmark_nested_struct_conversion, benchmark_deeply_nested_struct_conversion, benchmark_list_conversion
759+
targets = benchmark_struct_conversion, benchmark_nested_struct_conversion, benchmark_deeply_nested_struct_conversion, benchmark_list_conversion, benchmark_map_conversion
597760
}
598761
criterion_main!(benches);

0 commit comments

Comments
 (0)