Skip to content
Merged
1 change: 1 addition & 0 deletions docs/source/user-guide/latest/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ These settings can be used to determine which parts of the plan are accelerated
| `spark.comet.expression.UnaryMinus.enabled` | Enable Comet acceleration for `UnaryMinus` | true |
| `spark.comet.expression.Unhex.enabled` | Enable Comet acceleration for `Unhex` | true |
| `spark.comet.expression.UnixDate.enabled` | Enable Comet acceleration for `UnixDate` | true |
| `spark.comet.expression.UnixTimestamp.enabled` | Enable Comet acceleration for `UnixTimestamp` | true |
| `spark.comet.expression.UnscaledValue.enabled` | Enable Comet acceleration for `UnscaledValue` | true |
| `spark.comet.expression.Upper.enabled` | Enable Comet acceleration for `Upper` | true |
| `spark.comet.expression.WeekDay.enabled` | Enable Comet acceleration for `WeekDay` | true |
Expand Down
2 changes: 1 addition & 1 deletion docs/spark_expressions_support.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@
- [ ] unix_micros
- [ ] unix_millis
- [ ] unix_seconds
- [ ] unix_timestamp
- [x] unix_timestamp
- [ ] weekday
- [ ] weekofyear
- [ ] year
Expand Down
1 change: 1 addition & 0 deletions native/core/src/execution/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub mod logical;
pub mod nullcheck;
pub mod strings;
pub mod subquery;
pub mod temporal;

pub use datafusion_comet_spark_expr::EvalMode;

Expand Down
162 changes: 162 additions & 0 deletions native/core/src/execution/expressions/temporal.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Temporal expression builders

use std::sync::Arc;

use arrow::datatypes::{DataType, Field, SchemaRef};
use datafusion::config::ConfigOptions;
use datafusion::logical_expr::ScalarUDF;
use datafusion::physical_expr::{PhysicalExpr, ScalarFunctionExpr};
use datafusion_comet_proto::spark_expression::Expr;
use datafusion_comet_spark_expr::{
SparkHour, SparkMinute, SparkSecond, SparkUnixTimestamp, TimestampTruncExpr,
};

use crate::execution::{
expressions::extract_expr,
operators::ExecutionError,
planner::{expression_registry::ExpressionBuilder, PhysicalPlanner},
};

pub struct HourBuilder;

impl ExpressionBuilder for HourBuilder {
fn build(
&self,
spark_expr: &Expr,
input_schema: SchemaRef,
planner: &PhysicalPlanner,
) -> Result<Arc<dyn PhysicalExpr>, ExecutionError> {
let expr = extract_expr!(spark_expr, Hour);
let child = planner.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?;
let timezone = expr.timezone.clone();
let args = vec![child];
let comet_hour = Arc::new(ScalarUDF::new_from_impl(SparkHour::new(timezone)));
let field_ref = Arc::new(Field::new("hour", DataType::Int32, true));
let expr: ScalarFunctionExpr = ScalarFunctionExpr::new(
"hour",
comet_hour,
args,
field_ref,
Arc::new(ConfigOptions::default()),
);

Ok(Arc::new(expr))
}
}

pub struct MinuteBuilder;

impl ExpressionBuilder for MinuteBuilder {
fn build(
&self,
spark_expr: &Expr,
input_schema: SchemaRef,
planner: &PhysicalPlanner,
) -> Result<Arc<dyn PhysicalExpr>, ExecutionError> {
let expr = extract_expr!(spark_expr, Minute);
let child = planner.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?;
let timezone = expr.timezone.clone();
let args = vec![child];
let comet_minute = Arc::new(ScalarUDF::new_from_impl(SparkMinute::new(timezone)));
let field_ref = Arc::new(Field::new("minute", DataType::Int32, true));
let expr: ScalarFunctionExpr = ScalarFunctionExpr::new(
"minute",
comet_minute,
args,
field_ref,
Arc::new(ConfigOptions::default()),
);

Ok(Arc::new(expr))
}
}

pub struct SecondBuilder;

impl ExpressionBuilder for SecondBuilder {
fn build(
&self,
spark_expr: &Expr,
input_schema: SchemaRef,
planner: &PhysicalPlanner,
) -> Result<Arc<dyn PhysicalExpr>, ExecutionError> {
let expr = extract_expr!(spark_expr, Second);
let child = planner.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?;
let timezone = expr.timezone.clone();
let args = vec![child];
let comet_second = Arc::new(ScalarUDF::new_from_impl(SparkSecond::new(timezone)));
let field_ref = Arc::new(Field::new("second", DataType::Int32, true));
let expr: ScalarFunctionExpr = ScalarFunctionExpr::new(
"second",
comet_second,
args,
field_ref,
Arc::new(ConfigOptions::default()),
);

Ok(Arc::new(expr))
}
}

