Skip to content

Commit da09de9

Browse files
committed
[SPARK-57456][SQL] Support nanosecond-precision timestamp types in the JSON datasource (v1 and v2)
### What changes were proposed in this pull request? Umbrella: [SPARK-56822](https://issues.apache.org/jira/browse/SPARK-56822) (Timestamps with nanosecond precision). This PR adds read and write support for the nanosecond-capable timestamp types `TIMESTAMP_NTZ(p)` and `TIMESTAMP_LTZ(p)` (`p` in 7-9) to the JSON datasource, for both the v1 (`JsonFileFormat`) and v2 (`JsonTable`) paths, reaching parity with the microsecond `TimestampType` / `TimestampNTZType`, and removes the [SPARK-57166](https://issues.apache.org/jira/browse/SPARK-57166) rejection guardrail. Specifically: - `JacksonParser`: adds `TimestampLTZNanosType` / `TimestampNTZNanosType` read cases that delegate to the existing `parseNanos` / `parseWithoutTimeZoneNanos` formatter methods with the column precision. - `JacksonGenerator`: adds the corresponding write cases that delegate to `formatNanos` / `formatWithoutTimeZoneNanos`. - `JsonFileFormat` (v1) and `JsonTable` (v2): drop the `AnyTimestampNanoType` rejection in `supportDataType` / `supportsDataType`. Notes: - Schema inference (`JsonInferSchema`) keeps inferring microsecond `TimestampType` / `TimestampNTZType` by default; nanosecond types are reached only via an explicit user schema. - No new options: the existing `timestampFormat` / `timestampNTZFormat` options drive the nanos path. The column type carries the precision, and the count of `S` letters in the pattern controls how many fractional-second digits are emitted on write (text output needs up to 9 `S` for full precision; reads with the default formatter parse the full fraction and truncate to the declared precision). - The legacy time parser policy rejects nanos: the legacy LTZ formatter cannot represent sub-microsecond digits, so it raises `UNSUPPORTED_FEATURE.TIMESTAMP_NANOS_WITH_LEGACY_TIME_PARSER` (the NTZ formatter always uses the ISO-8601 path). ### Why are the changes needed? JSON rejected nanos timestamp types in its datasource capability checks and lacked the conversions to round-trip them, so these columns could not be written or read through JSON. This extends nanosecond-precision timestamp support (umbrella SPARK-56822) to the JSON datasource, matching the existing microsecond timestamp behavior and the Parquet/ORC/Avro/CSV nanosecond support. ### Does this PR introduce _any_ user-facing change? Yes. With `spark.sql.timestampNanosTypes.enabled=true`, columns of type `TIMESTAMP_NTZ(7-9)` / `TIMESTAMP_LTZ(7-9)` can now be written to and read from JSON files, and parsed/generated by `from_json` / `to_json`. Previously such columns were rejected with `UNSUPPORTED_DATA_TYPE_FOR_DATASOURCE`. This is a change within the unreleased master/branch only. ### How was this patch tested? - `JsonExpressionsSuite`: `JsonToStructs` nanosecond parsing at the catalyst expression level. - `JsonFunctionsSuite`: flipped the existing `from_json` nanosecond test to assert successful parsing and the truncated value (instead of an unsupported-type error); added `to_json` and `to_json` / `from_json` round-trip tests. - `FileBasedDataSourceSuite`: removed JSON from the SPARK-57166 rejection list; added end-to-end round-trip (precisions 7-9, NTZ and LTZ, v1 and v2), a nested struct/array/map round-trip, and a LEGACY time-parser-policy rejection test (write and read). - `JsonSuite`: `DataFrameReader.json(Dataset[String])` read, a custom-schema file round-trip, and a mixed microsecond/nanosecond schema round-trip; these run under the `JsonV1Suite`, `JsonV2Suite`, `JsonLegacyTimeParserSuite`, and `JsonUnsafeRowSuite` variants. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Cursor 2.1, Claude Opus 4.8 Closes #56865 from MaxGekk/nanos-json-ds. Authored-by: Maxim Gekk <max.gekk@gmail.com> Signed-off-by: Max Gekk <max.gekk@gmail.com> (cherry picked from commit 59fdb3e) Signed-off-by: Max Gekk <max.gekk@gmail.com>
1 parent a885e27 commit da09de9

8 files changed

