Skip to content

Commit af871e3

Browse files
andygroveclaude
andcommitted
Add input type checking and tests for unix_timestamp
- Add getSupportLevel() to validate input types before conversion - Only support TimestampType and DateType inputs - Fall back to Spark for StringType (with or without format argument) - Fall back to Spark for TimestampNTZType (timezone handling is incorrect) - Add tests to verify fallback behavior for unsupported types Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent d7ffc2e commit af871e3

2 files changed

Lines changed: 66 additions & 2 deletions

File tree

spark/src/main/scala/org/apache/comet/serde/datetime.scala

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ package org.apache.comet.serde
2222
import java.util.Locale
2323

2424
import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateSub, DayOfMonth, DayOfWeek, DayOfYear, GetDateField, Hour, Literal, Minute, Month, Quarter, Second, TruncDate, TruncTimestamp, UnixTimestamp, WeekDay, WeekOfYear, Year}
25-
import org.apache.spark.sql.types.{DateType, IntegerType}
25+
import org.apache.spark.sql.types.{DateType, IntegerType, TimestampType}
2626
import org.apache.spark.unsafe.types.UTF8String
2727

2828
import org.apache.comet.CometSparkSessionExtensions.withInfo
@@ -255,10 +255,36 @@ object CometSecond extends CometExpressionSerde[Second] {
255255
}
256256

257257
object CometUnixTimestamp extends CometExpressionSerde[UnixTimestamp] {
258+
259+
private def isSupportedInputType(expr: UnixTimestamp): Boolean = {
260+
// Note: TimestampNTZType is not supported because Comet incorrectly applies
261+
// timezone conversion to TimestampNTZ values. TimestampNTZ stores local time
262+
// without timezone, so no conversion should be applied.
263+
expr.children.head.dataType match {
264+
case TimestampType | DateType => true
265+
case _ => false
266+
}
267+
}
268+
269+
override def getSupportLevel(expr: UnixTimestamp): SupportLevel = {
270+
if (isSupportedInputType(expr)) {
271+
Compatible()
272+
} else {
273+
val inputType = expr.children.head.dataType
274+
Unsupported(Some(s"unix_timestamp does not support input type: $inputType"))
275+
}
276+
}
277+
258278
override def convert(
259279
expr: UnixTimestamp,
260280
inputs: Seq[Attribute],
261281
binding: Boolean): Option[ExprOuterClass.Expr] = {
282+
if (!isSupportedInputType(expr)) {
283+
val inputType = expr.children.head.dataType
284+
withInfo(expr, s"unix_timestamp does not support input type: $inputType")
285+
return None
286+
}
287+
262288
val childExpr = exprToProtoInternal(expr.children.head, inputs, binding)
263289

264290
if (childExpr.isDefined) {

spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ package org.apache.comet
2121

2222
import scala.util.Random
2323

24-
import org.apache.spark.sql.{CometTestBase, SaveMode}
24+
import org.apache.spark.sql.{CometTestBase, Row, SaveMode}
2525
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
2626
import org.apache.spark.sql.internal.SQLConf
2727
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
@@ -135,6 +135,44 @@ class CometTemporalExpressionSuite extends CometTestBase with AdaptiveSparkPlanH
135135
}
136136
}
137137

138+
test("unix_timestamp - timestamp_ntz input falls back to Spark") {
139+
// TimestampNTZ is not supported because Comet incorrectly applies timezone
140+
// conversion. TimestampNTZ stores local time without timezone, so the unix
141+
// timestamp should just be the value divided by microseconds per second.
142+
val r = new Random(42)
143+
val ntzSchema = StructType(Seq(StructField("ts_ntz", DataTypes.TimestampNTZType, true)))
144+
val ntzDF = FuzzDataGenerator.generateDataFrame(r, spark, ntzSchema, 100, DataGenOptions())
145+
ntzDF.createOrReplaceTempView("ntz_tbl")
146+
checkSparkAnswerAndFallbackReason(
147+
"SELECT ts_ntz, unix_timestamp(ts_ntz) from ntz_tbl order by ts_ntz",
148+
"unix_timestamp does not support input type: TimestampNTZType")
149+
}
150+
151+
test("unix_timestamp - string input falls back to Spark") {
152+
withTempView("string_tbl") {
153+
// Create test data with timestamp strings
154+
val schema = StructType(Seq(StructField("ts_str", DataTypes.StringType, true)))
155+
val data = Seq(
156+
Row("2020-01-01 00:00:00"),
157+
Row("2021-06-15 12:30:45"),
158+
Row("2022-12-31 23:59:59"),
159+
Row(null))
160+
spark
161+
.createDataFrame(spark.sparkContext.parallelize(data), schema)
162+
.createOrReplaceTempView("string_tbl")
163+
164+
// String input should fall back to Spark
165+
checkSparkAnswerAndFallbackReason(
166+
"SELECT ts_str, unix_timestamp(ts_str) from string_tbl order by ts_str",
167+
"unix_timestamp does not support input type: StringType")
168+
169+
// String input with custom format should also fall back
170+
checkSparkAnswerAndFallbackReason(
171+
"SELECT ts_str, unix_timestamp(ts_str, 'yyyy-MM-dd HH:mm:ss') from string_tbl",
172+
"unix_timestamp does not support input type: StringType")
173+
}
174+
}
175+
138176
private def createTimestampTestData = {
139177
val r = new Random(42)
140178
val schema = StructType(

0 commit comments

Comments
 (0)