Skip to content

Commit 9372a5e

Browse files
authored
feat: add support for date_from_unix_date expression (#3144)
1 parent 3861c4a commit 9372a5e

File tree

8 files changed

+143
-5
lines changed

8 files changed

+143
-5
lines changed

docs/spark_expressions_support.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@
171171
- [ ] date_add
172172
- [ ] date_diff
173173
- [ ] date_format
174-
- [ ] date_from_unix_date
174+
- [x] date_from_unix_date
175175
- [x] date_part
176176
- [ ] date_sub
177177
- [ ] date_trunc

native/spark-expr/src/comet_scalar_funcs.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use crate::{
2424
spark_ceil, spark_decimal_div, spark_decimal_integral_div, spark_floor, spark_isnan,
2525
spark_lpad, spark_make_decimal, spark_read_side_padding, spark_round, spark_rpad, spark_unhex,
2626
spark_unscaled_value, EvalMode, SparkArrayCompact, SparkContains, SparkDateDiff,
27-
SparkDateTrunc, SparkMakeDate, SparkSizeFunc,
27+
SparkDateFromUnixDate, SparkDateTrunc, SparkMakeDate, SparkSizeFunc,
2828
};
2929
use arrow::datatypes::DataType;
3030
use datafusion::common::{DataFusionError, Result as DataFusionResult};
@@ -203,6 +203,7 @@ fn all_scalar_functions() -> Vec<Arc<ScalarUDF>> {
203203
Arc::new(ScalarUDF::new_from_impl(SparkArrayCompact::default())),
204204
Arc::new(ScalarUDF::new_from_impl(SparkContains::default())),
205205
Arc::new(ScalarUDF::new_from_impl(SparkDateDiff::default())),
206+
Arc::new(ScalarUDF::new_from_impl(SparkDateFromUnixDate::default())),
206207
Arc::new(ScalarUDF::new_from_impl(SparkDateTrunc::default())),
207208
Arc::new(ScalarUDF::new_from_impl(SparkMakeDate::default())),
208209
Arc::new(ScalarUDF::new_from_impl(SparkSizeFunc::default())),
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow::array::{Array, Date32Array, Int32Array};
19+
use arrow::datatypes::DataType;
20+
use datafusion::common::{utils::take_function_args, DataFusionError, Result, ScalarValue};
21+
use datafusion::logical_expr::{
22+
ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
23+
};
24+
use std::any::Any;
25+
use std::sync::Arc;
26+
27+
/// Spark-compatible date_from_unix_date function.
28+
/// Converts an integer representing days since Unix epoch (1970-01-01) to a Date32 value.
29+
#[derive(Debug, PartialEq, Eq, Hash)]
30+
pub struct SparkDateFromUnixDate {
31+
signature: Signature,
32+
aliases: Vec<String>,
33+
}
34+
35+
impl SparkDateFromUnixDate {
36+
pub fn new() -> Self {
37+
Self {
38+
signature: Signature::exact(vec![DataType::Int32], Volatility::Immutable),
39+
aliases: vec![],
40+
}
41+
}
42+
}
43+
44+
impl Default for SparkDateFromUnixDate {
45+
fn default() -> Self {
46+
Self::new()
47+
}
48+
}
49+
50+
impl ScalarUDFImpl for SparkDateFromUnixDate {
51+
fn as_any(&self) -> &dyn Any {
52+
self
53+
}
54+
55+
fn name(&self) -> &str {
56+
"date_from_unix_date"
57+
}
58+
59+
fn signature(&self) -> &Signature {
60+
&self.signature
61+
}
62+
63+
fn return_type(&self, _: &[DataType]) -> Result<DataType> {
64+
Ok(DataType::Date32)
65+
}
66+
67+
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
68+
let [unix_date] = take_function_args(self.name(), args.args)?;
69+
match unix_date {
70+
ColumnarValue::Array(arr) => {
71+
let int_array = arr.as_any().downcast_ref::<Int32Array>().ok_or_else(|| {
72+
DataFusionError::Execution(
73+
"date_from_unix_date expects Int32Array input".to_string(),
74+
)
75+
})?;
76+
77+
// Date32 and Int32 both represent days since epoch, so we can directly
78+
// reinterpret the values. The only operation needed is creating a Date32Array
79+
// from the same underlying i32 values.
80+
let date_array =
81+
Date32Array::new(int_array.values().clone(), int_array.nulls().cloned());
82+
83+
Ok(ColumnarValue::Array(Arc::new(date_array)))
84+
}
85+
ColumnarValue::Scalar(scalar) => match scalar {
86+
ScalarValue::Int32(Some(days)) => {
87+
Ok(ColumnarValue::Scalar(ScalarValue::Date32(Some(days))))
88+
}
89+
ScalarValue::Int32(None) | ScalarValue::Null => {
90+
Ok(ColumnarValue::Scalar(ScalarValue::Date32(None)))
91+
}
92+
_ => Err(DataFusionError::Execution(
93+
"date_from_unix_date expects Int32 scalar input".to_string(),
94+
)),
95+
},
96+
}
97+
}
98+
99+
fn aliases(&self) -> &[String] {
100+
&self.aliases
101+
}
102+
}

native/spark-expr/src/datetime_funcs/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
// under the License.
1717

1818
mod date_diff;
19+
mod date_from_unix_date;
1920
mod date_trunc;
2021
mod extract_date_part;
2122
mod hours;
@@ -24,6 +25,7 @@ mod timestamp_trunc;
2425
mod unix_timestamp;
2526

2627
pub use date_diff::SparkDateDiff;
28+
pub use date_from_unix_date::SparkDateFromUnixDate;
2729
pub use date_trunc::SparkDateTrunc;
2830
pub use extract_date_part::SparkHour;
2931
pub use extract_date_part::SparkMinute;

native/spark-expr/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,8 @@ pub use comet_scalar_funcs::{
7171
};
7272
pub use csv_funcs::*;
7373
pub use datetime_funcs::{
74-
SparkDateDiff, SparkDateTrunc, SparkHour, SparkHoursTransform, SparkMakeDate, SparkMinute,
75-
SparkSecond, SparkUnixTimestamp, TimestampTruncExpr,
74+
SparkDateDiff, SparkDateFromUnixDate, SparkDateTrunc, SparkHour, SparkHoursTransform,
75+
SparkMakeDate, SparkMinute, SparkSecond, SparkUnixTimestamp, TimestampTruncExpr,
7676
};
7777
pub use error::{decimal_overflow_error, SparkError, SparkErrorWithContext, SparkResult};
7878
pub use hash_funcs::*;

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ object QueryPlanSerde extends Logging with CometExprShim {
198198
classOf[DateAdd] -> CometDateAdd,
199199
classOf[DateDiff] -> CometDateDiff,
200200
classOf[DateFormatClass] -> CometDateFormat,
201+
classOf[DateFromUnixDate] -> CometDateFromUnixDate,
201202
classOf[Days] -> CometDays,
202203
classOf[Hours] -> CometHours,
203204
classOf[DateSub] -> CometDateSub,

spark/src/main/scala/org/apache/comet/serde/datetime.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ package org.apache.comet.serde
2121

2222
import java.util.Locale
2323

24-
import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateSub, DayOfMonth, DayOfWeek, DayOfYear, Days, GetDateField, Hour, Hours, LastDay, Literal, MakeDate, Minute, Month, NextDay, Quarter, Second, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year}
24+
import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateFromUnixDate, DateSub, DayOfMonth, DayOfWeek, DayOfYear, Days, GetDateField, Hour, Hours, LastDay, Literal, MakeDate, Minute, Month, NextDay, Quarter, Second, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year}
2525
import org.apache.spark.sql.internal.SQLConf
2626
import org.apache.spark.sql.types.{DateType, IntegerType, StringType, TimestampNTZType, TimestampType}
2727
import org.apache.spark.unsafe.types.UTF8String
@@ -354,6 +354,8 @@ object CometMakeDate extends CometScalarFunction[MakeDate]("make_date")
354354