Lines changed: 371 additions & 78 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,19 @@ class JacksonGenerator(
159159
timestampNTZFormatter.format(DateTimeUtils.microsToLocalDateTime(row.getLong(ordinal)))
160160
gen.writeString(timestampString)
161161

162+
case t: TimestampLTZNanosType =>
163+
(row: SpecializedGetters, ordinal: Int) =>
164+
val timestampString =
165+
timestampFormatter.formatNanos(row.getTimestampLTZNanos(ordinal), t.precision)
166+
gen.writeString(timestampString)
167+
168+
case t: TimestampNTZNanosType =>
169+
(row: SpecializedGetters, ordinal: Int) =>
170+
val timestampString =
171+
timestampNTZFormatter.formatWithoutTimeZoneNanos(
172+
row.getTimestampNTZNanos(ordinal), t.precision)
173+
gen.writeString(timestampString)
174+
162175
case DateType =>
163176
(row: SpecializedGetters, ordinal: Int) =>
164177
val dateString = dateFormatter.format(row.getInt(ordinal))

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
3939
import org.apache.spark.sql.sources.Filter
4040
import org.apache.spark.sql.types._
4141
import org.apache.spark.types.variant._
42-
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String, VariantVal}
42+
import org.apache.spark.unsafe.types.{CalendarInterval, TimestampNanosVal, UTF8String, VariantVal}
4343
import org.apache.spark.util.Utils
4444

