Skip to content

Commit 64197c9

Browse files
aokolnychyigengliangwang
authored andcommitted
[SPARK-57544][SQL] Rework column ID validation for nested fields in DSv2
### What changes were proposed in this pull request? This PR reworks column ID validation for nested fields in DSv2. ### Why are the changes needed? The original implementation detected dropped-and-re-added columns by comparing top-level Column.id() strings in a dedicated validateColumnIds pass, but this approach had no visibility into nested struct fields, array elements, or map keys/values. To work around this limitation, connectors had to encode nested field IDs into the top-level ID string (as demonstrated by ComposedColumnIdTableCatalog), placing an unreasonable burden on connector authors and making the feature fragile by design. The new mechanism stores field IDs in `StructField` metadata and validates within `validateSchemaCompatibility`. ### Does this PR introduce _any_ user-facing change? Yes but it targets unreleased functionality and must be cherry picked to 4.2. ### How was this patch tested? Existing and new tests. ### Was this patch authored or co-authored using generative AI tooling? Claude Code v2.1.183 Closes #56619 from aokolnychyi/spark-57544. Authored-by: Anton Okolnychyi <aokolnychyi@apache.org> Signed-off-by: Gengliang Wang <gengliang@apache.org>
1 parent d52093a commit 64197c9

35 files changed

Lines changed: 1165 additions & 859 deletions

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3015,12 +3015,6 @@
30153015
"<errors>"
30163016
]
30173017
},
3018-
"COLUMN_ID_MISMATCH" : {
3019-
"message" : [
3020-
"Column IDs have changed:",
3021-
"<errors>"
3022-
]
3023-
},
30243018
"METADATA_COLUMNS_MISMATCH" : {
30253019
"message" : [
30263020
"Metadata columns have changed:",
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.util
19+
20+
object FieldMetadataUtils {
21+
// Metadata key for the field ID used to track column identity across schema evolution
22+
val FIELD_ID_METADATA_KEY = "__FIELD_ID"
23+
}

sql/api/src/main/scala/org/apache/spark/sql/types/StructField.scala

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.json4s.JsonDSL._
2626
import org.apache.spark.SparkException
2727
import org.apache.spark.annotation.Stable
2828
import org.apache.spark.sql.catalyst.util.{CollationFactory, QuotingUtils, StringConcat}
29+
import org.apache.spark.sql.catalyst.util.FieldMetadataUtils.FIELD_ID_METADATA_KEY
2930
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumnsUtils.{CURRENT_DEFAULT_COLUMN_METADATA_KEY, EXISTS_DEFAULT_COLUMN_METADATA_KEY}
3031
import org.apache.spark.util.SparkSchemaUtils
3132

@@ -243,6 +244,43 @@ case class StructField(
243244
metadata.contains(EXISTS_DEFAULT_COLUMN_METADATA_KEY)
244245
}
245246

247+
/**
248+
* Updates the field with an ID for column identity tracking.
249+
*/
250+
def withId(id: String): StructField = {
251+
val newMetadata = new MetadataBuilder()
252+
.withMetadata(metadata)
253+
.putString(FIELD_ID_METADATA_KEY, id)
254+
.build()
255+
copy(metadata = newMetadata)
256+
}
257+
258+
/**
259+
* Returns the ID of this field, if set.
260+
*/
261+
def id: Option[String] = {
262+
if (metadata.contains(FIELD_ID_METADATA_KEY)) {
263+
Some(metadata.getString(FIELD_ID_METADATA_KEY))
264+
} else {
265+
None
266+
}
267+
}
268+
269+
/**
270+
* Returns a copy of this field with the field ID removed, or this field if no ID is set.
271+
*/
272+
def clearId(): StructField = {
273+
if (metadata.contains(FIELD_ID_METADATA_KEY)) {
274+
val newMetadata = new MetadataBuilder()
275+
.withMetadata(metadata)
276+
.remove(FIELD_ID_METADATA_KEY)
277+
.build()
278+
copy(metadata = newMetadata)
279+
} else {
280+
this
281+
}
282+
}
283+
246284
private def getDDLDefault = getDefault()
247285
.orElse(getCurrentDefaultValue())
248286
.map(" DEFAULT " + _)

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java

Lines changed: 154 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,17 @@
1818
package org.apache.spark.sql.connector.catalog;
1919

2020
import java.util.Map;
21+
import java.util.Objects;
22+
import java.util.stream.Stream;
2123
import javax.annotation.Nullable;
2224

25+
import org.apache.spark.SparkIllegalArgumentException;
2326
import org.apache.spark.annotation.Evolving;
2427
import org.apache.spark.sql.connector.expressions.Transform;
2528
import org.apache.spark.sql.internal.connector.ColumnImpl;
2629
import org.apache.spark.sql.types.DataType;
30+
import org.apache.spark.sql.types.StructField;
31+
import org.apache.spark.sql.util.SchemaUtils;
2732

2833
/**
2934
* An interface representing a column of a {@link Table}. It defines basic properties of a column,
@@ -40,11 +45,11 @@
4045
public interface Column {
4146

4247
static Column create(String name, DataType dataType) {
43-
return create(name, dataType, true);
48+
return builderFor(name, dataType).build();
4449
}
4550

4651
static Column create(String name, DataType dataType, boolean nullable) {
47-
return create(name, dataType, nullable, null, null);
52+
return builderFor(name, dataType).nullable(nullable).build();
4853
}
4954

5055
static Column create(
@@ -53,16 +58,11 @@ static Column create(
5358
boolean nullable,
5459
String comment,
5560
String metadataInJSON) {
56-
return new ColumnImpl(
57-
name,
58-
dataType,
59-
nullable,
60-
comment,
61-
/* defaultValue = */ null,
62-
/* generationExpression = */ null,
63-
/* identityColumnSpec = */ null,
64-
metadataInJSON,
65-
/* id = */ null);
61+
return builderFor(name, dataType)
62+
.nullable(nullable)
63+
.comment(comment)
64+
.metadata(metadataInJSON)
65+
.build();
6666
}
6767

