Skip to content

Commit e4a0142

Browse files
authored
feat: Add support for unix_timestamp function (#2936)
1 parent 267ad4c commit e4a0142

14 files changed

Lines changed: 605 additions & 75 deletions

File tree

docs/source/user-guide/latest/configs.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,7 @@ These settings can be used to determine which parts of the plan are accelerated
336336
| `spark.comet.expression.UnaryMinus.enabled` | Enable Comet acceleration for `UnaryMinus` | true |
337337
| `spark.comet.expression.Unhex.enabled` | Enable Comet acceleration for `Unhex` | true |
338338
| `spark.comet.expression.UnixDate.enabled` | Enable Comet acceleration for `UnixDate` | true |
339+
| `spark.comet.expression.UnixTimestamp.enabled` | Enable Comet acceleration for `UnixTimestamp` | true |
339340
| `spark.comet.expression.UnscaledValue.enabled` | Enable Comet acceleration for `UnscaledValue` | true |
340341
| `spark.comet.expression.Upper.enabled` | Enable Comet acceleration for `Upper` | true |
341342
| `spark.comet.expression.WeekDay.enabled` | Enable Comet acceleration for `WeekDay` | true |

docs/spark_expressions_support.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@
217217
- [ ] unix_micros
218218
- [ ] unix_millis
219219
- [ ] unix_seconds
220-
- [ ] unix_timestamp
220+
- [x] unix_timestamp
221221
- [ ] weekday
222222
- [ ] weekofyear
223223
- [ ] year

native/core/src/execution/expressions/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ pub mod logical;
2424
pub mod nullcheck;
2525
pub mod strings;
2626
pub mod subquery;
27+
pub mod temporal;
2728

2829
pub use datafusion_comet_spark_expr::EvalMode;
2930

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Temporal expression builders
19+
20+
use std::sync::Arc;
21+
22+
use arrow::datatypes::{DataType, Field, SchemaRef};
23+
use datafusion::config::ConfigOptions;
24+
use datafusion::logical_expr::ScalarUDF;
25+
use datafusion::physical_expr::{PhysicalExpr, ScalarFunctionExpr};
26+
use datafusion_comet_proto::spark_expression::Expr;
27+
use datafusion_comet_spark_expr::{
28+
SparkHour, SparkMinute, SparkSecond, SparkUnixTimestamp, TimestampTruncExpr,
29+
};
30+
31+
use crate::execution::{
32+
expressions::extract_expr,
33+
operators::ExecutionError,
34+
planner::{expression_registry::ExpressionBuilder, PhysicalPlanner},
35+
};
36+
37+
pub struct HourBuilder;
38+
39+
impl ExpressionBuilder for HourBuilder {
40+
fn build(
41+
&self,
42+
spark_expr: &Expr,
43+
input_schema: SchemaRef,
44+
planner: &PhysicalPlanner,
45+
) -> Result<Arc<dyn PhysicalExpr>, ExecutionError> {
46+
let expr = extract_expr!(spark_expr, Hour);
47+
let child = planner.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?;
48+
let timezone = expr.timezone.clone();
49+
let args = vec![child];
50+
let comet_hour = Arc::new(ScalarUDF::new_from_impl(SparkHour::new(timezone)));
51+
let field_ref = Arc::new(Field::new("hour", DataType::Int32, true));
52+
let expr: ScalarFunctionExpr = ScalarFunctionExpr::new(
53+
"hour",
54+
comet_hour,
55+
args,
56+
field_ref,
57+
Arc::new(ConfigOptions::default()),
58+
);
59+
60+
Ok(Arc::new(expr))
61+
}
62+
}
63+
64+
pub struct MinuteBuilder;
65+
66+
impl ExpressionBuilder for MinuteBuilder {
67+
fn build(
68+
&self,
69+
spark_expr: &Expr,
70+
input_schema: SchemaRef,
71+
planner: &PhysicalPlanner,
72+
) -> Result<Arc<dyn PhysicalExpr>, ExecutionError> {
73+
let expr = extract_expr!(spark_expr, Minute);
74+
let child = planner.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?;
75+
let timezone = expr.timezone.clone();
76+
let args = vec![child];
77+
let comet_minute = Arc::new(ScalarUDF::new_from_impl(SparkMinute::new(timezone)));
78+
let field_ref = Arc::new(Field::new("minute", DataType::Int32, true));
79+
let expr: ScalarFunctionExpr = ScalarFunctionExpr::new(
80+
"minute",
81+
comet_minute,
82+
args,
83+
field_ref,
84+
Arc::new(ConfigOptions::default()),
85+
);
86+
87+
Ok(Arc::new(expr))
88+
}
89+
}
90+
91+
pub struct SecondBuilder;
92+
93+
impl ExpressionBuilder for SecondBuilder {
94+
fn build(
95+
&self,
96+
spark_expr: &Expr,
97+
input_schema: SchemaRef,
98+
planner: &PhysicalPlanner,
99+
) -> Result<Arc<dyn PhysicalExpr>, ExecutionError> {
100+
let expr = extract_expr!(spark_expr, Second);
101+
let child = planner.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?;
102+
let timezone = expr.timezone.clone();
103+
let args = vec![child];
104+
let comet_second = Arc::new(ScalarUDF::new_from_impl(SparkSecond::new(timezone)));
105+
let field_ref = Arc::new(Field::new("second", DataType::Int32, true));
106+
let expr: ScalarFunctionExpr = ScalarFunctionExpr::new(
107+
"second",
108+
comet_second,
109+
args,
110+
field_ref,
111+
Arc::new(ConfigOptions::default()),
112+
);
113+
114+
Ok(Arc::new(expr))
115+
}
116+
}
117+
118+
pub struct UnixTimestampBuilder;
119+
120+
impl ExpressionBuilder for UnixTimestampBuilder {
121+
fn build(
122+
&self,
123+
spark_expr: &Expr,
124+
input_schema: SchemaRef,
125+
planner: &PhysicalPlanner,
126+
) -> Result<Arc<dyn PhysicalExpr>, ExecutionError> {
127+
let expr = extract_expr!(spark_expr, UnixTimestamp);
128+
let child = planner.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?;
129+
let timezone = expr.timezone.clone();
130+
let args = vec![child];
131+
let comet_unix_timestamp =
132+
Arc::new(ScalarUDF::new_from_impl(SparkUnixTimestamp::new(timezone)));
133+
let field_ref = Arc::new(Field::new("unix_timestamp", DataType::Int64, true));
134+
let expr: ScalarFunctionExpr = ScalarFunctionExpr::new(
135+
"unix_timestamp",
136+
comet_unix_timestamp,
137+
args,
138+
field_ref,
139+
Arc::new(ConfigOptions::default()),
140+
);
141+
142+
Ok(Arc::new(expr))
143+
}
144+
}
145+
146+
pub struct TruncTimestampBuilder;
147+
148+
impl ExpressionBuilder for TruncTimestampBuilder {
149+
fn build(
150+
&self,
151+
spark_expr: &Expr,
152+
input_schema: SchemaRef,
153+
planner: &PhysicalPlanner,
154+
) -> Result<Arc<dyn PhysicalExpr>, ExecutionError> {
155+
let expr = extract_expr!(spark_expr, TruncTimestamp);
156+
let child = planner.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?;
157+
let format = planner.create_expr(expr.format.as_ref().unwrap(), input_schema)?;
158+
let timezone = expr.timezone.clone();
159+
160+
Ok(Arc::new(TimestampTruncExpr::new(child, format, timezone)))
161+
}
162+
}

native/core/src/execution/planner.rs

Lines changed: 2 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,7 @@ use datafusion::{
7070
};
7171
use datafusion_comet_spark_expr::{
7272
create_comet_physical_fun, create_comet_physical_fun_with_eval_mode, BinaryOutputStyle,
73-
BloomFilterAgg, BloomFilterMightContain, EvalMode, SparkHour, SparkMinute, SparkSecond,
74-
SumInteger,
73+
BloomFilterAgg, BloomFilterMightContain, EvalMode, SumInteger,
7574
};
7675
use iceberg::expr::Bind;
7776

@@ -126,8 +125,7 @@ use datafusion_comet_spark_expr::monotonically_increasing_id::MonotonicallyIncre
126125
use datafusion_comet_spark_expr::{
127126
ArrayInsert, Avg, AvgDecimal, Cast, CheckOverflow, Correlation, Covariance, CreateNamedStruct,
128127
GetArrayStructFields, GetStructField, IfExpr, ListExtract, NormalizeNaNAndZero, RandExpr,
129-
RandnExpr, SparkCastOptions, Stddev, SumDecimal, TimestampTruncExpr, ToJson, UnboundColumn,
130-
Variance,
128+
RandnExpr, SparkCastOptions, Stddev, SumDecimal, ToJson, UnboundColumn, Variance,
131129
};
132130
use itertools::Itertools;
133131
use jni::objects::GlobalRef;
@@ -375,65 +373,6 @@ impl PhysicalPlanner {
375373
SparkCastOptions::new(eval_mode, &expr.timezone, expr.allow_incompat),
376374
)))
377375
}
378-
ExprStruct::Hour(expr) => {
379-
let child =
380-
self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?;
381-
let timezone = expr.timezone.clone();
382-
let args = vec![child];
383-
let comet_hour = Arc::new(ScalarUDF::new_from_impl(SparkHour::new(timezone)));
384-
let field_ref = Arc::new(Field::new("hour", DataType::Int32, true));
385-
let expr: ScalarFunctionExpr = ScalarFunctionExpr::new(
386-
"hour",
387-
comet_hour,
388-
args,
389-
field_ref,
390-
Arc::new(ConfigOptions::default()),
391-
);
392-
393-
Ok(Arc::new(expr))
394-
}
395-
ExprStruct::Minute(expr) => {
396-
let child =
397-
self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?;
398-
let timezone = expr.timezone.clone();
399-
let args = vec![child];
400-
let comet_minute = Arc::new(ScalarUDF::new_from_impl(SparkMinute::new(timezone)));
401-
let field_ref = Arc::new(Field::new("minute", DataType::Int32, true));
402-
let expr: ScalarFunctionExpr = ScalarFunctionExpr::new(
403-
"minute",
404-
comet_minute,
405-
args,
406-
field_ref,
407-
Arc::new(ConfigOptions::default()),
408-
);
409-
410-
Ok(Arc::new(expr))
411-
}
412-
ExprStruct::Second(expr) => {
413-
let child =
414-
self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?;
415-
let timezone = expr.timezone.clone();
416-
let args = vec![child];
417-
let comet_second = Arc::new(ScalarUDF::new_from_impl(SparkSecond::new(timezone)));
418-
let field_ref = Arc::new(Field::new("second", DataType::Int32, true));
419-
let expr: ScalarFunctionExpr = ScalarFunctionExpr::new(
420-
"second",
421-
comet_second,
422-
args,
423-
field_ref,
424-
Arc::new(ConfigOptions::default()),
425-
);
426-
427-
Ok(Arc::new(expr))
428-
}
429-
ExprStruct::TruncTimestamp(expr) => {
430-
let child =
431-
self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?;
432-
let format = self.create_expr(expr.format.as_ref().unwrap(), input_schema)?;
433-
let timezone = expr.timezone.clone();
434-
435-
Ok(Arc::new(TimestampTruncExpr::new(child, format, timezone)))
436-
}
437376
ExprStruct::CheckOverflow(expr) => {
438377
let child = self.create_expr(expr.child.as_ref().unwrap(), input_schema)?;
439378
let data_type = to_arrow_datatype(expr.datatype.as_ref().unwrap());

native/core/src/execution/planner/expression_registry.rs

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ pub enum ExpressionType {
109109
Minute,
110110
Second,
111111
TruncTimestamp,
112+
UnixTimestamp,
112113
}
113114

114115
/// Registry for expression builders
@@ -181,9 +182,8 @@ impl ExpressionRegistry {
181182
// Register string expressions
182183
self.register_string_expressions();
183184

184-
// TODO: Register other expression categories in future phases
185-
// self.register_temporal_expressions();
186-
// etc.
185+
// Register temporal expressions
186+
self.register_temporal_expressions();
187187
}
188188

189189
/// Register arithmetic expression builders
@@ -286,6 +286,26 @@ impl ExpressionRegistry {
286286
.insert(ExpressionType::FromJson, Box::new(FromJsonBuilder));
287287
}
288288

289+
/// Register temporal expression builders
290+
fn register_temporal_expressions(&mut self) {
291+
use crate::execution::expressions::temporal::*;
292+
293+
self.builders
294+
.insert(ExpressionType::Hour, Box::new(HourBuilder));
295+
self.builders
296+
.insert(ExpressionType::Minute, Box::new(MinuteBuilder));
297+
self.builders
298+
.insert(ExpressionType::Second, Box::new(SecondBuilder));
299+
self.builders.insert(
300+
ExpressionType::UnixTimestamp,
301+
Box::new(UnixTimestampBuilder),
302+
);
303+
self.builders.insert(
304+
ExpressionType::TruncTimestamp,
305+
Box::new(TruncTimestampBuilder),
306+
);
307+
}
308+
289309
/// Extract expression type from Spark protobuf expression
290310
fn get_expression_type(spark_expr: &Expr) -> Result<ExpressionType, ExecutionError> {
291311
match spark_expr.expr_struct.as_ref() {
@@ -355,6 +375,7 @@ impl ExpressionRegistry {
355375
Some(ExprStruct::Minute(_)) => Ok(ExpressionType::Minute),
356376
Some(ExprStruct::Second(_)) => Ok(ExpressionType::Second),
357377
Some(ExprStruct::TruncTimestamp(_)) => Ok(ExpressionType::TruncTimestamp),
378+
Some(ExprStruct::UnixTimestamp(_)) => Ok(ExpressionType::UnixTimestamp),
358379

359380
Some(other) => Err(ExecutionError::GeneralError(format!(
360381
"Unsupported expression type: {:?}",

native/proto/src/proto/expr.proto

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,8 @@ message Expr {
8585
Rand randn = 62;
8686
EmptyExpr spark_partition_id = 63;
8787
EmptyExpr monotonically_increasing_id = 64;
88-
FromJson from_json = 89;
88+
UnixTimestamp unix_timestamp = 65;
89+
FromJson from_json = 66;
8990
}
9091
}
9192

@@ -304,6 +305,11 @@ message Second {
304305
string timezone = 2;
305306
}
306307

308+
message UnixTimestamp {
309+
Expr child = 1;
310+
string timezone = 2;
311+
}
312+
307313
message CheckOverflow {
308314
Expr child = 1;
309315
DataType datatype = 2;

native/spark-expr/src/datetime_funcs/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@ mod date_diff;
1919
mod date_trunc;
2020
mod extract_date_part;
2121
mod timestamp_trunc;
22+
mod unix_timestamp;
2223

2324
pub use date_diff::SparkDateDiff;
2425
pub use date_trunc::SparkDateTrunc;
2526
pub use extract_date_part::SparkHour;
2627
pub use extract_date_part::SparkMinute;
2728
pub use extract_date_part::SparkSecond;
2829
pub use timestamp_trunc::TimestampTruncExpr;
30+
pub use unix_timestamp::SparkUnixTimestamp;

0 commit comments

Comments
 (0)