Skip to content

Commit 0d2f579

Browse files
committed
feat: Support Spark expression window time
1 parent d3b2007 commit 0d2f579

3 files changed

Lines changed: 51 additions & 2 deletions

File tree

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,8 @@ object QueryPlanSerde extends Logging with CometExprShim {
215215
classOf[WeekDay] -> CometWeekDay,
216216
classOf[DayOfYear] -> CometDayOfYear,
217217
classOf[WeekOfYear] -> CometWeekOfYear,
218-
classOf[Quarter] -> CometQuarter)
218+
classOf[Quarter] -> CometQuarter,
219+
classOf[PreciseTimestampConversion] -> CometPreciseTimestampConversion)
219220

220221
private val conversionExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(
221222
classOf[Cast] -> CometCast)

spark/src/main/scala/org/apache/comet/serde/datetime.scala

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ package org.apache.comet.serde
2121

2222
import java.util.Locale
2323

24-
import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateSub, DayOfMonth, DayOfWeek, DayOfYear, GetDateField, Hour, LastDay, Literal, MakeDate, Minute, Month, NextDay, Quarter, Second, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year}
24+
import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateSub, DayOfMonth, DayOfWeek, DayOfYear, GetDateField, Hour, LastDay, Literal, MakeDate, Minute, Month, NextDay, PreciseTimestampConversion, Quarter, Second, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year}
2525
import org.apache.spark.sql.types.{DateType, IntegerType, StringType, TimestampType}
2626
import org.apache.spark.unsafe.types.UTF8String
2727

@@ -586,3 +586,28 @@ object CometDateFormat extends CometExpressionSerde[DateFormatClass] {
586586
}
587587
}
588588
}
589+
590+
object CometPreciseTimestampConversion extends CometExpressionSerde[PreciseTimestampConversion] {
591+
override def convert(
592+
expr: PreciseTimestampConversion,
593+
inputs: Seq[Attribute],
594+
binding: Boolean): Option[ExprOuterClass.Expr] = {
595+
// PreciseTimestampConversion reinterprets between LongType and TimestampType
596+
// without changing the underlying microsecond value, so a simple cast suffices.
597+
for {
598+
childExpr <- exprToProtoInternal(expr.child, inputs, binding)
599+
dt <- serializeDataType(expr.toType)
600+
} yield {
601+
ExprOuterClass.Expr
602+
.newBuilder()
603+
.setCast(
604+
ExprOuterClass.Cast
605+
.newBuilder()
606+
.setChild(childExpr)
607+
.setDatatype(dt)
608+
.setEvalMode(ExprOuterClass.EvalMode.LEGACY)
609+
.setAllowIncompat(false))
610+
.build()
611+
}
612+
}
613+
}

spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2992,4 +2992,27 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
29922992
}
29932993
}
29942994

2995+
test("window_time") {
2996+
withTable("t1") {
2997+
sql("create table t1(time timestamp, value int) using parquet")
2998+
sql(
2999+
"insert into t1 values" +
3000+
"(cast('2023-01-01 12:00:00' as timestamp), 1)," +
3001+
"(cast('2023-01-01 12:05:00' as timestamp), 2)," +
3002+
"(cast('2023-01-01 12:15:00' as timestamp), 3)")
3003+
3004+
// basic window_time with aggregation
3005+
checkSparkAnswer(
3006+
"select max(window_time(window)), sum(value) " +
3007+
"from (select window(time, '10 minutes') as window, value from t1) " +
3008+
"group by window")
3009+
3010+
// window_time with sliding window
3011+
checkSparkAnswer(
3012+
"select max(window_time(window)), count(value) " +
3013+
"from (select window(time, '10 minutes', '5 minutes') as window, value from t1) " +
3014+
"group by window")
3015+
}
3016+
}
3017+
29953018
}

0 commit comments

Comments
 (0)