From be8bb4f21055f6183334f625ce5ea6ecd56ed570 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 30 Jun 2026 04:56:46 +0000 Subject: [PATCH 1/2] [SPARK-57773][SQL] Strip leaked internal metadata in DataSourceV2Relation.create while preserving column IDs DataSourceV2Relation.create builds the relation output schema directly from table.columns without removing internal metadata. Any internal metadata key that leaked onto the table columns (the keys in INTERNAL_METADATA_KEYS, such as __metadata_col) therefore surfaces on the relation output and on df.schema, instead of being hidden as intended. This strips internal metadata in create via removeInternalMetadata, but preserves column IDs: FIELD_ID_METADATA_KEY is listed in INTERNAL_METADATA_KEYS so that other paths drop it, while it is also deliberately surfaced onto the relation output (SPARK-57544). removeInternalMetadata gains a keepFieldIds flag (default false, preserving behavior for existing callers) that skips that single key in the same removal pass, so create keeps the IDs without a strip-then-re-add round-trip. --- .../spark/sql/catalyst/util/package.scala | 8 +++-- .../datasources/v2/DataSourceV2Relation.scala | 10 ++++-- .../v2/DataSourceV2RelationSuite.scala | 34 ++++++++++++++++++- 3 files changed, 47 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala index 5dc3962821a03..4fd85ef4923d2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala @@ -251,11 +251,15 @@ package object util extends Logging { MetadataColumn.PRESERVE_ON_REINSERT ) - def removeInternalMetadata(schema: StructType): StructType = { + def removeInternalMetadata(schema: StructType, keepFieldIds: Boolean = false): StructType = { StructType(schema.map { field => var builder = new MetadataBuilder().withMetadata(field.metadata) INTERNAL_METADATA_KEYS.foreach { key => - builder = builder.remove(key) + // Column IDs are listed as internal so most paths drop them, but some paths (e.g. the v2 + // relation output) deliberately surface them, so allow callers to keep them. + if (!(keepFieldIds && key == FIELD_ID_METADATA_KEY)) { + builder = builder.remove(key) + } } field.copy(metadata = builder.build()) }) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index 19371dcb94dec..0b7c829939ff7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, ExposesMetadataColumns, Histogram, HistogramBin, LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.catalyst.streaming.{StreamingSourceIdentifyingName, Unassigned} import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes -import org.apache.spark.sql.catalyst.util.{truncatedString, CharVarcharUtils} +import org.apache.spark.sql.catalyst.util.{removeInternalMetadata, truncatedString, CharVarcharUtils} import org.apache.spark.sql.connector.catalog.{CatalogPlugin, FunctionCatalog, Identifier, SupportsMetadataColumns, Table, TableCapability, TableCatalog, V2TableUtil} import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference} @@ -329,7 +329,13 @@ object DataSourceV2Relation { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ // The v2 source may return schema containing char/varchar type. We replace char/varchar // with "annotated" string type here as the query engine doesn't support char/varchar yet. - val schema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(table.columns.asSchema) + // We also strip internal metadata that may have leaked onto the table columns, so it does + // not surface on the relation's output. Column IDs (FIELD_ID_METADATA_KEY) are an exception: + // although the key is listed in INTERNAL_METADATA_KEYS so that other paths drop it, the + // column-ID feature deliberately surfaces field IDs on the relation's output, so we keep them. + val schema = removeInternalMetadata( + CharVarcharUtils.replaceCharVarcharWithStringInSchema(table.columns.asSchema), + keepFieldIds = true) DataSourceV2Relation(table, toAttributes(schema), catalog, identifier, options, timeTravelSpec) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2RelationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2RelationSuite.scala index a1f1c85fe4d5a..898bb347b0d20 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2RelationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2RelationSuite.scala @@ -17,11 +17,16 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util + import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.catalog.{CatalogColumnStat, CatalogStatistics} import org.apache.spark.sql.catalyst.plans.logical.{Histogram, HistogramBin} +import org.apache.spark.sql.catalyst.util.METADATA_COL_ATTR_KEY +import org.apache.spark.sql.connector.catalog.{Column, Table, TableCapability} import org.apache.spark.sql.connector.expressions.FieldReference -import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} +import org.apache.spark.sql.types.{IntegerType, MetadataBuilder, StringType, StructField, StructType} +import org.apache.spark.sql.util.CaseInsensitiveStringMap class DataSourceV2RelationSuite extends SparkFunSuite { @@ -107,4 +112,31 @@ class DataSourceV2RelationSuite extends SparkFunSuite { val idV2NoHist = colStats.get(FieldReference.column("id")) assert(!idV2NoHist.histogram().isPresent) } + + test("create strips leaked internal metadata but preserves column IDs") { + // A column carrying both a column ID (surfaced on purpose) and a leaked internal + // metadata key (METADATA_COL_ATTR_KEY, listed in INTERNAL_METADATA_KEYS). + val leakedMetadata = new MetadataBuilder() + .putString(METADATA_COL_ATTR_KEY, "leaked") + .build() + val table = new Table { + override def name(): String = "t" + override def columns(): Array[Column] = Array( + Column.builderFor("id", IntegerType) + .id("1") + .metadata(leakedMetadata.json) + .build()) + override def capabilities(): util.Set[TableCapability] = + util.Collections.emptySet[TableCapability]() + } + + val relation = + DataSourceV2Relation.create(table, None, None, CaseInsensitiveStringMap.empty()) + val field = relation.schema.head + + // The leaked internal metadata key is stripped from the relation output ... + assert(!field.metadata.contains(METADATA_COL_ATTR_KEY)) + // ... but the column ID is preserved. + assert(field.id.contains("1")) + } } From 6834b782ecab48b3b4dbe34134e116729d023a44 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 30 Jun 2026 09:38:55 +0000 Subject: [PATCH 2/2] [SPARK-57773][SQL][FOLLOWUP] Fix scalastyle and broaden internal-metadata test Use util.Set.of() instead of Collections.emptySet() to satisfy scalastyle, and assert that create() strips every key in INTERNAL_METADATA_KEYS (except the deliberately-surfaced column ID) rather than only METADATA_COL_ATTR_KEY. Co-authored-by: Isaac --- .../v2/DataSourceV2RelationSuite.scala | 25 ++++++++++++------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2RelationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2RelationSuite.scala index 898bb347b0d20..46ed870cb3221 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2RelationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2RelationSuite.scala @@ -22,7 +22,8 @@ import java.util import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.catalog.{CatalogColumnStat, CatalogStatistics} import org.apache.spark.sql.catalyst.plans.logical.{Histogram, HistogramBin} -import org.apache.spark.sql.catalyst.util.METADATA_COL_ATTR_KEY +import org.apache.spark.sql.catalyst.util.FieldMetadataUtils.FIELD_ID_METADATA_KEY +import org.apache.spark.sql.catalyst.util.INTERNAL_METADATA_KEYS import org.apache.spark.sql.connector.catalog.{Column, Table, TableCapability} import org.apache.spark.sql.connector.expressions.FieldReference import org.apache.spark.sql.types.{IntegerType, MetadataBuilder, StringType, StructField, StructType} @@ -114,11 +115,14 @@ class DataSourceV2RelationSuite extends SparkFunSuite { } test("create strips leaked internal metadata but preserves column IDs") { - // A column carrying both a column ID (surfaced on purpose) and a leaked internal - // metadata key (METADATA_COL_ATTR_KEY, listed in INTERNAL_METADATA_KEYS). - val leakedMetadata = new MetadataBuilder() - .putString(METADATA_COL_ATTR_KEY, "leaked") - .build() + // A column carrying both a column ID (surfaced on purpose) and every internal metadata key + // (listed in INTERNAL_METADATA_KEYS), simulating a v2 source that leaks internal metadata. + // The column ID key is excluded here since it is set via `.id(...)` below. + val leakedBuilder = new MetadataBuilder() + INTERNAL_METADATA_KEYS.filter(_ != FIELD_ID_METADATA_KEY).foreach { key => + leakedBuilder.putString(key, "leaked") + } + val leakedMetadata = leakedBuilder.build() val table = new Table { override def name(): String = "t" override def columns(): Array[Column] = Array( @@ -127,16 +131,19 @@ class DataSourceV2RelationSuite extends SparkFunSuite { .metadata(leakedMetadata.json) .build()) override def capabilities(): util.Set[TableCapability] = - util.Collections.emptySet[TableCapability]() + util.Set.of[TableCapability]() } val relation = DataSourceV2Relation.create(table, None, None, CaseInsensitiveStringMap.empty()) val field = relation.schema.head - // The leaked internal metadata key is stripped from the relation output ... - assert(!field.metadata.contains(METADATA_COL_ATTR_KEY)) + // All leaked internal metadata keys are stripped from the relation output ... + INTERNAL_METADATA_KEYS.filter(_ != FIELD_ID_METADATA_KEY).foreach { key => + assert(!field.metadata.contains(key), s"internal metadata key $key should be stripped") + } // ... but the column ID is preserved. assert(field.id.contains("1")) + assert(field.metadata.contains(FIELD_ID_METADATA_KEY)) } }