Skip to content

Commit a803c38

Browse files
committed
[SPARK-55716][SQL][FOLLOWUP] Simplify NOT NULL preservation by skipping asNullable in resolveRelation
### What changes were proposed in this pull request? Followup to #54517. Simplifies NOT NULL constraint preservation per [review feedback](#54517 (comment)). Instead of calling `dataSchema.asNullable` in `resolveRelation()` and then restoring nullability with recursive `restoreNullability`/`restoreDataTypeNullability` helpers in `CreateDataSourceTableCommand`, this PR: 1. Adds a `forceNullable` parameter to `DataSource.resolveRelation()` (default `true`, preserving existing behavior) 2. Passes `forceNullable = !enforceNotNull` from `CreateDataSourceTableCommand`, so `asNullable` is skipped only when the config is enabled 3. Removes `restoreNullability` and `restoreDataTypeNullability` helper methods entirely **Data flow:** - **Config OFF** (default): `forceNullable = true` → `asNullable` runs → same behavior as before SPARK-55716 - **Config ON**: `forceNullable = false` → `asNullable` skipped → catalog stores NOT NULL → `PreprocessTableInsertion` enforces at insert time ### Why are the changes needed? Addresses review feedback: "is it simpler to not do `dataSchema.asNullable` if the flag is on?" ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing SPARK-55716 tests (7 tests in InsertSuite) and ShowCreateTableSuite (30 tests) all pass. ### Was this patch authored or co-authored using generative AI tooling? Yes, co-authored with GitHub Copilot. Closes #54597 from yaooqinn/SPARK-55716. Authored-by: Kent Yao <kentyao@microsoft.com> Signed-off-by: Kent Yao <kentyao@microsoft.com>
1 parent 93ab196 commit a803c38

4 files changed

Lines changed: 14 additions & 51 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala

Lines changed: 6 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,9 @@ import org.apache.spark.sql.classic.ClassicConversions.castToImpl
2828
import org.apache.spark.sql.errors.QueryCompilationErrors
2929
import org.apache.spark.sql.execution.CommandExecutionMode
3030
import org.apache.spark.sql.execution.datasources._
31+
import org.apache.spark.sql.internal.SQLConf
3132
import org.apache.spark.sql.sources.BaseRelation
32-
import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
33+
import org.apache.spark.sql.types.StructType
3334
import org.apache.spark.util.ArrayImplicits._
3435

3536
/**
@@ -79,7 +80,9 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
7980
bucketSpec = table.bucketSpec,
8081
options = table.storage.properties ++ pathOption,
8182
// As discussed in SPARK-19583, we don't check if the location is existed
82-
catalogTable = Some(tableWithDefaultOptions)).resolveRelation(checkFilesExist = false)
83+
catalogTable = Some(tableWithDefaultOptions))
84+
.resolveRelation(checkFilesExist = false,
85+
forceNullable = !sessionState.conf.getConf(SQLConf.FILE_SOURCE_INSERT_ENFORCE_NOT_NULL))
8386

8487
val partitionColumnNames = if (table.schema.nonEmpty) {
8588
table.partitionColumnNames
@@ -107,17 +110,8 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
107110
table.copy(schema = new StructType(), partitionColumnNames = Nil)
108111

109112
case _ =>
110-
// Merge nullability from the user-specified schema into the resolved schema.
111-
// DataSource.resolveRelation() calls dataSchema.asNullable which strips NOT NULL
112-
// constraints. We restore nullability from the original user schema while keeping
113-
// the resolved data types (which may include CharVarchar normalization, metadata, etc.)
114-
val resolvedSchema = if (table.schema.nonEmpty) {
115-
restoreNullability(dataSource.schema, table.schema)
116-
} else {
117-
dataSource.schema
118-
}
119113
table.copy(
120-
schema = resolvedSchema,
114+
schema = dataSource.schema,
121115
partitionColumnNames = partitionColumnNames,
122116
// If metastore partition management for file source tables is enabled, we start off with
123117
// partition provider hive, but no partitions in the metastore. The user has to call
@@ -132,38 +126,6 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
132126
Seq.empty[Row]
133127
}
134128

135-
/**
136-
* Recursively restores nullability from the original user-specified schema into
137-
* the resolved schema. The resolved schema's data types are preserved (they may
138-
* contain CharVarchar normalization, metadata, etc.), but nullability flags
139-
* (top-level and nested) are taken from the original schema.
140-
*/
141-
private def restoreNullability(resolved: StructType, original: StructType): StructType = {
142-
val originalFields = original.fields.map(f => f.name -> f).toMap
143-
StructType(resolved.fields.map { resolvedField =>
144-
originalFields.get(resolvedField.name) match {
145-
case Some(origField) =>
146-
resolvedField.copy(
147-
nullable = origField.nullable,
148-
dataType = restoreDataTypeNullability(resolvedField.dataType, origField.dataType))
149-
case None => resolvedField
150-
}
151-
})
152-
}
153-
154-
private def restoreDataTypeNullability(resolved: DataType, original: DataType): DataType = {
155-
(resolved, original) match {
156-
case (r: StructType, o: StructType) => restoreNullability(r, o)
157-
case (ArrayType(rElem, _), ArrayType(oElem, oNull)) =>
158-
ArrayType(restoreDataTypeNullability(rElem, oElem), oNull)
159-
case (MapType(rKey, rVal, _), MapType(oKey, oVal, oValNull)) =>
160-
MapType(
161-
restoreDataTypeNullability(rKey, oKey),
162-
restoreDataTypeNullability(rVal, oVal),
163-
oValNull)
164-
case _ => resolved
165-
}
166-
}
167129
}
168130