6868
static Column create(
@@ -72,88 +72,72 @@ static Column create(
7272
String comment,
7373
ColumnDefaultValue defaultValue,
7474
String metadataInJSON) {
75-
return new ColumnImpl(
76-
name,
77-
dataType,
78-
nullable,
79-
comment,
80-
defaultValue,
81-
/* generationExpression = */ null,
82-
/* identityColumnSpec = */ null,
83-
metadataInJSON,
84-
/* id = */ null);
75+
return builderFor(name, dataType)
76+
.nullable(nullable)
77+
.comment(comment)
78+
.defaultValue(defaultValue)
79+
.metadata(metadataInJSON)
80+
.build();
8581
}
8682

87-
/**
88-
* Creates a column with a generation expression in SQL string form.
89-
*
90-
* @since 4.3.0
91-
* @deprecated Use
92-
* {@link #create(String, DataType, boolean, String, GenerationExpression, String)} instead.
93-
*/
94-
@Deprecated
9583
static Column create(
9684
String name,
9785
DataType dataType,
9886
boolean nullable,
9987
String comment,
10088
String generationExpression,
10189
String metadataInJSON) {
102-
GenerationExpression genExpr = generationExpression != null
103-
? new GenerationExpression(generationExpression) : null;
104-
return new ColumnImpl(
105-
name,
106-
dataType,
107-
nullable,
108-
comment,
109-
/* defaultValue = */ null,
110-
genExpr,
111-
/* identityColumnSpec = */ null,
112-
metadataInJSON,
113-
/* id = */ null);
90+
return builderFor(name, dataType)
91+
.nullable(nullable)
92+
.comment(comment)
93+
.generationExpression(generationExpression)
94+
.metadata(metadataInJSON)
95+
.build();
11496
}
11597

116-
/**
117-
* Creates a column with a generation expression object.
118-
*
119-
* @since 4.3.0
120-
*/
12198
static Column create(
12299
String name,
123100
DataType dataType,
124101
boolean nullable,
125102
String comment,
126-
GenerationExpression generationExpression,
103+
IdentityColumnSpec identityColumnSpec,
127104
String metadataInJSON) {
128-
return new ColumnImpl(
129-
name,
130-
dataType,
131-
nullable,
132-
comment,
133-
/* defaultValue = */ null,
134-
generationExpression,
135-
/* identityColumnSpec = */ null,
136-
metadataInJSON,
137-
/* id = */ null);
105+
return builderFor(name, dataType)
106+
.nullable(nullable)
107+
.comment(comment)
108+
.identityColumnSpec(identityColumnSpec)
109+
.metadata(metadataInJSON)
110+
.build();
138111
}
139112

140-
static Column create(
141-
String name,
142-
DataType dataType,
143-
boolean nullable,
144-
String comment,
145-
IdentityColumnSpec identityColumnSpec,
146-
String metadataInJSON) {
147-
return new ColumnImpl(
148-
name,
149-
dataType,
150-
nullable,
151-
comment,
152-
/* defaultValue = */ null,
153-
/* generationExpression = */ null,
154-
identityColumnSpec,
155-
metadataInJSON,
156-
/* id = */ null);
113+
/**
114+
* Creates a builder for a new column with the given name and data type.
115+
*
116+
* @param name the name of the column
117+
* @param dataType the data type of the column
118+
* @return a new builder
119+
* @since 4.2.0
120+
*/
121+
static Builder builderFor(String name, DataType dataType) {
122+
return new Builder(name, dataType);
123+
}
124+
125+
/**
126+
* Creates a builder with pre-populated info from an existing column.
127+
*
128+
* @param column the source column
129+
* @return a new builder seeded with the column's current state
130+
* @since 4.2.0
131+
*/
132+
static Builder builderFrom(Column column) {
133+
return new Builder(column.name(), column.dataType())
134+
.nullable(column.nullable())
135+
.comment(column.comment())
136+
.defaultValue(column.defaultValue())
137+
.generationExpression(column.columnGenerationExpression())
138+
.identityColumnSpec(column.identityColumnSpec())
139+
.metadata(column.metadataInJSON())
140+
.id(column.id());
157141
}
158142

159143
/**
@@ -243,12 +227,104 @@ default GenerationExpression columnGenerationExpression() {
243227
* others.
244228
* <p>
245229
* This API covers top-level columns only. Nested struct fields, array elements, and map
246-
* keys/values do not have separate IDs. Connectors that track nested field IDs can encode
247-
* them into the returned top-level Column ID string to detect nested changes, since Spark
248-
* only compares string equality.
230+
* keys/values carry their own IDs in struct field metadata. Spark validates both top-level and
231+
* nested struct field IDs as part of schema compatibility checks (array elements and map/key
232+
* values' validation is not supported yet). See {@link StructField#id()}.
249233
*/
250234
@Nullable
251235
default String id() {
252236
return null;
253237
}
238+
239+
/**
240+
* A builder for {@link Column}.
241+
*
242+
* @since 4.2.0
243+
*/
244+
class Builder {
245+
private final String name;
246+
private DataType dataType;
247+
private boolean nullable = true;
248+
private String comment = null;
249+
private ColumnDefaultValue defaultValue = null;
250+
private GenerationExpression genExpr = null;
251+
private IdentityColumnSpec identityColumnSpec = null;
252+
private String metadataInJSON = null;
253+
private String id = null;
254+
255+
private Builder(String name, DataType dataType) {
256+
this.name = Objects.requireNonNull(name, "name must not be null");
257+
this.dataType = Objects.requireNonNull(dataType, "dataType must not be null");
258+
}
259+
260+
public Builder nullable(boolean nullable) {
261+
this.nullable = nullable;
262+
return this;
263+
}
264+
265+
public Builder comment(String comment) {
266+
this.comment = comment;
267+
return this;
268+
}
269+
270+
public Builder defaultValue(ColumnDefaultValue defaultValue) {
271+
this.defaultValue = defaultValue;
272+
return this;
273+
}
274+
275+
public Builder generationExpression(String sql) {
276+
this.genExpr = sql != null ? new GenerationExpression(sql) : null;
277+
return this;
278+
}
279+
280+
public Builder generationExpression(GenerationExpression generationExpr) {
281+
this.genExpr = generationExpr;
282+
return this;
283+
}
284+
285+
public Builder identityColumnSpec(IdentityColumnSpec identityColumnSpec) {
286+
this.identityColumnSpec = identityColumnSpec;
287+
return this;
288+
}
289+
290+
public Builder metadata(String metadataInJSON) {
291+
this.metadataInJSON = metadataInJSON;
292+
return this;
293+
}
294+
295+
public Builder id(String id) {
296+
this.id = id;
297+
return this;
298+
}
299+
300+
public Builder clearIds() {
301+
this.id = null;
302+
this.dataType = SchemaUtils.clearFieldIds(dataType);
303+
return this;
304+
}
305+
306+
public Column build() {
307+
validateState();
308+
return new ColumnImpl(
309+
name, dataType, nullable, comment, defaultValue,
310+
genExpr, identityColumnSpec, metadataInJSON, id);
311+
}
312+
313+
private void validateState() {
314+
if (hasConflictingDefinitions()) {
315+
throw new SparkIllegalArgumentException(
316+
"INTERNAL_ERROR",
317+
Map.of("message",
318+
"Column '" + name + "' cannot have more than one definition of: " +
319+
"default value, generation expression, identity column spec"));
320+
}
321+
}
322+
323+
private boolean hasConflictingDefinitions() {
324+
long definitionCount = Stream.of(defaultValue, genExpr, identityColumnSpec)
325+
.filter(Objects::nonNull)
326+
.count();
327+
return definitionCount > 1;
328+
}
329+
}
254330
}

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/RelationBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public B withColumns(Column[] columns) {
4444
}
4545

4646
public B withSchema(StructType schema) {
47-
this.columns = CatalogV2Util.structTypeToV2Columns(schema);
47+
this.columns = CatalogV2Util.structTypeToV2Columns(schema, true /* keep IDs */);
4848
return self();
4949
}
5050

0 commit comments

Comments
 (0)