Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.{CaseInsensitiveStringMap, SchemaUtils}
import org.apache.spark.unsafe.types.TimestampNanosVal
import org.apache.spark.util.ArrayImplicits._
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -1034,6 +1035,20 @@ object CatalogColumnStat extends Logging {
forTimestampNTZ = forTimestampNTZ)
}

/**
* Returns a [[TimestampFormatter]] with nanosecond precision (9 fractional digits) for
* lossless serialization of nanosecond-precision timestamp column statistics.
*/
private def getTimestampNanosFormatter(
isParsing: Boolean,
forTimestampNTZ: Boolean = false): TimestampFormatter = {
TimestampFormatter(
format = "yyyy-MM-dd HH:mm:ss.SSSSSSSSS",
zoneId = ZoneOffset.UTC,
isParsing = isParsing,
forTimestampNTZ = forTimestampNTZ)
}

/**
* Converts from string representation of data type to the corresponding Catalyst data type.
*/
Expand All @@ -1047,6 +1062,11 @@ object CatalogColumnStat extends Logging {
case TimestampType => getTimestampFormatter(isParsing = true).parse(s)
case TimestampNTZType =>
getTimestampFormatter(isParsing = true, forTimestampNTZ = true).parse(s)
case t: TimestampLTZNanosType =>
getTimestampNanosFormatter(isParsing = true).parseNanos(s, t.precision)
case t: TimestampNTZNanosType =>
getTimestampNanosFormatter(isParsing = true, forTimestampNTZ = true)
.parseWithoutTimeZoneNanos(s, t.precision)
case ByteType => s.toByte
case ShortType => s.toShort
case IntegerType => s.toInt
Expand All @@ -1073,6 +1093,12 @@ object CatalogColumnStat extends Logging {
case TimestampNTZType =>
getTimestampFormatter(isParsing = false, forTimestampNTZ = true)
.format(v.asInstanceOf[Long])
case t: TimestampLTZNanosType =>
getTimestampNanosFormatter(isParsing = false)
.formatNanos(v.asInstanceOf[TimestampNanosVal], t.precision)
case t: TimestampNTZNanosType =>
getTimestampNanosFormatter(isParsing = false, forTimestampNTZ = true)
.formatWithoutTimeZoneNanos(v.asInstanceOf[TimestampNanosVal], t.precision)
case BooleanType | _: IntegralType | FloatType | DoubleType => v
case _: DecimalType => v.asInstanceOf[Decimal].toJavaBigDecimal
// This version of Spark does not use min/max for binary/string types so we ignore it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.math.BigDecimal.RoundingMode
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, EmptyRow, Expression}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.types.{DecimalType, _}
import org.apache.spark.unsafe.types.TimestampNanosVal

object EstimationUtils {

Expand Down Expand Up @@ -136,6 +137,11 @@ object EstimationUtils {
def toDouble(value: Any, dataType: DataType): Double = {
dataType match {
case _: NumericType | DateType | TimestampType => value.toString.toDouble
case _: AnyTimestampNanoType =>
// Note: epochMicros*1000+nanosWithinMicro exceeds Double's 2^53 exact-integer range for
// real-world dates, so this conversion is slightly lossy. Acceptable for estimation only.
val v = value.asInstanceOf[TimestampNanosVal]
v.epochMicros * 1000.0 + v.nanosWithinMicro
case BooleanType => if (value.asInstanceOf[Boolean]) 1 else 0
}
}
Expand All @@ -145,6 +151,9 @@ object EstimationUtils {
case BooleanType => double.toInt == 1
case DateType => double.toInt
case TimestampType => double.toLong
case _: AnyTimestampNanoType =>
val nanos = double.toLong
TimestampNanosVal.fromParts(Math.floorDiv(nanos, 1000L), Math.floorMod(nanos, 1000).toShort)
case ByteType => double.toByte
case ShortType => double.toShort
case IntegerType => double.toInt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,8 @@ case class FilterEstimation(plan: Filter) extends Logging {
}

attr.dataType match {
case _: NumericType | DateType | TimestampType | BooleanType =>
case _: NumericType | DateType | TimestampType | BooleanType |
_: AnyTimestampNanoType =>
evaluateBinaryForNumeric(op, attr, literal, update)
case StringType | BinaryType =>
// TODO: It is difficult to support other binary comparisons for String/Binary
Expand Down Expand Up @@ -413,7 +414,8 @@ case class FilterEstimation(plan: Filter) extends Logging {

// use [min, max] to filter the original hSet
dataType match {
case _: NumericType | BooleanType | DateType | TimestampType =>
case _: NumericType | BooleanType | DateType | TimestampType |
_: AnyTimestampNanoType =>
if (ndv.toDouble == 0) {
return Some(0.0)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ object UnionEstimation {
private def isTypeSupported(dt: DataType): Boolean = dt match {
case ByteType | IntegerType | ShortType | FloatType | LongType |
DoubleType | DateType | _: DecimalType | TimestampType | TimestampNTZType |
_: AnsiIntervalType => true
_: AnsiIntervalType | _: AnyTimestampNanoType => true
case _ => false
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.ColumnStatsMa
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.TimestampNanosVal

/**
* In this test suite, we test predicates containing the following operators:
Expand Down Expand Up @@ -114,6 +115,16 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
val colStatIntSkewHgm = ColumnStat(distinctCount = Some(5), min = Some(1), max = Some(10),
nullCount = Some(0), avgLen = Some(4), maxLen = Some(4), histogram = Some(hgmIntSkew))

// column ctsnanos has 10 nanosecond-precision timestamp values.
// Values span from epochMicros=1000 nanosWithinMicro=0 to epochMicros=1009 nanosWithinMicro=0
// i.e. total epoch nanos 1000000..1009000 (range = 9000 nanos)
val tsNanosMin = TimestampNanosVal.fromParts(1000L, 0.toShort)
val tsNanosMax = TimestampNanosVal.fromParts(1009L, 0.toShort)
val attrTsNanos = AttributeReference("ctsnanos", TimestampNTZNanosType(9))()
val colStatTsNanos = ColumnStat(distinctCount = Some(10),
min = Some(tsNanosMin), max = Some(tsNanosMax),
nullCount = Some(0), avgLen = Some(10), maxLen = Some(10))

val attributeMap = AttributeMap(Seq(
attrInt -> colStatInt,
attrBool -> colStatBool,
Expand All @@ -125,7 +136,8 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
attrInt3 -> colStatInt3,
attrInt4 -> colStatInt4,
attrIntHgm -> colStatIntHgm,
attrIntSkewHgm -> colStatIntSkewHgm
attrIntSkewHgm -> colStatIntSkewHgm,
attrTsNanos -> colStatTsNanos
))

test("true") {
Expand Down Expand Up @@ -1020,4 +1032,87 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
}
}

// Tests for nanosecond-precision timestamp types (SPARK-57839)
test("ctsnanos = TimestampNanosVal(1003, 0)") {
val tsVal = TimestampNanosVal.fromParts(1003L, 0.toShort)
validateEstimatedStats(
Filter(EqualTo(attrTsNanos, Literal(tsVal, TimestampNTZNanosType(9))),
childStatsTestPlan(Seq(attrTsNanos), 10L)),
Seq(attrTsNanos -> ColumnStat(distinctCount = Some(1),
min = Some(tsVal), max = Some(tsVal),
nullCount = Some(0), avgLen = Some(10), maxLen = Some(10))),
expectedRowCount = 1)
}

test("ctsnanos < TimestampNanosVal(1003, 0)") {
// Range [1000000, 1009000], value = 1003000, fraction = 3000/9000 = 1/3
// Rows = ceil(10 * 3/9) = 4 (3.33 rounds up), ndv = ceil(10 * 3/9) = 4
val tsVal = TimestampNanosVal.fromParts(1003L, 0.toShort)
validateEstimatedStats(
Filter(LessThan(attrTsNanos, Literal(tsVal, TimestampNTZNanosType(9))),
childStatsTestPlan(Seq(attrTsNanos), 10L)),
Seq(attrTsNanos -> ColumnStat(distinctCount = Some(4),
min = Some(tsNanosMin), max = Some(tsVal),
nullCount = Some(0), avgLen = Some(10), maxLen = Some(10))),
expectedRowCount = 4)
}

test("ctsnanos IN (TimestampNanosVal(1003, 0), TimestampNanosVal(1005, 0))") {
val ts3 = TimestampNanosVal.fromParts(1003L, 0.toShort)
val ts5 = TimestampNanosVal.fromParts(1005L, 0.toShort)
validateEstimatedStats(
Filter(In(attrTsNanos, Seq(
Literal(ts3, TimestampNTZNanosType(9)),
Literal(ts5, TimestampNTZNanosType(9)))),
childStatsTestPlan(Seq(attrTsNanos), 10L)),
Seq(attrTsNanos -> ColumnStat(distinctCount = Some(2),
min = Some(ts3), max = Some(ts5),
nullCount = Some(0), avgLen = Some(10), maxLen = Some(10))),
expectedRowCount = 2)
}

// column ctsLtzNanos: LTZ nanos type with sub-microsecond values (nanosWithinMicro != 0).
// Values span epochMicros=1000 nanos=500 to epochMicros=1009 nanos=500
// i.e. total epoch nanos 1000500..1009500 (range = 9000 nanos)
val tsLtzNanosMin = TimestampNanosVal.fromParts(1000L, 500.toShort)
val tsLtzNanosMax = TimestampNanosVal.fromParts(1009L, 500.toShort)
val attrTsLtzNanos = AttributeReference("ctsltznanos", TimestampLTZNanosType(9))()
val colStatTsLtzNanos = ColumnStat(distinctCount = Some(10),
min = Some(tsLtzNanosMin), max = Some(tsLtzNanosMax),
nullCount = Some(0), avgLen = Some(10), maxLen = Some(10))

test("ctsltznanos < TimestampNanosVal(1003, 500) - LTZ type with sub-microsecond") {
// Range [1000500, 1009500], value = 1003500, fraction = 3000/9000 = 1/3
// Rows = ceil(10 * 3/9) = 4, ndv = ceil(10 * 3/9) = 4
val tsVal = TimestampNanosVal.fromParts(1003L, 500.toShort)
val ltzMap = AttributeMap(Seq(attrTsLtzNanos -> colStatTsLtzNanos))
validateEstimatedStats(
Filter(LessThan(attrTsLtzNanos, Literal(tsVal, TimestampLTZNanosType(9))),
childStatsTestPlan(Seq(attrTsLtzNanos), 10L, ltzMap)),
Seq(attrTsLtzNanos -> ColumnStat(distinctCount = Some(4),
min = Some(tsLtzNanosMin), max = Some(tsVal),
nullCount = Some(0), avgLen = Some(10), maxLen = Some(10))),
expectedRowCount = 4)
}

test("ctsnanos = TimestampNanosVal(1003, 456) - sub-microsecond nanosWithinMicro") {
// Test that sub-microsecond nanosWithinMicro term is exercised in toDouble estimation.
// The value 1003*1000+456 = 1003456 falls within range [1000000, 1009000].
val subMicroMin = TimestampNanosVal.fromParts(1000L, 0.toShort)
val subMicroMax = TimestampNanosVal.fromParts(1009L, 0.toShort)
val attrSubMicro = AttributeReference("ctssubmicro", TimestampNTZNanosType(9))()
val colStatSubMicro = ColumnStat(distinctCount = Some(10),
min = Some(subMicroMin), max = Some(subMicroMax),
nullCount = Some(0), avgLen = Some(10), maxLen = Some(10))
val subMicroMap = AttributeMap(Seq(attrSubMicro -> colStatSubMicro))
val tsVal = TimestampNanosVal.fromParts(1003L, 456.toShort)
validateEstimatedStats(
Filter(EqualTo(attrSubMicro, Literal(tsVal, TimestampNTZNanosType(9))),
childStatsTestPlan(Seq(attrSubMicro), 10L, subMicroMap)),
Seq(attrSubMicro -> ColumnStat(distinctCount = Some(1),
min = Some(tsVal), max = Some(tsVal),
nullCount = Some(0), avgLen = Some(10), maxLen = Some(10))),
expectedRowCount = 1)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types.{DateType, TimestampType, _}
import org.apache.spark.unsafe.types.TimestampNanosVal


class JoinEstimationSuite extends StatsEstimationTestBase {
Expand Down Expand Up @@ -506,7 +507,12 @@ class JoinEstimationSuite extends StatsEstimationTestBase {
nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
AttributeReference("ctimestamp", TimestampType)() -> ColumnStat(distinctCount = Some(1),
min = Some(timestamp), max = Some(timestamp),
nullCount = Some(0), avgLen = Some(8), maxLen = Some(8))
nullCount = Some(0), avgLen = Some(8), maxLen = Some(8)),
AttributeReference("ctsnanos", TimestampNTZNanosType(9))() -> ColumnStat(
distinctCount = Some(1),
min = Some(TimestampNanosVal.fromParts(timestamp, 123.toShort)),
max = Some(TimestampNanosVal.fromParts(timestamp, 123.toShort)),
nullCount = Some(0), avgLen = Some(10), maxLen = Some(10))
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReferen
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Union}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.TimestampNanosVal

class UnionEstimationSuite extends StatsEstimationTestBase {

Expand Down Expand Up @@ -60,7 +61,7 @@ class UnionEstimationSuite extends StatsEstimationTestBase {
val attrTimestampNTZ = AttributeReference("ctimestamp_ntz", TimestampNTZType)()
val attrYMInterval = AttributeReference("cyminterval", YearMonthIntervalType())()
val attrDTInterval = AttributeReference("cdtinterval", DayTimeIntervalType())()

val attrTsNanos = AttributeReference("ctsnanos", TimestampNTZNanosType(9))()
val s1 = 1.toShort
val s2 = 4.toShort
val b1 = 1.toByte
Expand Down Expand Up @@ -90,7 +91,10 @@ class UnionEstimationSuite extends StatsEstimationTestBase {
attrTimestamp -> ColumnStat(min = Some(1L), max = Some(4L)),
attrTimestampNTZ -> ColumnStat(min = Some(1L), max = Some(4L)),
attrYMInterval -> ColumnStat(min = Some(2), max = Some(5)),
attrDTInterval -> ColumnStat(min = Some(2L), max = Some(5L))))
attrDTInterval -> ColumnStat(min = Some(2L), max = Some(5L)),
attrTsNanos -> ColumnStat(
min = Some(TimestampNanosVal.fromParts(1L, 0.toShort)),
max = Some(TimestampNanosVal.fromParts(4L, 0.toShort)))))

val s3 = 2.toShort
val s4 = 6.toShort
Expand Down Expand Up @@ -133,7 +137,10 @@ class UnionEstimationSuite extends StatsEstimationTestBase {
max = Some(8)),
AttributeReference("cdttimestamp1", DayTimeIntervalType())() -> ColumnStat(
min = Some(4L),
max = Some(8L))))
max = Some(8L)),
AttributeReference("ctsnanos1", TimestampNTZNanosType(9))() -> ColumnStat(
min = Some(TimestampNanosVal.fromParts(3L, 0.toShort)),
max = Some(TimestampNanosVal.fromParts(6L, 0.toShort)))))

val child1 = StatsTestPlan(
outputList = columnInfo.keys.toSeq.sortWith(_.exprId.id < _.exprId.id),
Expand Down Expand Up @@ -167,7 +174,10 @@ class UnionEstimationSuite extends StatsEstimationTestBase {
attrTimestamp -> ColumnStat(min = Some(1L), max = Some(6L)),
attrTimestampNTZ -> ColumnStat(min = Some(1L), max = Some(6L)),
attrYMInterval -> ColumnStat(min = Some(2), max = Some(8)),
attrDTInterval -> ColumnStat(min = Some(2L), max = Some(8L)))))
attrDTInterval -> ColumnStat(min = Some(2L), max = Some(8L)),
attrTsNanos -> ColumnStat(
min = Some(TimestampNanosVal.fromParts(1L, 0.toShort)),
max = Some(TimestampNanosVal.fromParts(6L, 0.toShort))))))
assert(union.stats === expectedStats)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,9 @@ object CommandUtils extends Logging {
case _: IntegralType => true
case _: DecimalType => true
case DoubleType | FloatType => true
// Exclude nanosecond timestamp types: ApproximatePercentile and
// ApproxCountDistinctForIntervals do not accept them yet (separate scope).
case _: AnyTimestampNanoType => false
case _: DatetimeType => true
case _ => false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.test.SQLTestData.ArrayData
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.TimestampNanosVal
import org.apache.spark.util.Utils


Expand Down Expand Up @@ -205,6 +206,67 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
}
}

test("SPARK-57839: nanosecond-precision timestamp column stats lossless round trip") {
// Use a sub-microsecond value (nanosWithinMicro != 0) to prove nanos survive serialization.
val ntzNanosType = TimestampNTZNanosType(9)
val ltzNanosType = TimestampLTZNanosType(9)
val subMicroVal = TimestampNanosVal.fromParts(1640995201123456L, 789.toShort)

// NTZ nanos round-trip
val ntzStr = CatalogColumnStat.toExternalString(subMicroVal, "col", ntzNanosType)
val ntzParsed = CatalogColumnStat.fromExternalString(ntzStr, "col", ntzNanosType, 2)
assert(ntzParsed === subMicroVal,
s"NTZ nanos round-trip failed: formatted='$ntzStr', parsed=$ntzParsed, expected=$subMicroVal")

// LTZ nanos round-trip
val ltzStr = CatalogColumnStat.toExternalString(subMicroVal, "col", ltzNanosType)
val ltzParsed = CatalogColumnStat.fromExternalString(ltzStr, "col", ltzNanosType, 2)
assert(ltzParsed === subMicroVal,
s"LTZ nanos round-trip failed: formatted='$ltzStr', parsed=$ltzParsed, expected=$subMicroVal")

// Full CatalogColumnStat map round-trip
val catalogStat = CatalogColumnStat(
distinctCount = Some(1),
min = Some(ntzStr),
max = Some(ntzStr),
nullCount = Some(0),
avgLen = Some(10),
maxLen = Some(10))
val map = catalogStat.toMap("ctsnanos")
val roundtrip = CatalogColumnStat.fromMap("t", "ctsnanos", map)
assert(roundtrip === Some(catalogStat))
}

test("SPARK-57839: ANALYZE TABLE FOR COLUMNS on nanosecond timestamp collects basic stats") {
// Nanosecond timestamp columns should collect basic stats (min/max/ndv) but skip histogram,
// since ApproximatePercentile and ApproxCountDistinctForIntervals do not accept nanos types.
val table = "nanos_analyze_test"
withTable(table) {
sql(
s"""CREATE TABLE $table (id INT, ts_ntz TIMESTAMP_NTZ(9), ts_ltz TIMESTAMP_LTZ(9))
|USING PARQUET""".stripMargin)
sql(s"INSERT INTO $table VALUES (1, TIMESTAMP_NTZ'2024-01-01 00:00:00.123456789', " +
"TIMESTAMP_LTZ'2024-01-01 00:00:00.123456789')")
sql(s"INSERT INTO $table VALUES (2, TIMESTAMP_NTZ'2024-06-15 12:30:00.987654321', " +
"TIMESTAMP_LTZ'2024-06-15 12:30:00.987654321')")

// Should succeed with histograms enabled - nanos cols get basic stats, no histogram error
withSQLConf(SQLConf.HISTOGRAM_ENABLED.key -> "true") {
sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS ts_ntz, ts_ltz")
}

val stats = getCatalogTable(table).stats
assert(stats.isDefined)
assert(stats.get.colStats.contains("ts_ntz"))
assert(stats.get.colStats.contains("ts_ltz"))
// Verify basic stats collected (no histogram field for nanos columns)
val ntzStats = stats.get.colStats("ts_ntz")
assert(ntzStats.distinctCount.contains(2))
assert(ntzStats.nullCount.contains(0))
assert(ntzStats.histogram.isEmpty, "Nanos columns should not have histograms")
}
}

test("analyze column command - result verification") {
// (data.head.productArity - 1) because the last column does not support stats collection.
assert(stats.size == data.head.productArity - 1)
Expand Down