Skip to content

Commit 58fb6e1

Browse files
authored
feat(spark): implement from/to_utc_timestamp functions (#19880)
## Which issue does this PR close? - Closes #19879 - Part of #15914 ## Rationale for this change Implement the following spark functions: - https://spark.apache.org/docs/latest/api/sql/index.html#from_utc_timestamp - https://spark.apache.org/docs/latest/api/sql/index.html#to_utc_timestamp ## What changes are included in this PR? Implementations for `from_utc_timestamp` and `to_utc_timestamp` functions in datafusion-spark crate ## Are these changes tested? yes, in SLT ## Are there any user-facing changes? Yes
1 parent 8653851 commit 58fb6e1

6 files changed

Lines changed: 736 additions & 11 deletions

File tree

datafusion/functions/src/datetime/to_local_time.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ fn to_local_time(time_value: &ColumnarValue) -> Result<ColumnarValue> {
324324
/// ```
325325
///
326326
/// See `test_adjust_to_local_time()` for example
327-
fn adjust_to_local_time<T: ArrowTimestampType>(ts: i64, tz: Tz) -> Result<i64> {
327+
pub fn adjust_to_local_time<T: ArrowTimestampType>(ts: i64, tz: Tz) -> Result<i64> {
328328
fn convert_timestamp<F>(ts: i64, converter: F) -> Result<DateTime<Utc>>
329329
where
330330
F: Fn(i64) -> MappedLocalTime<DateTime<Utc>>,
Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::any::Any;
19+
use std::sync::Arc;
20+
21+
use arrow::array::timezone::Tz;
22+
use arrow::array::{Array, ArrayRef, AsArray, PrimitiveBuilder, StringArrayType};
23+
use arrow::datatypes::TimeUnit;
24+
use arrow::datatypes::{
25+
ArrowTimestampType, DataType, Field, FieldRef, TimestampMicrosecondType,
26+
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
27+
};
28+
use datafusion_common::types::{NativeType, logical_string};
29+
use datafusion_common::utils::take_function_args;
30+
use datafusion_common::{Result, exec_datafusion_err, exec_err, internal_err};
31+
use datafusion_expr::{
32+
Coercion, ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl,
33+
Signature, TypeSignatureClass, Volatility,
34+
};
35+
use datafusion_functions::datetime::to_local_time::adjust_to_local_time;
36+
use datafusion_functions::utils::make_scalar_function;
37+
38+
/// Apache Spark `from_utc_timestamp` function.
39+
///
40+
/// Interprets the given timestamp as UTC and converts it to the given timezone.
41+
///
42+
/// Timestamp in Apache Spark represents number of microseconds from the Unix epoch, which is not
43+
/// timezone-agnostic. So in Apache Spark this function just shift the timestamp value from UTC timezone to
44+
/// the given timezone.
45+
///
46+
/// See <https://spark.apache.org/docs/latest/api/sql/index.html#from_utc_timestamp>
47+
#[derive(Debug, PartialEq, Eq, Hash)]
48+
pub struct SparkFromUtcTimestamp {
49+
signature: Signature,
50+
}
51+
52+
impl Default for SparkFromUtcTimestamp {
53+
fn default() -> Self {
54+
Self::new()
55+
}
56+
}
57+
58+
impl SparkFromUtcTimestamp {
59+
pub fn new() -> Self {
60+
Self {
61+
signature: Signature::coercible(
62+
vec![
63+
Coercion::new_implicit(
64+
TypeSignatureClass::Timestamp,
65+
vec![TypeSignatureClass::Native(logical_string())],
66+
NativeType::Timestamp(TimeUnit::Microsecond, None),
67+
),
68+
Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
69+
],
70+
Volatility::Immutable,
71+
),
72+
}
73+
}
74+
}
75+
76+
impl ScalarUDFImpl for SparkFromUtcTimestamp {
77+
fn as_any(&self) -> &dyn Any {
78+
self
79+
}
80+
81+
fn name(&self) -> &str {
82+
"from_utc_timestamp"
83+
}
84+
85+
fn signature(&self) -> &Signature {
86+
&self.signature
87+
}
88+
89+
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
90+
internal_err!("return_field_from_args should be used instead")
91+
}
92+
93+
fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result<FieldRef> {
94+
let nullable = args.arg_fields.iter().any(|f| f.is_nullable());
95+
96+
Ok(Arc::new(Field::new(
97+
self.name(),
98+
args.arg_fields[0].data_type().clone(),
99+
nullable,
100+
)))
101+
}
102+
103+
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
104+
make_scalar_function(spark_from_utc_timestamp, vec![])(&args.args)
105+
}
106+
}
107+
108+
fn spark_from_utc_timestamp(args: &[ArrayRef]) -> Result<ArrayRef> {
109+
let [timestamp, timezone] = take_function_args("from_utc_timestamp", args)?;
110+
111+
match timestamp.data_type() {
112+
DataType::Timestamp(TimeUnit::Nanosecond, tz_opt) => {
113+
process_timestamp_with_tz_array::<TimestampNanosecondType>(
114+
timestamp,
115+
timezone,
116+
tz_opt.clone(),
117+
)
118+
}
119+
DataType::Timestamp(TimeUnit::Microsecond, tz_opt) => {
120+
process_timestamp_with_tz_array::<TimestampMicrosecondType>(
121+
timestamp,
122+
timezone,
123+
tz_opt.clone(),
124+
)
125+
}
126+
DataType::Timestamp(TimeUnit::Millisecond, tz_opt) => {
127+
process_timestamp_with_tz_array::<TimestampMillisecondType>(
128+
timestamp,
129+
timezone,
130+
tz_opt.clone(),
131+
)
132+
}
133+
DataType::Timestamp(TimeUnit::Second, tz_opt) => {
134+
process_timestamp_with_tz_array::<TimestampSecondType>(
135+
timestamp,
136+
timezone,
137+
tz_opt.clone(),
138+
)
139+
}
140+
ts_type => {
141+
exec_err!("`from_utc_timestamp`: unsupported argument types: {ts_type}")
142+
}
143+
}
144+
}
145+
146+
fn process_timestamp_with_tz_array<T: ArrowTimestampType>(
147+
ts_array: &ArrayRef,
148+
tz_array: &ArrayRef,
149+
tz_opt: Option<Arc<str>>,
150+
) -> Result<ArrayRef> {
151+
match tz_array.data_type() {
152+
DataType::Utf8 => {
153+
process_arrays::<T, _>(tz_opt, ts_array, tz_array.as_string::<i32>())
154+
}
155+
DataType::LargeUtf8 => {
156+
process_arrays::<T, _>(tz_opt, ts_array, tz_array.as_string::<i64>())
157+
}
158+
DataType::Utf8View => {
159+
process_arrays::<T, _>(tz_opt, ts_array, tz_array.as_string_view())
160+
}
161+
other => {
162+
exec_err!("`from_utc_timestamp`: timezone must be a string type, got {other}")
163+
}
164+
}
165+
}
166+
167+
fn process_arrays<'a, T: ArrowTimestampType, S>(
168+
return_tz_opt: Option<Arc<str>>,
169+
ts_array: &ArrayRef,
170+
tz_array: &'a S,
171+
) -> Result<ArrayRef>
172+
where
173+
&'a S: StringArrayType<'a>,
174+
{
175+
let ts_primitive = ts_array.as_primitive::<T>();
176+
let mut builder = PrimitiveBuilder::<T>::with_capacity(ts_array.len());
177+
178+
for (ts_opt, tz_opt) in ts_primitive.iter().zip(tz_array.iter()) {
179+
match (ts_opt, tz_opt) {
180+
(Some(ts), Some(tz_str)) => {
181+
let tz: Tz = tz_str.parse().map_err(|e| {
182+
exec_datafusion_err!(
183+
"`from_utc_timestamp`: invalid timezone '{tz_str}': {e}"
184+
)
185+
})?;
186+
let val = adjust_to_local_time::<T>(ts, tz)?;
187+
builder.append_value(val);
188+
}
189+
_ => builder.append_null(),
190+
}
191+
}
192+
193+
builder = builder.with_timezone_opt(return_tz_opt);
194+
Ok(Arc::new(builder.finish()))
195+
}

datafusion/spark/src/function/datetime/mod.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,13 @@ pub mod date_part;
2222
pub mod date_sub;
2323
pub mod date_trunc;
2424
pub mod extract;
25+
pub mod from_utc_timestamp;
2526
pub mod last_day;
2627
pub mod make_dt_interval;
2728
pub mod make_interval;
2829
pub mod next_day;
2930
pub mod time_trunc;
31+
pub mod to_utc_timestamp;
3032
pub mod trunc;
3133

3234
use datafusion_expr::ScalarUDF;
@@ -39,6 +41,10 @@ make_udf_function!(date_diff::SparkDateDiff, date_diff);
3941
make_udf_function!(date_part::SparkDatePart, date_part);
4042
make_udf_function!(date_sub::SparkDateSub, date_sub);
4143
make_udf_function!(date_trunc::SparkDateTrunc, date_trunc);
44+
make_udf_function!(
45+
from_utc_timestamp::SparkFromUtcTimestamp,
46+
from_utc_timestamp
47+
);
4248
make_udf_function!(extract::SparkHour, hour);
4349
make_udf_function!(extract::SparkMinute, minute);
4450
make_udf_function!(extract::SparkSecond, second);
@@ -47,6 +53,7 @@ make_udf_function!(make_dt_interval::SparkMakeDtInterval, make_dt_interval);
4753
make_udf_function!(make_interval::SparkMakeInterval, make_interval);
4854
make_udf_function!(next_day::SparkNextDay, next_day);
4955
make_udf_function!(time_trunc::SparkTimeTrunc, time_trunc);
56+
make_udf_function!(to_utc_timestamp::SparkToUtcTimestamp, to_utc_timestamp);
5057
make_udf_function!(trunc::SparkTrunc, trunc);
5158

5259
pub mod expr_fn {
@@ -125,6 +132,16 @@ pub mod expr_fn {
125132
"Extracts a part of the date or time from a date, time, or timestamp expression.",
126133
arg1 arg2
127134
));
135+
export_functions!((
136+
from_utc_timestamp,
137+
"Interpret a given timestamp `ts` in UTC timezone and then convert it to timezone `tz`.",
138+
ts tz
139+
));
140+
export_functions!((
141+
to_utc_timestamp,
142+
"Interpret a given timestamp `ts` in timezone `tz` and then convert it to UTC timezone.",
143+
ts tz
144+
));
128145
}
129146

130147
pub fn functions() -> Vec<Arc<ScalarUDF>> {
@@ -135,6 +152,7 @@ pub fn functions() -> Vec<Arc<ScalarUDF>> {
135152
date_part(),
136153
date_sub(),
137154
date_trunc(),
155+
from_utc_timestamp(),
138156
hour(),
139157
last_day(),
140158
make_dt_interval(),
@@ -143,6 +161,7 @@ pub fn functions() -> Vec<Arc<ScalarUDF>> {
143161
next_day(),
144162
second(),
145163
time_trunc(),
164+
to_utc_timestamp(),
146165
trunc(),
147166
]
148167
}

0 commit comments

Comments
 (0)