diff --git a/datafusion/functions/Cargo.toml b/datafusion/functions/Cargo.toml index 7503c337517e..f2bb48fc2618 100644 --- a/datafusion/functions/Cargo.toml +++ b/datafusion/functions/Cargo.toml @@ -182,6 +182,16 @@ harness = false name = "to_char" required-features = ["datetime_expressions"] +[[bench]] +harness = false +name = "to_local_time" +required-features = ["datetime_expressions"] + +[[bench]] +harness = false +name = "to_time" +required-features = ["datetime_expressions"] + [[bench]] harness = false name = "isnan" diff --git a/datafusion/functions/benches/make_date.rs b/datafusion/functions/benches/make_date.rs index 42b5b1019538..1c7b61ec6049 100644 --- a/datafusion/functions/benches/make_date.rs +++ b/datafusion/functions/benches/make_date.rs @@ -30,7 +30,7 @@ use rand::rngs::ThreadRng; fn years(rng: &mut ThreadRng) -> Int32Array { let mut years = vec![]; - for _ in 0..1000 { + for _ in 0..8192 { years.push(rng.random_range(1900..2050)); } @@ -39,7 +39,7 @@ fn years(rng: &mut ThreadRng) -> Int32Array { fn months(rng: &mut ThreadRng) -> Int32Array { let mut months = vec![]; - for _ in 0..1000 { + for _ in 0..8192 { months.push(rng.random_range(1..13)); } @@ -48,14 +48,14 @@ fn months(rng: &mut ThreadRng) -> Int32Array { fn days(rng: &mut ThreadRng) -> Int32Array { let mut days = vec![]; - for _ in 0..1000 { + for _ in 0..8192 { days.push(rng.random_range(1..29)); } Int32Array::from(days) } fn criterion_benchmark(c: &mut Criterion) { - c.bench_function("make_date_col_col_col_1000", |b| { + c.bench_function("make_date_col_col_col_8192", |b| { let mut rng = rand::rng(); let years_array = Arc::new(years(&mut rng)) as ArrayRef; let batch_len = years_array.len(); @@ -85,7 +85,7 @@ fn criterion_benchmark(c: &mut Criterion) { }) }); - c.bench_function("make_date_scalar_col_col_1000", |b| { + c.bench_function("make_date_scalar_col_col_8192", |b| { let mut rng = rand::rng(); let year = ColumnarValue::Scalar(ScalarValue::Int32(Some(2025))); let months_arr = Arc::new(months(&mut rng)) as ArrayRef; @@ -115,7 +115,7 @@ fn criterion_benchmark(c: &mut Criterion) { }) }); - c.bench_function("make_date_scalar_scalar_col_1000", |b| { + c.bench_function("make_date_scalar_scalar_col_8192", |b| { let mut rng = rand::rng(); let year = ColumnarValue::Scalar(ScalarValue::Int32(Some(2025))); let month = ColumnarValue::Scalar(ScalarValue::Int32(Some(11))); diff --git a/datafusion/functions/benches/to_local_time.rs b/datafusion/functions/benches/to_local_time.rs new file mode 100644 index 000000000000..42d1e271980e --- /dev/null +++ b/datafusion/functions/benches/to_local_time.rs @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::hint::black_box; +use std::sync::Arc; + +use arrow::array::{Array, ArrayRef, TimestampNanosecondArray}; +use arrow::datatypes::Field; +use criterion::{Criterion, criterion_group, criterion_main}; +use datafusion_common::config::ConfigOptions; +use datafusion_expr::{ColumnarValue, ScalarFunctionArgs}; +use datafusion_functions::datetime::to_local_time; +use rand::Rng; +use rand::rngs::ThreadRng; + +fn timestamps(rng: &mut ThreadRng) -> TimestampNanosecondArray { + let nanos: Vec = (0..100_000) + .map(|_| rng.random_range(0..1_000_000_000_000_000_000i64)) + .collect(); + TimestampNanosecondArray::from(nanos).with_timezone("America/New_York") +} + +fn timestamps_with_nulls(rng: &mut ThreadRng) -> TimestampNanosecondArray { + let values: Vec> = (0..100_000) + .map(|_| { + if rng.random_range(0..10u32) == 0 { + None + } else { + Some(rng.random_range(0..1_000_000_000_000_000_000i64)) + } + }) + .collect(); + TimestampNanosecondArray::from(values).with_timezone("America/New_York") +} + +fn bench_to_local_time(c: &mut Criterion, name: &str, array: ArrayRef) { + let batch_len = array.len(); + let input = ColumnarValue::Array(array); + let udf = to_local_time(); + let return_type = udf.return_type(&[input.data_type()]).unwrap(); + let return_field = Arc::new(Field::new("f", return_type, true)); + let arg_fields = vec![Field::new("a", input.data_type(), true).into()]; + let config_options = Arc::new(ConfigOptions::default()); + + c.bench_function(name, |b| { + b.iter(|| { + black_box( + udf.invoke_with_args(ScalarFunctionArgs { + args: vec![input.clone()], + arg_fields: arg_fields.clone(), + number_rows: batch_len, + return_field: Arc::clone(&return_field), + config_options: Arc::clone(&config_options), + }) + .expect("to_local_time should work on valid values"), + ) + }) + }); +} + +fn criterion_benchmark(c: &mut Criterion) { + let mut rng = rand::rng(); + bench_to_local_time( + c, + "to_local_time_no_nulls_100k", + Arc::new(timestamps(&mut rng)), + ); + bench_to_local_time( + c, + "to_local_time_10pct_nulls_100k", + Arc::new(timestamps_with_nulls(&mut rng)), + ); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/datafusion/functions/benches/to_time.rs b/datafusion/functions/benches/to_time.rs new file mode 100644 index 000000000000..6b3aa192415a --- /dev/null +++ b/datafusion/functions/benches/to_time.rs @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::hint::black_box; +use std::sync::Arc; + +use arrow::array::{Array, ArrayRef, StringArray}; +use arrow::datatypes::Field; +use criterion::{Criterion, criterion_group, criterion_main}; +use datafusion_common::config::ConfigOptions; +use datafusion_expr::{ColumnarValue, ScalarFunctionArgs}; +use datafusion_functions::datetime::to_time; +use rand::Rng; +use rand::rngs::ThreadRng; + +fn random_time_string(rng: &mut ThreadRng) -> String { + format!( + "{:02}:{:02}:{:02}.{:06}", + rng.random_range(0..24u32), + rng.random_range(0..60u32), + rng.random_range(0..60u32), + rng.random_range(0..1_000_000u32), + ) +} + +fn time_strings(rng: &mut ThreadRng) -> StringArray { + let strings: Vec = (0..100_000).map(|_| random_time_string(rng)).collect(); + StringArray::from(strings) +} + +fn time_strings_with_nulls(rng: &mut ThreadRng) -> StringArray { + let values: Vec> = (0..100_000) + .map(|_| { + if rng.random_range(0..10u32) == 0 { + None + } else { + Some(random_time_string(rng)) + } + }) + .collect(); + StringArray::from(values) +} + +fn bench_to_time(c: &mut Criterion, name: &str, array: ArrayRef) { + let batch_len = array.len(); + let input = ColumnarValue::Array(array); + let udf = to_time(); + let return_type = udf.return_type(&[input.data_type()]).unwrap(); + let return_field = Arc::new(Field::new("f", return_type, true)); + let arg_fields = vec![Field::new("a", input.data_type(), true).into()]; + let config_options = Arc::new(ConfigOptions::default()); + + c.bench_function(name, |b| { + b.iter(|| { + black_box( + udf.invoke_with_args(ScalarFunctionArgs { + args: vec![input.clone()], + arg_fields: arg_fields.clone(), + number_rows: batch_len, + return_field: Arc::clone(&return_field), + config_options: Arc::clone(&config_options), + }) + .expect("to_time should work on valid values"), + ) + }) + }); +} + +fn criterion_benchmark(c: &mut Criterion) { + let mut rng = rand::rng(); + bench_to_time(c, "to_time_no_nulls_100k", Arc::new(time_strings(&mut rng))); + bench_to_time( + c, + "to_time_10pct_nulls_100k", + Arc::new(time_strings_with_nulls(&mut rng)), + ); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/datafusion/functions/src/datetime/make_time.rs b/datafusion/functions/src/datetime/make_time.rs index 690acafa8dec..718612fcaddb 100644 --- a/datafusion/functions/src/datetime/make_time.rs +++ b/datafusion/functions/src/datetime/make_time.rs @@ -17,10 +17,10 @@ use std::sync::Arc; -use arrow::array::builder::PrimitiveBuilder; use arrow::array::cast::AsArray; use arrow::array::types::Int32Type; use arrow::array::{Array, PrimitiveArray}; +use arrow::buffer::NullBuffer; use arrow::datatypes::DataType::Time32; use arrow::datatypes::{DataType, Time32SecondType, TimeUnit}; use chrono::prelude::*; @@ -142,24 +142,31 @@ impl ScalarUDFImpl for MakeTimeFunc { let minutes = minutes.as_primitive::(); let seconds = seconds.as_primitive::(); - let mut builder: PrimitiveBuilder = - PrimitiveArray::builder(len); + let nulls = NullBuffer::union( + NullBuffer::union(hours.nulls(), minutes.nulls()).as_ref(), + seconds.nulls(), + ); + let mut values = Vec::with_capacity(len); for i in 0..len { - // match postgresql behaviour which returns null for any null input - if hours.is_null(i) || minutes.is_null(i) || seconds.is_null(i) { - builder.append_null(); + // Match Postgres behaviour which returns null for any null input + if nulls.as_ref().is_some_and(|n| n.is_null(i)) { + values.push(0); } else { make_time_inner( hours.value(i), minutes.value(i), seconds.value(i), - |seconds: i32| builder.append_value(seconds), + |seconds: i32| values.push(seconds), )?; } } - Ok(ColumnarValue::Array(Arc::new(builder.finish()))) + Ok(ColumnarValue::Array(Arc::new(PrimitiveArray::< + Time32SecondType, + >::new( + values.into(), nulls + )))) } } } diff --git a/datafusion/functions/src/datetime/to_local_time.rs b/datafusion/functions/src/datetime/to_local_time.rs index f0d489d1b145..5bd1978893d5 100644 --- a/datafusion/functions/src/datetime/to_local_time.rs +++ b/datafusion/functions/src/datetime/to_local_time.rs @@ -19,7 +19,7 @@ use std::ops::Add; use std::sync::Arc; use arrow::array::timezone::Tz; -use arrow::array::{ArrayRef, PrimitiveBuilder}; +use arrow::array::{ArrayRef, PrimitiveArray}; use arrow::datatypes::DataType::Timestamp; use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second}; use arrow::datatypes::{ @@ -153,18 +153,9 @@ fn transform_array( tz: Tz, ) -> Result { let primitive_array = as_primitive_array::(array)?; - let mut builder = PrimitiveBuilder::::with_capacity(primitive_array.len()); - for ts_opt in primitive_array.iter() { - match ts_opt { - None => builder.append_null(), - Some(ts) => { - let adjusted_ts: i64 = adjust_to_local_time::(ts, tz)?; - builder.append_value(adjusted_ts) - } - } - } - - Ok(ColumnarValue::Array(Arc::new(builder.finish()))) + let result: PrimitiveArray = + primitive_array.try_unary(|ts| adjust_to_local_time::(ts, tz))?; + Ok(ColumnarValue::Array(Arc::new(result))) } fn to_local_time(time_value: &ColumnarValue) -> Result { diff --git a/datafusion/functions/src/datetime/to_time.rs b/datafusion/functions/src/datetime/to_time.rs index 815bf6a328b7..94aa49fbbad2 100644 --- a/datafusion/functions/src/datetime/to_time.rs +++ b/datafusion/functions/src/datetime/to_time.rs @@ -16,7 +16,6 @@ // under the License. use crate::datetime::common::*; -use arrow::array::builder::PrimitiveBuilder; use arrow::array::cast::AsArray; use arrow::array::temporal_conversions::time_to_time64ns; use arrow::array::types::Time64NanosecondType; @@ -213,20 +212,15 @@ fn parse_time_array<'a, A: StringArrayType<'a>>( array: &A, formats: &[&str], ) -> Result> { - let mut builder: PrimitiveBuilder = - PrimitiveArray::builder(array.len()); - + let mut values = Vec::with_capacity(array.len()); for i in 0..array.len() { if array.is_null(i) { - builder.append_null(); + values.push(0); } else { - let s = array.value(i); - let nanos = parse_time_with_formats(s, formats)?; - builder.append_value(nanos); + values.push(parse_time_with_formats(array.value(i), formats)?); } } - - Ok(builder.finish()) + Ok(PrimitiveArray::new(values.into(), array.nulls().cloned())) } /// Parse time string using provided formats