From ecacba732164e7c25211924eff06274c78b83fdc Mon Sep 17 00:00:00 2001 From: lifumao Date: Sat, 11 Apr 2026 20:28:25 +0800 Subject: [PATCH 1/7] SERDEPROPERTIES are missing when KSHC create table --- .../connector/hive/HiveTableCatalog.scala | 10 +++++----- .../connector/hive/HiveCatalogSuite.scala | 19 +++++++++++++++++++ 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala index 1a2a8548bd7..c3395767f12 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala @@ -317,12 +317,12 @@ class HiveTableCatalog(sparkSession: SparkSession) val (partitionColumns, maybeBucketSpec) = partitions.toSeq.convertTransforms val location = Option(properties.get(TableCatalog.PROP_LOCATION)) val maybeProvider = Option(properties.get(TableCatalog.PROP_PROVIDER)) + val tableProperties = properties.asScala.toMap val (storage, provider) = getStorageFormatAndProvider( maybeProvider, location, - properties.asScala.toMap) - val tableProperties = properties.asScala + toOptions(tableProperties)) val isExternal = properties.containsKey(TableCatalog.PROP_EXTERNAL) val tableType = if (isExternal || location.isDefined) { @@ -339,7 +339,7 @@ class HiveTableCatalog(sparkSession: SparkSession) provider = Some(provider), partitionColumnNames = partitionColumns, bucketSpec = maybeBucketSpec, - properties = tableProperties.toMap, + properties = tableProperties, tracksPartitionsInCatalog = conf.manageFilesourcePartitions, comment = Option(properties.get(TableCatalog.PROP_COMMENT))) @@ -609,7 +609,7 @@ private object HiveTableCatalog extends Logging { outputFormat = hiveSerde.outputFormat.orElse(defaultHiveStorage.outputFormat), // User specified serde takes precedence over the one inferred from file format. serde = maybeSerde.orElse(hiveSerde.serde).orElse(defaultHiveStorage.serde), - properties = options ++ defaultHiveStorage.properties) + properties = defaultHiveStorage.properties) case _ => throw KyuubiHiveConnectorException(s"Unsupported serde ${maybeSerde.get}.") } } else { @@ -619,7 +619,7 @@ private object HiveTableCatalog extends Logging { outputFormat = maybeOutputFormat.orElse(defaultHiveStorage.outputFormat), serde = maybeSerde.orElse(defaultHiveStorage.serde), - properties = options ++ defaultHiveStorage.properties) + properties = defaultHiveStorage.properties) } (storageFormat, DDLUtils.HIVE_PROVIDER) } else { diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala index e4c8bb918a9..9a63ef8ea8b 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala @@ -287,6 +287,25 @@ class HiveCatalogSuite extends KyuubiHiveTest { catalog.dropTable(testIdent) } + test("createTable: SERDEPROPERTIES") { + val properties = new util.HashMap[String, String]() + properties.put("hive.serde", "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") + properties.put(TableCatalog.OPTION_PREFIX + "field.delim", ",") + assert(!catalog.tableExists(testIdent)) + + val table = catalog.createTable( + testIdent, + schema, + Array.empty[Transform], + properties).asInstanceOf[HiveTable] + + assert(!table.catalogTable.storage.properties.keys.exists( + _.startsWith(TableCatalog.OPTION_PREFIX))) + assert(!table.catalogTable.storage.properties.contains("hive.serde")) + assert(table.catalogTable.storage.properties.contains("field.delim")) + catalog.dropTable(testIdent) + } + test("loadTable") { val table = catalog.createTable(testIdent, schema, Array.empty[Transform], emptyProps) val loaded = catalog.loadTable(testIdent) From 00fce8e3f71fb07bafc93f6f3c0c63bcb927c2d3 Mon Sep 17 00:00:00 2001 From: lifumao Date: Sat, 11 Apr 2026 21:22:04 +0800 Subject: [PATCH 2/7] SERDEPROPERTIES are missing when KSHC create table --- .../kyuubi/spark/connector/hive/HiveTableCatalog.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala index c3395767f12..ed803a7b3bd 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala @@ -322,6 +322,7 @@ class HiveTableCatalog(sparkSession: SparkSession) getStorageFormatAndProvider( maybeProvider, location, + tableProperties, toOptions(tableProperties)) val isExternal = properties.containsKey(TableCatalog.PROP_EXTERNAL) val tableType = @@ -583,15 +584,16 @@ private object HiveTableCatalog extends Logging { private def getStorageFormatAndProvider( provider: Option[String], location: Option[String], - options: Map[String, String]): (CatalogStorageFormat, String) = { + options: Map[String, String], + serdeProperties: Map[String, String]): (CatalogStorageFormat, String) = { val nonHiveStorageFormat = CatalogStorageFormat.empty.copy( locationUri = location.map(CatalogUtils.stringToURI), - properties = options) + properties = serdeProperties) val conf = SQLConf.get val defaultHiveStorage = HiveSerDe.getDefaultStorage(conf).copy( locationUri = location.map(CatalogUtils.stringToURI), - properties = options) + properties = serdeProperties) if (provider.isDefined) { (nonHiveStorageFormat, provider.get) From 3d2060c4ba7eaf9e9d78c304e6e00b83ab19e8e6 Mon Sep 17 00:00:00 2001 From: lifumao Date: Sun, 12 Apr 2026 11:34:03 +0800 Subject: [PATCH 3/7] SERDEPROPERTIES are missing when KSHC create table --- .../connector/hive/HiveTableCatalog.scala | 55 ++++++++++++------- 1 file changed, 36 insertions(+), 19 deletions(-) diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala index ed803a7b3bd..dfc643c5b9c 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala @@ -317,13 +317,15 @@ class HiveTableCatalog(sparkSession: SparkSession) val (partitionColumns, maybeBucketSpec) = partitions.toSeq.convertTransforms val location = Option(properties.get(TableCatalog.PROP_LOCATION)) val maybeProvider = Option(properties.get(TableCatalog.PROP_PROVIDER)) - val tableProperties = properties.asScala.toMap + val tableProps = properties.asScala.toMap + val (optionsProps, serdeProps) = toOptionsAndSerdeProps(tableProps) val (storage, provider) = getStorageFormatAndProvider( maybeProvider, location, - tableProperties, - toOptions(tableProperties)) + tableProps, + optionsProps, + serdeProps) val isExternal = properties.containsKey(TableCatalog.PROP_EXTERNAL) val tableType = if (isExternal || location.isDefined) { @@ -340,7 +342,7 @@ class HiveTableCatalog(sparkSession: SparkSession) provider = Some(provider), partitionColumnNames = partitionColumns, bucketSpec = maybeBucketSpec, - properties = tableProperties, + properties = tableProps, tracksPartitionsInCatalog = conf.manageFilesourcePartitions, comment = Option(properties.get(TableCatalog.PROP_COMMENT))) @@ -432,10 +434,24 @@ class HiveTableCatalog(sparkSession: SparkSession) catalog.renameTable(oldIdent.asTableIdentifier, newIdent.asTableIdentifier) } - private def toOptions(properties: Map[String, String]): Map[String, String] = { - properties.filterKeys(_.startsWith(TableCatalog.OPTION_PREFIX)).map { - case (key, value) => key.drop(TableCatalog.OPTION_PREFIX.length) -> value - }.toMap + /** + * Splits properties into optionsProps and serdeProps based on the `options.` prefix. + * + * - optionsProps: keys with "options." prefix whose stripped key does NOT exist in properties, + * indicating they were originally specified via OPTIONS clause. + * - serdeProps: keys with "options." prefix whose stripped key ALREADY exists in properties, + * indicating they were originally specified via SERDEPROPERTIES clause + * + * @param properties the full properties map + * @return a tuple of (optionsProps, serdeProps), both with the "options." prefix stripped + */ + private def toOptionsAndSerdeProps( + properties: Map[String, String]): (Map[String, String], Map[String, String]) = { + val (serdeProps, optionsProps) = properties + .filterKeys(_.startsWith(TableCatalog.OPTION_PREFIX)) + .map { case (key, value) => key.drop(TableCatalog.OPTION_PREFIX.length) -> value } + .partition { case (strippedKey, _) => properties.contains(strippedKey) } + (optionsProps, serdeProps) } override def listNamespaces(): Array[Array[String]] = @@ -584,24 +600,25 @@ private object HiveTableCatalog extends Logging { private def getStorageFormatAndProvider( provider: Option[String], location: Option[String], - options: Map[String, String], - serdeProperties: Map[String, String]): (CatalogStorageFormat, String) = { + tableProps: Map[String, String], + optionsProps: Map[String, String], + serdeProps: Map[String, String]): (CatalogStorageFormat, String) = { val nonHiveStorageFormat = CatalogStorageFormat.empty.copy( locationUri = location.map(CatalogUtils.stringToURI), - properties = serdeProperties) + properties = optionsProps) val conf = SQLConf.get val defaultHiveStorage = HiveSerDe.getDefaultStorage(conf).copy( locationUri = location.map(CatalogUtils.stringToURI), - properties = serdeProperties) + properties = optionsProps) if (provider.isDefined) { (nonHiveStorageFormat, provider.get) - } else if (serdeIsDefined(options)) { - val maybeSerde = options.get("hive.serde") - val maybeStoredAs = options.get("hive.stored-as") - val maybeInputFormat = options.get("hive.input-format") - val maybeOutputFormat = options.get("hive.output-format") + } else if (serdeIsDefined(tableProps)) { + val maybeSerde = tableProps.get("hive.serde") + val maybeStoredAs = tableProps.get("hive.stored-as") + val maybeInputFormat = tableProps.get("hive.input-format") + val maybeOutputFormat = tableProps.get("hive.output-format") val storageFormat = if (maybeStoredAs.isDefined) { // If `STORED AS fileFormat` is used, infer inputFormat, outputFormat and serde from it. HiveSerDe.sourceToSerDe(maybeStoredAs.get) match { @@ -611,7 +628,7 @@ private object HiveTableCatalog extends Logging { outputFormat = hiveSerde.outputFormat.orElse(defaultHiveStorage.outputFormat), // User specified serde takes precedence over the one inferred from file format. serde = maybeSerde.orElse(hiveSerde.serde).orElse(defaultHiveStorage.serde), - properties = defaultHiveStorage.properties) + properties = serdeProps ++ defaultHiveStorage.properties) case _ => throw KyuubiHiveConnectorException(s"Unsupported serde ${maybeSerde.get}.") } } else { @@ -621,7 +638,7 @@ private object HiveTableCatalog extends Logging { outputFormat = maybeOutputFormat.orElse(defaultHiveStorage.outputFormat), serde = maybeSerde.orElse(defaultHiveStorage.serde), - properties = defaultHiveStorage.properties) + properties = serdeProps ++ defaultHiveStorage.properties) } (storageFormat, DDLUtils.HIVE_PROVIDER) } else { From 490009dfb30e7f3440bfbe01657e18b9ac13d9ce Mon Sep 17 00:00:00 2001 From: lifumao Date: Sun, 12 Apr 2026 12:13:52 +0800 Subject: [PATCH 4/7] SERDEPROPERTIES are missing when KSHC create table --- .../apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala index dfc643c5b9c..04701674a69 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala @@ -450,6 +450,7 @@ class HiveTableCatalog(sparkSession: SparkSession) val (serdeProps, optionsProps) = properties .filterKeys(_.startsWith(TableCatalog.OPTION_PREFIX)) .map { case (key, value) => key.drop(TableCatalog.OPTION_PREFIX.length) -> value } + .toMap .partition { case (strippedKey, _) => properties.contains(strippedKey) } (optionsProps, serdeProps) } From 11fc7b18517446c56ab1882d81df20b5dff465d4 Mon Sep 17 00:00:00 2001 From: lifumao Date: Sun, 12 Apr 2026 20:19:57 +0800 Subject: [PATCH 5/7] SERDEPROPERTIES are missing when KSHC create table --- .../kyuubi/spark/connector/hive/HiveTableCatalog.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala index 04701674a69..5f9d735f173 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala @@ -437,9 +437,9 @@ class HiveTableCatalog(sparkSession: SparkSession) /** * Splits properties into optionsProps and serdeProps based on the `options.` prefix. * - * - optionsProps: keys with "options." prefix whose stripped key does NOT exist in properties, + * - optionsProps: keys with "options." prefix whose stripped key ALREADY exist in properties, * indicating they were originally specified via OPTIONS clause. - * - serdeProps: keys with "options." prefix whose stripped key ALREADY exists in properties, + * - serdeProps: keys with "options." prefix whose stripped key does NOT exists in properties, * indicating they were originally specified via SERDEPROPERTIES clause * * @param properties the full properties map @@ -447,7 +447,7 @@ class HiveTableCatalog(sparkSession: SparkSession) */ private def toOptionsAndSerdeProps( properties: Map[String, String]): (Map[String, String], Map[String, String]) = { - val (serdeProps, optionsProps) = properties + val (optionsProps, serdeProps) = properties .filterKeys(_.startsWith(TableCatalog.OPTION_PREFIX)) .map { case (key, value) => key.drop(TableCatalog.OPTION_PREFIX.length) -> value } .toMap From 66b8fcd8865fb90345d5db3905373d6322c0b62b Mon Sep 17 00:00:00 2001 From: lifumao Date: Mon, 13 Apr 2026 11:12:56 +0800 Subject: [PATCH 6/7] add toOptionsAndSerdeProps ut --- .../connector/hive/HiveCatalogSuite.scala | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala index 9a63ef8ea8b..956db2ffc1c 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala @@ -287,6 +287,32 @@ class HiveCatalogSuite extends KyuubiHiveTest { catalog.dropTable(testIdent) } + test("toOptionsAndSerdeProps") { + val properties = Map( + "hive.serde" -> "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe", + "owner" -> "hadoop", + "header" -> "false", + "delimiter" -> "#", + TableCatalog.OPTION_PREFIX + "header" -> "false", + TableCatalog.OPTION_PREFIX + "delimiter" -> "#", + TableCatalog.OPTION_PREFIX + "field.delim" -> ",", + TableCatalog.OPTION_PREFIX + "line.delim" -> "\n") + + val method = classOf[HiveTableCatalog].getDeclaredMethod( + "toOptionsAndSerdeProps", + classOf[scala.collection.immutable.Map[_, _]]) + method.setAccessible(true) + val (optionsProps, serdeProps) = method.invoke(catalog, properties) + .asInstanceOf[(Map[String, String], Map[String, String])] + + assert(optionsProps == Map( + "header" -> "false", + "delimiter" -> "#")) + assert(serdeProps == Map( + "field.delim" -> ",", + "line.delim" -> "\n")) + } + test("createTable: SERDEPROPERTIES") { val properties = new util.HashMap[String, String]() properties.put("hive.serde", "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") From 815f60a083deb2117ddd0397b69d01676a3b36d5 Mon Sep 17 00:00:00 2001 From: lifumao Date: Mon, 13 Apr 2026 12:06:50 +0800 Subject: [PATCH 7/7] add toOptionsAndSerdeProps ut --- .../kyuubi/spark/connector/hive/HiveTableCatalog.scala | 4 ++-- .../kyuubi/spark/connector/hive/HiveCatalogSuite.scala | 7 +------ 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala index 5f9d735f173..f6b6235dc30 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala @@ -440,12 +440,12 @@ class HiveTableCatalog(sparkSession: SparkSession) * - optionsProps: keys with "options." prefix whose stripped key ALREADY exist in properties, * indicating they were originally specified via OPTIONS clause. * - serdeProps: keys with "options." prefix whose stripped key does NOT exists in properties, - * indicating they were originally specified via SERDEPROPERTIES clause + * indicating they were originally specified via SERDEPROPERTIES clause. * * @param properties the full properties map * @return a tuple of (optionsProps, serdeProps), both with the "options." prefix stripped */ - private def toOptionsAndSerdeProps( + private[hive] def toOptionsAndSerdeProps( properties: Map[String, String]): (Map[String, String], Map[String, String]) = { val (optionsProps, serdeProps) = properties .filterKeys(_.startsWith(TableCatalog.OPTION_PREFIX)) diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala index 956db2ffc1c..56a89bac2a3 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala @@ -298,12 +298,7 @@ class HiveCatalogSuite extends KyuubiHiveTest { TableCatalog.OPTION_PREFIX + "field.delim" -> ",", TableCatalog.OPTION_PREFIX + "line.delim" -> "\n") - val method = classOf[HiveTableCatalog].getDeclaredMethod( - "toOptionsAndSerdeProps", - classOf[scala.collection.immutable.Map[_, _]]) - method.setAccessible(true) - val (optionsProps, serdeProps) = method.invoke(catalog, properties) - .asInstanceOf[(Map[String, String], Map[String, String])] + val (optionsProps, serdeProps) = catalog.toOptionsAndSerdeProps(properties) assert(optionsProps == Map( "header" -> "false",