diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlGenerator.scala index 6e381a2974c79..558cd8acf61cb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlGenerator.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.util.{ArrayData, DateFormatter, DateTimeUti import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT import org.apache.spark.sql.types._ import org.apache.spark.types.variant.VariantUtil -import org.apache.spark.unsafe.types.{UTF8String, VariantVal} +import org.apache.spark.unsafe.types.{TimestampNanosVal, UTF8String, VariantVal} class StaxXmlGenerator( schema: DataType, @@ -200,6 +200,10 @@ class StaxXmlGenerator( gen.writeCharacters(timestampFormatter.format(v)) case (TimestampNTZType, v: Long) => gen.writeCharacters(timestampNTZFormatter.format(DateTimeUtils.microsToLocalDateTime(v))) + case (t: TimestampLTZNanosType, v: TimestampNanosVal) => + gen.writeCharacters(timestampFormatter.formatNanos(v, t.precision)) + case (t: TimestampNTZNanosType, v: TimestampNanosVal) => + gen.writeCharacters(timestampNTZFormatter.formatWithoutTimeZoneNanos(v, t.precision)) case (DateType, v: Int) => gen.writeCharacters(dateFormatter.format(v)) case (_: TimeType, v: Long) => gen.writeCharacters(timeFormatter.format(v)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala index 593e6a3a97e1a..f4eb443d95f52 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala @@ -601,6 +601,9 @@ class StaxXmlParser( Decimal(decimalParser(datum), dt.precision, dt.scale) case _: TimestampType => parseXmlTimestamp(datum, options) case _: TimestampNTZType => timestampNTZFormatter.parseWithoutTimeZone(datum, false) + case t: TimestampLTZNanosType => timestampFormatter.parseNanos(datum, t.precision) + case t: TimestampNTZNanosType => + timestampNTZFormatter.parseWithoutTimeZoneNanos(datum, t.precision, false) case _: DateType => parseXmlDate(datum, options) case _: TimeType => timeFormatter.parse(datum) case _: StringType => UTF8String.fromString(datum) @@ -652,6 +655,8 @@ class StaxXmlParser( case DateType => castTo(value, DateType) case TimestampType => castTo(value, TimestampType) case TimestampNTZType => castTo(value, TimestampNTZType) + case t: TimestampLTZNanosType => castTo(value, t) + case t: TimestampNTZNanosType => castTo(value, t) case _: TimeType => castTo(value, TimeType()) case FloatType => signSafeToFloat(value) case ByteType => castTo(value, ByteType) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlInferSchema.scala index d6a6866595893..7fbce121e6245 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlInferSchema.scala @@ -542,6 +542,15 @@ class XmlInferSchema(private val options: XmlOptions, private val caseSensitive: if ((SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY || timestampType == TimestampNTZType) && timestampNTZFormatter.parseWithoutTimeZoneOptional(field, false).isDefined) { + if (SQLConf.get.timestampNanosTypesEnabled) { + // Prefer nanosecond type when the fractional seconds part has more than 6 digits, + // indicating sub-microsecond precision that cannot be represented by TimestampNTZType. + val nanosOpt = + timestampNTZFormatter.parseWithoutTimeZoneNanosOptional(field, 9, false) + nanosOpt.filter(_.nanosWithinMicro != 0).foreach { _ => + return Some(TimestampNTZNanosType(9)) + } + } return Some(timestampType) } } catch { @@ -641,8 +650,45 @@ object XmlInferSchema { (t1: DataType, t2: DataType): DataType = { // TODO: Optimise this logic. + // AnyTimestampNanoType extends DatetimeType but is not covered by findWiderDateTimeType; + // handle it first to avoid a MatchError inside TypeCoercion.findTightestCommonType. + // StructType and ArrayType are also handled here so that compatibleType is used recursively + // for nested field types, preserving the nano-timestamp downgrade logic at all nesting levels. + // (TypeCoercion.findTightestCommonType handles same-structure StructType/ArrayType via + // findTypeForComplex, which calls findWiderDateTimeType and would bypass the custom logic.) + (t1, t2) match { + case (n1: TimestampNTZNanosType, n2: TimestampNTZNanosType) => + return TimestampNTZNanosType(math.max(n1.precision, n2.precision)) + case (n1: TimestampLTZNanosType, n2: TimestampLTZNanosType) => + return TimestampLTZNanosType(math.max(n1.precision, n2.precision)) + case (_: TimestampNTZNanosType, TimestampNTZType) | + (TimestampNTZType, _: TimestampNTZNanosType) => + return TimestampNTZType + case (_: AnyTimestampNanoType, _: DatetimeType) | + (_: DatetimeType, _: AnyTimestampNanoType) => + return TimestampType + case (StructType(fields1), StructType(fields2)) => + val newFields = (fields1 ++ fields2) + // normalize field name and pair it with original field + .map(field => (normalize(field.name, caseSensitive), field)) + .groupBy(_._1) // group by normalized field name + .map { case (_: String, fields: Array[(String, StructField)]) => + val fieldTypes = fields.map(_._2) + val dataType = fieldTypes.map(_.dataType) + .reduce(compatibleType(caseSensitive, valueTag)) + // we pick up the first field name that we've encountered for the field + StructField(fields.head._2.name, dataType) + } + return StructType(newFields.toArray.sortBy(_.name)) + case (ArrayType(elementType1, containsNull1), ArrayType(elementType2, containsNull2)) => + return ArrayType( + compatibleType(caseSensitive, valueTag)(elementType1, elementType2), + containsNull1 || containsNull2) + case _ => + } + TypeCoercion.findTightestCommonType(t1, t2).getOrElse { - // t1 or t2 is a StructType, ArrayType, or an unexpected type. + // t1 or t2 is an unexpected type combination (DecimalType variants, valueTag structs, etc.) (t1, t2) match { // Double support larger range than fixed decimal, DecimalType.Maximum should be enough // in most case, also have better precision. @@ -661,25 +707,6 @@ object XmlInferSchema { case (TimestampNTZType, TimestampType) | (TimestampType, TimestampNTZType) => TimestampType - case (StructType(fields1), StructType(fields2)) => - val newFields = (fields1 ++ fields2) - // normalize field name and pair it with original field - .map(field => (normalize(field.name, caseSensitive), field)) - .groupBy(_._1) // group by normalized field name - .map { case (_: String, fields: Array[(String, StructField)]) => - val fieldTypes = fields.map(_._2) - val dataType = fieldTypes.map(_.dataType) - .reduce(compatibleType(caseSensitive, valueTag)) - // we pick up the first field name that we've encountered for the field - StructField(fields.head._2.name, dataType) - } - StructType(newFields.toArray.sortBy(_.name)) - - case (ArrayType(elementType1, containsNull1), ArrayType(elementType2, containsNull2)) => - ArrayType( - compatibleType(caseSensitive, valueTag)( - elementType1, elementType2), containsNull1 || containsNull2) - // In XML datasource, since StructType can be compared with ArrayType. // In this case, ArrayType wraps the StructType. case (ArrayType(ty1, _), ty2) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlFileFormat.scala index ff4e57570a1d6..da81ff327c74a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlFileFormat.scala @@ -149,9 +149,6 @@ case class XmlFileFormat() extends TextBasedFileFormat with DataSourceRegister { case _: GeometryType | _: GeographyType => false - // Nanosecond-capable timestamps are not yet supported by this datasource. - case _: AnyTimestampNanoType => false - case _: AtomicType => true case st: StructType => st.forall { f => supportDataType(f.dataType) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index 11f75d3d9a7a8..16d2bc0d3082f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -1341,7 +1341,7 @@ class FileBasedDataSourceSuite extends SharedSparkSession test("SPARK-57166: nanosecond timestamp types are not supported in selected file data sources") { // Parquet and ORC support nanosecond-capable timestamps, while these formats still reject them. - val unsupportedDataSources = Seq("json", "xml") + val unsupportedDataSources = Seq("json") val nanosTypes = Seq(TimestampNTZNanosType(9), TimestampLTZNanosType(9)) withSQLConf(SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key -> "true") { // Test both v1 and v2 data sources. @@ -1635,6 +1635,89 @@ class FileBasedDataSourceSuite extends SharedSparkSession } } } + + test("SPARK-57458: XML supports nanosecond timestamp types") { + withSQLConf(SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key -> "true") { + foreachNanosPrecision { precision => + Seq(TimestampNTZNanosType(precision), TimestampLTZNanosType(precision)).foreach { + nanosType => + withTempDir { dir => + val wallClock = LocalDateTime.of(1970, 1, 1, 0, 20, 34, 567890123) + val (value, fmtKey, fmtVal) = nanosType match { + case _: TimestampNTZNanosType => + (wallClock.asInstanceOf[Any], + "timestampNTZFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS") + case _: TimestampLTZNanosType => + (wallClock.toInstant(ZoneOffset.UTC).asInstanceOf[Any], + "timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSSXXX") + } + val df = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(value))), + new StructType().add("ts", nanosType)) + val path = new File(dir, s"xml_nanos_${nanosType.typeName}").getCanonicalPath + df.write.format("xml").option("rowTag", "row") + .option(fmtKey, fmtVal).mode("overwrite").save(path) + val readBack = spark.read.schema(new StructType().add("ts", nanosType)) + .format("xml").option("rowTag", "row").option(fmtKey, fmtVal).load(path) + checkAnswer(readBack, df) + } + } + } + } + } + + test("SPARK-57458: XML rejects nanosecond timestamps under the LEGACY time parser policy") { + // The legacy timestamp formatter cannot represent sub-microsecond digits, so the nanos + // formatter methods raise TIMESTAMP_NANOS_WITH_LEGACY_TIME_PARSER. Only the LTZ formatter + // is legacy under this policy (the NTZ formatter always uses the ISO-8601 path), so this + // covers TimestampLTZNanosType. XML is v1-only so there is no v1/v2 branching to test. + def rootNanosError(e: Throwable): SparkUnsupportedOperationException = { + var cause: Throwable = e + while (cause != null && !cause.isInstanceOf[SparkUnsupportedOperationException]) { + cause = cause.getCause + } + assert(cause != null, + s"Expected TIMESTAMP_NANOS_WITH_LEGACY_TIME_PARSER, but got: ${e.getMessage}") + cause.asInstanceOf[SparkUnsupportedOperationException] + } + + withSQLConf( + SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key -> "true", + SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "LEGACY") { + val nanosType = TimestampLTZNanosType(9) + val schema = new StructType().add("ts", nanosType) + val expectedParameters = + Map("config" -> ("\"" + SQLConf.LEGACY_TIME_PARSER_POLICY.key + "\"")) + withTempDir { dir => + // Write path. + val df = spark.createDataFrame( + spark.sparkContext.parallelize( + Seq(Row(LocalDateTime.of(2020, 1, 1, 0, 0, 0, 1).toInstant(ZoneOffset.UTC)))), + schema) + val writeDir = new File(dir, "write").getCanonicalPath + checkError( + exception = rootNanosError(intercept[SparkException] { + df.write.format("xml").option("rowTag", "row").mode("overwrite").save(writeDir) + }), + condition = "UNSUPPORTED_FEATURE.TIMESTAMP_NANOS_WITH_LEGACY_TIME_PARSER", + parameters = expectedParameters) + + // Read path: write a benign file first so schema-driven parsing is what fails. Use + // FAILFAST so the unsupported-feature error surfaces instead of being swallowed as a + // null record by the permissive bad-record handling. + val readDir = new File(dir, "read").getCanonicalPath + Seq("a").toDF("ts").write.format("xml").option("rowTag", "row") + .mode("overwrite").save(readDir) + checkError( + exception = rootNanosError(intercept[SparkException] { + spark.read.schema(schema).option("mode", "FAILFAST").option("rowTag", "row") + .format("xml").load(readDir).collect() + }), + condition = "UNSUPPORTED_FEATURE.TIMESTAMP_NANOS_WITH_LEGACY_TIME_PARSER", + parameters = expectedParameters) + } + } + } } object TestingUDT { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/XmlFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/XmlFunctionsSuite.scala index ef875064e0ff0..dd6bc024dd8e0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/XmlFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/XmlFunctionsSuite.scala @@ -18,11 +18,13 @@ package org.apache.spark.sql import java.text.SimpleDateFormat +import java.time.{LocalDateTime, ZoneOffset} import java.util.Locale import scala.jdk.CollectionConverters._ import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.util.TimestampNanosTestUtils import org.apache.spark.sql.catalyst.util.TimestampNanosTestUtils.foreachNanosPrecision import org.apache.spark.sql.execution.WholeStageCodegenExec import org.apache.spark.sql.functions._ @@ -45,25 +47,45 @@ class XmlFunctionsSuite extends SharedSparkSession { test("SPARK-57164: from_xml with a nanos timestamp DDL schema string") { val df = Seq("""2020-01-01T00:00:00.123456789""").toDF("value") // FAILFAST so the value-converter rejection propagates instead of becoming a corrupt record. + // Pin the session timezone to UTC so LTZ values are predictable without a zone in the string. val options = Map("mode" -> "FAILFAST").asJava - withSQLConf(SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key -> "true") { + withSQLConf( + SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key -> "true", + SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") { foreachNanosPrecision { p => + val truncator = TimestampNanosTestUtils.nanoOfSecTruncator(p) + val truncNanos = truncator(123456789) + val expectedNTZ = LocalDateTime.of(2020, 1, 1, 0, 0, 0, truncNanos) + val expectedLTZ = expectedNTZ.toInstant(ZoneOffset.UTC) Seq( - s"TIMESTAMP_NTZ($p)" -> TimestampNTZNanosType(p), - s"TIMESTAMP_LTZ($p)" -> TimestampLTZNanosType(p), - s"TIMESTAMP($p) WITHOUT TIME ZONE" -> TimestampNTZNanosType(p), - s"TIMESTAMP($p) WITH LOCAL TIME ZONE" -> TimestampLTZNanosType(p)).foreach { - case (spelling, expected) => + s"TIMESTAMP_NTZ($p)" -> (TimestampNTZNanosType(p), expectedNTZ.asInstanceOf[Any]), + s"TIMESTAMP_LTZ($p)" -> (TimestampLTZNanosType(p), expectedLTZ.asInstanceOf[Any]), + s"TIMESTAMP($p) WITHOUT TIME ZONE" -> + (TimestampNTZNanosType(p), expectedNTZ.asInstanceOf[Any]), + s"TIMESTAMP($p) WITH LOCAL TIME ZONE" -> + (TimestampLTZNanosType(p), expectedLTZ.asInstanceOf[Any])).foreach { + case (spelling, (expectedType, expectedVal)) => val parsed = df.select(from_xml($"value", s"c $spelling", options).as("v")) // The schema string resolves to the nanos type ... - assert(parsed.schema("v").dataType.asInstanceOf[StructType]("c").dataType === expected) - // ... but the XML datasource does not support nanosecond timestamps yet, so the - // value converter rejects it at execution (surfaced as a malformed record in - // FAILFAST mode). - checkError( - exception = intercept[SparkException](parsed.collect()), - condition = "MALFORMED_RECORD_IN_PARSING.WITHOUT_SUGGESTION", - parameters = Map("badRecord" -> "[null]", "failFastMode" -> "FAILFAST")) + val parsedType = parsed.schema("v").dataType.asInstanceOf[StructType]("c").dataType + assert(parsedType === expectedType) + // ... the XML datasource parses the value and round-trips to the expected value. + checkAnswer(parsed, Row(Row(expectedVal)) :: Nil) + } + } + } + } + + test("SPARK-57458: from_xml rejects zoned input for NTZ nanos columns") { + // A string with an explicit zone offset must be rejected for TIMESTAMP_NTZ(p) because + // the NTZ parse path uses allowTimeZone=false, matching the micro NTZ and CSV behaviour. + val df = Seq("""2020-01-01T00:00:00.123456789+05:00""").toDF("value") + val options = Map("mode" -> "FAILFAST").asJava + withSQLConf(SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key -> "true") { + foreachNanosPrecision { p => + val parsed = df.select(from_xml($"value", s"c TIMESTAMP_NTZ($p)", options).as("v")) + intercept[SparkException] { + parsed.collect() } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala index a1aae1481f815..8a8ab4de3a8f8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala @@ -3694,6 +3694,74 @@ class XmlSuite assert(XmlOptions.isValidOption("encoding")) assert(XmlOptions.isValidOption("charset")) } + + // Full-precision format pattern for nanosecond NTZ timestamp schema-inference tests. + private val ntzNanosFormat = "yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS" + + test("SPARK-57458: XML infers nanosecond NTZ timestamps from sub-microsecond fractional digits") { + withSQLConf( + SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key -> "true", + SQLConf.TIMESTAMP_TYPE.key -> "TIMESTAMP_NTZ") { + // Write a nanosecond DataFrame with a 9-digit format so the XML values carry 9 fractional + // digits; then read without a schema so the type is inferred from the string values. + val wallClock = LocalDateTime.of(2025, 6, 15, 12, 30, 45, 123456789) + val ntzType = TimestampNTZNanosType(9) + val inputDf = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(wallClock))), + new StructType().add("ts", ntzType)) + withTempPath { dir => + val path = dir.getCanonicalPath + inputDf.write.format("xml").option("rowTag", "ROW") + .option("timestampNTZFormat", ntzNanosFormat).mode("overwrite").save(path) + val df = spark.read.format("xml").option("rowTag", "ROW").load(path) + assert(df.schema("ts").dataType === TimestampNTZNanosType(9), + s"Expected TimestampNTZNanosType(9), got ${df.schema("ts").dataType}") + } + } + } + + test("SPARK-57458: XML inferred type is TimestampNTZType for mixed micro/nano NTZ rows") { + // When some rows have >6 fractional digits (nano) and others have <=6 (micro), the inferred + // type must be TimestampNTZType (not TimestampType / LTZ), because all values are zone-free. + withSQLConf( + SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key -> "true", + SQLConf.TIMESTAMP_TYPE.key -> "TIMESTAMP_NTZ") { + val xmlContent = + """2025-06-15T12:30:45.123456789 + |2025-06-15T12:30:45.123456""".stripMargin + withTempDir { dir => + val path = new File(dir, "mixed.xml").getCanonicalPath + Files.write(Paths.get(path), + xmlContent.getBytes(StandardCharsets.UTF_8)) + val df = spark.read.format("xml").option("rowTag", "row") + .option("rootTag", "root").load(path) + assert(df.schema("ts").dataType === TimestampNTZType, + s"Expected TimestampNTZType, got ${df.schema("ts").dataType}") + } + } + } + + test("SPARK-57458: nano timestamp + non-datetime field widens to StringType during inference") { + // When a field is a nano-precision timestamp in some rows and a non-datetime value in others, + // inference must fall back to StringType (matching the micro-precision path) rather than + // widening to TimestampType and then failing at read time. + withSQLConf( + SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key -> "true", + SQLConf.TIMESTAMP_TYPE.key -> "TIMESTAMP_NTZ") { + val xmlContent = + """2025-06-15T12:30:45.123456789 + |not-a-timestamp""".stripMargin + withTempDir { dir => + val path = new File(dir, "nano_nondatetime.xml").getCanonicalPath + Files.write(Paths.get(path), xmlContent.getBytes(StandardCharsets.UTF_8)) + val df = spark.read.format("xml").option("rowTag", "row") + .option("rootTag", "root").load(path) + assert(df.schema("ts").dataType === StringType, + s"Expected StringType for nano + non-datetime, got ${df.schema("ts").dataType}") + } + } + } + } class XmlSuiteWithLegacyParser extends XmlSuite {