Skip to content

Commit 7859888

Browse files
committed
int_timestamp_cast_support
1 parent 0c4d1d4 commit 7859888

3 files changed

Lines changed: 25 additions & 35 deletions

File tree

native/spark-expr/src/conversion_funcs/cast.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -620,6 +620,9 @@ macro_rules! cast_int_to_timestamp_impl {
620620
if arr.is_null(i) {
621621
$builder.append_null();
622622
} else {
623+
// saturating_mul limits to i64::MIN/MAX on overflow instead of panicking,
624+
// which could occur when converting extreme values (e.g., Long.MIN_VALUE)
625+
// matching spark behavior (irrespective of EvalMode)
623626
let micros = (arr.value(i) as i64).saturating_mul(MICROS_PER_SECOND);
624627
$builder.append_value(micros);
625628
}

spark/src/test/scala/org/apache/comet/CometCastSuite.scala

Lines changed: 19 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,23 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper {
6565
lazy val usingParquetExecWithIncompatTypes: Boolean =
6666
hasUnsignedSmallIntSafetyCheck(conf)
6767

68+
// Timezone list to check temporal type casts
69+
private val compatibleTimezones = Seq(
70+
"UTC",
71+
"America/New_York",
72+
"America/Chicago",
73+
"America/Denver",
74+
"America/Los_Angeles",
75+
"Europe/London",
76+
"Europe/Paris",
77+
"Europe/Berlin",
78+
"Asia/Tokyo",
79+
"Asia/Shanghai",
80+
"Asia/Singapore",
81+
"Asia/Kolkata",
82+
"Australia/Sydney",
83+
"Pacific/Auckland")
84+
6885
test("all valid cast combinations covered") {
6986
val names = testNames
7087

@@ -224,13 +241,6 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper {
224241
}
225242

226243
test("cast ByteType to TimestampType") {
227-
val compatibleTimezones = Seq(
228-
"UTC",
229-
"America/New_York",
230-
"America/Los_Angeles",
231-
"Europe/London",
232-
"Asia/Tokyo",
233-
"Australia/Sydney")
234244
compatibleTimezones.foreach { tz =>
235245
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> tz) {
236246
castTest(
@@ -311,13 +321,6 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper {
311321
}
312322

313323
test("cast ShortType to TimestampType") {
314-
val compatibleTimezones = Seq(
315-
"UTC",
316-
"America/New_York",
317-
"America/Los_Angeles",
318-
"Europe/London",
319-
"Asia/Tokyo",
320-
"Australia/Sydney")
321324
compatibleTimezones.foreach { tz =>
322325
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> tz) {
323326
castTest(
@@ -384,13 +387,6 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper {
384387
}
385388

386389
test("cast IntegerType to TimestampType") {
387-
val compatibleTimezones = Seq(
388-
"UTC",
389-
"America/New_York",
390-
"America/Los_Angeles",
391-
"Europe/London",
392-
"Asia/Tokyo",
393-
"Australia/Sydney")
394390
compatibleTimezones.foreach { tz =>
395391
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> tz) {
396392
castTest(generateInts(), DataTypes.TimestampType)
@@ -443,19 +439,10 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper {
443439
test("cast LongType to TimestampType") {
444440
// Use assertDataFrameEquals because extreme Long values (Long.MIN_VALUE, Long.MAX_VALUE)
445441
// overflow when converted to java.sql.Timestamp during collect(), but the cast itself works.
446-
val compatibleTimezones = Seq(
447-
"UTC",
448-
"America/New_York",
449-
"America/Los_Angeles",
450-
"Europe/London",
451-
"Asia/Tokyo",
452-
"Australia/Sydney")
453442
compatibleTimezones.foreach { tz =>
454443
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> tz) {
455-
withTempPath { dir =>
456-
val input = generateLongs()
457-
val data = roundtripParquet(input, dir).coalesce(1)
458-
val df = data.withColumn("ts", col("a").cast(DataTypes.TimestampType))
444+
withParquetTable(generateLongs(), "t1") {
445+
val df = spark.table("t1").withColumn("ts", col("a").cast(DataTypes.TimestampType))
459446
assertDataFrameEquals(df)
460447
}
461448
}

spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ abstract class CometTestBase
332332
}
333333
}
334334

335-
// inspired from spark-testing-base
335+
// inspired from spark-testing-base
336336
protected def assertDataFrameEquals(
337337
df: => DataFrame,
338338
checkNativeOperators: Boolean = true): Unit = {
@@ -343,12 +343,12 @@ abstract class CometTestBase
343343
}
344344
val cometDf = datasetOfRows(spark, df.logicalPlan)
345345

346-
// schema match check
346+
// schema match check
347347
assert(
348348
sparkDf.schema == cometDf.schema,
349349
s"Schemas do not match.\nCorrect Answer: ${sparkDf.schema}\n Spark Answer: ${cometDf.schema}")
350350

351-
// diff check using except instead of collect (collect errors out on spark driver for long -> timestamp conv)
351+
// diff check using except instead of collect (collect errors out on spark driver for long -> timestamp conv)
352352
val sparkMinusComet = sparkDf.except(cometDf)
353353
val cometMinusSpark = cometDf.except(sparkDf)
354354

0 commit comments

Comments
 (0)