Skip to content

Commit a60ad5f

Browse files
authored
perf: Improve performance of native row-to-columnar transition used by JVM shuffle (apache#3289)
1 parent 9909535 commit a60ad5f

6 files changed

Lines changed: 1264 additions & 185 deletions

File tree

native/core/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,3 +134,7 @@ harness = false
134134
[[bench]]
135135
name = "parquet_decode"
136136
harness = false
137+
138+
[[bench]]
139+
name = "array_element_append"
140+
harness = false
Lines changed: 272 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,272 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Micro-benchmarks for SparkUnsafeArray element iteration.
19+
//!
20+
//! This tests the low-level `append_to_builder` function which converts
21+
//! SparkUnsafeArray elements to Arrow array builders. This is the inner loop
22+
//! used when processing List/Array columns in JVM shuffle.
23+
24+
use arrow::array::builder::{
25+
Date32Builder, Float64Builder, Int32Builder, Int64Builder, TimestampMicrosecondBuilder,
26+
};
27+
use arrow::datatypes::{DataType, TimeUnit};
28+
use comet::execution::shuffle::spark_unsafe::list::{append_to_builder, SparkUnsafeArray};
29+
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
30+
31+
const NUM_ELEMENTS: usize = 10000;
32+
33+
/// Create a SparkUnsafeArray in memory with i32 elements.
34+
/// Layout:
35+
/// - 8 bytes: num_elements (i64)
36+
/// - null bitset: 8 bytes per 64 elements
37+
/// - element data: 4 bytes per element (i32)
38+
fn create_spark_unsafe_array_i32(num_elements: usize, with_nulls: bool) -> Vec<u8> {
39+
// Header size: 8 (num_elements) + ceil(num_elements/64) * 8 (null bitset)
40+
let null_bitset_words = num_elements.div_ceil(64);
41+
let header_size = 8 + null_bitset_words * 8;
42+
let data_size = num_elements * 4; // i32 = 4 bytes
43+
let total_size = header_size + data_size;
44+
45+
let mut buffer = vec![0u8; total_size];
46+
47+
// Write num_elements
48+
buffer[0..8].copy_from_slice(&(num_elements as i64).to_le_bytes());
49+
50+
// Write null bitset (set every 10th element as null if with_nulls)
51+
if with_nulls {
52+
for i in (0..num_elements).step_by(10) {
53+
let word_idx = i / 64;
54+
let bit_idx = i % 64;
55+
let word_offset = 8 + word_idx * 8;
56+
let current_word =
57+
i64::from_le_bytes(buffer[word_offset..word_offset + 8].try_into().unwrap());
58+
let new_word = current_word | (1i64 << bit_idx);
59+
buffer[word_offset..word_offset + 8].copy_from_slice(&new_word.to_le_bytes());
60+
}
61+
}
62+
63+
// Write element data
64+
for i in 0..num_elements {
65+
let offset = header_size + i * 4;
66+
buffer[offset..offset + 4].copy_from_slice(&(i as i32).to_le_bytes());
67+
}
68+
69+
buffer
70+
}
71+
72+
/// Create a SparkUnsafeArray in memory with i64 elements.
73+
fn create_spark_unsafe_array_i64(num_elements: usize, with_nulls: bool) -> Vec<u8> {
74+
let null_bitset_words = num_elements.div_ceil(64);
75+
let header_size = 8 + null_bitset_words * 8;
76+
let data_size = num_elements * 8; // i64 = 8 bytes
77+
let total_size = header_size + data_size;
78+
79+
let mut buffer = vec![0u8; total_size];
80+
81+
// Write num_elements
82+
buffer[0..8].copy_from_slice(&(num_elements as i64).to_le_bytes());
83+
84+
// Write null bitset
85+
if with_nulls {
86+
for i in (0..num_elements).step_by(10) {
87+
let word_idx = i / 64;
88+
let bit_idx = i % 64;
89+
let word_offset = 8 + word_idx * 8;
90+
let current_word =
91+
i64::from_le_bytes(buffer[word_offset..word_offset + 8].try_into().unwrap());
92+
let new_word = current_word | (1i64 << bit_idx);
93+
buffer[word_offset..word_offset + 8].copy_from_slice(&new_word.to_le_bytes());
94+
}
95+
}
96+
97+
// Write element data
98+
for i in 0..num_elements {
99+
let offset = header_size + i * 8;
100+
buffer[offset..offset + 8].copy_from_slice(&(i as i64).to_le_bytes());
101+
}
102+
103+
buffer
104+
}
105+
106+
/// Create a SparkUnsafeArray in memory with f64 elements.
107+
fn create_spark_unsafe_array_f64(num_elements: usize, with_nulls: bool) -> Vec<u8> {
108+
let null_bitset_words = num_elements.div_ceil(64);
109+
let header_size = 8 + null_bitset_words * 8;
110+
let data_size = num_elements * 8; // f64 = 8 bytes
111+
let total_size = header_size + data_size;
112+
113+
let mut buffer = vec![0u8; total_size];
114+
115+
// Write num_elements
116+
buffer[0..8].copy_from_slice(&(num_elements as i64).to_le_bytes());
117+
118+
// Write null bitset
119+
if with_nulls {
120+
for i in (0..num_elements).step_by(10) {
121+
let word_idx = i / 64;
122+
let bit_idx = i % 64;
123+
let word_offset = 8 + word_idx * 8;
124+
let current_word =
125+
i64::from_le_bytes(buffer[word_offset..word_offset + 8].try_into().unwrap());
126+
let new_word = current_word | (1i64 << bit_idx);
127+
buffer[word_offset..word_offset + 8].copy_from_slice(&new_word.to_le_bytes());
128+
}
129+
}
130+
131+
// Write element data
132+
for i in 0..num_elements {
133+
let offset = header_size + i * 8;
134+
buffer[offset..offset + 8].copy_from_slice(&(i as f64).to_le_bytes());
135+
}
136+
137+
buffer
138+
}
139+
140+
fn benchmark_array_conversion(c: &mut Criterion) {
141+
let mut group = c.benchmark_group("spark_unsafe_array_to_arrow");
142+
143+
// Benchmark i32 array conversion
144+
for with_nulls in [false, true] {
145+
let buffer = create_spark_unsafe_array_i32(NUM_ELEMENTS, with_nulls);
146+
let array = SparkUnsafeArray::new(buffer.as_ptr() as i64);
147+
let null_str = if with_nulls { "with_nulls" } else { "no_nulls" };
148+
149+
group.bench_with_input(
150+
BenchmarkId::new("i32", null_str),
151+
&(&array, &buffer),
152+
|b, (array, _buffer)| {
153+
b.iter(|| {
154+
let mut builder = Int32Builder::with_capacity(NUM_ELEMENTS);
155+
if with_nulls {
156+
append_to_builder::<true>(&DataType::Int32, &mut builder, array).unwrap();
157+
} else {
158+
append_to_builder::<false>(&DataType::Int32, &mut builder, array).unwrap();
159+
}
160+
builder.finish()
161+
});
162+
},
163+
);
164+
}
165+
166+
// Benchmark i64 array conversion
167+
for with_nulls in [false, true] {
168+
let buffer = create_spark_unsafe_array_i64(NUM_ELEMENTS, with_nulls);
169+
let array = SparkUnsafeArray::new(buffer.as_ptr() as i64);
170+
let null_str = if with_nulls { "with_nulls" } else { "no_nulls" };
171+
172+
group.bench_with_input(
173+
BenchmarkId::new("i64", null_str),
174+
&(&array, &buffer),
175+
|b, (array, _buffer)| {
176+
b.iter(|| {
177+
let mut builder = Int64Builder::with_capacity(NUM_ELEMENTS);
178+
if with_nulls {
179+
append_to_builder::<true>(&DataType::Int64, &mut builder, array).unwrap();
180+
} else {
181+
append_to_builder::<false>(&DataType::Int64, &mut builder, array).unwrap();
182+
}
183+
builder.finish()
184+
});
185+
},
186+
);
187+
}
188+
189+
// Benchmark f64 array conversion
190+
for with_nulls in [false, true] {
191+
let buffer = create_spark_unsafe_array_f64(NUM_ELEMENTS, with_nulls);
192+
let array = SparkUnsafeArray::new(buffer.as_ptr() as i64);
193+
let null_str = if with_nulls { "with_nulls" } else { "no_nulls" };
194+
195+
group.bench_with_input(
196+
BenchmarkId::new("f64", null_str),
197+
&(&array, &buffer),
198+
|b, (array, _buffer)| {
199+
b.iter(|| {
200+
let mut builder = Float64Builder::with_capacity(NUM_ELEMENTS);
201+
if with_nulls {
202+
append_to_builder::<true>(&DataType::Float64, &mut builder, array).unwrap();
203+
} else {
204+
append_to_builder::<false>(&DataType::Float64, &mut builder, array)
205+
.unwrap();
206+
}
207+
builder.finish()
208+
});
209+
},
210+
);
211+
}
212+
213+
// Benchmark date32 array conversion (same memory layout as i32)
214+
for with_nulls in [false, true] {
215+
let buffer = create_spark_unsafe_array_i32(NUM_ELEMENTS, with_nulls);
216+
let array = SparkUnsafeArray::new(buffer.as_ptr() as i64);
217+
let null_str = if with_nulls { "with_nulls" } else { "no_nulls" };
218+
219+
group.bench_with_input(
220+
BenchmarkId::new("date32", null_str),
221+
&(&array, &buffer),
222+
|b, (array, _buffer)| {
223+
b.iter(|| {
224+
let mut builder = Date32Builder::with_capacity(NUM_ELEMENTS);
225+
if with_nulls {
226+
append_to_builder::<true>(&DataType::Date32, &mut builder, array).unwrap();
227+
} else {
228+
append_to_builder::<false>(&DataType::Date32, &mut builder, array).unwrap();
229+
}
230+
builder.finish()
231+
});
232+
},
233+
);
234+
}
235+
236+
// Benchmark timestamp array conversion (same memory layout as i64)
237+
for with_nulls in [false, true] {
238+
let buffer = create_spark_unsafe_array_i64(NUM_ELEMENTS, with_nulls);
239+
let array = SparkUnsafeArray::new(buffer.as_ptr() as i64);
240+
let null_str = if with_nulls { "with_nulls" } else { "no_nulls" };
241+
242+
group.bench_with_input(
243+
BenchmarkId::new("timestamp", null_str),
244+
&(&array, &buffer),
245+
|b, (array, _buffer)| {
246+
b.iter(|| {
247+
let mut builder = TimestampMicrosecondBuilder::with_capacity(NUM_ELEMENTS);
248+
let dt = DataType::Timestamp(TimeUnit::Microsecond, None);
249+
if with_nulls {
250+
append_to_builder::<true>(&dt, &mut builder, array).unwrap();
251+
} else {
252+
append_to_builder::<false>(&dt, &mut builder, array).unwrap();
253+
}
254+
builder.finish()
255+
});
256+
},
257+
);
258+
}
259+
260+
group.finish();
261+
}
262+
263+
fn config() -> Criterion {
264+
Criterion::default()
265+
}
266+
267+
criterion_group! {
268+
name = benches;
269+
config = config();
270+
targets = benchmark_array_conversion
271+
}
272+
criterion_main!(benches);

native/core/src/execution/jni_api.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -547,7 +547,6 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
547547
let physical_plan_time = start.elapsed();
548548

549549
exec_context.plan_creation_time += physical_plan_time;
550-
exec_context.root_op = Some(Arc::clone(&root_op));
551550
exec_context.scans = scans;
552551

553552
if exec_context.explain_native {
@@ -602,6 +601,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
602601
} else {
603602
exec_context.stream = Some(stream);
604603
}
604+
exec_context.root_op = Some(root_op);
605605
} else {
606606
// Pull input batches
607607
pull_input_batches(exec_context)?;

0 commit comments

Comments
 (0)