diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala index 5dc3962821a03..4fd85ef4923d2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala @@ -251,11 +251,15 @@ package object util extends Logging { MetadataColumn.PRESERVE_ON_REINSERT ) - def removeInternalMetadata(schema: StructType): StructType = { + def removeInternalMetadata(schema: StructType, keepFieldIds: Boolean = false): StructType = { StructType(schema.map { field => var builder = new MetadataBuilder().withMetadata(field.metadata) INTERNAL_METADATA_KEYS.foreach { key => - builder = builder.remove(key) + // Column IDs are listed as internal so most paths drop them, but some paths (e.g. the v2 + // relation output) deliberately surface them, so allow callers to keep them. + if (!(keepFieldIds && key == FIELD_ID_METADATA_KEY)) { + builder = builder.remove(key) + } } field.copy(metadata = builder.build()) }) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index 19371dcb94dec..0b7c829939ff7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, ExposesMetadataColumns, Histogram, HistogramBin, LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.catalyst.streaming.{StreamingSourceIdentifyingName, Unassigned} import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes -import org.apache.spark.sql.catalyst.util.{truncatedString, CharVarcharUtils} +import org.apache.spark.sql.catalyst.util.{removeInternalMetadata, truncatedString, CharVarcharUtils} import org.apache.spark.sql.connector.catalog.{CatalogPlugin, FunctionCatalog, Identifier, SupportsMetadataColumns, Table, TableCapability, TableCatalog, V2TableUtil} import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference} @@ -329,7 +329,13 @@ object DataSourceV2Relation { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ // The v2 source may return schema containing char/varchar type. We replace char/varchar // with "annotated" string type here as the query engine doesn't support char/varchar yet. - val schema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(table.columns.asSchema) + // We also strip internal metadata that may have leaked onto the table columns, so it does + // not surface on the relation's output. Column IDs (FIELD_ID_METADATA_KEY) are an exception: + // although the key is listed in INTERNAL_METADATA_KEYS so that other paths drop it, the + // column-ID feature deliberately surfaces field IDs on the relation's output, so we keep them. + val schema = removeInternalMetadata( + CharVarcharUtils.replaceCharVarcharWithStringInSchema(table.columns.asSchema), + keepFieldIds = true) DataSourceV2Relation(table, toAttributes(schema), catalog, identifier, options, timeTravelSpec) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2RelationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2RelationSuite.scala index a1f1c85fe4d5a..46ed870cb3221 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2RelationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2RelationSuite.scala @@ -17,11 +17,17 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util + import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.catalog.{CatalogColumnStat, CatalogStatistics} import org.apache.spark.sql.catalyst.plans.logical.{Histogram, HistogramBin} +import org.apache.spark.sql.catalyst.util.FieldMetadataUtils.FIELD_ID_METADATA_KEY +import org.apache.spark.sql.catalyst.util.INTERNAL_METADATA_KEYS +import org.apache.spark.sql.connector.catalog.{Column, Table, TableCapability} import org.apache.spark.sql.connector.expressions.FieldReference -import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} +import org.apache.spark.sql.types.{IntegerType, MetadataBuilder, StringType, StructField, StructType} +import org.apache.spark.sql.util.CaseInsensitiveStringMap class DataSourceV2RelationSuite extends SparkFunSuite { @@ -107,4 +113,37 @@ class DataSourceV2RelationSuite extends SparkFunSuite { val idV2NoHist = colStats.get(FieldReference.column("id")) assert(!idV2NoHist.histogram().isPresent) } + + test("create strips leaked internal metadata but preserves column IDs") { + // A column carrying both a column ID (surfaced on purpose) and every internal metadata key + // (listed in INTERNAL_METADATA_KEYS), simulating a v2 source that leaks internal metadata. + // The column ID key is excluded here since it is set via `.id(...)` below. + val leakedBuilder = new MetadataBuilder() + INTERNAL_METADATA_KEYS.filter(_ != FIELD_ID_METADATA_KEY).foreach { key => + leakedBuilder.putString(key, "leaked") + } + val leakedMetadata = leakedBuilder.build() + val table = new Table { + override def name(): String = "t" + override def columns(): Array[Column] = Array( + Column.builderFor("id", IntegerType) + .id("1") + .metadata(leakedMetadata.json) + .build()) + override def capabilities(): util.Set[TableCapability] = + util.Set.of[TableCapability]() + } + + val relation = + DataSourceV2Relation.create(table, None, None, CaseInsensitiveStringMap.empty()) + val field = relation.schema.head + + // All leaked internal metadata keys are stripped from the relation output ... + INTERNAL_METADATA_KEYS.filter(_ != FIELD_ID_METADATA_KEY).foreach { key => + assert(!field.metadata.contains(key), s"internal metadata key $key should be stripped") + } + // ... but the column ID is preserved. + assert(field.id.contains("1")) + assert(field.metadata.contains(FIELD_ID_METADATA_KEY)) + } }