Skip to content

Commit 442d3fb

Browse files
authored
feat: add support for timestamp_seconds expression (#3146)
1 parent 6cd6cf3 commit 442d3fb

8 files changed

Lines changed: 291 additions & 5 deletions

File tree

docs/source/contributor-guide/spark_expressions_support.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@
210210
- [ ] second
211211
- [ ] timestamp_micros
212212
- [ ] timestamp_millis
213-
- [ ] timestamp_seconds
213+
- [x] timestamp_seconds
214214
- [ ] to_date
215215
- [ ] to_timestamp
216216
- [ ] to_timestamp_ltz

native/spark-expr/src/comet_scalar_funcs.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use crate::{
2626
spark_lpad, spark_make_decimal, spark_read_side_padding, spark_round, spark_rpad, spark_unhex,
2727
spark_unscaled_value, EvalMode, SparkArrayCompact, SparkArrayPositionFunc, SparkArraysOverlap,
2828
SparkContains, SparkDateDiff, SparkDateFromUnixDate, SparkDateTrunc, SparkMakeDate,
29-
SparkSizeFunc,
29+
SparkSecondsToTimestamp, SparkSizeFunc,
3030
};
3131
use arrow::datatypes::DataType;
3232
use datafusion::common::{DataFusionError, Result as DataFusionResult};
@@ -214,6 +214,7 @@ fn all_scalar_functions() -> Vec<Arc<ScalarUDF>> {
214214
Arc::new(ScalarUDF::new_from_impl(SparkDateFromUnixDate::default())),
215215
Arc::new(ScalarUDF::new_from_impl(SparkDateTrunc::default())),
216216
Arc::new(ScalarUDF::new_from_impl(SparkMakeDate::default())),
217+
Arc::new(ScalarUDF::new_from_impl(SparkSecondsToTimestamp::default())),
217218
Arc::new(ScalarUDF::new_from_impl(SparkSizeFunc::default())),
218219
]
219220
}

native/spark-expr/src/datetime_funcs/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ mod date_trunc;
2121
mod extract_date_part;
2222
mod hours;
2323
mod make_date;
24+
mod seconds_to_timestamp;
2425
mod timestamp_trunc;
2526
mod unix_timestamp;
2627

