Skip to content

Commit b87de09

Browse files
committed
merge
2 parents f8fe7ba + fe54548 commit b87de09

4 files changed

Lines changed: 590 additions & 56 deletions

File tree

native/core/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,3 +135,7 @@ harness = false
135135
[[bench]]
136136
name = "struct_conversion"
137137
harness = false
138+
139+
[[bench]]
140+
name = "array_conversion"
141+
harness = false
Lines changed: 269 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,269 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Benchmarks for SparkUnsafeArray to Arrow array conversion.
19+
//! This specifically tests the append_to_builder function used in shuffle read path.
20+
21+
use arrow::array::builder::{
22+
Date32Builder, Float64Builder, Int32Builder, Int64Builder, TimestampMicrosecondBuilder,
23+
};
24+
use arrow::datatypes::{DataType, TimeUnit};
25+
use comet::execution::shuffle::list::{append_to_builder, SparkUnsafeArray};
26+
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
27+
28+
const NUM_ELEMENTS: usize = 10000;
29+
30+
/// Create a SparkUnsafeArray in memory with i32 elements.
31+
/// Layout:
32+
/// - 8 bytes: num_elements (i64)
33+
/// - null bitset: 8 bytes per 64 elements
34+
/// - element data: 4 bytes per element (i32)
35+
fn create_spark_unsafe_array_i32(num_elements: usize, with_nulls: bool) -> Vec<u8> {
36+
// Header size: 8 (num_elements) + ceil(num_elements/64) * 8 (null bitset)
37+
let null_bitset_words = num_elements.div_ceil(64);
38+
let header_size = 8 + null_bitset_words * 8;
39+
let data_size = num_elements * 4; // i32 = 4 bytes
40+
let total_size = header_size + data_size;
41+
42+
let mut buffer = vec![0u8; total_size];
43+
44+
// Write num_elements
45+
buffer[0..8].copy_from_slice(&(num_elements as i64).to_le_bytes());
46+
47+
// Write null bitset (set every 10th element as null if with_nulls)
48+
if with_nulls {
49+
for i in (0..num_elements).step_by(10) {
50+
let word_idx = i / 64;
51+
let bit_idx = i % 64;
52+
let word_offset = 8 + word_idx * 8;
53+
let current_word =
54+
i64::from_le_bytes(buffer[word_offset..word_offset + 8].try_into().unwrap());
55+
let new_word = current_word | (1i64 << bit_idx);
56+
buffer[word_offset..word_offset + 8].copy_from_slice(&new_word.to_le_bytes());
57+
}
58+
}
59+
60+
// Write element data
61+
for i in 0..num_elements {
62+
let offset = header_size + i * 4;
63+
buffer[offset..offset + 4].copy_from_slice(&(i as i32).to_le_bytes());
64+
}
65+
66+
buffer
67+
}
68+
69+
/// Create a SparkUnsafeArray in memory with i64 elements.
70+
fn create_spark_unsafe_array_i64(num_elements: usize, with_nulls: bool) -> Vec<u8> {
71+
let null_bitset_words = num_elements.div_ceil(64);
72+
let header_size = 8 + null_bitset_words * 8;
73+
let data_size = num_elements * 8; // i64 = 8 bytes
74+
let total_size = header_size + data_size;
75+
76+
let mut buffer = vec![0u8; total_size];
77+
78+
// Write num_elements
79+
buffer[0..8].copy_from_slice(&(num_elements as i64).to_le_bytes());
80+
81+
// Write null bitset
82+
if with_nulls {
83+
for i in (0..num_elements).step_by(10) {
84+
let word_idx = i / 64;
85+
let bit_idx = i % 64;
86+
let word_offset = 8 + word_idx * 8;
87+
let current_word =
88+
i64::from_le_bytes(buffer[word_offset..word_offset + 8].try_into().unwrap());
89+
let new_word = current_word | (1i64 << bit_idx);
90+
buffer[word_offset..word_offset + 8].copy_from_slice(&new_word.to_le_bytes());
91+
}
92+
}
93+
94+
// Write element data
95+
for i in 0..num_elements {
96+
let offset = header_size + i * 8;
97+
buffer[offset..offset + 8].copy_from_slice(&(i as i64).to_le_bytes());
98+
}
99+
100+
buffer
101+
}
102+
103+
/// Create a SparkUnsafeArray in memory with f64 elements.
104+
fn create_spark_unsafe_array_f64(num_elements: usize, with_nulls: bool) -> Vec<u8> {
105+
let null_bitset_words = num_elements.div_ceil(64);
106+
let header_size = 8 + null_bitset_words * 8;
107+
let data_size = num_elements * 8; // f64 = 8 bytes
108+
let total_size = header_size + data_size;
109+
110+
let mut buffer = vec![0u8; total_size];
111+
112+
// Write num_elements
113+
buffer[0..8].copy_from_slice(&(num_elements as i64).to_le_bytes());
114+
115+
// Write null bitset
116+
if with_nulls {
117+
for i in (0..num_elements).step_by(10) {
118+
let word_idx = i / 64;
119+
let bit_idx = i % 64;
120+
let word_offset = 8 + word_idx * 8;
121+
let current_word =
122+
i64::from_le_bytes(buffer[word_offset..word_offset + 8].try_into().unwrap());
123+
let new_word = current_word | (1i64 << bit_idx);
124+
buffer[word_offset..word_offset + 8].copy_from_slice(&new_word.to_le_bytes());
125+
}
126+
}
127+
128+
// Write element data
129+
for i in 0..num_elements {
130+
let offset = header_size + i * 8;
131+
buffer[offset..offset + 8].copy_from_slice(&(i as f64).to_le_bytes());
132+
}
133+
134+
buffer
135+
}
136+
137+
fn benchmark_array_conversion(c: &mut Criterion) {
138+
let mut group = c.benchmark_group("spark_unsafe_array_to_arrow");
139+
140+
// Benchmark i32 array conversion
141+
for with_nulls in [false, true] {
142+
let buffer = create_spark_unsafe_array_i32(NUM_ELEMENTS, with_nulls);
143+
let array = SparkUnsafeArray::new(buffer.as_ptr() as i64);
144+
let null_str = if with_nulls { "with_nulls" } else { "no_nulls" };
145+
146+
group.bench_with_input(
147+
BenchmarkId::new("i32", null_str),
148+
&(&array, &buffer),
149+
|b, (array, _buffer)| {
150+
b.iter(|| {
151+
let mut builder = Int32Builder::with_capacity(NUM_ELEMENTS);
152+
if with_nulls {
153+
append_to_builder::<true>(&DataType::Int32, &mut builder, array).unwrap();
154+
} else {
155+
append_to_builder::<false>(&DataType::Int32, &mut builder, array).unwrap();
156+
}
157+
builder.finish()
158+
});
159+
},
160+
);
161+
}
162+
163+
// Benchmark i64 array conversion
164+
for with_nulls in [false, true] {
165+
let buffer = create_spark_unsafe_array_i64(NUM_ELEMENTS, with_nulls);
166+
let array = SparkUnsafeArray::new(buffer.as_ptr() as i64);
167+
let null_str = if with_nulls { "with_nulls" } else { "no_nulls" };
168+
169+
group.bench_with_input(
170+
BenchmarkId::new("i64", null_str),
171+
&(&array, &buffer),
172+
|b, (array, _buffer)| {
173+
b.iter(|| {
174+
let mut builder = Int64Builder::with_capacity(NUM_ELEMENTS);
175+
if with_nulls {
176+
append_to_builder::<true>(&DataType::Int64, &mut builder, array).unwrap();
177+
} else {
178+
append_to_builder::<false>(&DataType::Int64, &mut builder, array).unwrap();
179+
}
180+
builder.finish()
181+
});
182+
},
183+
);
184+
}
185+
186+
// Benchmark f64 array conversion
187+
for with_nulls in [false, true] {
188+
let buffer = create_spark_unsafe_array_f64(NUM_ELEMENTS, with_nulls);
189+
let array = SparkUnsafeArray::new(buffer.as_ptr() as i64);
190+
let null_str = if with_nulls { "with_nulls" } else { "no_nulls" };
191+
192+
group.bench_with_input(
193+
BenchmarkId::new("f64", null_str),
194+
&(&array, &buffer),
195+
|b, (array, _buffer)| {
196+
b.iter(|| {
197+
let mut builder = Float64Builder::with_capacity(NUM_ELEMENTS);
198+
if with_nulls {
199+
append_to_builder::<true>(&DataType::Float64, &mut builder, array).unwrap();
200+
} else {
201+
append_to_builder::<false>(&DataType::Float64, &mut builder, array)
202+
.unwrap();
203+
}
204+
builder.finish()
205+
});
206+
},
207+
);
208+
}
209+
210+
// Benchmark date32 array conversion (same memory layout as i32)
211+
for with_nulls in [false, true] {
212+
let buffer = create_spark_unsafe_array_i32(NUM_ELEMENTS, with_nulls);
213+
let array = SparkUnsafeArray::new(buffer.as_ptr() as i64);
214+
let null_str = if with_nulls { "with_nulls" } else { "no_nulls" };
215+
216+
group.bench_with_input(
217+
BenchmarkId::new("date32", null_str),
218+
&(&array, &buffer),
219+
|b, (array, _buffer)| {
220+
b.iter(|| {
221+
let mut builder = Date32Builder::with_capacity(NUM_ELEMENTS);
222+
if with_nulls {
223+
append_to_builder::<true>(&DataType::Date32, &mut builder, array).unwrap();
224+
} else {
225+
append_to_builder::<false>(&DataType::Date32, &mut builder, array).unwrap();
226+
}
227+
builder.finish()
228+
});
229+
},
230+
);
231+
}
232+
233+
// Benchmark timestamp array conversion (same memory layout as i64)
234+
for with_nulls in [false, true] {
235+
let buffer = create_spark_unsafe_array_i64(NUM_ELEMENTS, with_nulls);
236+
let array = SparkUnsafeArray::new(buffer.as_ptr() as i64);
237+
let null_str = if with_nulls { "with_nulls" } else { "no_nulls" };
238+
239+
group.bench_with_input(
240+
BenchmarkId::new("timestamp", null_str),
241+
&(&array, &buffer),
242+
|b, (array, _buffer)| {
243+
b.iter(|| {
244+
let mut builder = TimestampMicrosecondBuilder::with_capacity(NUM_ELEMENTS);
245+
let dt = DataType::Timestamp(TimeUnit::Microsecond, None);
246+
if with_nulls {
247+
append_to_builder::<true>(&dt, &mut builder, array).unwrap();
248+
} else {
249+
append_to_builder::<false>(&dt, &mut builder, array).unwrap();
250+
}
251+
builder.finish()
252+
});
253+
},
254+
);
255+
}
256+
257+
group.finish();
258+
}
259+
260+
fn config() -> Criterion {
261+
Criterion::default()
262+
}
263+
264+
criterion_group! {
265+
name = benches;
266+
config = config();
267+
targets = benchmark_array_conversion
268+
}
269+
criterion_main!(benches);

0 commit comments

Comments
 (0)