diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala index 1a2a8548bd7..f6b6235dc30 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala @@ -317,12 +317,15 @@ class HiveTableCatalog(sparkSession: SparkSession) val (partitionColumns, maybeBucketSpec) = partitions.toSeq.convertTransforms val location = Option(properties.get(TableCatalog.PROP_LOCATION)) val maybeProvider = Option(properties.get(TableCatalog.PROP_PROVIDER)) + val tableProps = properties.asScala.toMap + val (optionsProps, serdeProps) = toOptionsAndSerdeProps(tableProps) val (storage, provider) = getStorageFormatAndProvider( maybeProvider, location, - properties.asScala.toMap) - val tableProperties = properties.asScala + tableProps, + optionsProps, + serdeProps) val isExternal = properties.containsKey(TableCatalog.PROP_EXTERNAL) val tableType = if (isExternal || location.isDefined) { @@ -339,7 +342,7 @@ class HiveTableCatalog(sparkSession: SparkSession) provider = Some(provider), partitionColumnNames = partitionColumns, bucketSpec = maybeBucketSpec, - properties = tableProperties.toMap, + properties = tableProps, tracksPartitionsInCatalog = conf.manageFilesourcePartitions, comment = Option(properties.get(TableCatalog.PROP_COMMENT))) @@ -431,10 +434,25 @@ class HiveTableCatalog(sparkSession: SparkSession) catalog.renameTable(oldIdent.asTableIdentifier, newIdent.asTableIdentifier) } - private def toOptions(properties: Map[String, String]): Map[String, String] = { - properties.filterKeys(_.startsWith(TableCatalog.OPTION_PREFIX)).map { - case (key, value) => key.drop(TableCatalog.OPTION_PREFIX.length) -> value - }.toMap + /** + * Splits properties into optionsProps and serdeProps based on the `options.` prefix. + * + * - optionsProps: keys with "options." prefix whose stripped key ALREADY exist in properties, + * indicating they were originally specified via OPTIONS clause. + * - serdeProps: keys with "options." prefix whose stripped key does NOT exists in properties, + * indicating they were originally specified via SERDEPROPERTIES clause. + * + * @param properties the full properties map + * @return a tuple of (optionsProps, serdeProps), both with the "options." prefix stripped + */ + private[hive] def toOptionsAndSerdeProps( + properties: Map[String, String]): (Map[String, String], Map[String, String]) = { + val (optionsProps, serdeProps) = properties + .filterKeys(_.startsWith(TableCatalog.OPTION_PREFIX)) + .map { case (key, value) => key.drop(TableCatalog.OPTION_PREFIX.length) -> value } + .toMap + .partition { case (strippedKey, _) => properties.contains(strippedKey) } + (optionsProps, serdeProps) } override def listNamespaces(): Array[Array[String]] = @@ -583,23 +601,25 @@ private object HiveTableCatalog extends Logging { private def getStorageFormatAndProvider( provider: Option[String], location: Option[String], - options: Map[String, String]): (CatalogStorageFormat, String) = { + tableProps: Map[String, String], + optionsProps: Map[String, String], + serdeProps: Map[String, String]): (CatalogStorageFormat, String) = { val nonHiveStorageFormat = CatalogStorageFormat.empty.copy( locationUri = location.map(CatalogUtils.stringToURI), - properties = options) + properties = optionsProps) val conf = SQLConf.get val defaultHiveStorage = HiveSerDe.getDefaultStorage(conf).copy( locationUri = location.map(CatalogUtils.stringToURI), - properties = options) + properties = optionsProps) if (provider.isDefined) { (nonHiveStorageFormat, provider.get) - } else if (serdeIsDefined(options)) { - val maybeSerde = options.get("hive.serde") - val maybeStoredAs = options.get("hive.stored-as") - val maybeInputFormat = options.get("hive.input-format") - val maybeOutputFormat = options.get("hive.output-format") + } else if (serdeIsDefined(tableProps)) { + val maybeSerde = tableProps.get("hive.serde") + val maybeStoredAs = tableProps.get("hive.stored-as") + val maybeInputFormat = tableProps.get("hive.input-format") + val maybeOutputFormat = tableProps.get("hive.output-format") val storageFormat = if (maybeStoredAs.isDefined) { // If `STORED AS fileFormat` is used, infer inputFormat, outputFormat and serde from it. HiveSerDe.sourceToSerDe(maybeStoredAs.get) match { @@ -609,7 +629,7 @@ private object HiveTableCatalog extends Logging { outputFormat = hiveSerde.outputFormat.orElse(defaultHiveStorage.outputFormat), // User specified serde takes precedence over the one inferred from file format. serde = maybeSerde.orElse(hiveSerde.serde).orElse(defaultHiveStorage.serde), - properties = options ++ defaultHiveStorage.properties) + properties = serdeProps ++ defaultHiveStorage.properties) case _ => throw KyuubiHiveConnectorException(s"Unsupported serde ${maybeSerde.get}.") } } else { @@ -619,7 +639,7 @@ private object HiveTableCatalog extends Logging { outputFormat = maybeOutputFormat.orElse(defaultHiveStorage.outputFormat), serde = maybeSerde.orElse(defaultHiveStorage.serde), - properties = options ++ defaultHiveStorage.properties) + properties = serdeProps ++ defaultHiveStorage.properties) } (storageFormat, DDLUtils.HIVE_PROVIDER) } else { diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala index e4c8bb918a9..56a89bac2a3 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala @@ -287,6 +287,46 @@ class HiveCatalogSuite extends KyuubiHiveTest { catalog.dropTable(testIdent) } + test("toOptionsAndSerdeProps") { + val properties = Map( + "hive.serde" -> "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe", + "owner" -> "hadoop", + "header" -> "false", + "delimiter" -> "#", + TableCatalog.OPTION_PREFIX + "header" -> "false", + TableCatalog.OPTION_PREFIX + "delimiter" -> "#", + TableCatalog.OPTION_PREFIX + "field.delim" -> ",", + TableCatalog.OPTION_PREFIX + "line.delim" -> "\n") + + val (optionsProps, serdeProps) = catalog.toOptionsAndSerdeProps(properties) + + assert(optionsProps == Map( + "header" -> "false", + "delimiter" -> "#")) + assert(serdeProps == Map( + "field.delim" -> ",", + "line.delim" -> "\n")) + } + + test("createTable: SERDEPROPERTIES") { + val properties = new util.HashMap[String, String]() + properties.put("hive.serde", "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") + properties.put(TableCatalog.OPTION_PREFIX + "field.delim", ",") + assert(!catalog.tableExists(testIdent)) + + val table = catalog.createTable( + testIdent, + schema, + Array.empty[Transform], + properties).asInstanceOf[HiveTable] + + assert(!table.catalogTable.storage.properties.keys.exists( + _.startsWith(TableCatalog.OPTION_PREFIX))) + assert(!table.catalogTable.storage.properties.contains("hive.serde")) + assert(table.catalogTable.storage.properties.contains("field.delim")) + catalog.dropTable(testIdent) + } + test("loadTable") { val table = catalog.createTable(testIdent, schema, Array.empty[Transform], emptyProps) val loaded = catalog.loadTable(testIdent)