@@ -32,5 +33,6 @@ pub use extract_date_part::SparkMinute;
3233
pub use extract_date_part::SparkSecond;
3334
pub use hours::SparkHoursTransform;
3435
pub use make_date::SparkMakeDate;
36+
pub use seconds_to_timestamp::SparkSecondsToTimestamp;
3537
pub use timestamp_trunc::TimestampTruncExpr;
3638
pub use unix_timestamp::SparkUnixTimestamp;
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow::array::{
19+
Array, Float32Array, Float64Array, Int32Array, Int64Array, TimestampMicrosecondArray,
20+
};
21+
use arrow::compute::try_unary;
22+
use arrow::datatypes::{DataType, TimeUnit};
23+
use datafusion::common::{utils::take_function_args, DataFusionError, Result, ScalarValue};
24+
use datafusion::logical_expr::{
25+
ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility,
26+
};
27+
use std::any::Any;
28+
use std::sync::Arc;
29+
30+
const MICROS_PER_SECOND: i64 = 1_000_000;
31+
32+
/// Spark-compatible seconds_to_timestamp (timestamp_seconds) function.
33+
/// Converts seconds since Unix epoch to a timestamp.
34+
#[derive(Debug, PartialEq, Eq, Hash)]
35+
pub struct SparkSecondsToTimestamp {
36+
signature: Signature,
37+
aliases: Vec<String>,
38+
}
39+
40+
impl SparkSecondsToTimestamp {
41+
pub fn new() -> Self {
42+
Self {
43+
signature: Signature::one_of(
44+
vec![
45+
TypeSignature::Exact(vec![DataType::Int32]),
46+
TypeSignature::Exact(vec![DataType::Int64]),
47+
TypeSignature::Exact(vec![DataType::Float32]),
48+
TypeSignature::Exact(vec![DataType::Float64]),
49+
],
50+
Volatility::Immutable,
51+
),
52+
aliases: vec!["timestamp_seconds".to_string()],
53+
}
54+
}
55+
}
56+
57+
impl Default for SparkSecondsToTimestamp {
58+
fn default() -> Self {
59+
Self::new()
60+
}
61+
}
62+
63+
impl ScalarUDFImpl for SparkSecondsToTimestamp {
64+
fn as_any(&self) -> &dyn Any {
65+
self
66+
}
67+
68+
fn name(&self) -> &str {
69+
"seconds_to_timestamp"
70+
}
71+
72+
fn signature(&self) -> &Signature {
73+
&self.signature
74+
}
75+
76+
fn return_type(&self, _: &[DataType]) -> Result<DataType> {
77+
Ok(DataType::Timestamp(TimeUnit::Microsecond, None))
78+
}
79+
80+
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
81+
let [seconds] = take_function_args(self.name(), args.args)?;
82+
83+
match seconds {
84+
ColumnarValue::Array(arr) => {
85+
// Handle Int32 input — no overflow possible since i32 * 1_000_000 fits in i64
86+
if let Some(int_array) = arr.as_any().downcast_ref::<Int32Array>() {
87+
let result: TimestampMicrosecondArray =
88+
try_unary(int_array, |s| Ok((s as i64) * MICROS_PER_SECOND))?;
89+
return Ok(ColumnarValue::Array(Arc::new(result)));
90+
}
91+
92+
// Handle Int64 input — error on overflow to match Spark's Math.multiplyExact
93+
if let Some(int_array) = arr.as_any().downcast_ref::<Int64Array>() {
94+
let result: TimestampMicrosecondArray = try_unary(int_array, |s| {
95+
s.checked_mul(MICROS_PER_SECOND).ok_or_else(|| {
96+
arrow::error::ArrowError::ComputeError("long overflow".to_string())
97+
})
98+
})?;
99+
return Ok(ColumnarValue::Array(Arc::new(result)));
100+
}
101+
102+
// Handle Float32 input — cast to f64 and use Float64 path
103+
if let Some(float_array) = arr.as_any().downcast_ref::<Float32Array>() {
104+
let result: arrow::array::TimestampMicrosecondArray = float_array
105+
.iter()
106+
.map(|opt| {
107+
opt.and_then(|s| {
108+
let s = s as f64;
109+
if s.is_nan() || s.is_infinite() {
110+
None
111+
} else {
112+
Some((s * (MICROS_PER_SECOND as f64)) as i64)
113+
}
114+
})
115+
})
116+
.collect();
117+
return Ok(ColumnarValue::Array(Arc::new(result)));
118+
}
119+
120+
// Handle Float64 input — NaN and Infinity return null per Spark behavior
121+
if let Some(float_array) = arr.as_any().downcast_ref::<Float64Array>() {
122+
let result: arrow::array::TimestampMicrosecondArray = float_array
123+
.iter()
124+
.map(|opt| {
125+
opt.and_then(|s| {
126+
if s.is_nan() || s.is_infinite() {
127+
None
128+
} else {
129+
Some((s * (MICROS_PER_SECOND as f64)) as i64)
130+
}
131+
})
132+
})
133+
.collect();
134+
return Ok(ColumnarValue::Array(Arc::new(result)));
135+
}
136+
137+
Err(DataFusionError::Execution(format!(
138+
"seconds_to_timestamp expects Int32, Int64, Float32 or Float64 input, got {:?}",
139+
arr.data_type()
140+
)))
141+
}
142+
ColumnarValue::Scalar(scalar) => {
143+
let ts_micros = match &scalar {
144+
ScalarValue::Int32(Some(s)) => Some((*s as i64) * MICROS_PER_SECOND),
145+
ScalarValue::Int64(Some(s)) => {
146+
Some(s.checked_mul(MICROS_PER_SECOND).ok_or_else(|| {
147+
DataFusionError::ArrowError(
148+
Box::new(arrow::error::ArrowError::ComputeError(
149+
"long overflow".to_string(),
150+
)),
151+
None,
152+
)
153+
})?)
154+
}
155+
ScalarValue::Float32(Some(s)) => {
156+
let s = *s as f64;
157+
if s.is_nan() || s.is_infinite() {
158+
None
159+
} else {
160+
Some((s * (MICROS_PER_SECOND as f64)) as i64)
161+
}
162+
}
163+
ScalarValue::Float64(Some(s)) => {
164+
if s.is_nan() || s.is_infinite() {
165+
None
166+
} else {
167+
Some((s * (MICROS_PER_SECOND as f64)) as i64)
168+
}
169+
}
170+
ScalarValue::Int32(None)
171+
| ScalarValue::Int64(None)
172+
| ScalarValue::Float32(None)
173+
| ScalarValue::Float64(None)
174+
| ScalarValue::Null => None,
175+
_ => {
176+
return Err(DataFusionError::Execution(format!(
177+
"seconds_to_timestamp expects numeric scalar input, got {:?}",
178+
scalar.data_type()
179+
)))
180+
}
181+
};
182+
Ok(ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
183+
ts_micros, None,
184+
)))
185+
}
186+
}
187+
}
188+
189+
fn aliases(&self) -> &[String] {
190+
&self.aliases
191+
}
192+
}

