Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.util.{ArrayData, DateFormatter, DateTimeUti
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
import org.apache.spark.sql.types._
import org.apache.spark.types.variant.VariantUtil
import org.apache.spark.unsafe.types.{UTF8String, VariantVal}
import org.apache.spark.unsafe.types.{TimestampNanosVal, UTF8String, VariantVal}

class StaxXmlGenerator(
schema: DataType,
Expand Down Expand Up @@ -200,6 +200,10 @@ class StaxXmlGenerator(
gen.writeCharacters(timestampFormatter.format(v))
case (TimestampNTZType, v: Long) =>
gen.writeCharacters(timestampNTZFormatter.format(DateTimeUtils.microsToLocalDateTime(v)))
case (t: TimestampLTZNanosType, v: TimestampNanosVal) =>
gen.writeCharacters(timestampFormatter.formatNanos(v, t.precision))
case (t: TimestampNTZNanosType, v: TimestampNanosVal) =>
gen.writeCharacters(timestampNTZFormatter.formatWithoutTimeZoneNanos(v, t.precision))
case (DateType, v: Int) =>
gen.writeCharacters(dateFormatter.format(v))
case (_: TimeType, v: Long) => gen.writeCharacters(timeFormatter.format(v))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,9 @@ class StaxXmlParser(
Decimal(decimalParser(datum), dt.precision, dt.scale)
case _: TimestampType => parseXmlTimestamp(datum, options)
case _: TimestampNTZType => timestampNTZFormatter.parseWithoutTimeZone(datum, false)
case t: TimestampLTZNanosType => timestampFormatter.parseNanos(datum, t.precision)
case t: TimestampNTZNanosType =>
timestampNTZFormatter.parseWithoutTimeZoneNanos(datum, t.precision, false)
case _: DateType => parseXmlDate(datum, options)
case _: TimeType => timeFormatter.parse(datum)
case _: StringType => UTF8String.fromString(datum)
Expand Down Expand Up @@ -652,6 +655,8 @@ class StaxXmlParser(
case DateType => castTo(value, DateType)
case TimestampType => castTo(value, TimestampType)
case TimestampNTZType => castTo(value, TimestampNTZType)
case t: TimestampLTZNanosType => castTo(value, t)
case t: TimestampNTZNanosType => castTo(value, t)
case _: TimeType => castTo(value, TimeType())
case FloatType => signSafeToFloat(value)
case ByteType => castTo(value, ByteType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,15 @@ class XmlInferSchema(private val options: XmlOptions, private val caseSensitive:
if ((SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY ||
timestampType == TimestampNTZType) &&
timestampNTZFormatter.parseWithoutTimeZoneOptional(field, false).isDefined) {
if (SQLConf.get.timestampNanosTypesEnabled) {
// Prefer nanosecond type when the fractional seconds part has more than 6 digits,
// indicating sub-microsecond precision that cannot be represented by TimestampNTZType.
val nanosOpt =
timestampNTZFormatter.parseWithoutTimeZoneNanosOptional(field, 9, false)
nanosOpt.filter(_.nanosWithinMicro != 0).foreach { _ =>
return Some(TimestampNTZNanosType(9))
}
}
return Some(timestampType)
}
} catch {
Expand Down Expand Up @@ -641,8 +650,45 @@ object XmlInferSchema {
(t1: DataType, t2: DataType): DataType = {

// TODO: Optimise this logic.
// AnyTimestampNanoType extends DatetimeType but is not covered by findWiderDateTimeType;
// handle it first to avoid a MatchError inside TypeCoercion.findTightestCommonType.
// StructType and ArrayType are also handled here so that compatibleType is used recursively
// for nested field types, preserving the nano-timestamp downgrade logic at all nesting levels.
// (TypeCoercion.findTightestCommonType handles same-structure StructType/ArrayType via
// findTypeForComplex, which calls findWiderDateTimeType and would bypass the custom logic.)
(t1, t2) match {
case (n1: TimestampNTZNanosType, n2: TimestampNTZNanosType) =>
return TimestampNTZNanosType(math.max(n1.precision, n2.precision))
case (n1: TimestampLTZNanosType, n2: TimestampLTZNanosType) =>
return TimestampLTZNanosType(math.max(n1.precision, n2.precision))
case (_: TimestampNTZNanosType, TimestampNTZType) |
(TimestampNTZType, _: TimestampNTZNanosType) =>
return TimestampNTZType
case (_: AnyTimestampNanoType, _: DatetimeType) |
(_: DatetimeType, _: AnyTimestampNanoType) =>
return TimestampType
case (StructType(fields1), StructType(fields2)) =>
val newFields = (fields1 ++ fields2)
// normalize field name and pair it with original field
.map(field => (normalize(field.name, caseSensitive), field))
.groupBy(_._1) // group by normalized field name
.map { case (_: String, fields: Array[(String, StructField)]) =>
val fieldTypes = fields.map(_._2)
val dataType = fieldTypes.map(_.dataType)
.reduce(compatibleType(caseSensitive, valueTag))
// we pick up the first field name that we've encountered for the field
StructField(fields.head._2.name, dataType)
}
return StructType(newFields.toArray.sortBy(_.name))
case (ArrayType(elementType1, containsNull1), ArrayType(elementType2, containsNull2)) =>
return ArrayType(
compatibleType(caseSensitive, valueTag)(elementType1, elementType2),
containsNull1 || containsNull2)
case _ =>
}

TypeCoercion.findTightestCommonType(t1, t2).getOrElse {
// t1 or t2 is a StructType, ArrayType, or an unexpected type.
// t1 or t2 is an unexpected type combination (DecimalType variants, valueTag structs, etc.)
(t1, t2) match {
// Double support larger range than fixed decimal, DecimalType.Maximum should be enough
// in most case, also have better precision.
Expand All @@ -661,25 +707,6 @@ object XmlInferSchema {
case (TimestampNTZType, TimestampType) | (TimestampType, TimestampNTZType) =>
TimestampType

case (StructType(fields1), StructType(fields2)) =>
val newFields = (fields1 ++ fields2)
// normalize field name and pair it with original field
.map(field => (normalize(field.name, caseSensitive), field))
.groupBy(_._1) // group by normalized field name
.map { case (_: String, fields: Array[(String, StructField)]) =>
val fieldTypes = fields.map(_._2)
val dataType = fieldTypes.map(_.dataType)
.reduce(compatibleType(caseSensitive, valueTag))
// we pick up the first field name that we've encountered for the field
StructField(fields.head._2.name, dataType)
}
StructType(newFields.toArray.sortBy(_.name))

case (ArrayType(elementType1, containsNull1), ArrayType(elementType2, containsNull2)) =>
ArrayType(
compatibleType(caseSensitive, valueTag)(
elementType1, elementType2), containsNull1 || containsNull2)

// In XML datasource, since StructType can be compared with ArrayType.
// In this case, ArrayType wraps the StructType.
case (ArrayType(ty1, _), ty2) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,6 @@ case class XmlFileFormat() extends TextBasedFileFormat with DataSourceRegister {

case _: GeometryType | _: GeographyType => false

// Nanosecond-capable timestamps are not yet supported by this datasource.
case _: AnyTimestampNanoType => false

case _: AtomicType => true

case st: StructType => st.forall { f => supportDataType(f.dataType) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1341,7 +1341,7 @@ class FileBasedDataSourceSuite extends SharedSparkSession

test("SPARK-57166: nanosecond timestamp types are not supported in selected file data sources") {
// Parquet and ORC support nanosecond-capable timestamps, while these formats still reject them.
val unsupportedDataSources = Seq("json", "xml")
val unsupportedDataSources = Seq("json")
val nanosTypes = Seq(TimestampNTZNanosType(9), TimestampLTZNanosType(9))
withSQLConf(SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key -> "true") {
// Test both v1 and v2 data sources.
Expand Down Expand Up @@ -1635,6 +1635,89 @@ class FileBasedDataSourceSuite extends SharedSparkSession
}
}
}

test("SPARK-57458: XML supports nanosecond timestamp types") {
withSQLConf(SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key -> "true") {
foreachNanosPrecision { precision =>
Seq(TimestampNTZNanosType(precision), TimestampLTZNanosType(precision)).foreach {
nanosType =>
withTempDir { dir =>
val wallClock = LocalDateTime.of(1970, 1, 1, 0, 20, 34, 567890123)
val (value, fmtKey, fmtVal) = nanosType match {
case _: TimestampNTZNanosType =>
(wallClock.asInstanceOf[Any],
"timestampNTZFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS")
case _: TimestampLTZNanosType =>
(wallClock.toInstant(ZoneOffset.UTC).asInstanceOf[Any],
"timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSSXXX")
}
val df = spark.createDataFrame(
spark.sparkContext.parallelize(Seq(Row(value))),
new StructType().add("ts", nanosType))
val path = new File(dir, s"xml_nanos_${nanosType.typeName}").getCanonicalPath
df.write.format("xml").option("rowTag", "row")
.option(fmtKey, fmtVal).mode("overwrite").save(path)
val readBack = spark.read.schema(new StructType().add("ts", nanosType))
.format("xml").option("rowTag", "row").option(fmtKey, fmtVal).load(path)
checkAnswer(readBack, df)
}
}
}
}
}

test("SPARK-57458: XML rejects nanosecond timestamps under the LEGACY time parser policy") {
// The legacy timestamp formatter cannot represent sub-microsecond digits, so the nanos
// formatter methods raise TIMESTAMP_NANOS_WITH_LEGACY_TIME_PARSER. Only the LTZ formatter
// is legacy under this policy (the NTZ formatter always uses the ISO-8601 path), so this
// covers TimestampLTZNanosType. XML is v1-only so there is no v1/v2 branching to test.
def rootNanosError(e: Throwable): SparkUnsupportedOperationException = {
var cause: Throwable = e
while (cause != null && !cause.isInstanceOf[SparkUnsupportedOperationException]) {
cause = cause.getCause
}
assert(cause != null,
s"Expected TIMESTAMP_NANOS_WITH_LEGACY_TIME_PARSER, but got: ${e.getMessage}")
cause.asInstanceOf[SparkUnsupportedOperationException]
}

withSQLConf(
SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key -> "true",
SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "LEGACY") {
val nanosType = TimestampLTZNanosType(9)
val schema = new StructType().add("ts", nanosType)
val expectedParameters =
Map("config" -> ("\"" + SQLConf.LEGACY_TIME_PARSER_POLICY.key + "\""))
withTempDir { dir =>
// Write path.
val df = spark.createDataFrame(
spark.sparkContext.parallelize(
Seq(Row(LocalDateTime.of(2020, 1, 1, 0, 0, 0, 1).toInstant(ZoneOffset.UTC)))),
schema)
val writeDir = new File(dir, "write").getCanonicalPath
checkError(
exception = rootNanosError(intercept[SparkException] {
df.write.format("xml").option("rowTag", "row").mode("overwrite").save(writeDir)
}),
condition = "UNSUPPORTED_FEATURE.TIMESTAMP_NANOS_WITH_LEGACY_TIME_PARSER",
parameters = expectedParameters)

// Read path: write a benign file first so schema-driven parsing is what fails. Use
// FAILFAST so the unsupported-feature error surfaces instead of being swallowed as a
// null record by the permissive bad-record handling.
val readDir = new File(dir, "read").getCanonicalPath
Seq("a").toDF("ts").write.format("xml").option("rowTag", "row")
.mode("overwrite").save(readDir)
checkError(
exception = rootNanosError(intercept[SparkException] {
spark.read.schema(schema).option("mode", "FAILFAST").option("rowTag", "row")
.format("xml").load(readDir).collect()
}),
condition = "UNSUPPORTED_FEATURE.TIMESTAMP_NANOS_WITH_LEGACY_TIME_PARSER",
parameters = expectedParameters)
}
}
}
}

object TestingUDT {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
package org.apache.spark.sql

import java.text.SimpleDateFormat
import java.time.{LocalDateTime, ZoneOffset}
import java.util.Locale

import scala.jdk.CollectionConverters._

import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.util.TimestampNanosTestUtils
import org.apache.spark.sql.catalyst.util.TimestampNanosTestUtils.foreachNanosPrecision
import org.apache.spark.sql.execution.WholeStageCodegenExec
import org.apache.spark.sql.functions._
Expand All @@ -45,25 +47,45 @@ class XmlFunctionsSuite extends SharedSparkSession {
test("SPARK-57164: from_xml with a nanos timestamp DDL schema string") {
val df = Seq("""<ROW><c>2020-01-01T00:00:00.123456789</c></ROW>""").toDF("value")
// FAILFAST so the value-converter rejection propagates instead of becoming a corrupt record.
// Pin the session timezone to UTC so LTZ values are predictable without a zone in the string.
val options = Map("mode" -> "FAILFAST").asJava
withSQLConf(SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key -> "true") {
withSQLConf(
SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key -> "true",
SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") {
foreachNanosPrecision { p =>
val truncator = TimestampNanosTestUtils.nanoOfSecTruncator(p)
val truncNanos = truncator(123456789)
val expectedNTZ = LocalDateTime.of(2020, 1, 1, 0, 0, 0, truncNanos)
val expectedLTZ = expectedNTZ.toInstant(ZoneOffset.UTC)
Seq(
s"TIMESTAMP_NTZ($p)" -> TimestampNTZNanosType(p),
s"TIMESTAMP_LTZ($p)" -> TimestampLTZNanosType(p),
s"TIMESTAMP($p) WITHOUT TIME ZONE" -> TimestampNTZNanosType(p),
s"TIMESTAMP($p) WITH LOCAL TIME ZONE" -> TimestampLTZNanosType(p)).foreach {
case (spelling, expected) =>
s"TIMESTAMP_NTZ($p)" -> (TimestampNTZNanosType(p), expectedNTZ.asInstanceOf[Any]),
s"TIMESTAMP_LTZ($p)" -> (TimestampLTZNanosType(p), expectedLTZ.asInstanceOf[Any]),
s"TIMESTAMP($p) WITHOUT TIME ZONE" ->
(TimestampNTZNanosType(p), expectedNTZ.asInstanceOf[Any]),
s"TIMESTAMP($p) WITH LOCAL TIME ZONE" ->
(TimestampLTZNanosType(p), expectedLTZ.asInstanceOf[Any])).foreach {
case (spelling, (expectedType, expectedVal)) =>
val parsed = df.select(from_xml($"value", s"c $spelling", options).as("v"))
// The schema string resolves to the nanos type ...
assert(parsed.schema("v").dataType.asInstanceOf[StructType]("c").dataType === expected)
// ... but the XML datasource does not support nanosecond timestamps yet, so the
// value converter rejects it at execution (surfaced as a malformed record in
// FAILFAST mode).
checkError(
exception = intercept[SparkException](parsed.collect()),
condition = "MALFORMED_RECORD_IN_PARSING.WITHOUT_SUGGESTION",
parameters = Map("badRecord" -> "[null]", "failFastMode" -> "FAILFAST"))
val parsedType = parsed.schema("v").dataType.asInstanceOf[StructType]("c").dataType
assert(parsedType === expectedType)
// ... the XML datasource parses the value and round-trips to the expected value.
checkAnswer(parsed, Row(Row(expectedVal)) :: Nil)
}
}
}
}

test("SPARK-57458: from_xml rejects zoned input for NTZ nanos columns") {
// A string with an explicit zone offset must be rejected for TIMESTAMP_NTZ(p) because
// the NTZ parse path uses allowTimeZone=false, matching the micro NTZ and CSV behaviour.
val df = Seq("""<ROW><c>2020-01-01T00:00:00.123456789+05:00</c></ROW>""").toDF("value")
val options = Map("mode" -> "FAILFAST").asJava
withSQLConf(SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key -> "true") {
foreachNanosPrecision { p =>
val parsed = df.select(from_xml($"value", s"c TIMESTAMP_NTZ($p)", options).as("v"))
intercept[SparkException] {
parsed.collect()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3694,6 +3694,74 @@ class XmlSuite
assert(XmlOptions.isValidOption("encoding"))
assert(XmlOptions.isValidOption("charset"))
}

// Full-precision format pattern for nanosecond NTZ timestamp schema-inference tests.
private val ntzNanosFormat = "yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS"

test("SPARK-57458: XML infers nanosecond NTZ timestamps from sub-microsecond fractional digits") {
withSQLConf(
SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key -> "true",
SQLConf.TIMESTAMP_TYPE.key -> "TIMESTAMP_NTZ") {
// Write a nanosecond DataFrame with a 9-digit format so the XML values carry 9 fractional
// digits; then read without a schema so the type is inferred from the string values.
val wallClock = LocalDateTime.of(2025, 6, 15, 12, 30, 45, 123456789)
val ntzType = TimestampNTZNanosType(9)
val inputDf = spark.createDataFrame(
spark.sparkContext.parallelize(Seq(Row(wallClock))),
new StructType().add("ts", ntzType))
withTempPath { dir =>
val path = dir.getCanonicalPath
inputDf.write.format("xml").option("rowTag", "ROW")
.option("timestampNTZFormat", ntzNanosFormat).mode("overwrite").save(path)
val df = spark.read.format("xml").option("rowTag", "ROW").load(path)
assert(df.schema("ts").dataType === TimestampNTZNanosType(9),
s"Expected TimestampNTZNanosType(9), got ${df.schema("ts").dataType}")
}
}
}

test("SPARK-57458: XML inferred type is TimestampNTZType for mixed micro/nano NTZ rows") {
// When some rows have >6 fractional digits (nano) and others have <=6 (micro), the inferred
// type must be TimestampNTZType (not TimestampType / LTZ), because all values are zone-free.
withSQLConf(
SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key -> "true",
SQLConf.TIMESTAMP_TYPE.key -> "TIMESTAMP_NTZ") {
val xmlContent =
"""<root><row><ts>2025-06-15T12:30:45.123456789</ts></row>
|<row><ts>2025-06-15T12:30:45.123456</ts></row></root>""".stripMargin
withTempDir { dir =>
val path = new File(dir, "mixed.xml").getCanonicalPath
Files.write(Paths.get(path),
xmlContent.getBytes(StandardCharsets.UTF_8))
val df = spark.read.format("xml").option("rowTag", "row")
.option("rootTag", "root").load(path)
assert(df.schema("ts").dataType === TimestampNTZType,
s"Expected TimestampNTZType, got ${df.schema("ts").dataType}")
}
}
}

test("SPARK-57458: nano timestamp + non-datetime field widens to StringType during inference") {
// When a field is a nano-precision timestamp in some rows and a non-datetime value in others,
// inference must fall back to StringType (matching the micro-precision path) rather than
// widening to TimestampType and then failing at read time.
withSQLConf(
SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key -> "true",
SQLConf.TIMESTAMP_TYPE.key -> "TIMESTAMP_NTZ") {
val xmlContent =
"""<root><row><ts>2025-06-15T12:30:45.123456789</ts></row>
|<row><ts>not-a-timestamp</ts></row></root>""".stripMargin
withTempDir { dir =>
val path = new File(dir, "nano_nondatetime.xml").getCanonicalPath
Files.write(Paths.get(path), xmlContent.getBytes(StandardCharsets.UTF_8))
val df = spark.read.format("xml").option("rowTag", "row")
.option("rootTag", "root").load(path)
assert(df.schema("ts").dataType === StringType,
s"Expected StringType for nano + non-datetime, got ${df.schema("ts").dataType}")
}
}
}

}

class XmlSuiteWithLegacyParser extends XmlSuite {
Expand Down