Skip to content

Commit 17fcf2c

Browse files
andygroveclaude
andcommitted
feat: Add TimestampNTZType support for casts and unix_timestamp
Add comprehensive support for TimestampNTZType (Timestamp without timezone) wherever TimestampType is currently supported. Changes: - Add TimestampNTZType to CometCast.supportedTypes - Support casting TimestampNTZ to Long, String, Date, and Timestamp - Add TimestampNTZ support to unix_timestamp function (no timezone conversion) - Add tests for TimestampNTZ casts and temporal expressions TimestampNTZ stores local time without timezone context, so: - unix_timestamp simply divides microseconds by 1,000,000 - Casts to Date use simple truncation (no timezone adjustment) - Casts to String format as local datetime without timezone suffix Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent b139c45 commit 17fcf2c

7 files changed

Lines changed: 176 additions & 29 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,4 @@ spark/benchmarks
2121
.DS_Store
2222
comet-event-trace.json
2323
__pycache__
24+
CLAUDE.md

native/spark-expr/src/conversion_funcs/cast.rs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -268,17 +268,15 @@ fn can_cast_to_string(from_type: &DataType, _options: &SparkCastOptions) -> bool
268268
}
269269
}
270270

271-
fn can_cast_from_timestamp_ntz(to_type: &DataType, options: &SparkCastOptions) -> bool {
271+
fn can_cast_from_timestamp_ntz(to_type: &DataType, _options: &SparkCastOptions) -> bool {
272272
use DataType::*;
273273
match to_type {
274-
Timestamp(_, _) | Date32 | Date64 | Utf8 => {
275-
// incompatible
276-
options.allow_incompat
277-
}
278-
_ => {
279-
// unsupported
280-
false
281-
}
274+
// TimestampNTZ -> Timestamp with timezone (interpret as UTC)
275+
// TimestampNTZ -> Date (simple truncation, no timezone adjustment)
276+
// TimestampNTZ -> String (format as local datetime)
277+
// TimestampNTZ -> Long (extract microseconds directly)
278+
Timestamp(_, Some(_)) | Date32 | Date64 | Utf8 | Int64 => true,
279+
_ => false,
282280
}
283281
}
284282

native/spark-expr/src/datetime_funcs/unix_timestamp.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,27 @@ impl ScalarUDFImpl for SparkUnixTimestamp {
7878

7979
match args {
8080
[ColumnarValue::Array(array)] => match array.data_type() {
81+
DataType::Timestamp(Microsecond, None) => {
82+
// TimestampNTZ: No timezone conversion needed - simply divide microseconds
83+
// by MICROS_PER_SECOND. TimestampNTZ stores local time without timezone.
84+
let timestamp_array =
85+
array.as_primitive::<arrow::datatypes::TimestampMicrosecondType>();
86+
87+
let result: PrimitiveArray<Int64Type> = if timestamp_array.null_count() == 0 {
88+
timestamp_array
89+
.values()
90+
.iter()
91+
.map(|&micros| micros / MICROS_PER_SECOND)
92+
.collect()
93+
} else {
94+
timestamp_array
95+
.iter()
96+
.map(|v| v.map(|micros| div_floor(micros, MICROS_PER_SECOND)))
97+
.collect()
98+
};
99+
100+
Ok(ColumnarValue::Array(Arc::new(result)))
101+
}
81102
DataType::Timestamp(_, _) => {
82103
let is_utc = self.timezone == "UTC";
83104
let array = if is_utc

spark/src/main/scala/org/apache/comet/expressions/CometCast.scala

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,8 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim {
4545
DataTypes.StringType,
4646
DataTypes.BinaryType,
4747
DataTypes.DateType,
48-
DataTypes.TimestampType)
49-
// TODO add DataTypes.TimestampNTZType for Spark 3.4 and later
50-
// https://github.com/apache/datafusion-comet/issues/378
48+
DataTypes.TimestampType,
49+
DataTypes.TimestampNTZType)
5150

5251
override def getSupportLevel(cast: Cast): SupportLevel = {
5352
if (cast.child.isInstanceOf[Literal]) {
@@ -127,13 +126,7 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim {
127126
case (dt: ArrayType, dt1: ArrayType) =>
128127
isSupported(dt.elementType, dt1.elementType, timeZoneId, evalMode)
129128
case (dt: DataType, _) if dt.typeName == "timestamp_ntz" =>
130-
// https://github.com/apache/datafusion-comet/issues/378
131-
toType match {
132-
case DataTypes.TimestampType | DataTypes.DateType | DataTypes.StringType =>
133-
Incompatible()
134-
case _ =>
135-
unsupported(fromType, toType)
136-
}
129+
canCastFromTimestampNTZ(toType)
137130
case (_: DecimalType, _: DecimalType) =>
138131
Compatible()
139132
case (DataTypes.StringType, _) =>
@@ -261,6 +254,16 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim {
261254
}
262255
}
263256

257+
private def canCastFromTimestampNTZ(toType: DataType): SupportLevel = {
258+
toType match {
259+
case DataTypes.LongType => Compatible()
260+
case DataTypes.StringType => Compatible()
261+
case DataTypes.DateType => Compatible()
262+
case DataTypes.TimestampType => Compatible()
263+
case _ => unsupported(DataTypes.TimestampNTZType, toType)
264+
}
265+
}
266+
264267
private def canCastFromBoolean(toType: DataType): SupportLevel = toType match {
265268
case DataTypes.ByteType | DataTypes.ShortType | DataTypes.IntegerType | DataTypes.LongType |
266269
DataTypes.FloatType | DataTypes.DoubleType =>

spark/src/main/scala/org/apache/comet/serde/datetime.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -257,11 +257,9 @@ object CometSecond extends CometExpressionSerde[Second] {
257257
object CometUnixTimestamp extends CometExpressionSerde[UnixTimestamp] {
258258

259259
private def isSupportedInputType(expr: UnixTimestamp): Boolean = {
260-
// Note: TimestampNTZType is not supported because Comet incorrectly applies
261-
// timezone conversion to TimestampNTZ values. TimestampNTZ stores local time
262-
// without timezone, so no conversion should be applied.
263260
expr.children.head.dataType match {
264261
case TimestampType | DateType => true
262+
case dt if dt.typeName == "timestamp_ntz" => true
265263
case _ => false
266264
}
267265
}

spark/src/test/scala/org/apache/comet/CometCastSuite.scala

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1036,6 +1036,88 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper {
10361036
castTest(generateTimestamps(), DataTypes.DateType)
10371037
}
10381038

1039+
// CAST from TimestampNTZType
1040+
1041+
test("cast TimestampNTZType to LongType") {
1042+
castTest(generateTimestampNTZs(), DataTypes.LongType)
1043+
}
1044+
1045+
test("cast TimestampNTZType to StringType") {
1046+
castTest(generateTimestampNTZs(), DataTypes.StringType)
1047+
}
1048+
1049+
test("cast TimestampNTZType to DateType") {
1050+
castTest(generateTimestampNTZs(), DataTypes.DateType)
1051+
}
1052+
1053+
test("cast TimestampNTZType to TimestampType") {
1054+
castTest(generateTimestampNTZs(), DataTypes.TimestampType)
1055+
}
1056+
1057+
// CAST to TimestampNTZType
1058+
1059+
ignore("cast BooleanType to TimestampNTZType") {
1060+
// Spark does not support this cast
1061+
castTest(generateBools(), DataTypes.TimestampNTZType)
1062+
}
1063+
1064+
ignore("cast ByteType to TimestampNTZType") {
1065+
// Not yet implemented
1066+
castTest(generateBytes(), DataTypes.TimestampNTZType)
1067+
}
1068+
1069+
ignore("cast ShortType to TimestampNTZType") {
1070+
// Not yet implemented
1071+
castTest(generateShorts(), DataTypes.TimestampNTZType)
1072+
}
1073+
1074+
ignore("cast IntegerType to TimestampNTZType") {
1075+
// Not yet implemented
1076+
castTest(generateInts(), DataTypes.TimestampNTZType)
1077+
}
1078+
1079+
ignore("cast LongType to TimestampNTZType") {
1080+
// Not yet implemented
1081+
castTest(generateLongs(), DataTypes.TimestampNTZType)
1082+
}
1083+
1084+
ignore("cast FloatType to TimestampNTZType") {
1085+
// Not yet implemented
1086+
castTest(generateFloats(), DataTypes.TimestampNTZType)
1087+
}
1088+
1089+
ignore("cast DoubleType to TimestampNTZType") {
1090+
// Not yet implemented
1091+
castTest(generateDoubles(), DataTypes.TimestampNTZType)
1092+
}
1093+
1094+
ignore("cast DecimalType(10,2) to TimestampNTZType") {
1095+
// Not yet implemented
1096+
castTest(generateDecimalsPrecision10Scale2(), DataTypes.TimestampNTZType)
1097+
}
1098+
1099+
ignore("cast StringType to TimestampNTZType") {
1100+
// Not yet implemented
1101+
castTest(
1102+
gen.generateStrings(dataSize, timestampPattern, 8).toDF("a"),
1103+
DataTypes.TimestampNTZType)
1104+
}
1105+
1106+
ignore("cast BinaryType to TimestampNTZType") {
1107+
// Spark does not support this cast
1108+
castTest(generateBinary(), DataTypes.TimestampNTZType)
1109+
}
1110+
1111+
ignore("cast DateType to TimestampNTZType") {
1112+
// Not yet implemented
1113+
castTest(generateDates(), DataTypes.TimestampNTZType)
1114+
}
1115+
1116+
ignore("cast TimestampType to TimestampNTZType") {
1117+
// Not yet implemented
1118+
castTest(generateTimestamps(), DataTypes.TimestampNTZType)
1119+
}
1120+
10391121
// Complex Types
10401122

10411123
test("cast StructType to StringType") {
@@ -1276,6 +1358,20 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper {
12761358
.drop("str")
12771359
}
12781360

1361+
private def generateTimestampNTZs(): DataFrame = {
1362+
val values =
1363+
Seq(
1364+
"2024-01-01T12:34:56.123456",
1365+
"2024-01-01T01:00:00",
1366+
"9999-12-31T23:59:59.999999",
1367+
"1970-01-01T00:00:00",
1368+
"2024-12-31T01:00:00")
1369+
withNulls(values)
1370+
.toDF("str")
1371+
.withColumn("a", col("str").cast(DataTypes.TimestampNTZType))
1372+
.drop("str")
1373+
}
1374+
12791375
private def generateBinary(): DataFrame = {
12801376
val r = new Random(0)
12811377
val bytes = new Array[Byte](8)

spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -135,17 +135,47 @@ class CometTemporalExpressionSuite extends CometTestBase with AdaptiveSparkPlanH
135135
}
136136
}
137137

138-
test("unix_timestamp - timestamp_ntz input falls back to Spark") {
139-
// TimestampNTZ is not supported because Comet incorrectly applies timezone
140-
// conversion. TimestampNTZ stores local time without timezone, so the unix
141-
// timestamp should just be the value divided by microseconds per second.
138+
test("unix_timestamp - timestamp_ntz input") {
139+
// TimestampNTZ stores local time without timezone, so the unix
140+
// timestamp is the value divided by microseconds per second (no timezone conversion).
142141
val r = new Random(42)
143142
val ntzSchema = StructType(Seq(StructField("ts_ntz", DataTypes.TimestampNTZType, true)))
144143
val ntzDF = FuzzDataGenerator.generateDataFrame(r, spark, ntzSchema, 100, DataGenOptions())
145144
ntzDF.createOrReplaceTempView("ntz_tbl")
146-
checkSparkAnswerAndFallbackReason(
147-
"SELECT ts_ntz, unix_timestamp(ts_ntz) from ntz_tbl order by ts_ntz",
148-
"unix_timestamp does not support input type: TimestampNTZType")
145+
checkSparkAnswerAndOperator(
146+
"SELECT ts_ntz, unix_timestamp(ts_ntz) from ntz_tbl order by ts_ntz")
147+
}
148+
149+
test("hour/minute/second - timestamp_ntz input") {
150+
val r = new Random(42)
151+
val ntzSchema = StructType(Seq(StructField("ts_ntz", DataTypes.TimestampNTZType, true)))
152+
val ntzDF = FuzzDataGenerator.generateDataFrame(r, spark, ntzSchema, 100, DataGenOptions())
153+
ntzDF.createOrReplaceTempView("ntz_tbl")
154+
checkSparkAnswerAndOperator(
155+
"SELECT ts_ntz, hour(ts_ntz), minute(ts_ntz), second(ts_ntz) from ntz_tbl order by ts_ntz")
156+
}
157+
158+
test("date_trunc - timestamp_ntz input") {
159+
val r = new Random(42)
160+
val ntzSchema = StructType(Seq(StructField("ts_ntz", DataTypes.TimestampNTZType, true)))
161+
val ntzDF = FuzzDataGenerator.generateDataFrame(r, spark, ntzSchema, 100, DataGenOptions())
162+
ntzDF.createOrReplaceTempView("ntz_tbl")
163+
for (format <- CometTruncTimestamp.supportedFormats) {
164+
checkSparkAnswerAndOperator(
165+
s"SELECT ts_ntz, date_trunc('$format', ts_ntz) from ntz_tbl order by ts_ntz")
166+
}
167+
}
168+
169+
test("date_format - timestamp_ntz input") {
170+
val r = new Random(42)
171+
val ntzSchema = StructType(Seq(StructField("ts_ntz", DataTypes.TimestampNTZType, true)))
172+
val ntzDF = FuzzDataGenerator.generateDataFrame(r, spark, ntzSchema, 100, DataGenOptions())
173+
ntzDF.createOrReplaceTempView("ntz_tbl")
174+
val supportedFormats = CometDateFormat.supportedFormats.keys.toSeq.filterNot(_.contains("'"))
175+
for (format <- supportedFormats) {
176+
checkSparkAnswerAndOperator(
177+
s"SELECT ts_ntz, date_format(ts_ntz, '$format') from ntz_tbl order by ts_ntz")
178+
}
149179
}
150180

151181
test("unix_timestamp - string input falls back to Spark") {

0 commit comments

Comments
 (0)