diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 6776f88ed1ef8..91635d346605c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -6289,6 +6289,19 @@ object SQLConf { .booleanConf .createWithDefault(false) + val LEGACY_JDBC_TIME_MAPPING_ENABLED = + buildConf("spark.sql.legacy.jdbc.timeMapping.enabled") + .internal() + .doc("When true, JDBC TIME columns are read as TimestampType (the legacy behavior), even " + + "when spark.sql.timeType.enabled is true. This is an escape hatch for workloads that " + + "rely on the old TIME-to-timestamp mapping. When false, JDBC TIME columns are read as " + + "TimeType if spark.sql.timeType.enabled is true. Has no effect when " + + "spark.sql.timeType.enabled is false.") + .version("4.3.0") + .withBindingPolicy(ConfigBindingPolicy.SESSION) + .booleanConf + .createWithDefault(false) + val PYTHON_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.python.filterPushdown.enabled") .internal() .doc("When true, enable filter pushdown to Python datasource, at the cost of running " + @@ -8156,6 +8169,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def legacyPostgresDatetimeMappingEnabled: Boolean = getConf(LEGACY_POSTGRES_DATETIME_MAPPING_ENABLED) + def legacyJdbcTimeMappingEnabled: Boolean = + getConf(LEGACY_JDBC_TIME_MAPPING_ENABLED) + override def legacyTimeParserPolicy: LegacyBehaviorPolicy.Value = getConf(SQLConf.LEGACY_TIME_PARSER_POLICY) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 1ad38e7712080..4d67061e0a0cd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -229,7 +229,7 @@ object JdbcUtils extends Logging with SQLConfHelper { case java.sql.Types.SQLXML => StringType case java.sql.Types.STRUCT => StringType case java.sql.Types.TIME => - if (conf.isTimeTypeEnabled) { + if (conf.isTimeTypeEnabled && !conf.legacyJdbcTimeMappingEnabled) { // Use reported scale (fractional digits) as precision; TIME(0) is valid val timePrecision = if (scale >= 0 && scale <= TimeType.MAX_PRECISION) scale else TimeType.DEFAULT_PRECISION diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 4e75e1807cf4c..f353f6f2a6997 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -768,6 +768,32 @@ class JDBCSuite extends SharedSparkSession { assert(rows(0).getAs[java.time.LocalTime](0) === java.time.LocalTime.of(12, 34, 56)) } + test("SPARK-57555: legacy.jdbc.timeMapping escape hatch keeps TIME as TimestampType") { + // The escape hatch forces the legacy TIME-to-timestamp mapping even when the TIME type is + // enabled, so workloads relying on the old behavior are not silently broken. + withSQLConf( + SQLConf.TIME_TYPE_ENABLED.key -> "true", + SQLConf.LEGACY_JDBC_TIME_MAPPING_ENABLED.key -> "true") { + val df = spark.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties()) + assert(df.schema("A").dataType === TimestampType) + } + // Sanity check: without the escape hatch the column is read as TimeType. + withSQLConf( + SQLConf.TIME_TYPE_ENABLED.key -> "true", + SQLConf.LEGACY_JDBC_TIME_MAPPING_ENABLED.key -> "false") { + val df = spark.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties()) + assert(df.schema("A").dataType.isInstanceOf[TimeType]) + } + // The escape hatch has no effect when the TIME type is disabled: the column is read as + // TimestampType regardless of the escape hatch. + withSQLConf( + SQLConf.TIME_TYPE_ENABLED.key -> "false", + SQLConf.LEGACY_JDBC_TIME_MAPPING_ENABLED.key -> "true") { + val df = spark.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties()) + assert(df.schema("A").dataType === TimestampType) + } + } + test("SPARK-57555: JDBC TIME write round-trip") { val url = urlWithUserAndPass val tableName = "TEST.TIME_ROUNDTRIP"