Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,19 @@ class JacksonGenerator(
timestampNTZFormatter.format(DateTimeUtils.microsToLocalDateTime(row.getLong(ordinal)))
gen.writeString(timestampString)

case t: TimestampLTZNanosType =>
(row: SpecializedGetters, ordinal: Int) =>
val timestampString =
timestampFormatter.formatNanos(row.getTimestampLTZNanos(ordinal), t.precision)
gen.writeString(timestampString)

case t: TimestampNTZNanosType =>
(row: SpecializedGetters, ordinal: Int) =>
val timestampString =
timestampNTZFormatter.formatWithoutTimeZoneNanos(
row.getTimestampNTZNanos(ordinal), t.precision)
gen.writeString(timestampString)

case DateType =>
(row: SpecializedGetters, ordinal: Int) =>
val dateString = dateFormatter.format(row.getInt(ordinal))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types._
import org.apache.spark.types.variant._
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String, VariantVal}
import org.apache.spark.unsafe.types.{CalendarInterval, TimestampNanosVal, UTF8String, VariantVal}
import org.apache.spark.util.Utils

/**
Expand Down Expand Up @@ -380,6 +380,21 @@ class JacksonParser(
timestampNTZFormatter.parseWithoutTimeZone(parser.getText, false)
}

case t: TimestampLTZNanosType =>
(parser: JsonParser) => parseJsonToken[TimestampNanosVal](parser, dataType) {
// Unlike the microsecond TimestampType, the nanosecond types accept only string input.
// The numeric-epoch shorthand (a JSON integer read as epoch seconds) is legacy
// TimestampType behavior and is intentionally not carried over to the nanos types.
case VALUE_STRING if parser.getTextLength >= 1 =>
timestampFormatter.parseNanos(parser.getText, t.precision)
}

case t: TimestampNTZNanosType =>
(parser: JsonParser) => parseJsonToken[TimestampNanosVal](parser, dataType) {
case VALUE_STRING if parser.getTextLength >= 1 =>
timestampNTZFormatter.parseWithoutTimeZoneNanos(parser.getText, t.precision, false)
}

case DateType =>
(parser: JsonParser) => parseJsonToken[java.lang.Integer](parser, dataType) {
case VALUE_STRING if parser.getTextLength >= 1 =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.catalyst.expressions

import java.text.{DecimalFormat, DecimalFormatSymbols, SimpleDateFormat}
import java.time.{LocalDateTime, ZoneOffset}
import java.util.{Calendar, Locale, TimeZone}

import org.scalatest.exceptions.TestFailedException
Expand All @@ -30,6 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.Cast._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils.{PST, UTC, UTC_OPT}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}

Expand Down Expand Up @@ -570,6 +572,33 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
}
}

test("SPARK-57456: from_json with nanos timestamp") {
val jsonData = """{"t": "2016-01-01T00:00:00.123456789"}"""
// No timestamp format option: the default formatter parses the full sub-second fraction and
// truncates the sub-precision digits toward zero to the declared precision.
withSQLConf(SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key -> "true") {
TimestampNanosTestUtils.foreachNanosPrecision { p =>
val nano = TimestampNanosTestUtils.nanoOfSecTruncator(p)(123456789)
val ldt = LocalDateTime.of(2016, 1, 1, 0, 0, 0, nano)
checkEvaluation(
JsonToStructs(
StructType(StructField("t", TimestampNTZNanosType(p)) :: Nil),
Map.empty[String, String],
Literal(jsonData),
UTC_OPT),
InternalRow(TimestampNanosTestUtils.localDateTimeToNanosVal(ldt)))
// LTZ: the string has no zone, so it is interpreted in the given time zone (UTC here).
checkEvaluation(
JsonToStructs(
StructType(StructField("t", TimestampLTZNanosType(p)) :: Nil),
Map.empty[String, String],
Literal(jsonData),
UTC_OPT),
InternalRow(TimestampNanosTestUtils.instantToNanosVal(ldt.toInstant(ZoneOffset.UTC))))
}
}
}

test("SPARK-19543: from_json empty input column") {
val schema = StructType(StructField("a", IntegerType) :: Nil)
checkEvaluation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,6 @@ case class JsonFileFormat() extends TextBasedFileFormat with DataSourceRegister

case _: GeometryType | _: GeographyType => false

// Nanosecond-capable timestamps are not yet supported by this datasource.
case _: AnyTimestampNanoType => false

case _: AtomicType => true

case st: StructType => st.forall { f => supportDataType(f.dataType) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,6 @@ case class JsonTable(
override def supportsDataType(dataType: DataType): Boolean = dataType match {
case _: GeometryType | _: GeographyType => false

// Nanosecond-capable timestamps are not yet supported by this datasource.
case _: AnyTimestampNanoType => false

case _: AtomicType => true

case st: StructType => st.forall { f => supportsDataType(f.dataType) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.TimestampNanosVal
import org.apache.spark.util.HadoopFSUtils

class FileBasedDataSourceSuite extends SharedSparkSession
Expand Down Expand Up @@ -1339,65 +1338,6 @@ class FileBasedDataSourceSuite extends SharedSparkSession
}
}

test("SPARK-57166: nanosecond timestamp types are not supported in selected file data sources") {
// Parquet and ORC support nanosecond-capable timestamps, while these formats still reject them.
val unsupportedDataSources = Seq("json")
val nanosTypes = Seq(TimestampNTZNanosType(9), TimestampLTZNanosType(9))
withSQLConf(SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key -> "true") {
// Test both v1 and v2 data sources.
Seq(true, false).foreach { useV1 =>
val useV1List = if (useV1) {
unsupportedDataSources.mkString(",")
} else {
""
}
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> useV1List) {
unsupportedDataSources.foreach { format =>
nanosTypes.foreach { nanosType =>
val expectedType = s""""${nanosType.sql}""""
withTempDir { dir =>
// Write path: a nanos-typed column cannot be written. The nanos literal is built
// directly from its internal value to avoid relying on cast/parser support.
val nanosLiteral =
Literal.create(new TimestampNanosVal(0L, 0.toShort), nanosType)
val df = spark.range(1).select(Column(nanosLiteral).as("ts"))
val writeDir = new File(dir, "write").getCanonicalPath
checkError(
exception = intercept[AnalysisException] {
df.write.format(format).mode("overwrite").save(writeDir)
},
condition = "UNSUPPORTED_DATA_TYPE_FOR_DATASOURCE",
parameters = Map(
"columnName" -> "`ts`",
"columnType" -> expectedType,
"format" -> formatMapping(format)))

// Read path: a user-specified nanos schema is rejected. Write a benign file first
// so schema validation (not file listing) is what fails.
val readDir = new File(dir, "read").getCanonicalPath
// XML requires a `rowTag` option on both the read and write paths.
val extraOptions =
if (format == "xml") Map("rowTag" -> "row") else Map.empty[String, String]
Seq("a").toDF("ts").write.format(format).options(extraOptions)
.mode("overwrite").save(readDir)
checkError(
exception = intercept[AnalysisException] {
spark.read.schema(new StructType().add("ts", nanosType))
.format(format).options(extraOptions).load(readDir).collect()
},
condition = "UNSUPPORTED_DATA_TYPE_FOR_DATASOURCE",
parameters = Map(
"columnName" -> "`ts`",
"columnType" -> expectedType,
"format" -> formatMapping(format)))
}
}
}
}
}
}
}

test("SPARK-57166: ORC supports nanosecond timestamp types in v1 and v2") {
withSQLConf(SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key -> "true") {
// Validate both v1 and v2 ORC paths.
Expand Down Expand Up @@ -1473,6 +1413,148 @@ class FileBasedDataSourceSuite extends SharedSparkSession
}
}

test("SPARK-57456: JSON supports nanosecond timestamp types in v1 and v2") {
withSQLConf(SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key -> "true") {
Seq(true, false).foreach { useV1 =>
val useV1List = if (useV1) "json" else ""
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> useV1List) {
foreachNanosPrecision { precision =>
// JSON is text-based: the format string must carry enough fractional-second digits to
// represent the full precision. Use exactly `precision` S-characters.
val fracPat = "S" * precision
Seq(TimestampNTZNanosType(precision), TimestampLTZNanosType(precision)).foreach {
nanosType =>
withTempDir { dir =>
val wallClock = LocalDateTime.of(1970, 1, 1, 0, 20, 34, 567890123)
val value: Any = nanosType match {
case _: TimestampNTZNanosType => wallClock
case _: TimestampLTZNanosType => wallClock.toInstant(ZoneOffset.UTC)
}
val df = spark.createDataFrame(
spark.sparkContext.parallelize(Seq(Row(value))),
new StructType().add("ts", nanosType))
val path = new File(dir, s"json_nanos_${nanosType.typeName}").getCanonicalPath
val (fmtKey, fmtVal) = nanosType match {
case _: TimestampNTZNanosType =>
("timestampNTZFormat", s"yyyy-MM-dd'T'HH:mm:ss.$fracPat")
case _: TimestampLTZNanosType =>
("timestampFormat", s"yyyy-MM-dd'T'HH:mm:ss.${fracPat}XXX")
}
df.write.format("json").option(fmtKey, fmtVal).mode("overwrite").save(path)
val readBack = spark.read
.schema(new StructType().add("ts", nanosType))
.option(fmtKey, fmtVal)
.format("json").load(path)
checkAnswer(readBack, df)
}
}
}
}
}
}
}

test("SPARK-57456: JSON supports nested nanosecond timestamp types in v1 and v2") {
withSQLConf(SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key -> "true") {
Seq(true, false).foreach { useV1 =>
val useV1List = if (useV1) "json" else ""
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> useV1List) {
foreachNanosPrecision { precision =>
val fracPat = "S" * precision
Seq(TimestampNTZNanosType(precision), TimestampLTZNanosType(precision)).foreach {
nanosType =>
withTempDir { dir =>
val wallClock = LocalDateTime.of(1970, 1, 1, 0, 20, 34, 567890123)
val leaf: Any = nanosType match {
case _: TimestampNTZNanosType => wallClock
case _: TimestampLTZNanosType => wallClock.toInstant(ZoneOffset.UTC)
}
// Embed the nanos leaf inside a struct, an array, and a map value. The guardrails
// and Jackson read/write paths recurse into all three.
val schema = new StructType()
.add("s", new StructType().add("ts", nanosType))
.add("a", ArrayType(nanosType))
.add("m", MapType(StringType, nanosType))
val row = Row(Row(leaf), Seq(leaf), Map("k" -> leaf))
val df = spark.createDataFrame(
spark.sparkContext.parallelize(Seq(row)), schema)
val (fmtKey, fmtVal) = nanosType match {
case _: TimestampNTZNanosType =>
("timestampNTZFormat", s"yyyy-MM-dd'T'HH:mm:ss.$fracPat")
case _: TimestampLTZNanosType =>
("timestampFormat", s"yyyy-MM-dd'T'HH:mm:ss.${fracPat}XXX")
}
val path =
new File(dir, s"json_nested_${nanosType.typeName}").getCanonicalPath
df.write.format("json").option(fmtKey, fmtVal).mode("overwrite").save(path)
val readBack = spark.read.schema(schema).option(fmtKey, fmtVal)
.format("json").load(path)
checkAnswer(readBack, df)
}
}
}
}
}
}
}

test("SPARK-57456: JSON rejects nanosecond timestamps under the LEGACY time parser policy") {
// The legacy timestamp formatter cannot represent sub-microsecond digits, so the nanos
// formatter methods raise TIMESTAMP_NANOS_WITH_LEGACY_TIME_PARSER. Only the LTZ formatter is
// legacy under this policy (the NTZ formatter always uses the ISO-8601 path), so this covers
// TimestampLTZNanosType.
def rootNanosError(e: Throwable): SparkUnsupportedOperationException = {
var cause: Throwable = e
while (cause != null && !cause.isInstanceOf[SparkUnsupportedOperationException]) {
cause = cause.getCause
}
assert(cause != null,
s"Expected TIMESTAMP_NANOS_WITH_LEGACY_TIME_PARSER, but got: ${e.getMessage}")
cause.asInstanceOf[SparkUnsupportedOperationException]
}

withSQLConf(
SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key -> "true",
SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "LEGACY") {
val nanosType = TimestampLTZNanosType(9)
val schema = new StructType().add("ts", nanosType)
val expectedParameters =
Map("config" -> ("\"" + SQLConf.LEGACY_TIME_PARSER_POLICY.key + "\""))
Seq(true, false).foreach { useV1 =>
val useV1List = if (useV1) "json" else ""
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> useV1List) {
withTempDir { dir =>
// Write path.
val df = spark.createDataFrame(
spark.sparkContext.parallelize(
Seq(Row(LocalDateTime.of(2020, 1, 1, 0, 0, 0, 1).toInstant(ZoneOffset.UTC)))),
schema)
val writeDir = new File(dir, "write").getCanonicalPath
checkError(
exception = rootNanosError(intercept[SparkException] {
df.write.format("json").mode("overwrite").save(writeDir)
}),
condition = "UNSUPPORTED_FEATURE.TIMESTAMP_NANOS_WITH_LEGACY_TIME_PARSER",
parameters = expectedParameters)

// Read path: write a benign file first so schema-driven parsing is what fails. Use
// FAILFAST so the unsupported-feature error surfaces instead of being turned into a
// null record by the permissive bad-record handling.
val readDir = new File(dir, "read").getCanonicalPath
Seq("a").toDF("ts").write.format("json").mode("overwrite").save(readDir)
checkError(
exception = rootNanosError(intercept[SparkException] {
spark.read.schema(schema).option("mode", "FAILFAST")
.format("json").load(readDir).collect()
}),
condition = "UNSUPPORTED_FEATURE.TIMESTAMP_NANOS_WITH_LEGACY_TIME_PARSER",
parameters = expectedParameters)
}
}
}
}
}

// Asserts the ignoredPathSegmentRegex contract for `format`: the default regex hides the
// '_'-prefixed file; a never-matching per-read option or session conf each surface it; the
// option overrides the conf.
Expand Down
Loading