169131
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,10 @@ case class DataSource(
362362
* is considered as a non-streaming file based data source. Since we know
363363
* that files already exist, we don't need to check them again.
364364
*/
365-
def resolveRelation(checkFilesExist: Boolean = true, readOnly: Boolean = false): BaseRelation = {
365+
def resolveRelation(
366+
checkFilesExist: Boolean = true,
367+
readOnly: Boolean = false,
368+
forceNullable: Boolean = true): BaseRelation = {
366369
val relation = (providingInstance(), userSpecifiedSchema) match {
367370
// TODO: Throw when too much is given.
368371
case (dataSource: SchemaRelationProvider, Some(schema)) =>
@@ -436,7 +439,7 @@ case class DataSource(
436439
HadoopFsRelation(
437440
fileCatalog,
438441
partitionSchema = partitionSchema,
439-
dataSchema = dataSchema.asNullable,
442+
dataSchema = if (forceNullable) dataSchema.asNullable else dataSchema,
440443
bucketSpec = bucketSpec,
441444
format,
442445
caseInsensitiveOptions)(sparkSession)

sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowCreateTableSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ trait ShowCreateTableSuiteBase extends command.ShowCreateTableSuiteBase
185185
val showDDL = getShowCreateDDL(t)
186186
assert(showDDL === Array(
187187
s"CREATE TABLE $fullName (",
188-
"a BIGINT NOT NULL,",
188+
"a BIGINT,",
189189
"b BIGINT DEFAULT 42,",
190190
"c STRING COLLATE UTF8_BINARY DEFAULT 'abc, \"def\"' COMMENT 'comment')",
191191
"USING parquet",

sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -340,12 +340,10 @@ class SparkMetadataOperationSuite extends HiveThriftServer2TestBase {
340340
case _ => assert(radix === 0) // nulls
341341
}
342342

343-
val expectedNullable = if (schema(pos).nullable) 1 else 0
344-
assert(rowSet.getInt("NULLABLE") === expectedNullable)
343+
assert(rowSet.getInt("NULLABLE") === 1)
345344
assert(rowSet.getString("REMARKS") === pos.toString)
346345
assert(rowSet.getInt("ORDINAL_POSITION") === pos + 1)
347-
val expectedIsNullable = if (schema(pos).nullable) "YES" else "NO"
348-
assert(rowSet.getString("IS_NULLABLE") === expectedIsNullable)
346+
assert(rowSet.getString("IS_NULLABLE") === "YES")
349347
assert(rowSet.getString("IS_AUTO_INCREMENT") === "NO")
350348
pos += 1
351349
}

0 commit comments

Comments
 (0)