Skip to content

Commit be8bb4f

Browse files
committed
[SPARK-57773][SQL] Strip leaked internal metadata in DataSourceV2Relation.create while preserving column IDs
DataSourceV2Relation.create builds the relation output schema directly from table.columns without removing internal metadata. Any internal metadata key that leaked onto the table columns (the keys in INTERNAL_METADATA_KEYS, such as __metadata_col) therefore surfaces on the relation output and on df.schema, instead of being hidden as intended. This strips internal metadata in create via removeInternalMetadata, but preserves column IDs: FIELD_ID_METADATA_KEY is listed in INTERNAL_METADATA_KEYS so that other paths drop it, while it is also deliberately surfaced onto the relation output (SPARK-57544). removeInternalMetadata gains a keepFieldIds flag (default false, preserving behavior for existing callers) that skips that single key in the same removal pass, so create keeps the IDs without a strip-then-re-add round-trip.
1 parent e28edaf commit be8bb4f

3 files changed

Lines changed: 47 additions & 5 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -251,11 +251,15 @@ package object util extends Logging {
251251
MetadataColumn.PRESERVE_ON_REINSERT
252252
)
253253

254-
def removeInternalMetadata(schema: StructType): StructType = {
254+
def removeInternalMetadata(schema: StructType, keepFieldIds: Boolean = false): StructType = {
255255
StructType(schema.map { field =>
256256
var builder = new MetadataBuilder().withMetadata(field.metadata)
257257
INTERNAL_METADATA_KEYS.foreach { key =>
258-
builder = builder.remove(key)
258+
// Column IDs are listed as internal so most paths drop them, but some paths (e.g. the v2
259+
// relation output) deliberately surface them, so allow callers to keep them.
260+
if (!(keepFieldIds && key == FIELD_ID_METADATA_KEY)) {
261+
builder = builder.remove(key)
262+
}
259263
}
260264
field.copy(metadata = builder.build())
261265
})

sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.plans.QueryPlan
2727
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, ExposesMetadataColumns, Histogram, HistogramBin, LeafNode, LogicalPlan, Statistics}
2828
import org.apache.spark.sql.catalyst.streaming.{StreamingSourceIdentifyingName, Unassigned}
2929
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
30-
import org.apache.spark.sql.catalyst.util.{truncatedString, CharVarcharUtils}
30+
import org.apache.spark.sql.catalyst.util.{removeInternalMetadata, truncatedString, CharVarcharUtils}
3131
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, FunctionCatalog, Identifier, SupportsMetadataColumns, Table, TableCapability, TableCatalog, V2TableUtil}
3232
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper
3333
import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference}
@@ -329,7 +329,13 @@ object DataSourceV2Relation {
329329
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
330330
// The v2 source may return schema containing char/varchar type. We replace char/varchar
331331
// with "annotated" string type here as the query engine doesn't support char/varchar yet.
332-
val schema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(table.columns.asSchema)
332+
// We also strip internal metadata that may have leaked onto the table columns, so it does
333+
// not surface on the relation's output. Column IDs (FIELD_ID_METADATA_KEY) are an exception:
334+
// although the key is listed in INTERNAL_METADATA_KEYS so that other paths drop it, the
335+
// column-ID feature deliberately surfaces field IDs on the relation's output, so we keep them.
336+
val schema = removeInternalMetadata(
337+
CharVarcharUtils.replaceCharVarcharWithStringInSchema(table.columns.asSchema),
338+
keepFieldIds = true)
333339
DataSourceV2Relation(table, toAttributes(schema), catalog, identifier, options, timeTravelSpec)
334340
}
335341

sql/catalyst/src/test/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2RelationSuite.scala

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,16 @@
1717

1818
package org.apache.spark.sql.execution.datasources.v2
1919

20+
import java.util
21+
2022
import org.apache.spark.SparkFunSuite
2123
import org.apache.spark.sql.catalyst.catalog.{CatalogColumnStat, CatalogStatistics}
2224
import org.apache.spark.sql.catalyst.plans.logical.{Histogram, HistogramBin}
25+
import org.apache.spark.sql.catalyst.util.METADATA_COL_ATTR_KEY
26+
import org.apache.spark.sql.connector.catalog.{Column, Table, TableCapability}
2327
import org.apache.spark.sql.connector.expressions.FieldReference
24-
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
28+
import org.apache.spark.sql.types.{IntegerType, MetadataBuilder, StringType, StructField, StructType}
29+
import org.apache.spark.sql.util.CaseInsensitiveStringMap
2530

2631
class DataSourceV2RelationSuite extends SparkFunSuite {
2732

@@ -107,4 +112,31 @@ class DataSourceV2RelationSuite extends SparkFunSuite {
107112
val idV2NoHist = colStats.get(FieldReference.column("id"))
108113
assert(!idV2NoHist.histogram().isPresent)
109114
}
115+
116+
test("create strips leaked internal metadata but preserves column IDs") {
117+
// A column carrying both a column ID (surfaced on purpose) and a leaked internal
118+
// metadata key (METADATA_COL_ATTR_KEY, listed in INTERNAL_METADATA_KEYS).
119+
val leakedMetadata = new MetadataBuilder()
120+
.putString(METADATA_COL_ATTR_KEY, "leaked")
121+
.build()
122+
val table = new Table {
123+
override def name(): String = "t"
124+
override def columns(): Array[Column] = Array(
125+
Column.builderFor("id", IntegerType)
126+
.id("1")
127+
.metadata(leakedMetadata.json)
128+
.build())
129+
override def capabilities(): util.Set[TableCapability] =
130+
util.Collections.emptySet[TableCapability]()
131+
}
132+
133+
val relation =
134+
DataSourceV2Relation.create(table, None, None, CaseInsensitiveStringMap.empty())
135+
val field = relation.schema.head
136+
137+
// The leaked internal metadata key is stripped from the relation output ...
138+
assert(!field.metadata.contains(METADATA_COL_ATTR_KEY))
139+
// ... but the column ID is preserved.
140+
assert(field.id.contains("1"))
141+
}
110142
}

0 commit comments

Comments
 (0)