Skip to content

Commit 6af833b

Browse files
committed
fix
1 parent e5c8d28 commit 6af833b

3 files changed

Lines changed: 25 additions & 84 deletions

File tree

native/core/src/execution/expressions/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ pub mod logical;
2424
pub mod nullcheck;
2525
pub mod strings;
2626
pub mod subquery;
27+
pub mod temporal;
2728

2829
pub use datafusion_comet_spark_expr::EvalMode;
2930

native/core/src/execution/planner.rs

Lines changed: 2 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,7 @@ use datafusion::{
7070
};
7171
use datafusion_comet_spark_expr::{
7272
create_comet_physical_fun, create_comet_physical_fun_with_eval_mode, BinaryOutputStyle,
73-
BloomFilterAgg, BloomFilterMightContain, EvalMode, SparkHour, SparkMinute, SparkSecond,
74-
SparkUnixTimestamp,
73+
BloomFilterAgg, BloomFilterMightContain, EvalMode,
7574
};
7675
use iceberg::expr::Bind;
7776

@@ -126,8 +125,7 @@ use datafusion_comet_spark_expr::monotonically_increasing_id::MonotonicallyIncre
126125
use datafusion_comet_spark_expr::{
127126
ArrayInsert, Avg, AvgDecimal, Cast, CheckOverflow, Correlation, Covariance, CreateNamedStruct,
128127
GetArrayStructFields, GetStructField, IfExpr, ListExtract, NormalizeNaNAndZero, RandExpr,
129-
RandnExpr, SparkCastOptions, Stddev, SumDecimal, TimestampTruncExpr, ToJson, UnboundColumn,
130-
Variance,
128+
RandnExpr, SparkCastOptions, Stddev, SumDecimal, ToJson, UnboundColumn, Variance,
131129
};
132130
use itertools::Itertools;
133131
use jni::objects::GlobalRef;
@@ -375,83 +373,6 @@ impl PhysicalPlanner {
375373
SparkCastOptions::new(eval_mode, &expr.timezone, expr.allow_incompat),
376374
)))
377375
}
378-
ExprStruct::Hour(expr) => {
379-
let child =
380-
self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?;
381-
let timezone = expr.timezone.clone();
382-
let args = vec![child];
383-
let comet_hour = Arc::new(ScalarUDF::new_from_impl(SparkHour::new(timezone)));
384-
let field_ref = Arc::new(Field::new("hour", DataType::Int32, true));
385-
let expr: ScalarFunctionExpr = ScalarFunctionExpr::new(
386-
"hour",
387-
comet_hour,
388-
args,
389-
field_ref,
390-
Arc::new(ConfigOptions::default()),
391-
);
392-
393-
Ok(Arc::new(expr))
394-
}
395-
ExprStruct::Minute(expr) => {
396-
let child =
397-
self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?;
398-
let timezone = expr.timezone.clone();
399-
let args = vec![child];
400-
let comet_minute = Arc::new(ScalarUDF::new_from_impl(SparkMinute::new(timezone)));
401-
let field_ref = Arc::new(Field::new("minute", DataType::Int32, true));
402-
let expr: ScalarFunctionExpr = ScalarFunctionExpr::new(
403-
"minute",
404-
comet_minute,
405-
args,
406-
field_ref,
407-
Arc::new(ConfigOptions::default()),
408-
);
409-
410-
Ok(Arc::new(expr))
411-
}
412-
ExprStruct::Second(expr) => {
413-
let child =
414-
self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?;
415-
let timezone = expr.timezone.clone();
416-
let args = vec![child];
417-
let comet_second = Arc::new(ScalarUDF::new_from_impl(SparkSecond::new(timezone)));
418-
let field_ref = Arc::new(Field::new("second", DataType::Int32, true));
419-
let expr: ScalarFunctionExpr = ScalarFunctionExpr::new(
420-
"second",
421-
comet_second,
422-
args,
423-
field_ref,
424-
Arc::new(ConfigOptions::default()),
425-
);
426-
427-
Ok(Arc::new(expr))
428-
}
429-
ExprStruct::UnixTimestamp(expr) => {
430-
let child =
431-
self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?;
432-
let timezone = expr.timezone.clone();
433-
let args = vec![child];
434-
let comet_unix_timestamp =
435-
Arc::new(ScalarUDF::new_from_impl(SparkUnixTimestamp::new(timezone)));
436-
let field_ref = Arc::new(Field::new("unix_timestamp", DataType::Int64, true));
437-
let expr: ScalarFunctionExpr = ScalarFunctionExpr::new(
438-
"unix_timestamp",
439-
comet_unix_timestamp,
440-
args,
441-
field_ref,
442-
Arc::new(ConfigOptions::default()),
443-
);
444-
445-
Ok(Arc::new(expr))
446-
}
447-
ExprStruct::TruncTimestamp(expr) => {
448-
let child =
449-
self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?;
450-
let format = self.create_expr(expr.format.as_ref().unwrap(), input_schema)?;
451-
let timezone = expr.timezone.clone();
452-
453-
Ok(Arc::new(TimestampTruncExpr::new(child, format, timezone)))
454-
}
455376
ExprStruct::CheckOverflow(expr) => {
456377
let child = self.create_expr(expr.child.as_ref().unwrap(), input_schema)?;
457378
let data_type = to_arrow_datatype(expr.datatype.as_ref().unwrap());

native/core/src/execution/planner/expression_registry.rs

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -182,9 +182,8 @@ impl ExpressionRegistry {
182182
// Register string expressions
183183
self.register_string_expressions();
184184

185-
// TODO: Register other expression categories in future phases
186-
// self.register_temporal_expressions();
187-
// etc.
185+
// Register temporal expressions
186+
self.register_temporal_expressions();
188187
}
189188

190189
/// Register arithmetic expression builders
@@ -287,6 +286,26 @@ impl ExpressionRegistry {
287286
.insert(ExpressionType::FromJson, Box::new(FromJsonBuilder));
288287
}
289288

289+
/// Register temporal expression builders
290+
fn register_temporal_expressions(&mut self) {
291+
use crate::execution::expressions::temporal::*;
292+
293+
self.builders
294+
.insert(ExpressionType::Hour, Box::new(HourBuilder));
295+
self.builders
296+
.insert(ExpressionType::Minute, Box::new(MinuteBuilder));
297+
self.builders
298+
.insert(ExpressionType::Second, Box::new(SecondBuilder));
299+
self.builders.insert(
300+
ExpressionType::UnixTimestamp,
301+
Box::new(UnixTimestampBuilder),
302+
);
303+
self.builders.insert(
304+
ExpressionType::TruncTimestamp,
305+
Box::new(TruncTimestampBuilder),
306+
);
307+
}
308+
290309
/// Extract expression type from Spark protobuf expression
291310
fn get_expression_type(spark_expr: &Expr) -> Result<ExpressionType, ExecutionError> {
292311
match spark_expr.expr_struct.as_ref() {

0 commit comments

Comments
 (0)