Skip to content

Commit 1929d71

Browse files
authored
perf: Optimize NULL handling in some datetime functions (#21477)
## Which issue does this PR close? - Closes #21476. ## Rationale for this change `to_time`, `to_local_time`, and `make_date` build NULLs arrays incrementally. It is faster and more idiomatic to build them in bulk via `NullBuffer` instead. This doesn't make a huge performance difference in practice, but it shouldn't be slower, and the result is more concise and idiomatic. Benchmarks (ARM64): ``` - make_date_col_col_col_8192: 97.0µs -> 97.3µs, +0.3% - make_date_scalar_col_col_8192: 97.3µs -> 97.8µs, +0.5% - make_date_scalar_scalar_col_8192: 98.1µs -> 98.7µs, +0.6% - make_date_scalar_scalar_scalar: 162.0ns -> 162.1ns, +0.1% - to_time_no_nulls_100k: 17.1ms -> 17.7ms, +3.5% - to_time_10pct_nulls_100k: 16.0ms -> 16.2ms, +1.3% - to_local_time_no_nulls_100k: 8.0ms -> 7.8ms, -2.5% - to_local_time_10pct_nulls_100k: 7.6ms -> 7.0ms, -7.9% ``` ## What changes are included in this PR? * Add benchmarks for `to_time` and `to_local_time` * Use 8192 batch size in `make_date` benchmark, for consistency with default DF batch size * Implement NULL handling optimization/cleanup ## Are these changes tested? Yes. ## Are there any user-facing changes? No.
1 parent beed4f0 commit 1929d71

7 files changed

Lines changed: 223 additions & 37 deletions

File tree

datafusion/functions/Cargo.toml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,16 @@ harness = false
186186
name = "to_char"
187187
required-features = ["datetime_expressions"]
188188

189+
[[bench]]
190+
harness = false
191+
name = "to_local_time"
192+
required-features = ["datetime_expressions"]
193+
194+
[[bench]]
195+
harness = false
196+
name = "to_time"
197+
required-features = ["datetime_expressions"]
198+
189199
[[bench]]
190200
harness = false
191201
name = "isnan"

datafusion/functions/benches/make_date.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use rand::rngs::ThreadRng;
3030

3131
fn years(rng: &mut ThreadRng) -> Int32Array {
3232
let mut years = vec![];
33-
for _ in 0..1000 {
33+
for _ in 0..8192 {
3434
years.push(rng.random_range(1900..2050));
3535
}
3636

@@ -39,7 +39,7 @@ fn years(rng: &mut ThreadRng) -> Int32Array {
3939

4040
fn months(rng: &mut ThreadRng) -> Int32Array {
4141
let mut months = vec![];
42-
for _ in 0..1000 {
42+
for _ in 0..8192 {
4343
months.push(rng.random_range(1..13));
4444
}
4545

@@ -48,14 +48,14 @@ fn months(rng: &mut ThreadRng) -> Int32Array {
4848

4949
fn days(rng: &mut ThreadRng) -> Int32Array {
5050
let mut days = vec![];
51-
for _ in 0..1000 {
51+
for _ in 0..8192 {
5252
days.push(rng.random_range(1..29));
5353
}
5454

5555
Int32Array::from(days)
5656
}
5757
fn criterion_benchmark(c: &mut Criterion) {
58-
c.bench_function("make_date_col_col_col_1000", |b| {
58+
c.bench_function("make_date_col_col_col_8192", |b| {
5959
let mut rng = rand::rng();
6060
let years_array = Arc::new(years(&mut rng)) as ArrayRef;
6161
let batch_len = years_array.len();
@@ -85,7 +85,7 @@ fn criterion_benchmark(c: &mut Criterion) {
8585
})
8686
});
8787

88-
c.bench_function("make_date_scalar_col_col_1000", |b| {
88+
c.bench_function("make_date_scalar_col_col_8192", |b| {
8989
let mut rng = rand::rng();
9090
let year = ColumnarValue::Scalar(ScalarValue::Int32(Some(2025)));
9191
let months_arr = Arc::new(months(&mut rng)) as ArrayRef;
@@ -115,7 +115,7 @@ fn criterion_benchmark(c: &mut Criterion) {
115115
})
116116
});
117117

118-
c.bench_function("make_date_scalar_scalar_col_1000", |b| {
118+
c.bench_function("make_date_scalar_scalar_col_8192", |b| {
119119
let mut rng = rand::rng();
120120
let year = ColumnarValue::Scalar(ScalarValue::Int32(Some(2025)));
121121
let month = ColumnarValue::Scalar(ScalarValue::Int32(Some(11)));
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::hint::black_box;
19+
use std::sync::Arc;
20+
21+
use arrow::array::{Array, ArrayRef, TimestampNanosecondArray};
22+
use arrow::datatypes::Field;
23+
use criterion::{Criterion, criterion_group, criterion_main};
24+
use datafusion_common::config::ConfigOptions;
25+
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs};
26+
use datafusion_functions::datetime::to_local_time;
27+
use rand::Rng;
28+
use rand::rngs::ThreadRng;
29+
30+
fn timestamps(rng: &mut ThreadRng) -> TimestampNanosecondArray {
31+
let nanos: Vec<i64> = (0..100_000)
32+
.map(|_| rng.random_range(0..1_000_000_000_000_000_000i64))
33+
.collect();
34+
TimestampNanosecondArray::from(nanos).with_timezone("America/New_York")
35+
}
36+
37+
fn timestamps_with_nulls(rng: &mut ThreadRng) -> TimestampNanosecondArray {
38+
let values: Vec<Option<i64>> = (0..100_000)
39+
.map(|_| {
40+
if rng.random_range(0..10u32) == 0 {
41+
None
42+
} else {
43+
Some(rng.random_range(0..1_000_000_000_000_000_000i64))
44+
}
45+
})
46+
.collect();
47+
TimestampNanosecondArray::from(values).with_timezone("America/New_York")
48+
}
49+
50+
fn bench_to_local_time(c: &mut Criterion, name: &str, array: ArrayRef) {
51+
let batch_len = array.len();
52+
let input = ColumnarValue::Array(array);
53+
let udf = to_local_time();
54+
let return_type = udf.return_type(&[input.data_type()]).unwrap();
55+
let return_field = Arc::new(Field::new("f", return_type, true));
56+
let arg_fields = vec![Field::new("a", input.data_type(), true).into()];
57+
let config_options = Arc::new(ConfigOptions::default());
58+
59+
c.bench_function(name, |b| {
60+
b.iter(|| {
61+
black_box(
62+
udf.invoke_with_args(ScalarFunctionArgs {
63+
args: vec![input.clone()],
64+
arg_fields: arg_fields.clone(),
65+
number_rows: batch_len,
66+
return_field: Arc::clone(&return_field),
67+
config_options: Arc::clone(&config_options),
68+
})
69+
.expect("to_local_time should work on valid values"),
70+
)
71+
})
72+
});
73+
}
74+
75+
fn criterion_benchmark(c: &mut Criterion) {
76+
let mut rng = rand::rng();
77+
bench_to_local_time(
78+
c,
79+
"to_local_time_no_nulls_100k",
80+
Arc::new(timestamps(&mut rng)),
81+
);
82+
bench_to_local_time(
83+
c,
84+
"to_local_time_10pct_nulls_100k",
85+
Arc::new(timestamps_with_nulls(&mut rng)),
86+
);
87+
}
88+
89+
criterion_group!(benches, criterion_benchmark);
90+
criterion_main!(benches);
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::hint::black_box;
19+
use std::sync::Arc;
20+
21+
use arrow::array::{Array, ArrayRef, StringArray};
22+
use arrow::datatypes::Field;
23+
use criterion::{Criterion, criterion_group, criterion_main};
24+
use datafusion_common::config::ConfigOptions;
25+
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs};
26+
use datafusion_functions::datetime::to_time;
27+
use rand::Rng;
28+
use rand::rngs::ThreadRng;
29+
30+
fn random_time_string(rng: &mut ThreadRng) -> String {
31+
format!(
32+
"{:02}:{:02}:{:02}.{:06}",
33+
rng.random_range(0..24u32),
34+
rng.random_range(0..60u32),
35+
rng.random_range(0..60u32),
36+
rng.random_range(0..1_000_000u32),
37+
)
38+
}
39+
40+
fn time_strings(rng: &mut ThreadRng) -> StringArray {
41+
let strings: Vec<String> = (0..100_000).map(|_| random_time_string(rng)).collect();
42+
StringArray::from(strings)
43+
}
44+
45+
fn time_strings_with_nulls(rng: &mut ThreadRng) -> StringArray {
46+
let values: Vec<Option<String>> = (0..100_000)
47+
.map(|_| {
48+
if rng.random_range(0..10u32) == 0 {
49+
None
50+
} else {
51+
Some(random_time_string(rng))
52+
}
53+
})
54+
.collect();
55+
StringArray::from(values)
56+
}
57+
58+
fn bench_to_time(c: &mut Criterion, name: &str, array: ArrayRef) {
59+
let batch_len = array.len();
60+
let input = ColumnarValue::Array(array);
61+
let udf = to_time();
62+
let return_type = udf.return_type(&[input.data_type()]).unwrap();
63+
let return_field = Arc::new(Field::new("f", return_type, true));
64+
let arg_fields = vec![Field::new("a", input.data_type(), true).into()];
65+
let config_options = Arc::new(ConfigOptions::default());
66+
67+
c.bench_function(name, |b| {
68+
b.iter(|| {
69+
black_box(
70+
udf.invoke_with_args(ScalarFunctionArgs {
71+
args: vec![input.clone()],
72+
arg_fields: arg_fields.clone(),
73+
number_rows: batch_len,
74+
return_field: Arc::clone(&return_field),
75+
config_options: Arc::clone(&config_options),
76+
})
77+
.expect("to_time should work on valid values"),
78+
)
79+
})
80+
});
81+
}
82+
83+
fn criterion_benchmark(c: &mut Criterion) {
84+
let mut rng = rand::rng();
85+
bench_to_time(c, "to_time_no_nulls_100k", Arc::new(time_strings(&mut rng)));
86+
bench_to_time(
87+
c,
88+
"to_time_10pct_nulls_100k",
89+
Arc::new(time_strings_with_nulls(&mut rng)),
90+
);
91+
}
92+
93+
criterion_group!(benches, criterion_benchmark);
94+
criterion_main!(benches);

datafusion/functions/src/datetime/make_time.rs

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@
1717

1818
use std::sync::Arc;
1919

20-
use arrow::array::builder::PrimitiveBuilder;
2120
use arrow::array::cast::AsArray;
2221
use arrow::array::types::Int32Type;
2322
use arrow::array::{Array, PrimitiveArray};
23+
use arrow::buffer::NullBuffer;
2424
use arrow::datatypes::DataType::Time32;
2525
use arrow::datatypes::{DataType, Time32SecondType, TimeUnit};
2626
use chrono::prelude::*;
@@ -142,24 +142,31 @@ impl ScalarUDFImpl for MakeTimeFunc {
142142
let minutes = minutes.as_primitive::<Int32Type>();
143143
let seconds = seconds.as_primitive::<Int32Type>();
144144

145-
let mut builder: PrimitiveBuilder<Time32SecondType> =
146-
PrimitiveArray::builder(len);
145+
let nulls = NullBuffer::union(
146+
NullBuffer::union(hours.nulls(), minutes.nulls()).as_ref(),
147+
seconds.nulls(),
148+
);
147149

150+
let mut values = Vec::with_capacity(len);
148151
for i in 0..len {
149-
// match postgresql behaviour which returns null for any null input
150-
if hours.is_null(i) || minutes.is_null(i) || seconds.is_null(i) {
151-
builder.append_null();
152+
// Match Postgres behaviour which returns null for any null input
153+
if nulls.as_ref().is_some_and(|n| n.is_null(i)) {
154+
values.push(0);
152155
} else {
153156
make_time_inner(
154157
hours.value(i),
155158
minutes.value(i),
156159
seconds.value(i),
157-
|seconds: i32| builder.append_value(seconds),
160+
|seconds: i32| values.push(seconds),
158161
)?;
159162
}
160163
}
161164

162-
Ok(ColumnarValue::Array(Arc::new(builder.finish())))
165+
Ok(ColumnarValue::Array(Arc::new(PrimitiveArray::<
166+
Time32SecondType,
167+
>::new(
168+
values.into(), nulls
169+
))))
163170
}
164171
}
165172
}

datafusion/functions/src/datetime/to_local_time.rs

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use std::ops::Add;
1919
use std::sync::Arc;
2020

2121
use arrow::array::timezone::Tz;
22-
use arrow::array::{ArrayRef, PrimitiveBuilder};
22+
use arrow::array::{ArrayRef, PrimitiveArray};
2323
use arrow::datatypes::DataType::Timestamp;
2424
use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second};
2525
use arrow::datatypes::{
@@ -153,18 +153,9 @@ fn transform_array<T: ArrowTimestampType>(
153153
tz: Tz,
154154
) -> Result<ColumnarValue> {
155155
let primitive_array = as_primitive_array::<T>(array)?;
156-
let mut builder = PrimitiveBuilder::<T>::with_capacity(primitive_array.len());
157-
for ts_opt in primitive_array.iter() {
158-
match ts_opt {
159-
None => builder.append_null(),
160-
Some(ts) => {
161-
let adjusted_ts: i64 = adjust_to_local_time::<T>(ts, tz)?;
162-
builder.append_value(adjusted_ts)
163-
}
164-
}
165-
}
166-
167-
Ok(ColumnarValue::Array(Arc::new(builder.finish())))
156+
let result: PrimitiveArray<T> =
157+
primitive_array.try_unary(|ts| adjust_to_local_time::<T>(ts, tz))?;
158+
Ok(ColumnarValue::Array(Arc::new(result)))
168159
}
169160

170161
fn to_local_time(time_value: &ColumnarValue) -> Result<ColumnarValue> {

datafusion/functions/src/datetime/to_time.rs

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
// under the License.
1717

1818
use crate::datetime::common::*;
19-
use arrow::array::builder::PrimitiveBuilder;
2019
use arrow::array::cast::AsArray;
2120
use arrow::array::temporal_conversions::time_to_time64ns;
2221
use arrow::array::types::Time64NanosecondType;
@@ -213,20 +212,15 @@ fn parse_time_array<'a, A: StringArrayType<'a>>(
213212
array: &A,
214213
formats: &[&str],
215214
) -> Result<PrimitiveArray<Time64NanosecondType>> {
216-
let mut builder: PrimitiveBuilder<Time64NanosecondType> =
217-
PrimitiveArray::builder(array.len());
218-
215+
let mut values = Vec::with_capacity(array.len());
219216
for i in 0..array.len() {
220217
if array.is_null(i) {
221-
builder.append_null();
218+
values.push(0);
222219
} else {
223-
let s = array.value(i);
224-
let nanos = parse_time_with_formats(s, formats)?;
225-
builder.append_value(nanos);
220+
values.push(parse_time_with_formats(array.value(i), formats)?);
226221
}
227222
}
228-
229-
Ok(builder.finish())
223+
Ok(PrimitiveArray::new(values.into(), array.nulls().cloned()))
230224
}
231225

232226
/// Parse time string using provided formats

0 commit comments

Comments
 (0)