Skip to content

Commit 9a76677

Browse files
authored
chore: audit date/time expressions (#4448)
1 parent a049750 commit 9a76677

1 file changed

Lines changed: 88 additions & 75 deletions

File tree

spark/src/main/scala/org/apache/comet/serde/datetime.scala

Lines changed: 88 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import java.util.Locale
2323

2424
import org.apache.spark.sql.catalyst.expressions.{AddMonths, Attribute, ConvertTimezone, DateAdd, DateDiff, DateFormatClass, DateFromUnixDate, DateSub, DayOfMonth, DayOfWeek, DayOfYear, Days, FromUTCTimestamp, GetDateField, GetTimestamp, Hour, Hours, LastDay, Literal, MakeDate, MakeTimestamp, MicrosToTimestamp, MillisToTimestamp, Minute, Month, MonthsBetween, NextDay, Quarter, Second, SecondsToTimestamp, ToUnixTimestamp, ToUTCTimestamp, TruncDate, TruncTimestamp, UnixDate, UnixMicros, UnixMillis, UnixSeconds, UnixTimestamp, WeekDay, WeekOfYear, Year}
2525
import org.apache.spark.sql.internal.SQLConf
26-
import org.apache.spark.sql.types.{DateType, DoubleType, FloatType, IntegerType, LongType, StringType, TimestampNTZType, TimestampType}
26+
import org.apache.spark.sql.types.{DataType, DateType, DoubleType, FloatType, IntegerType, LongType, StringType, TimestampNTZType, TimestampType}
2727
import org.apache.spark.unsafe.types.UTF8String
2828

2929
import org.apache.comet.CometConf
@@ -179,23 +179,24 @@ object CometQuarter extends CometExpressionSerde[Quarter] with CometExprGetDateF
179179
}
180180
}
181181