pub struct UnixTimestampBuilder;

impl ExpressionBuilder for UnixTimestampBuilder {
fn build(
&self,
spark_expr: &Expr,
input_schema: SchemaRef,
planner: &PhysicalPlanner,
) -> Result<Arc<dyn PhysicalExpr>, ExecutionError> {
let expr = extract_expr!(spark_expr, UnixTimestamp);
let child = planner.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?;
let timezone = expr.timezone.clone();
let args = vec![child];
let comet_unix_timestamp =
Arc::new(ScalarUDF::new_from_impl(SparkUnixTimestamp::new(timezone)));
let field_ref = Arc::new(Field::new("unix_timestamp", DataType::Int64, true));
let expr: ScalarFunctionExpr = ScalarFunctionExpr::new(
"unix_timestamp",
comet_unix_timestamp,
args,
field_ref,
Arc::new(ConfigOptions::default()),
);

Ok(Arc::new(expr))
}
}

pub struct TruncTimestampBuilder;

impl ExpressionBuilder for TruncTimestampBuilder {
fn build(
&self,
spark_expr: &Expr,
input_schema: SchemaRef,
planner: &PhysicalPlanner,
) -> Result<Arc<dyn PhysicalExpr>, ExecutionError> {
let expr = extract_expr!(spark_expr, TruncTimestamp);
let child = planner.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?;
let format = planner.create_expr(expr.format.as_ref().unwrap(), input_schema)?;
let timezone = expr.timezone.clone();

Ok(Arc::new(TimestampTruncExpr::new(child, format, timezone)))
}
}
65 changes: 2 additions & 63 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ use datafusion::{
};
use datafusion_comet_spark_expr::{
create_comet_physical_fun, create_comet_physical_fun_with_eval_mode, BinaryOutputStyle,
BloomFilterAgg, BloomFilterMightContain, EvalMode, SparkHour, SparkMinute, SparkSecond,
SumInteger,
BloomFilterAgg, BloomFilterMightContain, EvalMode, SumInteger,
};
use iceberg::expr::Bind;

