Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1483,7 +1483,10 @@ nonTrivialPrimitiveType
| INTERVAL
(fromYearMonth=(YEAR | MONTH) (TO to=MONTH)? |
fromDayTime=(DAY | HOUR | MINUTE | SECOND) (TO to=(HOUR | MINUTE | SECOND))?)?
| TIMESTAMP (withLocalTimeZone | withoutTimeZone)?
| TIMESTAMP (LEFT_PAREN precision=integerValue RIGHT_PAREN)?
(withLocalTimeZone | withoutTimeZone)?
| TIMESTAMP_LTZ (LEFT_PAREN precision=integerValue RIGHT_PAREN)?
| TIMESTAMP_NTZ (LEFT_PAREN precision=integerValue RIGHT_PAREN)?
| TIME (LEFT_PAREN precision=integerValue RIGHT_PAREN)? (withoutTimeZone)?
| GEOGRAPHY LEFT_PAREN (srid=integerValue | any=ANY) RIGHT_PAREN
| GEOMETRY LEFT_PAREN (srid=integerValue | any=ANY) RIGHT_PAREN
Expand All @@ -1498,7 +1501,6 @@ trivialPrimitiveType
| FLOAT | REAL
| DOUBLE
| DATE
| TIMESTAMP_LTZ | TIMESTAMP_NTZ
| BINARY
| VOID
| VARIANT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
import org.apache.spark.sql.catalyst.util.CollationFactory
import org.apache.spark.sql.catalyst.util.SparkParserUtils.{string, withOrigin}
import org.apache.spark.sql.connector.catalog.IdentityColumnSpec
import org.apache.spark.sql.errors.{DataTypeErrorsBase, QueryParsingErrors}
import org.apache.spark.sql.errors.{DataTypeErrors, DataTypeErrorsBase, QueryParsingErrors}
import org.apache.spark.sql.internal.SqlApiConf
import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, CalendarIntervalType, CharType, DataType, DateType, DayTimeIntervalType, DecimalType, DoubleType, FloatType, GeographyType, GeometryType, IntegerType, LongType, MapType, MetadataBuilder, NullType, ShortType, StringType, StructField, StructType, TimestampNTZType, TimestampType, TimeType, VarcharType, VariantType, YearMonthIntervalType}
import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, CalendarIntervalType, CharType, DataType, DateType, DayTimeIntervalType, DecimalType, DoubleType, FloatType, GeographyType, GeometryType, IntegerType, LongType, MapType, MetadataBuilder, NullType, ShortType, StringType, StructField, StructType, TimestampLTZNanosType, TimestampNTZNanosType, TimestampNTZType, TimestampType, TimeType, VarcharType, VariantType, YearMonthIntervalType}