4545
/**
@@ -380,6 +380,21 @@ class JacksonParser(
380380
timestampNTZFormatter.parseWithoutTimeZone(parser.getText, false)
381381
}
382382

383+
case t: TimestampLTZNanosType =>
384+
(parser: JsonParser) => parseJsonToken[TimestampNanosVal](parser, dataType) {
385+
// Unlike the microsecond TimestampType, the nanosecond types accept only string input.
386+
// The numeric-epoch shorthand (a JSON integer read as epoch seconds) is legacy
387+
// TimestampType behavior and is intentionally not carried over to the nanos types.
388+
case VALUE_STRING if parser.getTextLength >= 1 =>
389+
timestampFormatter.parseNanos(parser.getText, t.precision)
390+
}
391+
392+
case t: TimestampNTZNanosType =>
393+
(parser: JsonParser) => parseJsonToken[TimestampNanosVal](parser, dataType) {
394+
case VALUE_STRING if parser.getTextLength >= 1 =>
395+
timestampNTZFormatter.parseWithoutTimeZoneNanos(parser.getText, t.precision, false)
396+
}
397+
383398
case DateType =>
384399
(parser: JsonParser) => parseJsonToken[java.lang.Integer](parser, dataType) {
385400
case VALUE_STRING if parser.getTextLength >= 1 =>

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.sql.catalyst.expressions
1919

2020
import java.text.{DecimalFormat, DecimalFormatSymbols, SimpleDateFormat}
21+
import java.time.{LocalDateTime, ZoneOffset}
2122
import java.util.{Calendar, Locale, TimeZone}
2223

2324
import org.scalatest.exceptions.TestFailedException
@@ -30,6 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.Cast._
3031
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
3132
import org.apache.spark.sql.catalyst.util._
3233
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils.{PST, UTC, UTC_OPT}
34+
import org.apache.spark.sql.internal.SQLConf
3335
import org.apache.spark.sql.types._
3436
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
3537

@@ -570,6 +572,33 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
570572
}
571573
}
572574

575+
test("SPARK-57456: from_json with nanos timestamp") {
576+
val jsonData = """{"t": "2016-01-01T00:00:00.123456789"}"""
577+
// No timestamp format option: the default formatter parses the full sub-second fraction and
578+
// truncates the sub-precision digits toward zero to the declared precision.
579+
withSQLConf(SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key -> "true") {
580+
TimestampNanosTestUtils.foreachNanosPrecision { p =>
581+
val nano = TimestampNanosTestUtils.nanoOfSecTruncator(p)(123456789)
582+
val ldt = LocalDateTime.of(2016, 1, 1, 0, 0, 0, nano)
583+
checkEvaluation(
584+
JsonToStructs(
585+
StructType(StructField("t", TimestampNTZNanosType(p)) :: Nil),
586+
Map.empty[String, String],
587+
Literal(jsonData),
588+
UTC_OPT),
589+
InternalRow(TimestampNanosTestUtils.localDateTimeToNanosVal(ldt)))
590+
// LTZ: the string has no zone, so it is interpreted in the given time zone (UTC here).
591+
checkEvaluation(
592+
JsonToStructs(
593+
StructType(StructField("t", TimestampLTZNanosType(p)) :: Nil),
594+
Map.empty[String, String],
595+
Literal(jsonData),
596+
UTC_OPT),
597+
InternalRow(TimestampNanosTestUtils.instantToNanosVal(ldt.toInstant(ZoneOffset.UTC))))
598+
}
599+
}
600+
}
601+
573602
test("SPARK-19543: from_json empty input column") {
574603
val schema = StructType(StructField("a", IntegerType) :: Nil)
575604
checkEvaluation(

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,9 +127,6 @@ case class JsonFileFormat() extends TextBasedFileFormat with DataSourceRegister
127127

128128
case _: GeometryType | _: GeographyType => false
129129

130-
// Nanosecond-capable timestamps are not yet supported by this datasource.
131-
case _: AnyTimestampNanoType => false
132-
133130
case _: AtomicType => true
134131

135132
case st: StructType => st.forall { f => supportDataType(f.dataType) }

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,6 @@ case class JsonTable(
6161
override def supportsDataType(dataType: DataType): Boolean = dataType match {
6262
case _: GeometryType | _: GeographyType => false
6363

64-
// Nanosecond-capable timestamps are not yet supported by this datasource.
65-
case _: AnyTimestampNanoType => false
66-
6764
case _: AtomicType => true
6865

6966
case st: StructType => st.forall { f => supportsDataType(f.dataType) }

sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala

Lines changed: 142 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ import org.apache.spark.sql.functions._
4646
import org.apache.spark.sql.internal.SQLConf
4747
import org.apache.spark.sql.test.SharedSparkSession
4848
import org.apache.spark.sql.types._
49-
import org.apache.spark.unsafe.types.TimestampNanosVal
5049
import org.apache.spark.util.HadoopFSUtils
5150

5251
class FileBasedDataSourceSuite extends SharedSparkSession
@@ -1339,65 +1338,6 @@ class FileBasedDataSourceSuite extends SharedSparkSession
13391338
}
13401339
}
13411340

1342-
test("SPARK-57166: nanosecond timestamp types are not supported in selected file data sources") {
1343-
// Parquet and ORC support nanosecond-capable timestamps, while these formats still reject them.
1344-
val unsupportedDataSources = Seq("json")
1345-
val nanosTypes = Seq(TimestampNTZNanosType(9), TimestampLTZNanosType(9))
1346-
withSQLConf(SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key -> "true") {
1347-
// Test both v1 and v2 data sources.
1348-
Seq(true, false).foreach { useV1 =>
1349-
val useV1List = if (useV1) {
1350-
unsupportedDataSources.mkString(",")
1351-
} else {
1352-
""
1353-
}
1354-
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> useV1List) {
1355-
unsupportedDataSources.foreach { format =>
1356-
nanosTypes.foreach { nanosType =>
1357-
val expectedType = s""""${nanosType.sql}""""
1358-
withTempDir { dir =>
1359-
// Write path: a nanos-typed column cannot be written. The nanos literal is built
1360-
// directly from its internal value to avoid relying on cast/parser support.
1361-
val nanosLiteral =
1362-
Literal.create(new TimestampNanosVal(0L, 0.toShort), nanosType)
1363-
val df = spark.range(1).select(Column(nanosLiteral).as("ts"))
1364-
val writeDir = new File(dir, "write").getCanonicalPath
1365-
checkError(
1366-
exception = intercept[AnalysisException] {
1367-
df.write.format(format).mode("overwrite").save(writeDir)
1368-
},
1369-
condition = "UNSUPPORTED_DATA_TYPE_FOR_DATASOURCE",
1370-
parameters = Map(
1371-
"columnName" -> "`ts`",
1372-
"columnType" -> expectedType,
1373-
"format" -> formatMapping(format)))
1374-
1375-
// Read path: a user-specified nanos schema is rejected. Write a benign file first
1376-
// so schema validation (not file listing) is what fails.
1377-
val readDir = new File(dir, "read").getCanonicalPath
1378-
// XML requires a `rowTag` option on both the read and write paths.
1379-
val extraOptions =
1380-
if (format == "xml") Map("rowTag" -> "row") else Map.empty[String, String]
1381-
Seq("a").toDF("ts").write.format(format).options(extraOptions)
1382-
.mode("overwrite").save(readDir)
1383-
checkError(
1384-
exception = intercept[AnalysisException] {
1385-
spark.read.schema(new StructType().add("ts", nanosType))
1386-
.format(format).options(extraOptions).load(readDir).collect()
1387-
},
1388-
condition = "UNSUPPORTED_DATA_TYPE_FOR_DATASOURCE",
1389-
parameters = Map(
1390-
"columnName" -> "`ts`",
1391-
"columnType" -> expectedType,
1392-
"format" -> formatMapping(format)))
1393-
}
1394-
}
1395-
}
1396-
}
1397-
}
1398-
}
1399-
}
1400-
14011341
test("SPARK-57166: ORC supports nanosecond timestamp types in v1 and v2") {
14021342
withSQLConf(SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key -> "true") {
14031343
// Validate both v1 and v2 ORC paths.
@@ -1473,6 +1413,148 @@ class FileBasedDataSourceSuite extends SharedSparkSession
14731413
}
14741414
}
14751415

1416+
test("SPARK-57456: JSON supports nanosecond timestamp types in v1 and v2") {
1417+
withSQLConf(SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key -> "true") {
1418+
Seq(true, false).foreach { useV1 =>
1419+
val useV1List = if (useV1) "json" else ""
1420+
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> useV1List) {
1421+
foreachNanosPrecision { precision =>
1422+
// JSON is text-based: the format string must carry enough fractional-second digits to
1423+
// represent the full precision. Use exactly `precision` S-characters.
1424+
val fracPat = "S" * precision
1425+
Seq(TimestampNTZNanosType(precision), TimestampLTZNanosType(precision)).foreach {
1426+
nanosType =>
1427+
withTempDir { dir =>
1428+
val wallClock = LocalDateTime.of(1970, 1, 1, 0, 20, 34, 567890123)
1429+
val value: Any = nanosType match {
1430+
case _: TimestampNTZNanosType => wallClock
1431+
case _: TimestampLTZNanosType => wallClock.toInstant(ZoneOffset.UTC)
1432+
}
1433+
val df = spark.createDataFrame(
1434+
spark.sparkContext.parallelize(Seq(Row(value))),
1435+
new StructType().add("ts", nanosType))
1436+
val path = new File(dir, s"json_nanos_${nanosType.typeName}").getCanonicalPath
1437+
val (fmtKey, fmtVal) = nanosType match {
1438+
case _: TimestampNTZNanosType =>
1439+
("timestampNTZFormat", s"yyyy-MM-dd'T'HH:mm:ss.$fracPat")
1440+
case _: TimestampLTZNanosType =>
1441+
("timestampFormat", s"yyyy-MM-dd'T'HH:mm:ss.${fracPat}XXX")
1442+
}
1443+
df.write.format("json").option(fmtKey, fmtVal).mode("overwrite").save(path)
1444+
val readBack = spark.read
1445+
.schema(new StructType().add("ts", nanosType))
1446+
.option(fmtKey, fmtVal)
1447+
.format("json").load(path)
1448+
checkAnswer(readBack, df)
1449+
}
1450+
}
1451+
}
1452+
}
1453+
}
1454+
}
1455+
}
1456+
1457+
test("SPARK-57456: JSON supports nested nanosecond timestamp types in v1 and v2") {
1458+
withSQLConf(SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key -> "true") {
1459+
Seq(true, false).foreach { useV1 =>
1460+
val useV1List = if (useV1) "json" else ""
1461+
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> useV1List) {
1462+
foreachNanosPrecision { precision =>
1463+
val fracPat = "S" * precision
1464+
Seq(TimestampNTZNanosType(precision), TimestampLTZNanosType(precision)).foreach {
1465+
nanosType =>
1466+
withTempDir { dir =>
1467+
val wallClock = LocalDateTime.of(1970, 1, 1, 0, 20, 34, 567890123)
1468+
val leaf: Any = nanosType match {
1469+
case _: TimestampNTZNanosType => wallClock
1470+
case _: TimestampLTZNanosType => wallClock.toInstant(ZoneOffset.UTC)
1471+
}
1472+
// Embed the nanos leaf inside a struct, an array, and a map value. The guardrails
1473+
// and Jackson read/write paths recurse into all three.
1474+
val schema = new StructType()
1475+
.add("s", new StructType().add("ts", nanosType))
1476+
.add("a", ArrayType(nanosType))
1477+
.add("m", MapType(StringType, nanosType))
1478+
val row = Row(Row(leaf), Seq(leaf), Map("k" -> leaf))
1479+
val df = spark.createDataFrame(
1480+
spark.sparkContext.parallelize(Seq(row)), schema)
1481+
val (fmtKey, fmtVal) = nanosType match {
1482+
case _: TimestampNTZNanosType =>
1483+
("timestampNTZFormat", s"yyyy-MM-dd'T'HH:mm:ss.$fracPat")
1484+
case _: TimestampLTZNanosType =>
1485+
("timestampFormat", s"yyyy-MM-dd'T'HH:mm:ss.${fracPat}XXX")
1486+
}
1487+
val path =
1488+
new File(dir, s"json_nested_${nanosType.typeName}").getCanonicalPath
1489+
df.write.format("json").option(fmtKey, fmtVal).mode("overwrite").save(path)
1490+
val readBack = spark.read.schema(schema).option(fmtKey, fmtVal)
1491+
.format("json").load(path)
1492+
checkAnswer(readBack, df)
1493+
}
1494+
}
1495+
}
1496+
}
1497+
}
1498+
}
1499+
}
1500+
1501+
test("SPARK-57456: JSON rejects nanosecond timestamps under the LEGACY time parser policy") {
1502+
// The legacy timestamp formatter cannot represent sub-microsecond digits, so the nanos
1503+
// formatter methods raise TIMESTAMP_NANOS_WITH_LEGACY_TIME_PARSER. Only the LTZ formatter is
1504+
// legacy under this policy (the NTZ formatter always uses the ISO-8601 path), so this covers
1505+
// TimestampLTZNanosType.
1506+
def rootNanosError(e: Throwable): SparkUnsupportedOperationException = {
1507+
var cause: Throwable = e
1508+
while (cause != null && !cause.isInstanceOf[SparkUnsupportedOperationException]) {
1509+
cause = cause.getCause
1510+
}
1511+
assert(cause != null,
1512+
s"Expected TIMESTAMP_NANOS_WITH_LEGACY_TIME_PARSER, but got: ${e.getMessage}")
1513+
cause.asInstanceOf[SparkUnsupportedOperationException]
1514+
}
1515+
1516+
withSQLConf(
1517+
SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key -> "true",
1518+
SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "LEGACY") {
1519+
val nanosType = TimestampLTZNanosType(9)
1520+
val schema = new StructType().add("ts", nanosType)
1521+
val expectedParameters =
1522+
Map("config" -> ("\"" + SQLConf.LEGACY_TIME_PARSER_POLICY.key + "\""))
1523+
Seq(true, false).foreach { useV1 =>
1524+
val useV1List = if (useV1) "json" else ""
1525+
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> useV1List) {
1526+
withTempDir { dir =>
1527+
// Write path.
1528+
val df = spark.createDataFrame(
1529+
spark.sparkContext.parallelize(
1530+
Seq(Row(LocalDateTime.of(2020, 1, 1, 0, 0, 0, 1).toInstant(ZoneOffset.UTC)))),
1531+
schema)
1532+
val writeDir = new File(dir, "write").getCanonicalPath
1533+
checkError(
1534+
exception = rootNanosError(intercept[SparkException] {
1535+
df.write.format("json").mode("overwrite").save(writeDir)
1536+
}),
1537+
condition = "UNSUPPORTED_FEATURE.TIMESTAMP_NANOS_WITH_LEGACY_TIME_PARSER",
1538+
parameters = expectedParameters)
1539+
1540+
// Read path: write a benign file first so schema-driven parsing is what fails. Use
1541+
// FAILFAST so the unsupported-feature error surfaces instead of being turned into a
1542+
// null record by the permissive bad-record handling.
1543+
val readDir = new File(dir, "read").getCanonicalPath
1544+
Seq("a").toDF("ts").write.format("json").mode("overwrite").save(readDir)
1545+
checkError(
1546+
exception = rootNanosError(intercept[SparkException] {
1547+
spark.read.schema(schema).option("mode", "FAILFAST")
1548+
.format("json").load(readDir).collect()
1549+
}),
1550+
condition = "UNSUPPORTED_FEATURE.TIMESTAMP_NANOS_WITH_LEGACY_TIME_PARSER",
1551+
parameters = expectedParameters)
1552+
}
1553+
}
1554+
}
1555+
}
1556+
}
1557+
14761558
// Asserts the ignoredPathSegmentRegex contract for `format`: the default regex hides the
14771559
// '_'-prefixed file; a never-matching per-read option or session conf each surface it; the
14781560
// option overrides the conf.

0 commit comments

Comments
 (0)