Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -251,11 +251,15 @@ package object util extends Logging {
MetadataColumn.PRESERVE_ON_REINSERT
)

def removeInternalMetadata(schema: StructType): StructType = {
def removeInternalMetadata(schema: StructType, keepFieldIds: Boolean = false): StructType = {
StructType(schema.map { field =>
var builder = new MetadataBuilder().withMetadata(field.metadata)
INTERNAL_METADATA_KEYS.foreach { key =>
builder = builder.remove(key)
// Column IDs are listed as internal so most paths drop them, but some paths (e.g. the v2
// relation output) deliberately surface them, so allow callers to keep them.
if (!(keepFieldIds && key == FIELD_ID_METADATA_KEY)) {
builder = builder.remove(key)
}
}
field.copy(metadata = builder.build())
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, ExposesMetadataColumns, Histogram, HistogramBin, LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.streaming.{StreamingSourceIdentifyingName, Unassigned}
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
import org.apache.spark.sql.catalyst.util.{truncatedString, CharVarcharUtils}
import org.apache.spark.sql.catalyst.util.{removeInternalMetadata, truncatedString, CharVarcharUtils}
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, FunctionCatalog, Identifier, SupportsMetadataColumns, Table, TableCapability, TableCatalog, V2TableUtil}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper
import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference}
Expand Down Expand Up @@ -329,7 +329,13 @@ object DataSourceV2Relation {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
// The v2 source may return schema containing char/varchar type. We replace char/varchar
// with "annotated" string type here as the query engine doesn't support char/varchar yet.
val schema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(table.columns.asSchema)
// We also strip internal metadata that may have leaked onto the table columns, so it does
// not surface on the relation's output. Column IDs (FIELD_ID_METADATA_KEY) are an exception:
// although the key is listed in INTERNAL_METADATA_KEYS so that other paths drop it, the
// column-ID feature deliberately surfaces field IDs on the relation's output, so we keep them.
val schema = removeInternalMetadata(
CharVarcharUtils.replaceCharVarcharWithStringInSchema(table.columns.asSchema),
keepFieldIds = true)
DataSourceV2Relation(table, toAttributes(schema), catalog, identifier, options, timeTravelSpec)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@

package org.apache.spark.sql.execution.datasources.v2

import java.util

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.catalog.{CatalogColumnStat, CatalogStatistics}
import org.apache.spark.sql.catalyst.plans.logical.{Histogram, HistogramBin}
import org.apache.spark.sql.catalyst.util.FieldMetadataUtils.FIELD_ID_METADATA_KEY
import org.apache.spark.sql.catalyst.util.INTERNAL_METADATA_KEYS
import org.apache.spark.sql.connector.catalog.{Column, Table, TableCapability}
import org.apache.spark.sql.connector.expressions.FieldReference
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.types.{IntegerType, MetadataBuilder, StringType, StructField, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap

class DataSourceV2RelationSuite extends SparkFunSuite {

Expand Down Expand Up @@ -107,4 +113,37 @@ class DataSourceV2RelationSuite extends SparkFunSuite {
val idV2NoHist = colStats.get(FieldReference.column("id"))
assert(!idV2NoHist.histogram().isPresent)
}

test("create strips leaked internal metadata but preserves column IDs") {
// A column carrying both a column ID (surfaced on purpose) and every internal metadata key
// (listed in INTERNAL_METADATA_KEYS), simulating a v2 source that leaks internal metadata.
// The column ID key is excluded here since it is set via `.id(...)` below.
val leakedBuilder = new MetadataBuilder()
INTERNAL_METADATA_KEYS.filter(_ != FIELD_ID_METADATA_KEY).foreach { key =>
leakedBuilder.putString(key, "leaked")
}
val leakedMetadata = leakedBuilder.build()
val table = new Table {
override def name(): String = "t"
override def columns(): Array[Column] = Array(
Column.builderFor("id", IntegerType)
.id("1")
.metadata(leakedMetadata.json)
.build())
override def capabilities(): util.Set[TableCapability] =
util.Set.of[TableCapability]()
}

val relation =
DataSourceV2Relation.create(table, None, None, CaseInsensitiveStringMap.empty())
val field = relation.schema.head

// All leaked internal metadata keys are stripped from the relation output ...
INTERNAL_METADATA_KEYS.filter(_ != FIELD_ID_METADATA_KEY).foreach { key =>
assert(!field.metadata.contains(key), s"internal metadata key $key should be stripped")
}
// ... but the column ID is preserved.
assert(field.id.contains("1"))
assert(field.metadata.contains(FIELD_ID_METADATA_KEY))
}
}