Skip to content

Commit e32dd52

Browse files
andygroveclaude
andcommitted
perf: optimize shuffle array element iteration with slice-based append
Use bulk-append methods for primitive types in SparkUnsafeArray: - Non-nullable path uses append_slice() for optimal memcpy-style copy - Nullable path uses pointer iteration with efficient null bitset reading Supported types: i8, i16, i32, i64, f32, f64, date32, timestamp Benchmark results (10K elements): | Type | Baseline | Optimized | Speedup | |------|----------|-----------|---------| | i32/no_nulls | 6.08µs | 0.65µs | **9.3x** | | i32/with_nulls | 22.49µs | 16.21µs | **1.39x** | | i64/no_nulls | 6.15µs | 1.22µs | **5x** | | i64/with_nulls | 16.41µs | 16.41µs | 1x | | f64/no_nulls | 8.05µs | 1.22µs | **6.6x** | | f64/with_nulls | 16.52µs | 16.21µs | 1.02x | | date32/no_nulls | - | 0.66µs | ~9x | | timestamp/no_nulls | - | 1.21µs | ~5x | Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 57780bc commit e32dd52

4 files changed

Lines changed: 602 additions & 56 deletions

File tree

native/core/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,3 +131,7 @@ harness = false
131131
[[bench]]
132132
name = "parquet_decode"
133133
harness = false
134+
135+
[[bench]]
136+
name = "array_conversion"
137+
harness = false
Lines changed: 263 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,263 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Benchmarks for SparkUnsafeArray to Arrow array conversion.
19+
//! This specifically tests the append_to_builder function used in shuffle read path.
20+
21+
use arrow::array::builder::{ArrayBuilder, Int32Builder, Int64Builder, Float64Builder, Date32Builder, TimestampMicrosecondBuilder};
22+
use arrow::datatypes::{DataType, TimeUnit};
23+
use comet::execution::shuffle::list::{append_to_builder, SparkUnsafeArray};
24+
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
25+
26+
const NUM_ELEMENTS: usize = 10000;
27+
28+
/// Create a SparkUnsafeArray in memory with i32 elements.
29+
/// Layout:
30+
/// - 8 bytes: num_elements (i64)
31+
/// - null bitset: 8 bytes per 64 elements
32+
/// - element data: 4 bytes per element (i32)
33+
fn create_spark_unsafe_array_i32(num_elements: usize, with_nulls: bool) -> Vec<u8> {
34+
// Header size: 8 (num_elements) + ceil(num_elements/64) * 8 (null bitset)
35+
let null_bitset_words = (num_elements + 63) / 64;
36+
let header_size = 8 + null_bitset_words * 8;
37+
let data_size = num_elements * 4; // i32 = 4 bytes
38+
let total_size = header_size + data_size;
39+
40+
let mut buffer = vec![0u8; total_size];
41+
42+
// Write num_elements
43+
buffer[0..8].copy_from_slice(&(num_elements as i64).to_le_bytes());
44+
45+
// Write null bitset (set every 10th element as null if with_nulls)
46+
if with_nulls {
47+
for i in (0..num_elements).step_by(10) {
48+
let word_idx = i / 64;
49+
let bit_idx = i % 64;
50+
let word_offset = 8 + word_idx * 8;
51+
let current_word = i64::from_le_bytes(buffer[word_offset..word_offset + 8].try_into().unwrap());
52+
let new_word = current_word | (1i64 << bit_idx);
53+
buffer[word_offset..word_offset + 8].copy_from_slice(&new_word.to_le_bytes());
54+
}
55+
}
56+
57+
// Write element data
58+
for i in 0..num_elements {
59+
let offset = header_size + i * 4;
60+
buffer[offset..offset + 4].copy_from_slice(&(i as i32).to_le_bytes());
61+
}
62+
63+
buffer
64+
}
65+
66+
/// Create a SparkUnsafeArray in memory with i64 elements.
67+
fn create_spark_unsafe_array_i64(num_elements: usize, with_nulls: bool) -> Vec<u8> {
68+
let null_bitset_words = (num_elements + 63) / 64;
69+
let header_size = 8 + null_bitset_words * 8;
70+
let data_size = num_elements * 8; // i64 = 8 bytes
71+
let total_size = header_size + data_size;
72+
73+
let mut buffer = vec![0u8; total_size];
74+
75+
// Write num_elements
76+
buffer[0..8].copy_from_slice(&(num_elements as i64).to_le_bytes());
77+
78+
// Write null bitset
79+
if with_nulls {
80+
for i in (0..num_elements).step_by(10) {
81+
let word_idx = i / 64;
82+
let bit_idx = i % 64;
83+
let word_offset = 8 + word_idx * 8;
84+
let current_word = i64::from_le_bytes(buffer[word_offset..word_offset + 8].try_into().unwrap());
85+
let new_word = current_word | (1i64 << bit_idx);
86+
buffer[word_offset..word_offset + 8].copy_from_slice(&new_word.to_le_bytes());
87+
}
88+
}
89+
90+
// Write element data
91+
for i in 0..num_elements {
92+
let offset = header_size + i * 8;
93+
buffer[offset..offset + 8].copy_from_slice(&(i as i64).to_le_bytes());
94+
}
95+
96+
buffer
97+
}
98+
99+
/// Create a SparkUnsafeArray in memory with f64 elements.
100+
fn create_spark_unsafe_array_f64(num_elements: usize, with_nulls: bool) -> Vec<u8> {
101+
let null_bitset_words = (num_elements + 63) / 64;
102+
let header_size = 8 + null_bitset_words * 8;
103+
let data_size = num_elements * 8; // f64 = 8 bytes
104+
let total_size = header_size + data_size;
105+
106+
let mut buffer = vec![0u8; total_size];
107+
108+
// Write num_elements
109+
buffer[0..8].copy_from_slice(&(num_elements as i64).to_le_bytes());
110+
111+
// Write null bitset
112+
if with_nulls {
113+
for i in (0..num_elements).step_by(10) {
114+
let word_idx = i / 64;
115+
let bit_idx = i % 64;
116+
let word_offset = 8 + word_idx * 8;
117+
let current_word = i64::from_le_bytes(buffer[word_offset..word_offset + 8].try_into().unwrap());
118+
let new_word = current_word | (1i64 << bit_idx);
119+
buffer[word_offset..word_offset + 8].copy_from_slice(&new_word.to_le_bytes());
120+
}
121+
}
122+
123+
// Write element data
124+
for i in 0..num_elements {
125+
let offset = header_size + i * 8;
126+
buffer[offset..offset + 8].copy_from_slice(&(i as f64).to_le_bytes());
127+
}
128+
129+
buffer
130+
}
131+
132+
fn benchmark_array_conversion(c: &mut Criterion) {
133+
let mut group = c.benchmark_group("spark_unsafe_array_to_arrow");
134+
135+
// Benchmark i32 array conversion
136+
for with_nulls in [false, true] {
137+
let buffer = create_spark_unsafe_array_i32(NUM_ELEMENTS, with_nulls);
138+
let array = SparkUnsafeArray::new(buffer.as_ptr() as i64);
139+
let null_str = if with_nulls { "with_nulls" } else { "no_nulls" };
140+
141+
group.bench_with_input(
142+
BenchmarkId::new("i32", null_str),
143+
&(&array, &buffer),
144+
|b, (array, _buffer)| {
145+
b.iter(|| {
146+
let mut builder = Int32Builder::with_capacity(NUM_ELEMENTS);
147+
if with_nulls {
148+
append_to_builder::<true>(&DataType::Int32, &mut builder, array).unwrap();
149+
} else {
150+
append_to_builder::<false>(&DataType::Int32, &mut builder, array).unwrap();
151+
}
152+
builder.finish()
153+
});
154+
},
155+
);
156+
}
157+
158+
// Benchmark i64 array conversion
159+
for with_nulls in [false, true] {
160+
let buffer = create_spark_unsafe_array_i64(NUM_ELEMENTS, with_nulls);
161+
let array = SparkUnsafeArray::new(buffer.as_ptr() as i64);
162+
let null_str = if with_nulls { "with_nulls" } else { "no_nulls" };
163+
164+
group.bench_with_input(
165+
BenchmarkId::new("i64", null_str),
166+
&(&array, &buffer),
167+
|b, (array, _buffer)| {
168+
b.iter(|| {
169+
let mut builder = Int64Builder::with_capacity(NUM_ELEMENTS);
170+
if with_nulls {
171+
append_to_builder::<true>(&DataType::Int64, &mut builder, array).unwrap();
172+
} else {
173+
append_to_builder::<false>(&DataType::Int64, &mut builder, array).unwrap();
174+
}
175+
builder.finish()
176+
});
177+
},
178+
);
179+
}
180+
181+
// Benchmark f64 array conversion
182+
for with_nulls in [false, true] {
183+
let buffer = create_spark_unsafe_array_f64(NUM_ELEMENTS, with_nulls);
184+
let array = SparkUnsafeArray::new(buffer.as_ptr() as i64);
185+
let null_str = if with_nulls { "with_nulls" } else { "no_nulls" };
186+
187+
group.bench_with_input(
188+
BenchmarkId::new("f64", null_str),
189+
&(&array, &buffer),
190+
|b, (array, _buffer)| {
191+
b.iter(|| {
192+
let mut builder = Float64Builder::with_capacity(NUM_ELEMENTS);
193+
if with_nulls {
194+
append_to_builder::<true>(&DataType::Float64, &mut builder, array).unwrap();
195+
} else {
196+
append_to_builder::<false>(&DataType::Float64, &mut builder, array).unwrap();
197+
}
198+
builder.finish()
199+
});
200+
},
201+
);
202+
}
203+
204+
// Benchmark date32 array conversion (same memory layout as i32)
205+
for with_nulls in [false, true] {
206+
let buffer = create_spark_unsafe_array_i32(NUM_ELEMENTS, with_nulls);
207+
let array = SparkUnsafeArray::new(buffer.as_ptr() as i64);
208+
let null_str = if with_nulls { "with_nulls" } else { "no_nulls" };
209+
210+
group.bench_with_input(
211+
BenchmarkId::new("date32", null_str),
212+
&(&array, &buffer),
213+
|b, (array, _buffer)| {
214+
b.iter(|| {
215+
let mut builder = Date32Builder::with_capacity(NUM_ELEMENTS);
216+
if with_nulls {
217+
append_to_builder::<true>(&DataType::Date32, &mut builder, array).unwrap();
218+
} else {
219+
append_to_builder::<false>(&DataType::Date32, &mut builder, array).unwrap();
220+
}
221+
builder.finish()
222+
});
223+
},
224+
);
225+
}
226+
227+
// Benchmark timestamp array conversion (same memory layout as i64)
228+
for with_nulls in [false, true] {
229+
let buffer = create_spark_unsafe_array_i64(NUM_ELEMENTS, with_nulls);
230+
let array = SparkUnsafeArray::new(buffer.as_ptr() as i64);
231+
let null_str = if with_nulls { "with_nulls" } else { "no_nulls" };
232+
233+
group.bench_with_input(
234+
BenchmarkId::new("timestamp", null_str),
235+
&(&array, &buffer),
236+
|b, (array, _buffer)| {
237+
b.iter(|| {
238+
let mut builder = TimestampMicrosecondBuilder::with_capacity(NUM_ELEMENTS);
239+
let dt = DataType::Timestamp(TimeUnit::Microsecond, None);
240+
if with_nulls {
241+
append_to_builder::<true>(&dt, &mut builder, array).unwrap();
242+
} else {
243+
append_to_builder::<false>(&dt, &mut builder, array).unwrap();
244+
}
245+
builder.finish()
246+
});
247+
},
248+
);
249+
}
250+
251+
group.finish();
252+
}
253+
254+
fn config() -> Criterion {
255+
Criterion::default()
256+
}
257+
258+
criterion_group! {
259+
name = benches;
260+
config = config();
261+
targets = benchmark_array_conversion
262+
}
263+
criterion_main!(benches);

0 commit comments

Comments
 (0)