From 7cbc8247e5bda7331d03b1a1ba6a62b94062f7f3 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 14 Jan 2026 15:58:29 -0700 Subject: [PATCH 1/4] feat: add support for datediff expression Adds native Comet support for Spark's datediff function, which returns the number of days between two dates (endDate - startDate). Closes #3087 Co-Authored-By: Claude Opus 4.5 --- native/spark-expr/src/comet_scalar_funcs.rs | 5 +- .../src/datetime_funcs/date_diff.rs | 102 ++++++++++++++++++ native/spark-expr/src/datetime_funcs/mod.rs | 2 + native/spark-expr/src/lib.rs | 4 +- .../apache/comet/serde/QueryPlanSerde.scala | 1 + .../org/apache/comet/serde/datetime.scala | 4 +- .../comet/CometTemporalExpressionSuite.scala | 34 ++++++ 7 files changed, 148 insertions(+), 4 deletions(-) create mode 100644 native/spark-expr/src/datetime_funcs/date_diff.rs diff --git a/native/spark-expr/src/comet_scalar_funcs.rs b/native/spark-expr/src/comet_scalar_funcs.rs index 8384a4646a..760dc3570f 100644 --- a/native/spark-expr/src/comet_scalar_funcs.rs +++ b/native/spark-expr/src/comet_scalar_funcs.rs @@ -22,8 +22,8 @@ use crate::math_funcs::modulo_expr::spark_modulo; use crate::{ spark_array_repeat, spark_ceil, spark_decimal_div, spark_decimal_integral_div, spark_floor, spark_isnan, spark_lpad, spark_make_decimal, spark_read_side_padding, spark_round, spark_rpad, - spark_unhex, spark_unscaled_value, EvalMode, SparkBitwiseCount, SparkDateTrunc, SparkSizeFunc, - SparkStringSpace, + spark_unhex, spark_unscaled_value, EvalMode, SparkBitwiseCount, SparkDateDiff, SparkDateTrunc, + SparkSizeFunc, SparkStringSpace, }; use arrow::datatypes::DataType; use datafusion::common::{DataFusionError, Result as DataFusionResult}; @@ -192,6 +192,7 @@ pub fn create_comet_physical_fun_with_eval_mode( fn all_scalar_functions() -> Vec> { vec![ Arc::new(ScalarUDF::new_from_impl(SparkBitwiseCount::default())), + Arc::new(ScalarUDF::new_from_impl(SparkDateDiff::default())), Arc::new(ScalarUDF::new_from_impl(SparkDateTrunc::default())), Arc::new(ScalarUDF::new_from_impl(SparkStringSpace::default())), Arc::new(ScalarUDF::new_from_impl(SparkSizeFunc::default())), diff --git a/native/spark-expr/src/datetime_funcs/date_diff.rs b/native/spark-expr/src/datetime_funcs/date_diff.rs new file mode 100644 index 0000000000..1bf1697ad5 --- /dev/null +++ b/native/spark-expr/src/datetime_funcs/date_diff.rs @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{Array, Date32Array, Int32Array}; +use arrow::compute::kernels::arity::binary; +use arrow::datatypes::DataType; +use datafusion::common::{utils::take_function_args, DataFusionError, Result}; +use datafusion::logical_expr::{ + ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, +}; +use std::any::Any; +use std::sync::Arc; + +/// Spark-compatible date_diff function. +/// Returns the number of days from startDate to endDate (endDate - startDate). +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct SparkDateDiff { + signature: Signature, + aliases: Vec, +} + +impl SparkDateDiff { + pub fn new() -> Self { + Self { + signature: Signature::exact( + vec![DataType::Date32, DataType::Date32], + Volatility::Immutable, + ), + aliases: vec!["datediff".to_string()], + } + } +} + +impl Default for SparkDateDiff { + fn default() -> Self { + Self::new() + } +} + +impl ScalarUDFImpl for SparkDateDiff { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "date_diff" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _: &[DataType]) -> Result { + Ok(DataType::Int32) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + let [end_date, start_date] = take_function_args(self.name(), args.args)?; + + // Convert scalars to arrays for uniform processing + let end_arr = end_date.into_array(1)?; + let start_arr = start_date.into_array(1)?; + + let end_date_array = end_arr.as_any().downcast_ref::().ok_or_else(|| { + DataFusionError::Execution("date_diff expects Date32Array for end_date".to_string()) + })?; + + let start_date_array = + start_arr + .as_any() + .downcast_ref::() + .ok_or_else(|| { + DataFusionError::Execution( + "date_diff expects Date32Array for start_date".to_string(), + ) + })?; + + // Date32 stores days since epoch, so difference is just subtraction + let result: Int32Array = + binary(end_date_array, start_date_array, |end, start| end - start)?; + + Ok(ColumnarValue::Array(Arc::new(result))) + } + + fn aliases(&self) -> &[String] { + &self.aliases + } +} diff --git a/native/spark-expr/src/datetime_funcs/mod.rs b/native/spark-expr/src/datetime_funcs/mod.rs index ef8041e5fe..c984e3a38f 100644 --- a/native/spark-expr/src/datetime_funcs/mod.rs +++ b/native/spark-expr/src/datetime_funcs/mod.rs @@ -15,10 +15,12 @@ // specific language governing permissions and limitations // under the License. +mod date_diff; mod date_trunc; mod extract_date_part; mod timestamp_trunc; +pub use date_diff::SparkDateDiff; pub use date_trunc::SparkDateTrunc; pub use extract_date_part::SparkHour; pub use extract_date_part::SparkMinute; diff --git a/native/spark-expr/src/lib.rs b/native/spark-expr/src/lib.rs index f26fd911d8..7f6d0b08a5 100644 --- a/native/spark-expr/src/lib.rs +++ b/native/spark-expr/src/lib.rs @@ -69,7 +69,9 @@ pub use comet_scalar_funcs::{ create_comet_physical_fun, create_comet_physical_fun_with_eval_mode, register_all_comet_functions, }; -pub use datetime_funcs::{SparkDateTrunc, SparkHour, SparkMinute, SparkSecond, TimestampTruncExpr}; +pub use datetime_funcs::{ + SparkDateDiff, SparkDateTrunc, SparkHour, SparkMinute, SparkSecond, TimestampTruncExpr, +}; pub use error::{SparkError, SparkResult}; pub use hash_funcs::*; pub use json_funcs::{FromJson, ToJson}; diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index e50b1d80e6..5b3cbba6cd 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -185,6 +185,7 @@ object QueryPlanSerde extends Logging with CometExprShim { private val temporalExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( classOf[DateAdd] -> CometDateAdd, + classOf[DateDiff] -> CometDateDiff, classOf[DateSub] -> CometDateSub, classOf[FromUnixTime] -> CometFromUnixTime, classOf[Hour] -> CometHour, diff --git a/spark/src/main/scala/org/apache/comet/serde/datetime.scala b/spark/src/main/scala/org/apache/comet/serde/datetime.scala index ef2b0f793c..ed3714f751 100644 --- a/spark/src/main/scala/org/apache/comet/serde/datetime.scala +++ b/spark/src/main/scala/org/apache/comet/serde/datetime.scala @@ -21,7 +21,7 @@ package org.apache.comet.serde import java.util.Locale -import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateSub, DayOfMonth, DayOfWeek, DayOfYear, GetDateField, Hour, Literal, Minute, Month, Quarter, Second, TruncDate, TruncTimestamp, WeekDay, WeekOfYear, Year} +import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateSub, DayOfMonth, DayOfWeek, DayOfYear, GetDateField, Hour, Literal, Minute, Month, Quarter, Second, TruncDate, TruncTimestamp, WeekDay, WeekOfYear, Year} import org.apache.spark.sql.types.{DateType, IntegerType} import org.apache.spark.unsafe.types.UTF8String @@ -258,6 +258,8 @@ object CometDateAdd extends CometScalarFunction[DateAdd]("date_add") object CometDateSub extends CometScalarFunction[DateSub]("date_sub") +object CometDateDiff extends CometScalarFunction[DateDiff]("date_diff") + object CometTruncDate extends CometExpressionSerde[TruncDate] { val supportedFormats: Seq[String] = diff --git a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala index 9a23c76d82..6965c69c55 100644 --- a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala @@ -122,4 +122,38 @@ class CometTemporalExpressionSuite extends CometTestBase with AdaptiveSparkPlanH StructField("fmt", DataTypes.StringType, true))) FuzzDataGenerator.generateDataFrame(r, spark, schema, 1000, DataGenOptions()) } + + test("datediff") { + val r = new Random(42) + val schema = StructType( + Seq( + StructField("c0", DataTypes.DateType, true), + StructField("c1", DataTypes.DateType, true))) + val df = FuzzDataGenerator.generateDataFrame(r, spark, schema, 1000, DataGenOptions()) + df.createOrReplaceTempView("tbl") + + // Basic test with random dates + checkSparkAnswerAndOperator("SELECT c0, c1, datediff(c0, c1) FROM tbl ORDER BY c0, c1") + + // Disable constant folding to ensure literal expressions are executed by Comet + withSQLConf( + SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> + "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") { + // Test positive difference (end date > start date) + checkSparkAnswerAndOperator("SELECT datediff(DATE('2009-07-31'), DATE('2009-07-30'))") + + // Test negative difference (end date < start date) + checkSparkAnswerAndOperator("SELECT datediff(DATE('2009-07-30'), DATE('2009-07-31'))") + + // Test same dates (should be 0) + checkSparkAnswerAndOperator("SELECT datediff(DATE('2009-07-30'), DATE('2009-07-30'))") + + // Test larger date differences + checkSparkAnswerAndOperator("SELECT datediff(DATE('2024-01-01'), DATE('2020-01-01'))") + + // Test null handling + checkSparkAnswerAndOperator("SELECT datediff(NULL, DATE('2009-07-30'))") + checkSparkAnswerAndOperator("SELECT datediff(DATE('2009-07-30'), NULL)") + } + } } From 06e1b80d0dff18acd5267088d3ca99207e8d6099 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 14 Jan 2026 17:26:21 -0700 Subject: [PATCH 2/4] update docs --- docs/source/user-guide/latest/configs.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/source/user-guide/latest/configs.md b/docs/source/user-guide/latest/configs.md index 1a273ad033..b50325afa3 100644 --- a/docs/source/user-guide/latest/configs.md +++ b/docs/source/user-guide/latest/configs.md @@ -234,6 +234,7 @@ These settings can be used to determine which parts of the plan are accelerated | `spark.comet.expression.CreateArray.enabled` | Enable Comet acceleration for `CreateArray` | true | | `spark.comet.expression.CreateNamedStruct.enabled` | Enable Comet acceleration for `CreateNamedStruct` | true | | `spark.comet.expression.DateAdd.enabled` | Enable Comet acceleration for `DateAdd` | true | +| `spark.comet.expression.DateDiff.enabled` | Enable Comet acceleration for `DateDiff` | true | | `spark.comet.expression.DateSub.enabled` | Enable Comet acceleration for `DateSub` | true | | `spark.comet.expression.DayOfMonth.enabled` | Enable Comet acceleration for `DayOfMonth` | true | | `spark.comet.expression.DayOfWeek.enabled` | Enable Comet acceleration for `DayOfWeek` | true | From a91e73121cc1cecd44f3611ed415cf77a883ccef Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 14 Jan 2026 17:27:56 -0700 Subject: [PATCH 3/4] cargo fmt --- .../src/datetime_funcs/date_diff.rs | 26 ++++++++++--------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/native/spark-expr/src/datetime_funcs/date_diff.rs b/native/spark-expr/src/datetime_funcs/date_diff.rs index 1bf1697ad5..6a593f0f87 100644 --- a/native/spark-expr/src/datetime_funcs/date_diff.rs +++ b/native/spark-expr/src/datetime_funcs/date_diff.rs @@ -75,19 +75,21 @@ impl ScalarUDFImpl for SparkDateDiff { let end_arr = end_date.into_array(1)?; let start_arr = start_date.into_array(1)?; - let end_date_array = end_arr.as_any().downcast_ref::().ok_or_else(|| { - DataFusionError::Execution("date_diff expects Date32Array for end_date".to_string()) - })?; + let end_date_array = end_arr + .as_any() + .downcast_ref::() + .ok_or_else(|| { + DataFusionError::Execution("date_diff expects Date32Array for end_date".to_string()) + })?; - let start_date_array = - start_arr - .as_any() - .downcast_ref::() - .ok_or_else(|| { - DataFusionError::Execution( - "date_diff expects Date32Array for start_date".to_string(), - ) - })?; + let start_date_array = start_arr + .as_any() + .downcast_ref::() + .ok_or_else(|| { + DataFusionError::Execution( + "date_diff expects Date32Array for start_date".to_string(), + ) + })?; // Date32 stores days since epoch, so difference is just subtraction let result: Int32Array = From 05c660f0cfde127a1d418c3efcd679e6030dcc78 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 14 Jan 2026 20:48:12 -0700 Subject: [PATCH 4/4] Add leap year edge case tests for datediff Add tests for leap year handling as suggested in review: - 1900 was NOT a leap year (divisible by 100 but not 400) - 2000 WAS a leap year (divisible by 400) - 2004 was a leap year (divisible by 4, not by 100) - 2100 will NOT be a leap year (divisible by 100 but not 400) Co-Authored-By: Claude Opus 4.5 --- .../comet/CometTemporalExpressionSuite.scala | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala index 6965c69c55..478013ad2e 100644 --- a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala @@ -154,6 +154,21 @@ class CometTemporalExpressionSuite extends CometTestBase with AdaptiveSparkPlanH // Test null handling checkSparkAnswerAndOperator("SELECT datediff(NULL, DATE('2009-07-30'))") checkSparkAnswerAndOperator("SELECT datediff(DATE('2009-07-30'), NULL)") + + // Test leap year edge cases + // 1900 was NOT a leap year (divisible by 100 but not 400) + // 2000 WAS a leap year (divisible by 400) + // So Feb 27 to Mar 1 spans different number of days: + // 1900: 2 days (Feb 28, Mar 1) + // 2000: 3 days (Feb 28, Feb 29, Mar 1) + checkSparkAnswerAndOperator("SELECT datediff(DATE('1900-03-01'), DATE('1900-02-27'))") + checkSparkAnswerAndOperator("SELECT datediff(DATE('2000-03-01'), DATE('2000-02-27'))") + + // Additional leap year tests + // 2004 was a leap year (divisible by 4, not by 100) + checkSparkAnswerAndOperator("SELECT datediff(DATE('2004-03-01'), DATE('2004-02-28'))") + // 2100 will NOT be a leap year (divisible by 100 but not 400) + checkSparkAnswerAndOperator("SELECT datediff(DATE('2100-03-01'), DATE('2100-02-28'))") } } }