Skip to content

Commit 5fa1e69

Browse files
authored
feat: add support for unix_date expression (#3141)
1 parent d19e1f2 commit 5fa1e69

5 files changed

Lines changed: 56 additions & 2 deletions

File tree

docs/source/user-guide/latest/configs.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,7 @@ These settings can be used to determine which parts of the plan are accelerated
333333
| `spark.comet.expression.TruncTimestamp.enabled` | Enable Comet acceleration for `TruncTimestamp` | true |
334334
| `spark.comet.expression.UnaryMinus.enabled` | Enable Comet acceleration for `UnaryMinus` | true |
335335
| `spark.comet.expression.Unhex.enabled` | Enable Comet acceleration for `Unhex` | true |
336+
| `spark.comet.expression.UnixDate.enabled` | Enable Comet acceleration for `UnixDate` | true |
336337
| `spark.comet.expression.UnscaledValue.enabled` | Enable Comet acceleration for `UnscaledValue` | true |
337338
| `spark.comet.expression.Upper.enabled` | Enable Comet acceleration for `Upper` | true |
338339
| `spark.comet.expression.WeekDay.enabled` | Enable Comet acceleration for `WeekDay` | true |

native/spark-expr/src/conversion_funcs/cast.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1025,6 +1025,10 @@ fn cast_array(
10251025
cast_string_to_timestamp(&array, to_type, eval_mode, &cast_options.timezone)
10261026
}
10271027
(Utf8, Date32) => cast_string_to_date(&array, to_type, eval_mode),
1028+
(Date32, Int32) => {
1029+
// Date32 is stored as days since epoch (i32), so this is a simple reinterpret cast
1030+
Ok(cast_with_options(&array, to_type, &CAST_OPTIONS)?)
1031+
}
10281032
(Utf8, Float32 | Float64) => cast_string_to_float(&array, to_type, eval_mode),
10291033
(Utf8 | LargeUtf8, Decimal128(precision, scale)) => {
10301034
cast_string_to_decimal(&array, to_type, precision, scale, eval_mode)
@@ -1318,7 +1322,7 @@ fn is_datafusion_spark_compatible(from_type: &DataType, to_type: &DataType) -> b
13181322
| DataType::Utf8 // note that there can be formatting differences
13191323
),
13201324
DataType::Utf8 => matches!(to_type, DataType::Binary),
1321-
DataType::Date32 => matches!(to_type, DataType::Utf8),
1325+
DataType::Date32 => matches!(to_type, DataType::Int32 | DataType::Utf8),
13221326
DataType::Timestamp(_, _) => {
13231327
matches!(
13241328
to_type,

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ object QueryPlanSerde extends Logging with CometExprShim {
186186
private val temporalExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(
187187
classOf[DateAdd] -> CometDateAdd,
188188
classOf[DateSub] -> CometDateSub,
189+
classOf[UnixDate] -> CometUnixDate,
189190
classOf[FromUnixTime] -> CometFromUnixTime,
190191
classOf[Hour] -> CometHour,
191192
classOf[Minute] -> CometMinute,

spark/src/main/scala/org/apache/comet/serde/datetime.scala

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ package org.apache.comet.serde
2121

2222
import java.util.Locale
2323

24-
import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateSub, DayOfMonth, DayOfWeek, DayOfYear, GetDateField, Hour, Literal, Minute, Month, Quarter, Second, TruncDate, TruncTimestamp, WeekDay, WeekOfYear, Year}
24+
import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateSub, DayOfMonth, DayOfWeek, DayOfYear, GetDateField, Hour, Literal, Minute, Month, Quarter, Second, TruncDate, TruncTimestamp, UnixDate, WeekDay, WeekOfYear, Year}
2525
import org.apache.spark.sql.types.{DateType, IntegerType}
2626
import org.apache.spark.unsafe.types.UTF8String
2727

@@ -258,6 +258,33 @@ object CometDateAdd extends CometScalarFunction[DateAdd]("date_add")
258258

259259
object CometDateSub extends CometScalarFunction[DateSub]("date_sub")
260260

261+
/**
262+
* Converts a date to the number of days since Unix epoch (1970-01-01). Since dates are internally
263+
* stored as days since epoch, this is a simple cast to integer.
264+
*/
265+
object CometUnixDate extends CometExpressionSerde[UnixDate] {
266+
override def convert(
267+
expr: UnixDate,
268+
inputs: Seq[Attribute],
269+
binding: Boolean): Option[ExprOuterClass.Expr] = {
270+
val childExpr = exprToProtoInternal(expr.child, inputs, binding)
271+
val optExpr = childExpr.map { child =>
272+
Expr
273+
.newBuilder()
274+
.setCast(
275+
ExprOuterClass.Cast
276+
.newBuilder()
277+
.setChild(child)
278+
.setDatatype(serializeDataType(IntegerType).get)
279+
.setEvalMode(ExprOuterClass.EvalMode.LEGACY)
280+
.setAllowIncompat(false)
281+
.build())
282+
.build()
283+
}
284+
optExprWithInfo(optExpr, expr, expr.child)
285+
}
286+
}
287+
261288
object CometTruncDate extends CometExpressionSerde[TruncDate] {
262289

263290
val supportedFormats: Seq[String] =

spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,4 +122,25 @@ class CometTemporalExpressionSuite extends CometTestBase with AdaptiveSparkPlanH
122122
StructField("fmt", DataTypes.StringType, true)))
123123
FuzzDataGenerator.generateDataFrame(r, spark, schema, 1000, DataGenOptions())
124124
}
125+
126+
test("unix_date") {
127+
val r = new Random(42)
128+
val schema = StructType(Seq(StructField("c0", DataTypes.DateType, true)))
129+
val df = FuzzDataGenerator.generateDataFrame(r, spark, schema, 1000, DataGenOptions())
130+
df.createOrReplaceTempView("tbl")
131+
132+
// Basic test
133+
checkSparkAnswerAndOperator("SELECT c0, unix_date(c0) FROM tbl ORDER BY c0")
134+
135+
// Test with literal dates
136+
checkSparkAnswerAndOperator(
137+
"SELECT unix_date(DATE('1970-01-01')), unix_date(DATE('1970-01-02')), unix_date(DATE('2024-01-01'))")
138+
139+
// Test dates before Unix epoch (should return negative values)
140+
checkSparkAnswerAndOperator(
141+
"SELECT unix_date(DATE('1969-12-31')), unix_date(DATE('1960-01-01'))")
142+
143+
// Test null handling
144+
checkSparkAnswerAndOperator("SELECT unix_date(NULL)")
145+
}
125146
}

0 commit comments

Comments
 (0)