355355
object CometLastDay extends CometScalarFunction[LastDay]("last_day")
356356

357+
object CometDateFromUnixDate extends CometScalarFunction[DateFromUnixDate]("date_from_unix_date")
358+
357359
object CometDateDiff extends CometScalarFunction[DateDiff]("date_diff")
358360

359361
/**
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
-- Licensed to the Apache Software Foundation (ASF) under one
2+
-- or more contributor license agreements. See the NOTICE file
3+
-- distributed with this work for additional information
4+
-- regarding copyright ownership. The ASF licenses this file
5+
-- to you under the Apache License, Version 2.0 (the
6+
-- "License"); you may not use this file except in compliance
7+
-- with the License. You may obtain a copy of the License at
8+
--
9+
-- http://www.apache.org/licenses/LICENSE-2.0
10+
--
11+
-- Unless required by applicable law or agreed to in writing,
12+
-- software distributed under the License is distributed on an
13+
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
-- KIND, either express or implied. See the License for the
15+
-- specific language governing permissions and limitations
16+
-- under the License.
17+
18+
statement
19+
CREATE TABLE test_date_from_unix_date(i int) USING parquet
20+
21+
-- -719162 = 0001-01-01 (Spark min date), 2932896 = 9999-12-31 (Spark max date)
22+
statement
23+
INSERT INTO test_date_from_unix_date VALUES (0), (1), (-1), (18993), (-25567), (-719162), (2932896), (NULL)
24+
25+
query
26+
SELECT date_from_unix_date(i) FROM test_date_from_unix_date
27+
28+
-- literal arguments
29+
query
30+
SELECT date_from_unix_date(0), date_from_unix_date(1), date_from_unix_date(-1), date_from_unix_date(18993), date_from_unix_date(NULL)

0 commit comments

Comments
 (0)