diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 247d4124dae4a..2f5e83e81e4ea 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -49,6 +49,7 @@ import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.sql.util.{CaseInsensitiveStringMap, SchemaUtils} +import org.apache.spark.unsafe.types.TimestampNanosVal import org.apache.spark.util.ArrayImplicits._ import org.apache.spark.util.Utils @@ -1034,6 +1035,20 @@ object CatalogColumnStat extends Logging { forTimestampNTZ = forTimestampNTZ) } + /** + * Returns a [[TimestampFormatter]] with nanosecond precision (9 fractional digits) for + * lossless serialization of nanosecond-precision timestamp column statistics. + */ + private def getTimestampNanosFormatter( + isParsing: Boolean, + forTimestampNTZ: Boolean = false): TimestampFormatter = { + TimestampFormatter( + format = "yyyy-MM-dd HH:mm:ss.SSSSSSSSS", + zoneId = ZoneOffset.UTC, + isParsing = isParsing, + forTimestampNTZ = forTimestampNTZ) + } + /** * Converts from string representation of data type to the corresponding Catalyst data type. */ @@ -1047,6 +1062,11 @@ object CatalogColumnStat extends Logging { case TimestampType => getTimestampFormatter(isParsing = true).parse(s) case TimestampNTZType => getTimestampFormatter(isParsing = true, forTimestampNTZ = true).parse(s) + case t: TimestampLTZNanosType => + getTimestampNanosFormatter(isParsing = true).parseNanos(s, t.precision) + case t: TimestampNTZNanosType => + getTimestampNanosFormatter(isParsing = true, forTimestampNTZ = true) + .parseWithoutTimeZoneNanos(s, t.precision) case ByteType => s.toByte case ShortType => s.toShort case IntegerType => s.toInt @@ -1073,6 +1093,12 @@ object CatalogColumnStat extends Logging { case TimestampNTZType => getTimestampFormatter(isParsing = false, forTimestampNTZ = true) .format(v.asInstanceOf[Long]) + case t: TimestampLTZNanosType => + getTimestampNanosFormatter(isParsing = false) + .formatNanos(v.asInstanceOf[TimestampNanosVal], t.precision) + case t: TimestampNTZNanosType => + getTimestampNanosFormatter(isParsing = false, forTimestampNTZ = true) + .formatWithoutTimeZoneNanos(v.asInstanceOf[TimestampNanosVal], t.precision) case BooleanType | _: IntegralType | FloatType | DoubleType => v case _: DecimalType => v.asInstanceOf[Decimal].toJavaBigDecimal // This version of Spark does not use min/max for binary/string types so we ignore it. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala index 7083014f1f38f..c231e8414cd05 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala @@ -23,6 +23,7 @@ import scala.math.BigDecimal.RoundingMode import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, EmptyRow, Expression} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.types.{DecimalType, _} +import org.apache.spark.unsafe.types.TimestampNanosVal object EstimationUtils { @@ -136,6 +137,11 @@ object EstimationUtils { def toDouble(value: Any, dataType: DataType): Double = { dataType match { case _: NumericType | DateType | TimestampType => value.toString.toDouble + case _: AnyTimestampNanoType => + // Note: epochMicros*1000+nanosWithinMicro exceeds Double's 2^53 exact-integer range for + // real-world dates, so this conversion is slightly lossy. Acceptable for estimation only. + val v = value.asInstanceOf[TimestampNanosVal] + v.epochMicros * 1000.0 + v.nanosWithinMicro case BooleanType => if (value.asInstanceOf[Boolean]) 1 else 0 } } @@ -145,6 +151,11 @@ object EstimationUtils { case BooleanType => double.toInt == 1 case DateType => double.toInt case TimestampType => double.toLong + case _: AnyTimestampNanoType => + // Lossy: the input Double may have lost precision in toDouble (see above). + // Acceptable for CBO estimation only. + val nanos = double.toLong + TimestampNanosVal.fromParts(Math.floorDiv(nanos, 1000L), Math.floorMod(nanos, 1000).toShort) case ByteType => double.toByte case ShortType => double.toShort case IntegerType => double.toInt diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala index 814c74d160156..a2fffb0dae629 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala @@ -291,7 +291,8 @@ case class FilterEstimation(plan: Filter) extends Logging { } attr.dataType match { - case _: NumericType | DateType | TimestampType | BooleanType => + case _: NumericType | DateType | TimestampType | BooleanType | + _: AnyTimestampNanoType => evaluateBinaryForNumeric(op, attr, literal, update) case StringType | BinaryType => // TODO: It is difficult to support other binary comparisons for String/Binary @@ -413,7 +414,8 @@ case class FilterEstimation(plan: Filter) extends Logging { // use [min, max] to filter the original hSet dataType match { - case _: NumericType | BooleanType | DateType | TimestampType => + case _: NumericType | BooleanType | DateType | TimestampType | + _: AnyTimestampNanoType => if (ndv.toDouble == 0) { return Some(0.0) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/UnionEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/UnionEstimation.scala index 7ad05ee3ad6b3..e451a21065daf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/UnionEstimation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/UnionEstimation.scala @@ -38,7 +38,7 @@ object UnionEstimation { private def isTypeSupported(dt: DataType): Boolean = dt match { case ByteType | IntegerType | ShortType | FloatType | LongType | DoubleType | DateType | _: DecimalType | TimestampType | TimestampNTZType | - _: AnsiIntervalType => true + _: AnsiIntervalType | _: AnyTimestampNanoType => true case _ => false } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala index 6bd8f7d24ca2a..f1751f35e2879 100755 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.ColumnStatsMa import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils._ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.TimestampNanosVal /** * In this test suite, we test predicates containing the following operators: @@ -114,6 +115,16 @@ class FilterEstimationSuite extends StatsEstimationTestBase { val colStatIntSkewHgm = ColumnStat(distinctCount = Some(5), min = Some(1), max = Some(10), nullCount = Some(0), avgLen = Some(4), maxLen = Some(4), histogram = Some(hgmIntSkew)) + // column ctsnanos has 10 nanosecond-precision timestamp values. + // Values span from epochMicros=1000 nanosWithinMicro=0 to epochMicros=1009 nanosWithinMicro=0 + // i.e. total epoch nanos 1000000..1009000 (range = 9000 nanos) + val tsNanosMin = TimestampNanosVal.fromParts(1000L, 0.toShort) + val tsNanosMax = TimestampNanosVal.fromParts(1009L, 0.toShort) + val attrTsNanos = AttributeReference("ctsnanos", TimestampNTZNanosType(9))() + val colStatTsNanos = ColumnStat(distinctCount = Some(10), + min = Some(tsNanosMin), max = Some(tsNanosMax), + nullCount = Some(0), avgLen = Some(10), maxLen = Some(10)) + val attributeMap = AttributeMap(Seq( attrInt -> colStatInt, attrBool -> colStatBool, @@ -125,7 +136,8 @@ class FilterEstimationSuite extends StatsEstimationTestBase { attrInt3 -> colStatInt3, attrInt4 -> colStatInt4, attrIntHgm -> colStatIntHgm, - attrIntSkewHgm -> colStatIntSkewHgm + attrIntSkewHgm -> colStatIntSkewHgm, + attrTsNanos -> colStatTsNanos )) test("true") { @@ -1020,4 +1032,136 @@ class FilterEstimationSuite extends StatsEstimationTestBase { } } + // Tests for nanosecond-precision timestamp types (SPARK-57839) + test("ctsnanos = TimestampNanosVal(1003, 0)") { + val tsVal = TimestampNanosVal.fromParts(1003L, 0.toShort) + validateEstimatedStats( + Filter(EqualTo(attrTsNanos, Literal(tsVal, TimestampNTZNanosType(9))), + childStatsTestPlan(Seq(attrTsNanos), 10L)), + Seq(attrTsNanos -> ColumnStat(distinctCount = Some(1), + min = Some(tsVal), max = Some(tsVal), + nullCount = Some(0), avgLen = Some(10), maxLen = Some(10))), + expectedRowCount = 1) + } + + test("ctsnanos < TimestampNanosVal(1003, 0)") { + // Range [1000000, 1009000], value = 1003000, fraction = 3000/9000 = 1/3 + // Rows = ceil(10 * 3/9) = 4 (3.33 rounds up), ndv = ceil(10 * 3/9) = 4 + val tsVal = TimestampNanosVal.fromParts(1003L, 0.toShort) + validateEstimatedStats( + Filter(LessThan(attrTsNanos, Literal(tsVal, TimestampNTZNanosType(9))), + childStatsTestPlan(Seq(attrTsNanos), 10L)), + Seq(attrTsNanos -> ColumnStat(distinctCount = Some(4), + min = Some(tsNanosMin), max = Some(tsVal), + nullCount = Some(0), avgLen = Some(10), maxLen = Some(10))), + expectedRowCount = 4) + } + + test("ctsnanos IN (TimestampNanosVal(1003, 0), TimestampNanosVal(1005, 0))") { + val ts3 = TimestampNanosVal.fromParts(1003L, 0.toShort) + val ts5 = TimestampNanosVal.fromParts(1005L, 0.toShort) + validateEstimatedStats( + Filter(In(attrTsNanos, Seq( + Literal(ts3, TimestampNTZNanosType(9)), + Literal(ts5, TimestampNTZNanosType(9)))), + childStatsTestPlan(Seq(attrTsNanos), 10L)), + Seq(attrTsNanos -> ColumnStat(distinctCount = Some(2), + min = Some(ts3), max = Some(ts5), + nullCount = Some(0), avgLen = Some(10), maxLen = Some(10))), + expectedRowCount = 2) + } + + // column ctsLtzNanos: LTZ nanos type with sub-microsecond values (nanosWithinMicro != 0). + // Values span epochMicros=1000 nanos=500 to epochMicros=1009 nanos=500 + // i.e. total epoch nanos 1000500..1009500 (range = 9000 nanos) + val tsLtzNanosMin = TimestampNanosVal.fromParts(1000L, 500.toShort) + val tsLtzNanosMax = TimestampNanosVal.fromParts(1009L, 500.toShort) + val attrTsLtzNanos = AttributeReference("ctsltznanos", TimestampLTZNanosType(9))() + val colStatTsLtzNanos = ColumnStat(distinctCount = Some(10), + min = Some(tsLtzNanosMin), max = Some(tsLtzNanosMax), + nullCount = Some(0), avgLen = Some(10), maxLen = Some(10)) + + test("ctsltznanos < TimestampNanosVal(1003, 500) - LTZ type with sub-microsecond") { + // Range [1000500, 1009500], value = 1003500, fraction = 3000/9000 = 1/3 + // Rows = ceil(10 * 3/9) = 4, ndv = ceil(10 * 3/9) = 4 + val tsVal = TimestampNanosVal.fromParts(1003L, 500.toShort) + val ltzMap = AttributeMap(Seq(attrTsLtzNanos -> colStatTsLtzNanos)) + validateEstimatedStats( + Filter(LessThan(attrTsLtzNanos, Literal(tsVal, TimestampLTZNanosType(9))), + childStatsTestPlan(Seq(attrTsLtzNanos), 10L, ltzMap)), + Seq(attrTsLtzNanos -> ColumnStat(distinctCount = Some(4), + min = Some(tsLtzNanosMin), max = Some(tsVal), + nullCount = Some(0), avgLen = Some(10), maxLen = Some(10))), + expectedRowCount = 4) + } + + test("ctsnanos = TimestampNanosVal(1003, 456) - sub-microsecond nanosWithinMicro") { + // Test that sub-microsecond nanosWithinMicro term is exercised in toDouble estimation. + // The value 1003*1000+456 = 1003456 falls within range [1000000, 1009000]. + val subMicroMin = TimestampNanosVal.fromParts(1000L, 0.toShort) + val subMicroMax = TimestampNanosVal.fromParts(1009L, 0.toShort) + val attrSubMicro = AttributeReference("ctssubmicro", TimestampNTZNanosType(9))() + val colStatSubMicro = ColumnStat(distinctCount = Some(10), + min = Some(subMicroMin), max = Some(subMicroMax), + nullCount = Some(0), avgLen = Some(10), maxLen = Some(10)) + val subMicroMap = AttributeMap(Seq(attrSubMicro -> colStatSubMicro)) + val tsVal = TimestampNanosVal.fromParts(1003L, 456.toShort) + validateEstimatedStats( + Filter(EqualTo(attrSubMicro, Literal(tsVal, TimestampNTZNanosType(9))), + childStatsTestPlan(Seq(attrSubMicro), 10L, subMicroMap)), + Seq(attrSubMicro -> ColumnStat(distinctCount = Some(1), + min = Some(tsVal), max = Some(tsVal), + nullCount = Some(0), avgLen = Some(10), maxLen = Some(10))), + expectedRowCount = 1) + } + + // High-magnitude nanos tests: exercise the lossy toDouble/fromDouble path where + // epochMicros*1000 exceeds Double's 2^53 exact-integer range (real-world 2024 timestamps). + // epochMicros ~1.7e15 => epoch nanos ~1.7e18 >> 2^53 (~9e15). + // The point is to verify estimation remains sensible despite floating-point rounding. + + test("NTZ nanos filter estimation at high magnitude (2024 timestamps)") { + // 2024-01-01T00:00:00Z => epochMicros = 1704067200000000L + // 2024-01-01T00:00:09Z => epochMicros = 1704067209000000L + // Range in nanos = 9000000 nanos (9 ms) + val hiMin = TimestampNanosVal.fromParts(1704067200000000L, 0.toShort) + val hiMax = TimestampNanosVal.fromParts(1704067209000000L, 0.toShort) + val attrHiNtz = AttributeReference("ctsnanos_hi", TimestampNTZNanosType(9))() + val colStatHi = ColumnStat(distinctCount = Some(10), + min = Some(hiMin), max = Some(hiMax), + nullCount = Some(0), avgLen = Some(10), maxLen = Some(10)) + val hiMap = AttributeMap(Seq(attrHiNtz -> colStatHi)) + // Filter value at ~1/3 of the range + val tsVal = TimestampNanosVal.fromParts(1704067203000000L, 0.toShort) + // Expected: fraction ~ 3/9 = 1/3, rows = ceil(10 * 1/3) = 4, ndv = 4 + validateEstimatedStats( + Filter(LessThan(attrHiNtz, Literal(tsVal, TimestampNTZNanosType(9))), + childStatsTestPlan(Seq(attrHiNtz), 10L, hiMap)), + Seq(attrHiNtz -> ColumnStat(distinctCount = Some(4), + min = Some(hiMin), max = Some(tsVal), + nullCount = Some(0), avgLen = Some(10), maxLen = Some(10))), + expectedRowCount = 4) + } + + test("LTZ nanos filter estimation at high magnitude (2024 timestamps)") { + // Same magnitude as above but using TimestampLTZNanosType. + val hiMin = TimestampNanosVal.fromParts(1704067200000000L, 0.toShort) + val hiMax = TimestampNanosVal.fromParts(1704067209000000L, 0.toShort) + val attrHiLtz = AttributeReference("ctsltznanos_hi", TimestampLTZNanosType(9))() + val colStatHi = ColumnStat(distinctCount = Some(10), + min = Some(hiMin), max = Some(hiMax), + nullCount = Some(0), avgLen = Some(10), maxLen = Some(10)) + val hiMap = AttributeMap(Seq(attrHiLtz -> colStatHi)) + // Filter value at ~1/3 of the range + val tsVal = TimestampNanosVal.fromParts(1704067203000000L, 0.toShort) + // Expected: fraction ~ 3/9 = 1/3, rows = ceil(10 * 1/3) = 4, ndv = 4 + validateEstimatedStats( + Filter(LessThan(attrHiLtz, Literal(tsVal, TimestampLTZNanosType(9))), + childStatsTestPlan(Seq(attrHiLtz), 10L, hiMap)), + Seq(attrHiLtz -> ColumnStat(distinctCount = Some(4), + min = Some(hiMin), max = Some(tsVal), + nullCount = Some(0), avgLen = Some(10), maxLen = Some(10))), + expectedRowCount = 4) + } + } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala index cdfc863cc0212..357ba48144580 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils._ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types.{DateType, TimestampType, _} +import org.apache.spark.unsafe.types.TimestampNanosVal class JoinEstimationSuite extends StatsEstimationTestBase { @@ -506,7 +507,12 @@ class JoinEstimationSuite extends StatsEstimationTestBase { nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)), AttributeReference("ctimestamp", TimestampType)() -> ColumnStat(distinctCount = Some(1), min = Some(timestamp), max = Some(timestamp), - nullCount = Some(0), avgLen = Some(8), maxLen = Some(8)) + nullCount = Some(0), avgLen = Some(8), maxLen = Some(8)), + AttributeReference("ctsnanos", TimestampNTZNanosType(9))() -> ColumnStat( + distinctCount = Some(1), + min = Some(TimestampNanosVal.fromParts(timestamp, 123.toShort)), + max = Some(TimestampNanosVal.fromParts(timestamp, 123.toShort)), + nullCount = Some(0), avgLen = Some(10), maxLen = Some(10)) ) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/UnionEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/UnionEstimationSuite.scala index c6857e5fea908..efb900dee33aa 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/UnionEstimationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/UnionEstimationSuite.scala @@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReferen import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Union} import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.TimestampNanosVal class UnionEstimationSuite extends StatsEstimationTestBase { @@ -60,7 +61,7 @@ class UnionEstimationSuite extends StatsEstimationTestBase { val attrTimestampNTZ = AttributeReference("ctimestamp_ntz", TimestampNTZType)() val attrYMInterval = AttributeReference("cyminterval", YearMonthIntervalType())() val attrDTInterval = AttributeReference("cdtinterval", DayTimeIntervalType())() - + val attrTsNanos = AttributeReference("ctsnanos", TimestampNTZNanosType(9))() val s1 = 1.toShort val s2 = 4.toShort val b1 = 1.toByte @@ -90,7 +91,10 @@ class UnionEstimationSuite extends StatsEstimationTestBase { attrTimestamp -> ColumnStat(min = Some(1L), max = Some(4L)), attrTimestampNTZ -> ColumnStat(min = Some(1L), max = Some(4L)), attrYMInterval -> ColumnStat(min = Some(2), max = Some(5)), - attrDTInterval -> ColumnStat(min = Some(2L), max = Some(5L)))) + attrDTInterval -> ColumnStat(min = Some(2L), max = Some(5L)), + attrTsNanos -> ColumnStat( + min = Some(TimestampNanosVal.fromParts(1L, 0.toShort)), + max = Some(TimestampNanosVal.fromParts(4L, 0.toShort))))) val s3 = 2.toShort val s4 = 6.toShort @@ -133,7 +137,10 @@ class UnionEstimationSuite extends StatsEstimationTestBase { max = Some(8)), AttributeReference("cdttimestamp1", DayTimeIntervalType())() -> ColumnStat( min = Some(4L), - max = Some(8L)))) + max = Some(8L)), + AttributeReference("ctsnanos1", TimestampNTZNanosType(9))() -> ColumnStat( + min = Some(TimestampNanosVal.fromParts(3L, 0.toShort)), + max = Some(TimestampNanosVal.fromParts(6L, 0.toShort))))) val child1 = StatsTestPlan( outputList = columnInfo.keys.toSeq.sortWith(_.exprId.id < _.exprId.id), @@ -167,7 +174,10 @@ class UnionEstimationSuite extends StatsEstimationTestBase { attrTimestamp -> ColumnStat(min = Some(1L), max = Some(6L)), attrTimestampNTZ -> ColumnStat(min = Some(1L), max = Some(6L)), attrYMInterval -> ColumnStat(min = Some(2), max = Some(8)), - attrDTInterval -> ColumnStat(min = Some(2L), max = Some(8L))))) + attrDTInterval -> ColumnStat(min = Some(2L), max = Some(8L)), + attrTsNanos -> ColumnStat( + min = Some(TimestampNanosVal.fromParts(1L, 0.toShort)), + max = Some(TimestampNanosVal.fromParts(6L, 0.toShort)))))) assert(union.stats === expectedStats) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala index 3eb2b14539ffe..c8fcd3f3fb892 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala @@ -371,6 +371,9 @@ object CommandUtils extends Logging { case _: IntegralType => true case _: DecimalType => true case DoubleType | FloatType => true + // Exclude nanosecond timestamp types: ApproximatePercentile and + // ApproxCountDistinctForIntervals do not accept them yet (separate scope). + case _: AnyTimestampNanoType => false case _: DatetimeType => true case _ => false } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala index 5222d5ce26658..62f62c8d88691 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala @@ -38,6 +38,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.test.SQLTestData.ArrayData import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.TimestampNanosVal import org.apache.spark.util.Utils @@ -205,6 +206,67 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared } } + test("SPARK-57839: nanosecond-precision timestamp column stats lossless round trip") { + // Use a sub-microsecond value (nanosWithinMicro != 0) to prove nanos survive serialization. + val ntzNanosType = TimestampNTZNanosType(9) + val ltzNanosType = TimestampLTZNanosType(9) + val subMicroVal = TimestampNanosVal.fromParts(1640995201123456L, 789.toShort) + + // NTZ nanos round-trip + val ntzStr = CatalogColumnStat.toExternalString(subMicroVal, "col", ntzNanosType) + val ntzParsed = CatalogColumnStat.fromExternalString(ntzStr, "col", ntzNanosType, 2) + assert(ntzParsed === subMicroVal, + s"NTZ nanos round-trip failed: formatted='$ntzStr', parsed=$ntzParsed, expected=$subMicroVal") + + // LTZ nanos round-trip + val ltzStr = CatalogColumnStat.toExternalString(subMicroVal, "col", ltzNanosType) + val ltzParsed = CatalogColumnStat.fromExternalString(ltzStr, "col", ltzNanosType, 2) + assert(ltzParsed === subMicroVal, + s"LTZ nanos round-trip failed: formatted='$ltzStr', parsed=$ltzParsed, expected=$subMicroVal") + + // Full CatalogColumnStat map round-trip + val catalogStat = CatalogColumnStat( + distinctCount = Some(1), + min = Some(ntzStr), + max = Some(ntzStr), + nullCount = Some(0), + avgLen = Some(10), + maxLen = Some(10)) + val map = catalogStat.toMap("ctsnanos") + val roundtrip = CatalogColumnStat.fromMap("t", "ctsnanos", map) + assert(roundtrip === Some(catalogStat)) + } + + test("SPARK-57839: ANALYZE TABLE FOR COLUMNS on nanosecond timestamp collects basic stats") { + // Nanosecond timestamp columns should collect basic stats (min/max/ndv) but skip histogram, + // since ApproximatePercentile and ApproxCountDistinctForIntervals do not accept nanos types. + val table = "nanos_analyze_test" + withTable(table) { + sql( + s"""CREATE TABLE $table (id INT, ts_ntz TIMESTAMP_NTZ(9), ts_ltz TIMESTAMP_LTZ(9)) + |USING PARQUET""".stripMargin) + sql(s"INSERT INTO $table VALUES (1, TIMESTAMP_NTZ'2024-01-01 00:00:00.123456789', " + + "TIMESTAMP_LTZ'2024-01-01 00:00:00.123456789')") + sql(s"INSERT INTO $table VALUES (2, TIMESTAMP_NTZ'2024-06-15 12:30:00.987654321', " + + "TIMESTAMP_LTZ'2024-06-15 12:30:00.987654321')") + + // Should succeed with histograms enabled - nanos cols get basic stats, no histogram error + withSQLConf(SQLConf.HISTOGRAM_ENABLED.key -> "true") { + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS ts_ntz, ts_ltz") + } + + val stats = getCatalogTable(table).stats + assert(stats.isDefined) + assert(stats.get.colStats.contains("ts_ntz")) + assert(stats.get.colStats.contains("ts_ltz")) + // Verify basic stats collected (no histogram field for nanos columns) + val ntzStats = stats.get.colStats("ts_ntz") + assert(ntzStats.distinctCount.contains(2)) + assert(ntzStats.nullCount.contains(0)) + assert(ntzStats.histogram.isEmpty, "Nanos columns should not have histograms") + } + } + test("analyze column command - result verification") { // (data.head.productArity - 1) because the last column does not support stats collection. assert(stats.size == data.head.productArity - 1)