182-
object CometHour extends CometExpressionSerde[Hour] with CodegenDispatchFallback {
182+
private object TimeFieldSerde {
183+
val timestampNtzIncompatReason: String =
184+
"Incorrectly applies timezone conversion to TimestampNTZ inputs" +
185+
" (https://github.com/apache/datafusion-comet/issues/3180)"
183186

184-
val incompatReason: String = "Incorrectly applies timezone conversion to TimestampNTZ inputs" +
185-
" (https://github.com/apache/datafusion-comet/issues/3180)"
187+
def supportLevelForChild(childType: DataType): SupportLevel = childType match {
188+
case TimestampNTZType => Incompatible(Some(timestampNtzIncompatReason))
189+
case _ => Compatible()
190+
}
191+
}
186192

187-
override def getIncompatibleReasons(): Seq[String] = Seq(incompatReason)
193+
object CometHour extends CometExpressionSerde[Hour] with CodegenDispatchFallback {
188194

189-
override def getSupportLevel(expr: Hour): SupportLevel = {
190-
if (expr.child.dataType == TimestampNTZType) {
191-
Incompatible(
192-
Some(
193-
"Incorrectly applies timezone conversion to TimestampNTZ inputs" +
194-
" (https://github.com/apache/datafusion-comet/issues/3180)"))
195-
} else {
196-
Compatible()
197-
}
198-
}
195+
override def getIncompatibleReasons(): Seq[String] =
196+
Seq(TimeFieldSerde.timestampNtzIncompatReason)
197+
198+
override def getSupportLevel(expr: Hour): SupportLevel =
199+
TimeFieldSerde.supportLevelForChild(expr.child.dataType)
199200

200201
override def convert(
201202
expr: Hour,
@@ -224,20 +225,11 @@ object CometHour extends CometExpressionSerde[Hour] with CodegenDispatchFallback
224225

225226
object CometMinute extends CometExpressionSerde[Minute] with CodegenDispatchFallback {
226227

227-
override def getIncompatibleReasons(): Seq[String] = Seq(
228-
"Incorrectly applies timezone conversion to TimestampNTZ inputs" +
229-
" (https://github.com/apache/datafusion-comet/issues/3180)")
230-
231-
override def getSupportLevel(expr: Minute): SupportLevel = {
232-
if (expr.child.dataType == TimestampNTZType) {
233-
Incompatible(
234-
Some(
235-
"Incorrectly applies timezone conversion to TimestampNTZ inputs" +
236-
" (https://github.com/apache/datafusion-comet/issues/3180)"))
237-
} else {
238-
Compatible()
239-
}
240-
}
228+
override def getIncompatibleReasons(): Seq[String] =
229+
Seq(TimeFieldSerde.timestampNtzIncompatReason)
230+
231+
override def getSupportLevel(expr: Minute): SupportLevel =
232+
TimeFieldSerde.supportLevelForChild(expr.child.dataType)
241233

242234
override def convert(
243235
expr: Minute,
@@ -266,20 +258,11 @@ object CometMinute extends CometExpressionSerde[Minute] with CodegenDispatchFall
266258

267259
object CometSecond extends CometExpressionSerde[Second] with CodegenDispatchFallback {
268260

269-
override def getIncompatibleReasons(): Seq[String] = Seq(
270-
"Incorrectly applies timezone conversion to TimestampNTZ inputs" +
271-
" (https://github.com/apache/datafusion-comet/issues/3180)")
272-
273-
override def getSupportLevel(expr: Second): SupportLevel = {
274-
if (expr.child.dataType == TimestampNTZType) {
275-
Incompatible(
276-
Some(
277-
"Incorrectly applies timezone conversion to TimestampNTZ inputs" +
278-
" (https://github.com/apache/datafusion-comet/issues/3180)"))
279-
} else {
280-
Compatible()
281-
}
282-
}
261+
override def getIncompatibleReasons(): Seq[String] =
262+
Seq(TimeFieldSerde.timestampNtzIncompatReason)
263+
264+
override def getSupportLevel(expr: Second): SupportLevel =
265+
TimeFieldSerde.supportLevelForChild(expr.child.dataType)
283266

284267
override def convert(
285268
expr: Second,
@@ -469,6 +452,11 @@ object CometMakeDate extends CometExpressionSerde[MakeDate] {
469452

470453
object CometSecondsToTimestamp
471454
extends CometScalarFunction[SecondsToTimestamp]("seconds_to_timestamp") {
455+
456+
override def getUnsupportedReasons(): Seq[String] = Seq(
457+
"Only `IntegerType`, `LongType`, `FloatType`, and `DoubleType` inputs are supported." +
458+
" `DecimalType`, `ByteType`, and `ShortType` fall back to Spark.")
459+
472460
override def getSupportLevel(expr: SecondsToTimestamp): SupportLevel =
473461
expr.child.dataType match {
474462
case IntegerType | LongType | FloatType | DoubleType => Compatible()
@@ -514,8 +502,14 @@ object CometTruncDate extends CometExpressionSerde[TruncDate] with CodegenDispat
514502
val supportedFormats: Seq[String] =
515503
Seq("year", "yyyy", "yy", "quarter", "mon", "month", "mm", "week")
516504

517-
override def getIncompatibleReasons(): Seq[String] = Seq(
518-
"Non-literal format strings will throw an exception instead of returning NULL")
505+
private val nonLiteralFormatIncompatReason: String =
506+
"Non-literal format strings will throw an exception instead of returning NULL"
507+
508+
private def unsupportedFormatReason(fmt: Any): String =
509+
s"Format $fmt is not supported. Only the following formats are supported: " +
510+
supportedFormats.mkString(", ")
511+
512+
override def getIncompatibleReasons(): Seq[String] = Seq(nonLiteralFormatIncompatReason)
519513

520514
override def getUnsupportedReasons(): Seq[String] = Seq(
521515
"Only the following formats are supported: " + supportedFormats.mkString(", "))
@@ -526,11 +520,10 @@ object CometTruncDate extends CometExpressionSerde[TruncDate] with CodegenDispat
526520
if (supportedFormats.contains(fmt.toString.toLowerCase(Locale.ROOT))) {
527521
Compatible()
528522
} else {
529-
Unsupported(Some(s"Format $fmt is not supported"))
523+
Unsupported(Some(unsupportedFormatReason(fmt)))
530524
}
531525
case _ =>
532-
Incompatible(
533-
Some("Invalid format strings will throw an exception instead of returning NULL"))
526+
Incompatible(Some(nonLiteralFormatIncompatReason))
534527
}
535528
}
536529

@@ -555,10 +548,6 @@ object CometTruncTimestamp
555548
extends CometExpressionSerde[TruncTimestamp]
556549
with CodegenDispatchFallback {
557550

558-
override def getIncompatibleReasons(): Seq[String] = Seq(
559-
"Produces incorrect results when used with non-UTC timezones. Compatible when timezone is" +
560-
" UTC. (https://github.com/apache/datafusion-comet/issues/2649)")
561-
562551
val supportedFormats: Seq[String] =
563552
Seq(
564553
"year",
@@ -577,6 +566,23 @@ object CometTruncTimestamp
577566
"millisecond",
578567
"microsecond")
579568

569+
private val nonUtcIncompatReason: String =
570+
"Produces incorrect results when used with non-UTC timezones. Compatible when timezone is" +
571+
" UTC. (https://github.com/apache/datafusion-comet/issues/2649)"
572+
573+
private val nonLiteralFormatIncompatReason: String =
574+
"Non-literal format strings will throw an exception instead of returning NULL"
575+
576+
private def unsupportedFormatReason(fmt: Any): String =
577+
s"Format $fmt is not supported. Only the following formats are supported: " +
578+
supportedFormats.mkString(", ")
579+
580+
override def getIncompatibleReasons(): Seq[String] =
581+
Seq(nonUtcIncompatReason, nonLiteralFormatIncompatReason)
582+
583+
override def getUnsupportedReasons(): Seq[String] = Seq(
584+
"Only the following formats are supported: " + supportedFormats.mkString(", "))
585+
580586
override def getSupportLevel(expr: TruncTimestamp): SupportLevel = {
581587
val timezone = expr.timeZoneId.getOrElse("UTC")
582588
val isUtc = timezone == "UTC" || timezone == "Etc/UTC"
@@ -586,17 +592,13 @@ object CometTruncTimestamp
586592
if (isUtc) {
587593
Compatible()
588594
} else {
589-
Incompatible(
590-
Some(
591-
s"Incorrect results in non-UTC timezone '$timezone'" +
592-
" (https://github.com/apache/datafusion-comet/issues/2649)"))
595+
Incompatible(Some(nonUtcIncompatReason))
593596
}
594597
} else {
595-
Unsupported(Some(s"Format $fmt is not supported"))
598+
Unsupported(Some(unsupportedFormatReason(fmt)))
596599
}
597600
case _ =>
598-
Incompatible(
599-
Some("Invalid format strings will throw an exception instead of returning NULL"))
601+
Incompatible(Some(nonLiteralFormatIncompatReason))
600602
}
601603
}
602604

@@ -734,24 +736,27 @@ object CometDateFormat extends CometExpressionSerde[DateFormatClass] {
734736
* without applying any session timezone offset.
735737
*/
736738
object CometHours extends CometExpressionSerde[Hours] {
739+
740+
override def getUnsupportedReasons(): Seq[String] = Seq(
741+
"Only `TimestampType` and `TimestampNTZType` inputs are supported.")
742+
743+
override def getSupportLevel(expr: Hours): SupportLevel = expr.child.dataType match {
744+
case TimestampType | TimestampNTZType => Compatible()
745+
case other => Unsupported(Some(s"Hours does not support input type: $other"))
746+
}
747+
737748
override def convert(
738749
expr: Hours,
739750
inputs: Seq[Attribute],
740751
binding: Boolean): Option[ExprOuterClass.Expr] = {
741-
val optExpr = expr.child.dataType match {
742-
case TimestampType | TimestampNTZType =>
743-
exprToProtoInternal(expr.child, inputs, binding).map { childExpr =>
744-
val builder = ExprOuterClass.HoursTransform.newBuilder()
745-
builder.setChild(childExpr)
752+
val optExpr = exprToProtoInternal(expr.child, inputs, binding).map { childExpr =>
753+
val builder = ExprOuterClass.HoursTransform.newBuilder()
754+
builder.setChild(childExpr)
746755

747-
ExprOuterClass.Expr
748-
.newBuilder()
749-
.setHoursTransform(builder)
750-
.build()
751-
}
752-
case other =>
753-
withFallbackReason(expr, s"Hours does not support input type: $other")
754-
None
756+
ExprOuterClass.Expr
757+
.newBuilder()
758+
.setHoursTransform(builder)
759+
.build()
755760
}
756761
optExprWithFallbackReason(optExpr, expr, expr.child)
757762
}
@@ -768,6 +773,16 @@ object CometHours extends CometExpressionSerde[Hours] {
768773
* The first cast respects the session timezone to correctly determine the date boundary.
769774
*/
770775
object CometDays extends CometExpressionSerde[Days] {
776+
777+
override def getUnsupportedReasons(): Seq[String] = Seq(
778+
"Only `DateType` and `TimestampType` inputs are supported." +
779+
" `TimestampNTZType` is not supported.")
780+
781+
override def getSupportLevel(expr: Days): SupportLevel = expr.child.dataType match {
782+
case DateType | TimestampType => Compatible()
783+
case other => Unsupported(Some(s"Days does not support input type: $other"))
784+
}
785+
771786
override def convert(
772787
expr: Days,
773788
inputs: Seq[Attribute],
@@ -782,9 +797,7 @@ object CometDays extends CometExpressionSerde[Days] {
782797
childExpr.flatMap { child =>
783798
CometCast.castToProto(expr, Some(timezone), DateType, child, CometEvalMode.LEGACY)
784799
}
785-
case other =>
786-
withFallbackReason(expr, s"Days does not support input type: $other")
787-
None
800+
case _ => None
788801
}
789802

790803
// Convert DateType to IntegerType (days since epoch)

0 commit comments

Comments
 (0)