Expand Down Expand Up @@ -126,8 +125,7 @@ use datafusion_comet_spark_expr::monotonically_increasing_id::MonotonicallyIncre
use datafusion_comet_spark_expr::{
ArrayInsert, Avg, AvgDecimal, Cast, CheckOverflow, Correlation, Covariance, CreateNamedStruct,
GetArrayStructFields, GetStructField, IfExpr, ListExtract, NormalizeNaNAndZero, RandExpr,
RandnExpr, SparkCastOptions, Stddev, SumDecimal, TimestampTruncExpr, ToJson, UnboundColumn,
Variance,
RandnExpr, SparkCastOptions, Stddev, SumDecimal, ToJson, UnboundColumn, Variance,
};
use itertools::Itertools;
use jni::objects::GlobalRef;
Expand Down Expand Up @@ -375,65 +373,6 @@ impl PhysicalPlanner {
SparkCastOptions::new(eval_mode, &expr.timezone, expr.allow_incompat),
)))
}
ExprStruct::Hour(expr) => {
let child =
self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?;
let timezone = expr.timezone.clone();
let args = vec![child];
let comet_hour = Arc::new(ScalarUDF::new_from_impl(SparkHour::new(timezone)));
let field_ref = Arc::new(Field::new("hour", DataType::Int32, true));
let expr: ScalarFunctionExpr = ScalarFunctionExpr::new(
"hour",
comet_hour,
args,
field_ref,
Arc::new(ConfigOptions::default()),
);

Ok(Arc::new(expr))
}
ExprStruct::Minute(expr) => {
let child =
self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?;
let timezone = expr.timezone.clone();
let args = vec![child];
let comet_minute = Arc::new(ScalarUDF::new_from_impl(SparkMinute::new(timezone)));
let field_ref = Arc::new(Field::new("minute", DataType::Int32, true));
let expr: ScalarFunctionExpr = ScalarFunctionExpr::new(
"minute",
comet_minute,
args,
field_ref,
Arc::new(ConfigOptions::default()),
);

Ok(Arc::new(expr))
}
ExprStruct::Second(expr) => {
let child =
self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?;
let timezone = expr.timezone.clone();
let args = vec![child];
let comet_second = Arc::new(ScalarUDF::new_from_impl(SparkSecond::new(timezone)));
let field_ref = Arc::new(Field::new("second", DataType::Int32, true));
let expr: ScalarFunctionExpr = ScalarFunctionExpr::new(
"second",
comet_second,
args,
field_ref,
Arc::new(ConfigOptions::default()),
);

Ok(Arc::new(expr))
}
ExprStruct::TruncTimestamp(expr) => {
let child =
self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?;
let format = self.create_expr(expr.format.as_ref().unwrap(), input_schema)?;
let timezone = expr.timezone.clone();

Ok(Arc::new(TimestampTruncExpr::new(child, format, timezone)))
}
ExprStruct::CheckOverflow(expr) => {
let child = self.create_expr(expr.child.as_ref().unwrap(), input_schema)?;
let data_type = to_arrow_datatype(expr.datatype.as_ref().unwrap());
Expand Down
27 changes: 24 additions & 3 deletions native/core/src/execution/planner/expression_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ pub enum ExpressionType {
Minute,
Second,
TruncTimestamp,
UnixTimestamp,
}

/// Registry for expression builders
Expand Down Expand Up @@ -181,9 +182,8 @@ impl ExpressionRegistry {
// Register string expressions
self.register_string_expressions();

// TODO: Register other expression categories in future phases
// self.register_temporal_expressions();
// etc.
// Register temporal expressions
self.register_temporal_expressions();
}

/// Register arithmetic expression builders
Expand Down Expand Up @@ -286,6 +286,26 @@ impl ExpressionRegistry {
.insert(ExpressionType::FromJson, Box::new(FromJsonBuilder));
}

/// Register temporal expression builders
fn register_temporal_expressions(&mut self) {
use crate::execution::expressions::temporal::*;

self.builders
.insert(ExpressionType::Hour, Box::new(HourBuilder));
self.builders
.insert(ExpressionType::Minute, Box::new(MinuteBuilder));
self.builders
.insert(ExpressionType::Second, Box::new(SecondBuilder));
self.builders.insert(
ExpressionType::UnixTimestamp,
Box::new(UnixTimestampBuilder),
);
self.builders.insert(
ExpressionType::TruncTimestamp,
Box::new(TruncTimestampBuilder),
);
}

/// Extract expression type from Spark protobuf expression
fn get_expression_type(spark_expr: &Expr) -> Result<ExpressionType, ExecutionError> {
match spark_expr.expr_struct.as_ref() {
Expand Down Expand Up @@ -355,6 +375,7 @@ impl ExpressionRegistry {
Some(ExprStruct::Minute(_)) => Ok(ExpressionType::Minute),
Some(ExprStruct::Second(_)) => Ok(ExpressionType::Second),
Some(ExprStruct::TruncTimestamp(_)) => Ok(ExpressionType::TruncTimestamp),
Some(ExprStruct::UnixTimestamp(_)) => Ok(ExpressionType::UnixTimestamp),

Some(other) => Err(ExecutionError::GeneralError(format!(
"Unsupported expression type: {:?}",
Expand Down
8 changes: 7 additions & 1 deletion native/proto/src/proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ message Expr {
Rand randn = 62;
EmptyExpr spark_partition_id = 63;
EmptyExpr monotonically_increasing_id = 64;
FromJson from_json = 89;
UnixTimestamp unix_timestamp = 65;
FromJson from_json = 66;
Comment thread
parthchandra marked this conversation as resolved.
}
}

Expand Down Expand Up @@ -304,6 +305,11 @@ message Second {
string timezone = 2;
}

message UnixTimestamp {
Expr child = 1;
string timezone = 2;
}

message CheckOverflow {
Expr child = 1;
DataType datatype = 2;
Expand Down
2 changes: 2 additions & 0 deletions native/spark-expr/src/datetime_funcs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ mod date_diff;
mod date_trunc;
mod extract_date_part;
mod timestamp_trunc;
mod unix_timestamp;

pub use date_diff::SparkDateDiff;
pub use date_trunc::SparkDateTrunc;
pub use extract_date_part::SparkHour;
pub use extract_date_part::SparkMinute;
pub use extract_date_part::SparkSecond;
pub use timestamp_trunc::TimestampTruncExpr;
pub use unix_timestamp::SparkUnixTimestamp;
Loading
Loading