native/spark-expr/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,8 @@ pub use comet_scalar_funcs::{
7474
pub use csv_funcs::*;
7575
pub use datetime_funcs::{
7676
SparkDateDiff, SparkDateFromUnixDate, SparkDateTrunc, SparkHour, SparkHoursTransform,
77-
SparkMakeDate, SparkMinute, SparkSecond, SparkUnixTimestamp, TimestampTruncExpr,
77+
SparkMakeDate, SparkMinute, SparkSecond, SparkSecondsToTimestamp, SparkUnixTimestamp,
78+
TimestampTruncExpr,
7879
};
7980
pub use error::{decimal_overflow_error, SparkError, SparkErrorWithContext, SparkResult};
8081
pub use hash_funcs::*;

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,7 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim {
218218
classOf[Minute] -> CometMinute,
219219
classOf[NextDay] -> CometNextDay,
220220
classOf[Second] -> CometSecond,
221+
classOf[SecondsToTimestamp] -> CometSecondsToTimestamp,
221222
classOf[TruncDate] -> CometTruncDate,
222223
classOf[TruncTimestamp] -> CometTruncTimestamp,
223224
classOf[UnixTimestamp] -> CometUnixTimestamp,

spark/src/main/scala/org/apache/comet/serde/datetime.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@ package org.apache.comet.serde
2121

2222
import java.util.Locale
2323

24-
import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateFromUnixDate, DateSub, DayOfMonth, DayOfWeek, DayOfYear, Days, GetDateField, Hour, Hours, LastDay, Literal, MakeDate, Minute, Month, NextDay, Quarter, Second, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year}
24+
import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateFromUnixDate, DateSub, DayOfMonth, DayOfWeek, DayOfYear, Days, GetDateField, Hour, Hours, LastDay, Literal, MakeDate, Minute, Month, NextDay, Quarter, Second, SecondsToTimestamp, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year}
2525
import org.apache.spark.sql.internal.SQLConf
26-
import org.apache.spark.sql.types.{DateType, IntegerType, StringType, TimestampNTZType, TimestampType}
26+
import org.apache.spark.sql.types.{DateType, DoubleType, FloatType, IntegerType, LongType, StringType, TimestampNTZType, TimestampType}
2727
import org.apache.spark.unsafe.types.UTF8String
2828

2929
import org.apache.comet.CometSparkSessionExtensions.withInfo
@@ -368,6 +368,15 @@ object CometNextDay extends CometScalarFunction[NextDay]("next_day")
368368