/**
* AST builder for parsing data type definitions and table schemas.
Expand Down Expand Up @@ -350,11 +350,42 @@ class DataTypeAstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with DataTypeE
CalendarIntervalType
}
case TIMESTAMP if currentCtx.withLocalTimeZone() != null =>
TimestampType
if (currentCtx.precision == null) {
TimestampType
} else {
parseTimestampLtzNanosPrecision(currentCtx.precision.getText)
}
case TIMESTAMP if currentCtx.withoutTimeZone() != null =>
TimestampNTZType
if (currentCtx.precision == null) {
TimestampNTZType
} else {
parseTimestampNtzNanosPrecision(currentCtx.precision.getText)
}
case TIMESTAMP =>
SqlApiConf.get.timestampType
if (currentCtx.precision == null) {
SqlApiConf.get.timestampType
} else {
SqlApiConf.get.timestampType match {
case TimestampType =>
parseTimestampLtzNanosPrecision(currentCtx.precision.getText)
case TimestampNTZType =>
parseTimestampNtzNanosPrecision(currentCtx.precision.getText)
case other =>
throw SparkException.internalError(s"Unexpected default timestamp type: $other")
}
}
case TIMESTAMP_LTZ =>
if (currentCtx.precision == null) {
TimestampType
} else {
parseTimestampLtzNanosPrecision(currentCtx.precision.getText)
}
case TIMESTAMP_NTZ =>
if (currentCtx.precision == null) {
TimestampNTZType
} else {
parseTimestampNtzNanosPrecision(currentCtx.precision.getText)
}
case TIME =>
val precision = if (currentCtx.precision == null) {
TimeType.DEFAULT_PRECISION
Expand Down Expand Up @@ -398,8 +429,6 @@ class DataTypeAstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with DataTypeE
case FLOAT | REAL => FloatType
case DOUBLE => DoubleType
case DATE => DateType
case TIMESTAMP_LTZ => TimestampType
case TIMESTAMP_NTZ => TimestampNTZType
case BINARY => BinaryType
case VOID => NullType
case VARIANT => VariantType
Expand Down Expand Up @@ -448,6 +477,24 @@ class DataTypeAstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with DataTypeE
}
}

private def parseTimestampLtzNanosPrecision(precision: String): TimestampLTZNanosType = {
DataTypeErrors.checkTimestampNanosTypesEnabled()
try TimestampLTZNanosType(precision.toInt)
catch {
case _: NumberFormatException =>
throw DataTypeErrors.invalidTimestampPrecisionError(precision, "TIMESTAMP_LTZ")
}
}

private def parseTimestampNtzNanosPrecision(precision: String): TimestampNTZNanosType = {
DataTypeErrors.checkTimestampNanosTypesEnabled()
try TimestampNTZNanosType(precision.toInt)
catch {
case _: NumberFormatException =>
throw DataTypeErrors.invalidTimestampPrecisionError(precision, "TIMESTAMP_NTZ")
}
}

/**
* Create a complex DataType. Arrays, Maps and Structures are supported.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.trees.Origin
import org.apache.spark.sql.catalyst.util.QuotingUtils
import org.apache.spark.sql.catalyst.util.QuotingUtils.toSQLSchema
import org.apache.spark.sql.internal.SqlApiConf
import org.apache.spark.sql.types.{DataType, Decimal, StringType}
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -282,4 +283,16 @@ private[sql] object DataTypeErrors extends DataTypeErrorsBase {
messageParameters = Map("precision" -> precision, "type" -> typeName),
cause = null)
}

def checkTimestampNanosTypesEnabled(): Unit = {
if (!SqlApiConf.get.timestampNanosTypesEnabled) {
throw new SparkException(
errorClass = "FEATURE_NOT_ENABLED",
messageParameters = Map(
"featureName" -> "Nanosecond-precision timestamp types",
"configKey" -> "spark.sql.timestampNanosTypes.enabled",
"configValue" -> "true"),
cause = null)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ private[sql] trait SqlApiConf {
def legacyParameterSubstitutionConstantsOnly: Boolean
def legacyIdentifierClauseOnly: Boolean
def typesFrameworkEnabled: Boolean
def timestampNanosTypesEnabled: Boolean
}

private[sql] object SqlApiConf {
Expand Down Expand Up @@ -112,4 +113,5 @@ private[sql] object DefaultSqlApiConf extends SqlApiConf {
override def legacyParameterSubstitutionConstantsOnly: Boolean = false
override def legacyIdentifierClauseOnly: Boolean = false
override def typesFrameworkEnabled: Boolean = false
override def timestampNanosTypesEnabled: Boolean = false
}
Original file line number Diff line number Diff line change
Expand Up @@ -236,12 +236,14 @@ object DataType {
// For backwards compatibility, previously the type name of NullType is "null"
case "null" => NullType
case TIMESTAMP_LTZ_NANOS_TYPE(precision) =>
DataTypeErrors.checkTimestampNanosTypesEnabled()
try TimestampLTZNanosType(precision.toInt)
catch {
case _: NumberFormatException =>
throw DataTypeErrors.invalidTimestampPrecisionError(precision, "TIMESTAMP_LTZ")
}
case TIMESTAMP_NTZ_NANOS_TYPE(precision) =>
DataTypeErrors.checkTimestampNanosTypesEnabled()
try TimestampNTZNanosType(precision.toInt)
catch {
case _: NumberFormatException =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,19 @@ object SQLConf {
.booleanConf
.createWithDefaultFunction(() => Utils.isTesting)

val TIMESTAMP_NANOS_TYPES_ENABLED =
buildConf("spark.sql.timestampNanosTypes.enabled")
.internal()
.doc("When true, the parameterized nanosecond-precision timestamp types " +
"TIMESTAMP_NTZ(p) / TIMESTAMP_LTZ(p) for p in [7, 9] are recognized as " +
"Spark SQL data types at user-facing entry points. Default is false because " +
"downstream execution paths (Cast, PhysicalDataType, AnyTimestampType, encoders, " +
"Connect proto) are not yet wired for these types. See SPARK-56822.")
.version("4.2.0")
.withBindingPolicy(ConfigBindingPolicy.SESSION)
.booleanConf
.createWithDefault(false)

val EXTENDED_EXPLAIN_PROVIDERS = buildConf("spark.sql.extendedExplainProviders")
.doc("A comma-separated list of classes that implement the" +
" org.apache.spark.sql.ExtendedExplainGenerator trait. If provided, Spark will print" +
Expand Down Expand Up @@ -7512,6 +7525,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf {

def typesFrameworkEnabled: Boolean = getConf(TYPES_FRAMEWORK_ENABLED)

def timestampNanosTypesEnabled: Boolean = getConf(TIMESTAMP_NANOS_TYPES_ENABLED)

def dataSourceV2JoinPushdown: Boolean = getConf(DATA_SOURCE_V2_JOIN_PUSHDOWN)

def dynamicPartitionPruningEnabled: Boolean = getConf(DYNAMIC_PARTITION_PRUNING_ENABLED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,14 +161,81 @@ class DataTypeParserSuite extends SparkFunSuite with SQLHelper {
assert(parse("timestamp") === TimestampNTZType)
assert(parse("timestamp with local time zone") === TimestampType)
assert(parse("timestamp without time zone") === TimestampNTZType)
withSQLConf(SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key -> "true") {
assert(parse("timestamp(9)") === TimestampNTZNanosType(9))
// Bare TIMESTAMP(p) routes through SqlApiConf.get.timestampType, so an
// out-of-range precision must surface as the NTZ error here.
Seq("6", "10").foreach { p =>
checkError(
exception = intercept[SparkException] {
CatalystSqlParser.parseDataType(s"timestamp($p)")
},
condition = "INVALID_TIMESTAMP_PRECISION",
parameters = Map("precision" -> p, "type" -> "TIMESTAMP_NTZ"))
}
}
}
withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> TimestampTypes.TIMESTAMP_LTZ.toString) {
assert(parse("timestamp") === TimestampType)
assert(parse("timestamp with local time zone") === TimestampType)
assert(parse("timestamp without time zone") === TimestampNTZType)
withSQLConf(SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key -> "true") {
assert(parse("timestamp(9)") === TimestampLTZNanosType(9))
// Bare TIMESTAMP(p) under LTZ default must surface as the LTZ error.
Seq("6", "10").foreach { p =>
checkError(
exception = intercept[SparkException] {
CatalystSqlParser.parseDataType(s"timestamp($p)")
},
condition = "INVALID_TIMESTAMP_PRECISION",
parameters = Map("precision" -> p, "type" -> "TIMESTAMP_LTZ"))
}
}
}
}

test("parse nanos timestamp types when the preview flag is enabled") {
withSQLConf(SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key -> "true") {
assert(parse("TIMESTAMP_NTZ(7)") === TimestampNTZNanosType(7))
assert(parse("TIMESTAMP_NTZ(8)") === TimestampNTZNanosType(8))
assert(parse("TIMESTAMP_NTZ(9)") === TimestampNTZNanosType(9))
assert(parse("TIMESTAMP_LTZ(7)") === TimestampLTZNanosType(7))
assert(parse("TIMESTAMP_LTZ(8)") === TimestampLTZNanosType(8))
assert(parse("TIMESTAMP_LTZ(9)") === TimestampLTZNanosType(9))
assert(parse("Timestamp_Ntz(9)") === TimestampNTZNanosType(9))
assert(parse("timestamp_ltz(7)") === TimestampLTZNanosType(7))
assert(parse("TIMESTAMP(9) WITHOUT TIME ZONE") === TimestampNTZNanosType(9))
assert(parse("TIMESTAMP(7) WITH LOCAL TIME ZONE") === TimestampLTZNanosType(7))
assert(parse("timestamp(8) without time zone") === TimestampNTZNanosType(8))
assert(parse("timestamp(8) with local time zone") === TimestampLTZNanosType(8))
}
}

test("nanos timestamp parser surface is gated by SQL conf, disabled by default") {
val gatedSpellings = Seq(
"TIMESTAMP_NTZ(7)",
"TIMESTAMP_LTZ(9)",
"TIMESTAMP(9) WITHOUT TIME ZONE",
"TIMESTAMP(9) WITH LOCAL TIME ZONE",
"TIMESTAMP(9)")
gatedSpellings.foreach { spelling =>
checkError(
exception = intercept[SparkException] {
CatalystSqlParser.parseDataType(spelling)
},
condition = "FEATURE_NOT_ENABLED",
parameters = Map(
"featureName" -> "Nanosecond-precision timestamp types",
"configKey" -> "spark.sql.timestampNanosTypes.enabled",
"configValue" -> "true"))
}
// Bare unparameterized forms remain accepted even with the gate off.
assert(parse("TIMESTAMP_NTZ") === TimestampNTZType)
assert(parse("TIMESTAMP_LTZ") === TimestampType)
assert(parse("TIMESTAMP WITHOUT TIME ZONE") === TimestampNTZType)
assert(parse("TIMESTAMP WITH LOCAL TIME ZONE") === TimestampType)
}

// DataType parser accepts certain reserved keywords.
checkDataType(
"Struct<TABLE: string, DATE:boolean>",
Expand Down Expand Up @@ -241,4 +308,53 @@ class DataTypeParserSuite extends SparkFunSuite with SQLHelper {
condition = "PARSE_SYNTAX_ERROR",
parameters = Map("error" -> "'WITH'", "hint" -> ""))
}

test("invalid precision of the nanos timestamp data type") {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test covers the two zone-suffixed forms - TIMESTAMP(6) WITHOUT TIME ZONE -> TIMESTAMP_NTZ error and TIMESTAMP(10) WITH LOCAL TIME ZONE -> TIMESTAMP_LTZ error - but does not exercise the bare TIMESTAMP(6) / TIMESTAMP(10) form under each SQLConf.TIMESTAMP_TYPE setting, which is the path that routes through SqlApiConf.get.timestampType in DataTypeAstBuilder.scala.

Alternatively, the assertions could be added to the existing test("Set default timestamp type") block at line 159, since that test already wraps both values of SQLConf.TIMESTAMP_TYPE and was modified by this PR to add the success-case parse("timestamp(9)") assertions — the invalid-precision counterpart would sit naturally next to them.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added both 6 and 10 precision checks.

withSQLConf(SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key -> "true") {
Seq("TIMESTAMP_NTZ" -> "TIMESTAMP_NTZ", "TIMESTAMP_LTZ" -> "TIMESTAMP_LTZ").foreach {
case (spelling, errorType) =>
Seq(0, 1, 6, 10, 99).foreach { p =>
checkError(
exception = intercept[SparkException] {
CatalystSqlParser.parseDataType(s"$spelling($p)")
},
condition = "INVALID_TIMESTAMP_PRECISION",
parameters = Map("precision" -> p.toString, "type" -> errorType))
}
}
// Integer overflow: regex matches but Int.parseInt fails. Original digits are preserved.
checkError(
exception = intercept[SparkException] {
CatalystSqlParser.parseDataType("TIMESTAMP_NTZ(99999999999)")
},
condition = "INVALID_TIMESTAMP_PRECISION",
parameters = Map("precision" -> "99999999999", "type" -> "TIMESTAMP_NTZ"))
// TIMESTAMP(p) with zone aliases route to the corresponding nanos type's error.
checkError(
exception = intercept[SparkException] {
CatalystSqlParser.parseDataType("TIMESTAMP(6) WITHOUT TIME ZONE")
},
condition = "INVALID_TIMESTAMP_PRECISION",
parameters = Map("precision" -> "6", "type" -> "TIMESTAMP_NTZ"))
checkError(
exception = intercept[SparkException] {
CatalystSqlParser.parseDataType("TIMESTAMP(10) WITH LOCAL TIME ZONE")
},
condition = "INVALID_TIMESTAMP_PRECISION",
parameters = Map("precision" -> "10", "type" -> "TIMESTAMP_LTZ"))
// Negative precision is rejected by the parser, not by the type constructor.
checkError(
exception = intercept[ParseException] {
CatalystSqlParser.parseDataType("TIMESTAMP_NTZ(-1)")
},
condition = "PARSE_SYNTAX_ERROR",
parameters = Map("error" -> "'-'", "hint" -> ""))
checkError(
exception = intercept[ParseException] {
CatalystSqlParser.parseDataType("TIMESTAMP_LTZ(-100)")
},
condition = "PARSE_SYNTAX_ERROR",
parameters = Map("error" -> "'-'", "hint" -> ""))
}
}
}
Loading