Skip to content

Commit fd34134

Browse files
committed
int_timestamp_add_benchmarks
1 parent 11ef1b6 commit fd34134

3 files changed

Lines changed: 161 additions & 11 deletions

File tree

native/spark-expr/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,10 @@ harness = false
9292
name = "to_csv"
9393
harness = false
9494

95+
[[bench]]
96+
name = "cast_int_to_timestamp"
97+
harness = false
98+
9599
[[test]]
96100
name = "test_udf_registration"
97101
path = "tests/spark_expr_reg.rs"
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow::array::builder::{Int16Builder, Int32Builder, Int64Builder, Int8Builder};
19+
use arrow::array::RecordBatch;
20+
use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
21+
use criterion::{criterion_group, criterion_main, Criterion};
22+
use datafusion::physical_expr::{expressions::Column, PhysicalExpr};
23+
use datafusion_comet_spark_expr::{Cast, EvalMode, SparkCastOptions};
24+
use std::sync::Arc;
25+
26+
const BATCH_SIZE: usize = 8192;
27+
28+
fn criterion_benchmark(c: &mut Criterion) {
29+
// Test with UTC timezone
30+
let spark_cast_options = SparkCastOptions::new(EvalMode::Legacy, "UTC", false);
31+
let timestamp_type = DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into()));
32+
33+
let mut group = c.benchmark_group("cast_int_to_timestamp");
34+
35+
// Int8 -> Timestamp
36+
let batch_i8 = create_int8_batch();
37+
let expr_i8 = Arc::new(Column::new("a", 0));
38+
let cast_i8_to_ts = Cast::new(expr_i8, timestamp_type.clone(), spark_cast_options.clone());
39+
group.bench_function("cast_i8_to_timestamp", |b| {
40+
b.iter(|| cast_i8_to_ts.evaluate(&batch_i8).unwrap());
41+
});
42+
43+
// Int16 -> Timestamp
44+
let batch_i16 = create_int16_batch();
45+
let expr_i16 = Arc::new(Column::new("a", 0));
46+
let cast_i16_to_ts = Cast::new(expr_i16, timestamp_type.clone(), spark_cast_options.clone());
47+
group.bench_function("cast_i16_to_timestamp", |b| {
48+
b.iter(|| cast_i16_to_ts.evaluate(&batch_i16).unwrap());
49+
});
50+
51+
// Int32 -> Timestamp
52+
let batch_i32 = create_int32_batch();
53+
let expr_i32 = Arc::new(Column::new("a", 0));
54+
let cast_i32_to_ts = Cast::new(expr_i32, timestamp_type.clone(), spark_cast_options.clone());
55+
group.bench_function("cast_i32_to_timestamp", |b| {
56+
b.iter(|| cast_i32_to_ts.evaluate(&batch_i32).unwrap());
57+
});
58+
59+
// Int64 -> Timestamp
60+
let batch_i64 = create_int64_batch();
61+
let expr_i64 = Arc::new(Column::new("a", 0));
62+
let cast_i64_to_ts = Cast::new(expr_i64, timestamp_type.clone(), spark_cast_options.clone());
63+
group.bench_function("cast_i64_to_timestamp", |b| {
64+
b.iter(|| cast_i64_to_ts.evaluate(&batch_i64).unwrap());
65+
});
66+
67+
group.finish();
68+
}
69+
70+
fn create_int8_batch() -> RecordBatch {
71+
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int8, true)]));
72+
let mut b = Int8Builder::with_capacity(BATCH_SIZE);
73+
for i in 0..BATCH_SIZE {
74+
if i % 10 == 0 {
75+
b.append_null();
76+
} else {
77+
b.append_value(rand::random::<i8>());
78+
}
79+
}
80+
RecordBatch::try_new(schema, vec![Arc::new(b.finish())]).unwrap()
81+
}
82+
83+
fn create_int16_batch() -> RecordBatch {
84+
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int16, true)]));
85+
let mut b = Int16Builder::with_capacity(BATCH_SIZE);
86+
for i in 0..BATCH_SIZE {
87+
if i % 10 == 0 {
88+
b.append_null();
89+
} else {
90+
b.append_value(rand::random::<i16>());
91+
}
92+
}
93+
RecordBatch::try_new(schema, vec![Arc::new(b.finish())]).unwrap()
94+
}
95+
96+
fn create_int32_batch() -> RecordBatch {
97+
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)]));
98+
let mut b = Int32Builder::with_capacity(BATCH_SIZE);
99+
for i in 0..BATCH_SIZE {
100+
if i % 10 == 0 {
101+
b.append_null();
102+
} else {
103+
b.append_value(rand::random::<i32>());
104+
}
105+
}
106+
RecordBatch::try_new(schema, vec![Arc::new(b.finish())]).unwrap()
107+
}
108+
109+
fn create_int64_batch() -> RecordBatch {
110+
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)]));
111+
let mut b = Int64Builder::with_capacity(BATCH_SIZE);
112+
for i in 0..BATCH_SIZE {
113+
if i % 10 == 0 {
114+
b.append_null();
115+
} else {
116+
b.append_value(rand::random::<i64>());
117+
}
118+
}
119+
RecordBatch::try_new(schema, vec![Arc::new(b.finish())]).unwrap()
120+
}
121+
122+
fn config() -> Criterion {
123+
Criterion::default()
124+
}
125+
126+
criterion_group! {
127+
name = benches;
128+
config = config();
129+
targets = criterion_benchmark
130+
}
131+
criterion_main!(benches);