369369
object CometMakeDate extends CometScalarFunction[MakeDate]("make_date")
370370

371+
object CometSecondsToTimestamp
372+
extends CometScalarFunction[SecondsToTimestamp]("seconds_to_timestamp") {
373+
override def getSupportLevel(expr: SecondsToTimestamp): SupportLevel =
374+
expr.child.dataType match {
375+
case IntegerType | LongType | FloatType | DoubleType => Compatible()
376+
case dt => Unsupported(Some(s"timestamp_seconds does not support input type $dt"))
377+
}
378+
}
379+
371380
object CometLastDay extends CometScalarFunction[LastDay]("last_day")
372381

373382
object CometDateFromUnixDate extends CometScalarFunction[DateFromUnixDate]("date_from_unix_date")
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
-- Licensed to the Apache Software Foundation (ASF) under one
2+
-- or more contributor license agreements. See the NOTICE file
3+
-- distributed with this work for additional information
4+
-- regarding copyright ownership. The ASF licenses this file
5+
-- to you under the Apache License, Version 2.0 (the
6+
-- "License"); you may not use this file except in compliance
7+
-- with the License. You may obtain a copy of the License at
8+
--
9+
-- http://www.apache.org/licenses/LICENSE-2.0
10+
--
11+
-- Unless required by applicable law or agreed to in writing,
12+
-- software distributed under the License is distributed on an
13+
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
-- KIND, either express or implied. See the License for the
15+
-- specific language governing permissions and limitations
16+
-- under the License.
17+
18+
-- Config: spark.sql.session.timeZone=UTC
19+
-- ConfigMatrix: parquet.enable.dictionary=false,true
20+
21+
-- bigint column
22+
statement
23+
CREATE TABLE test_ts_seconds_bigint(c0 bigint) USING parquet
24+
25+
statement
26+
INSERT INTO test_ts_seconds_bigint VALUES (0), (1640995200), (-86400), (4102444800), (-2208988800), (NULL)
27+
28+
query
29+
SELECT c0, timestamp_seconds(c0) FROM test_ts_seconds_bigint
30+
31+
-- int column
32+
statement
33+
CREATE TABLE test_ts_seconds_int(c0 int) USING parquet
34+
35+
statement
36+
INSERT INTO test_ts_seconds_int VALUES (0), (1640995200), (-86400), (NULL)
37+
38+
query
39+
SELECT c0, timestamp_seconds(c0) FROM test_ts_seconds_int
40+
41+
-- double column
42+
statement
43+
CREATE TABLE test_ts_seconds_double(c0 double) USING parquet
44+
45+
statement
46+
INSERT INTO test_ts_seconds_double VALUES (0.0), (1640995200.123), (-86400.5), (NULL)
47+
48+
query
49+
SELECT c0, timestamp_seconds(c0) FROM test_ts_seconds_double
50+
51+
-- literal arguments
52+
query
53+
SELECT timestamp_seconds(0)
54+
55+
query
56+
SELECT timestamp_seconds(1640995200)
57+
58+
-- negative value (before epoch)
59+
query
60+
SELECT timestamp_seconds(-86400)
61+
62+
-- decimal seconds (fractional)
63+
query
64+
SELECT timestamp_seconds(CAST(1640995200.123 AS DOUBLE))
65+
66+
-- null handling
67+
query
68+
SELECT timestamp_seconds(NULL)
69+
70+
-- NaN input (should return null)
71+
query
72+
SELECT timestamp_seconds(CAST('NaN' AS DOUBLE))
73+
74+
-- Infinity input (should return null)
75+
query
76+
SELECT timestamp_seconds(CAST('Infinity' AS DOUBLE))
77+
78+
-- Negative infinity input (should return null)
79+
query
80+
SELECT timestamp_seconds(CAST('-Infinity' AS DOUBLE))

0 commit comments

Comments
 (0)