native/spark-expr/src/conversion_funcs/cast.rs

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ use arrow::array::{
2525
PrimitiveBuilder, StringArray, StructArray, TimestampMicrosecondBuilder,
2626
};
2727
use arrow::compute::can_cast_types;
28-
use arrow::datatypes::DataType::Int64;
2928
use arrow::datatypes::{
3029
i256, ArrowDictionaryKeyType, ArrowNativeType, DataType, Decimal256Type, GenericBinaryType,
3130
Schema,
@@ -614,6 +613,20 @@ macro_rules! cast_decimal_to_int32_up {
614613
}};
615614
}
616615

616+
macro_rules! cast_int_to_timestamp_impl {
617+
($array:expr, $builder:expr, $primitive_type:ty) => {{
618+
let arr = $array.as_primitive::<$primitive_type>();
619+
for i in 0..arr.len() {
620+
if arr.is_null(i) {
621+
$builder.append_null();
622+
} else {
623+
let micros = (arr.value(i) as i64).saturating_mul(MICROS_PER_SECOND);
624+
$builder.append_value(micros);
625+
}
626+
}
627+
}};
628+
}
629+
617630
// copied from arrow::dataTypes::Decimal128Type since Decimal128Type::format_decimal can't be called directly
618631
fn format_decimal_str(value_str: &str, precision: usize, scale: i8) -> String {
619632
let (sign, rest) = match value_str.strip_prefix('-') {
@@ -942,16 +955,18 @@ fn cast_int_to_timestamp(
942955
cast_options: &SparkCastOptions,
943956
) -> SparkResult<ArrayRef> {
944957
// Input is seconds since epoch, multiply by MICROS_PER_SECOND to get microseconds.
945-
let int64_array = cast_with_options(&array_ref, &Int64, &CAST_OPTIONS)?;
946-
let int64_arr = int64_array.as_primitive::<Int64Type>();
947-
948-
let mut builder = TimestampMicrosecondBuilder::with_capacity(int64_arr.len());
949-
for i in 0..int64_arr.len() {
950-
if int64_arr.is_null(i) {
951-
builder.append_null();
952-
} else {
953-
let micros = int64_arr.value(i).saturating_mul(MICROS_PER_SECOND);
954-
builder.append_value(micros);
958+
let mut builder = TimestampMicrosecondBuilder::with_capacity(array_ref.len());
959+
960+
match array_ref.data_type() {
961+
DataType::Int8 => cast_int_to_timestamp_impl!(array_ref, builder, Int8Type),
962+
DataType::Int16 => cast_int_to_timestamp_impl!(array_ref, builder, Int16Type),
963+
DataType::Int32 => cast_int_to_timestamp_impl!(array_ref, builder, Int32Type),
964+
DataType::Int64 => cast_int_to_timestamp_impl!(array_ref, builder, Int64Type),
965+
dt => {
966+
return Err(SparkError::Internal(format!(
967+
"Unsupported type for cast_int_to_timestamp: {:?}",
968+
dt
969+
)))
955970
}
956971
}
957972

0 commit comments

Comments
 (0)