Skip to content

Commit d9631e2

Browse files
yaooqinnCopilot
andcommitted
[SPARK-55716][SQL] Fix V1 file source NOT NULL constraint enforcement
V1 file-based DataSource writes (parquet/orc/json) silently accept null values into NOT NULL columns. The root cause: 1. `DataSource.resolveRelation()` calls `dataSchema.asNullable` (SPARK-13738) for read safety, stripping NOT NULL recursively. 2. `CreateDataSourceTableCommand` stores this all-nullable schema in the catalog, permanently losing NOT NULL info. 3. `PreprocessTableInsertion` never injects `AssertNotNull` because the schema is all-nullable. Fix: - `CreateDataSourceTableCommand`: preserve user-specified nullability via recursive merging into the resolved schema. - `PreprocessTableInsertion`: restore nullability flags from catalog schema before null checks. - Add legacy config `spark.sql.legacy.allowNullInsertForFileSourceTables` (default false) for backward compatibility. Covers top-level and nested types (array elements, struct fields, map values). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 361ece3 commit d9631e2

File tree

8 files changed

+299
-11
lines changed

8 files changed

+299
-11
lines changed

docs/sql-migration-guide.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ license: |
2525
## Upgrading from Spark SQL 4.1 to 4.2
2626

2727
- Since Spark 4.2, Spark enables order-independent checksums for shuffle outputs by default to detect data inconsistencies during indeterminate shuffle stage retries. If a checksum mismatch is detected, Spark rolls back and re-executes all succeeding stages that depend on the shuffle output. If rolling back is not possible for some succeeding stages, the job will fail. To restore the previous behavior, set `spark.sql.shuffle.orderIndependentChecksum.enabled` and `spark.sql.shuffle.orderIndependentChecksum.enableFullRetryOnMismatch` to `false`.
28+
- Since Spark 4.2, Spark can enforce NOT NULL constraints when inserting into V1 file-based data source tables (e.g., Parquet, ORC, JSON) by setting `spark.sql.legacy.allowNullInsertForFileSourceTables` to `false`. Previously (and still by default), null values are silently accepted into NOT NULL columns.
2829
- Since Spark 4.2, support for Derby JDBC datasource is deprecated.
2930

3031
## Upgrading from Spark SQL 4.0 to 4.1

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4743,6 +4743,17 @@ object SQLConf {
47434743
.enumConf(StoreAssignmentPolicy)
47444744
.createWithDefault(StoreAssignmentPolicy.ANSI)
47454745

4746+
val LEGACY_NO_NULL_CHECK_FOR_FILE_SOURCE_INSERT =
4747+
buildConf("spark.sql.legacy.allowNullInsertForFileSourceTables")
4748+
.internal()
4749+
.doc("When true (default, legacy behavior), Spark does not enforce NOT NULL constraints " +
4750+
"when inserting data into file-based data source tables (e.g., Parquet, ORC, JSON). " +
4751+
"When false, Spark properly enforces NOT NULL constraints for file-based tables, " +
4752+
"consistent with the behavior for other data sources and V2 catalog tables.")
4753+
.version("4.2.0")
4754+
.booleanConf
4755+
.createWithDefault(true)
4756+
47464757
val ANSI_ENABLED = buildConf(SqlApiConfHelper.ANSI_ENABLED_KEY)
47474758
.doc("When true, Spark SQL uses an ANSI compliant dialect instead of being Hive compliant. " +
47484759
"For example, Spark will throw an exception at runtime instead of returning null results " +

sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.spark.sql.errors.QueryCompilationErrors
2929
import org.apache.spark.sql.execution.CommandExecutionMode
3030
import org.apache.spark.sql.execution.datasources._
3131
import org.apache.spark.sql.sources.BaseRelation
32-
import org.apache.spark.sql.types.StructType
32+
import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
3333
import org.apache.spark.util.ArrayImplicits._
3434

3535
/**
@@ -107,8 +107,17 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
107107
table.copy(schema = new StructType(), partitionColumnNames = Nil)
108108

109109
case _ =>
110+
// Merge nullability from the user-specified schema into the resolved schema.
111+
// DataSource.resolveRelation() calls dataSchema.asNullable which strips NOT NULL
112+
// constraints. We restore nullability from the original user schema while keeping
113+
// the resolved data types (which may include CharVarchar normalization, metadata, etc.)
114+
val resolvedSchema = if (table.schema.nonEmpty) {
115+
restoreNullability(dataSource.schema, table.schema)
116+
} else {
117+
dataSource.schema
118+
}
110119
table.copy(
111-
schema = dataSource.schema,
120+
schema = resolvedSchema,
112121
partitionColumnNames = partitionColumnNames,
113122
// If metastore partition management for file source tables is enabled, we start off with
114123
// partition provider hive, but no partitions in the metastore. The user has to call
@@ -122,6 +131,39 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
122131

123132
Seq.empty[Row]
124133
}
134+
135+
/**
136+
* Recursively restores nullability from the original user-specified schema into
137+
* the resolved schema. The resolved schema's data types are preserved (they may
138+
* contain CharVarchar normalization, metadata, etc.), but nullability flags
139+
* (top-level and nested) are taken from the original schema.
140+
*/
141+
private def restoreNullability(resolved: StructType, original: StructType): StructType = {
142+
val originalFields = original.fields.map(f => f.name -> f).toMap
143+
StructType(resolved.fields.map { resolvedField =>
144+
originalFields.get(resolvedField.name) match {
145+
case Some(origField) =>
146+
resolvedField.copy(
147+
nullable = origField.nullable,
148+
dataType = restoreDataTypeNullability(resolvedField.dataType, origField.dataType))
149+
case None => resolvedField
150+
}
151+
})
152+
}
153+
154+
private def restoreDataTypeNullability(resolved: DataType, original: DataType): DataType = {
155+
(resolved, original) match {
156+
case (r: StructType, o: StructType) => restoreNullability(r, o)
157+
case (ArrayType(rElem, _), ArrayType(oElem, oNull)) =>
158+
ArrayType(restoreDataTypeNullability(rElem, oElem), oNull)
159+
case (MapType(rKey, rVal, _), MapType(oKey, oVal, oValNull)) =>
160+
MapType(
161+
restoreDataTypeNullability(rKey, oKey),
162+
restoreDataTypeNullability(rVal, oVal),
163+
oValNull)
164+
case _ => resolved
165+
}
166+
}
125167
}
126168

127169
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ import org.apache.spark.sql.execution.datasources.{CreateTable => CreateTableV1}
3939
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
4040
import org.apache.spark.sql.internal.SQLConf
4141
import org.apache.spark.sql.sources.InsertableRelation
42-
import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
42+
import org.apache.spark.sql.types.{ArrayType, DataType, MapType, MetadataBuilder, StructField, StructType}
4343
import org.apache.spark.sql.util.PartitioningUtils.normalizePartitionSpec
4444
import org.apache.spark.sql.util.SchemaUtils
4545
import org.apache.spark.util.ArrayImplicits._
@@ -470,7 +470,29 @@ object PreprocessTableInsertion extends ResolveInsertionBase {
470470
insert.partitionSpec, partColNames, tblName, conf.resolver)
471471

472472
val staticPartCols = normalizedPartSpec.filter(_._2.isDefined).keySet
473-
val expectedColumns = insert.table.output.filterNot(a => staticPartCols.contains(a.name))
473+
val expectedColumns = {
474+
val cols = insert.table.output.filterNot(a => staticPartCols.contains(a.name))
475+
// When the legacy config is disabled, restore the original nullability from the
476+
// catalog table schema. HadoopFsRelation forces dataSchema.asNullable for safe reads,
477+
// which strips NOT NULL constraints (both top-level and nested) from the
478+
// LogicalRelation output. We restore nullability so that AssertNotNull checks are
479+
// properly injected.
480+
if (!conf.getConf(SQLConf.LEGACY_NO_NULL_CHECK_FOR_FILE_SOURCE_INSERT)) {
481+
catalogTable.map { ct =>
482+
val catalogFields = ct.schema.fields.map(f => f.name -> f).toMap
483+
cols.map { col =>
484+
catalogFields.get(col.name) match {
485+
case Some(field) =>
486+
col.withNullability(field.nullable)
487+
.withDataType(restoreDataTypeNullability(col.dataType, field.dataType))
488+
case None => col
489+
}
490+
}
491+
}.getOrElse(cols)
492+
} else {
493+
cols
494+
}
495+
}
474496

475497
val partitionsTrackedByCatalog = catalogTable.isDefined &&
476498
catalogTable.get.partitionColumnNames.nonEmpty &&
@@ -546,6 +568,34 @@ object PreprocessTableInsertion extends ResolveInsertionBase {
546568
case _ => i
547569
}
548570
}
571+
572+
/**
573+
* Recursively restores nullability flags from the original data type into the resolved
574+
* data type, keeping the resolved type structure intact.
575+
*/
576+
private def restoreDataTypeNullability(resolved: DataType, original: DataType): DataType = {
577+
(resolved, original) match {
578+
case (r: StructType, o: StructType) =>
579+
val origFields = o.fields.map(f => f.name -> f).toMap
580+
StructType(r.fields.map { rf =>
581+
origFields.get(rf.name) match {
582+
case Some(of) =>
583+
rf.copy(
584+
nullable = of.nullable,
585+
dataType = restoreDataTypeNullability(rf.dataType, of.dataType))
586+
case None => rf
587+
}
588+
})
589+
case (ArrayType(rElem, _), ArrayType(oElem, oNull)) =>
590+
ArrayType(restoreDataTypeNullability(rElem, oElem), oNull)
591+
case (MapType(rKey, rVal, _), MapType(oKey, oVal, oValNull)) =>
592+
MapType(
593+
restoreDataTypeNullability(rKey, oKey),
594+
restoreDataTypeNullability(rVal, oVal),
595+
oValNull)
596+
case _ => resolved
597+
}
598+
}
549599
}
550600

551601
/**

sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowCreateTableSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ trait ShowCreateTableSuiteBase extends command.ShowCreateTableSuiteBase
185185
val showDDL = getShowCreateDDL(t)
186186
assert(showDDL === Array(
187187
s"CREATE TABLE $fullName (",
188-
"a BIGINT,",
188+
"a BIGINT NOT NULL,",
189189
"b BIGINT DEFAULT 42,",
190190
"c STRING COLLATE UTF8_BINARY DEFAULT 'abc, \"def\"' COMMENT 'comment')",
191191
"USING parquet",

sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2851,6 +2851,188 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
28512851
}
28522852
}
28532853

2854+
test("SPARK-55716: V1 INSERT rejects null into NOT NULL column for file sources") {
2855+
Seq("parquet", "orc", "json").foreach { format =>
2856+
withSQLConf(SQLConf.LEGACY_NO_NULL_CHECK_FOR_FILE_SOURCE_INSERT.key -> "false") {
2857+
withTable("t") {
2858+
sql(s"CREATE TABLE t(i INT NOT NULL, s STRING NOT NULL) USING $format")
2859+
// V1 DataSource writes now enforce NOT NULL constraints via AssertNotNull
2860+
val e1 = intercept[SparkRuntimeException] {
2861+
sql("INSERT INTO t VALUES(null, 'a')")
2862+
}
2863+
assert(e1.getCondition === "NOT_NULL_ASSERT_VIOLATION")
2864+
val e2 = intercept[SparkRuntimeException] {
2865+
sql("INSERT INTO t VALUES(1, null)")
2866+
}
2867+
assert(e2.getCondition === "NOT_NULL_ASSERT_VIOLATION")
2868+
// Valid insert should succeed
2869+
sql("INSERT INTO t VALUES(1, 'a')")
2870+
checkAnswer(spark.table("t"), Seq(Row(1, "a")))
2871+
}
2872+
}
2873+
}
2874+
}
2875+
2876+
test("SPARK-55716: V1 INSERT NOT NULL enforcement respects storeAssignmentPolicy") {
2877+
Seq("parquet", "orc").foreach { format =>
2878+
// ANSI mode (default): rejects null
2879+
withSQLConf(
2880+
SQLConf.LEGACY_NO_NULL_CHECK_FOR_FILE_SOURCE_INSERT.key -> "false",
2881+
SQLConf.STORE_ASSIGNMENT_POLICY.key -> SQLConf.StoreAssignmentPolicy.ANSI.toString) {
2882+
withTable("t") {
2883+
sql(s"CREATE TABLE t(i INT NOT NULL) USING $format")
2884+
val e = intercept[SparkRuntimeException] {
2885+
sql("INSERT INTO t VALUES(null)")
2886+
}
2887+
assert(e.getCondition === "NOT_NULL_ASSERT_VIOLATION")
2888+
}
2889+
}
2890+
// STRICT mode: also rejects null (fails at analysis with type mismatch)
2891+
withSQLConf(
2892+
SQLConf.LEGACY_NO_NULL_CHECK_FOR_FILE_SOURCE_INSERT.key -> "false",
2893+
SQLConf.STORE_ASSIGNMENT_POLICY.key -> SQLConf.StoreAssignmentPolicy.STRICT.toString) {
2894+
withTable("t") {
2895+
sql(s"CREATE TABLE t(i INT NOT NULL) USING $format")
2896+
intercept[AnalysisException] {
2897+
sql("INSERT INTO t VALUES(null)")
2898+
}
2899+
}
2900+
}
2901+
// LEGACY mode: allows null (no AssertNotNull injected)
2902+
withSQLConf(
2903+
SQLConf.STORE_ASSIGNMENT_POLICY.key -> SQLConf.StoreAssignmentPolicy.LEGACY.toString) {
2904+
withTable("t") {
2905+
sql(s"CREATE TABLE t(i INT NOT NULL) USING $format")
2906+
sql("INSERT INTO t VALUES(null)")
2907+
checkAnswer(spark.table("t"), Seq(Row(null)))
2908+
}
2909+
}
2910+
// Legacy config: allows null even in ANSI mode
2911+
withSQLConf(
2912+
SQLConf.LEGACY_NO_NULL_CHECK_FOR_FILE_SOURCE_INSERT.key -> "true") {
2913+
withTable("t") {
2914+
sql(s"CREATE TABLE t(i INT NOT NULL) USING $format")
2915+
sql("INSERT INTO t VALUES(null)")
2916+
checkAnswer(spark.table("t"), Seq(Row(null)))
2917+
}
2918+
}
2919+
}
2920+
}
2921+
2922+
test("SPARK-55716: V1 INSERT rejects null with V2 file source path") {
2923+
Seq("parquet", "orc").foreach { format =>
2924+
withSQLConf(
2925+
SQLConf.LEGACY_NO_NULL_CHECK_FOR_FILE_SOURCE_INSERT.key -> "false",
2926+
SQLConf.USE_V1_SOURCE_LIST.key -> "") {
2927+
withTable("t") {
2928+
sql(s"CREATE TABLE t(i INT NOT NULL, s STRING NOT NULL) USING $format")
2929+
val e = intercept[SparkRuntimeException] {
2930+
sql("INSERT INTO t VALUES(null, 'a')")
2931+
}
2932+
assert(e.getCondition === "NOT_NULL_ASSERT_VIOLATION")
2933+
}
2934+
}
2935+
}
2936+
}
2937+
2938+
test("SPARK-55716: V1 INSERT rejects null array element for NOT NULL element type") {
2939+
Seq("parquet", "orc").foreach { format =>
2940+
withSQLConf(SQLConf.LEGACY_NO_NULL_CHECK_FOR_FILE_SOURCE_INSERT.key -> "false") {
2941+
withTable("t") {
2942+
val schema = new StructType()
2943+
.add("a", ArrayType(IntegerType, containsNull = false))
2944+
spark.sessionState.catalog.createTable(
2945+
CatalogTable(
2946+
identifier = TableIdentifier("t"),
2947+
tableType = CatalogTableType.MANAGED,
2948+
storage = CatalogStorageFormat.empty,
2949+
schema = schema,
2950+
provider = Some(format)),
2951+
ignoreIfExists = false)
2952+
val e = intercept[SparkRuntimeException] {
2953+
sql("INSERT INTO t SELECT array(1, null, 3)")
2954+
}
2955+
assert(e.getCondition === "NOT_NULL_ASSERT_VIOLATION")
2956+
// Valid insert should succeed
2957+
sql("INSERT INTO t SELECT array(1, 2, 3)")
2958+
checkAnswer(spark.table("t"), Seq(Row(Seq(1, 2, 3))))
2959+
}
2960+
}
2961+
}
2962+
}
2963+
2964+
test("SPARK-55716: V1 INSERT rejects null struct field for NOT NULL field") {
2965+
Seq("parquet", "orc").foreach { format =>
2966+
withSQLConf(SQLConf.LEGACY_NO_NULL_CHECK_FOR_FILE_SOURCE_INSERT.key -> "false") {
2967+
withTable("t") {
2968+
val schema = new StructType()
2969+
.add("s", new StructType()
2970+
.add("x", IntegerType, nullable = false)
2971+
.add("y", StringType, nullable = false))
2972+
spark.sessionState.catalog.createTable(
2973+
CatalogTable(
2974+
identifier = TableIdentifier("t"),
2975+
tableType = CatalogTableType.MANAGED,
2976+
storage = CatalogStorageFormat.empty,
2977+
schema = schema,
2978+
provider = Some(format)),
2979+
ignoreIfExists = false)
2980+
val e = intercept[SparkRuntimeException] {
2981+
sql("INSERT INTO t SELECT named_struct('x', null, 'y', 'hello')")
2982+
}
2983+
assert(e.getCondition === "NOT_NULL_ASSERT_VIOLATION")
2984+
// Valid insert should succeed
2985+
sql("INSERT INTO t SELECT named_struct('x', 1, 'y', 'hello')")
2986+
checkAnswer(spark.table("t"), Seq(Row(Row(1, "hello"))))
2987+
}
2988+
}
2989+
}
2990+
}
2991+
2992+
test("SPARK-55716: V1 INSERT rejects null map value for NOT NULL value type") {
2993+
Seq("parquet", "orc").foreach { format =>
2994+
withSQLConf(SQLConf.LEGACY_NO_NULL_CHECK_FOR_FILE_SOURCE_INSERT.key -> "false") {
2995+
withTable("t") {
2996+
val schema = new StructType()
2997+
.add("m", MapType(StringType, IntegerType, valueContainsNull = false))
2998+
spark.sessionState.catalog.createTable(
2999+
CatalogTable(
3000+
identifier = TableIdentifier("t"),
3001+
tableType = CatalogTableType.MANAGED,
3002+
storage = CatalogStorageFormat.empty,
3003+
schema = schema,
3004+
provider = Some(format)),
3005+
ignoreIfExists = false)
3006+
val e = intercept[SparkRuntimeException] {
3007+
sql("INSERT INTO t SELECT map('a', 1, 'b', null)")
3008+
}
3009+
assert(e.getCondition === "NOT_NULL_ASSERT_VIOLATION")
3010+
// Valid insert should succeed
3011+
sql("INSERT INTO t SELECT map('a', 1, 'b', 2)")
3012+
checkAnswer(spark.table("t"), Seq(Row(Map("a" -> 1, "b" -> 2))))
3013+
}
3014+
}
3015+
}
3016+
}
3017+
3018+
test("SPARK-55716: V1 DataFrame write ignores NOT NULL schema constraint") {
3019+
Seq("parquet", "orc").foreach { format =>
3020+
withTempPath { path =>
3021+
val data = Seq(Row(null, "hello", 1.0), Row(1, null, 2.0), Row(2, "world", null))
3022+
val df = spark.createDataFrame(
3023+
spark.sparkContext.parallelize(data),
3024+
new StructType()
3025+
.add("id", IntegerType, nullable = true)
3026+
.add("name", StringType, nullable = true)
3027+
.add("value", DoubleType, nullable = true))
3028+
// V1 DataSource writes do not enforce NOT NULL constraints
3029+
df.write.mode(SaveMode.Overwrite).format(format).save(path.getCanonicalPath)
3030+
val result = spark.read.format(format).load(path.getCanonicalPath)
3031+
checkAnswer(result, data)
3032+
}
3033+
}
3034+
}
3035+
28543036
test("UNSUPPORTED_OVERWRITE.PATH: Can't overwrite a path that is also being read from") {
28553037
val tableName = "t1"
28563038
withTable(tableName) {

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ private[hive] class SparkGetColumnsOperation(
224224
null, // SQL_DATETIME_SUB
225225
null, // CHAR_OCTET_LENGTH
226226
ordinal.asInstanceOf[AnyRef], // ORDINAL_POSITION, 1-based
227-
"YES", // IS_NULLABLE
227+
(if (column.nullable) "YES" else "NO"), // IS_NULLABLE
228228
null, // SCOPE_CATALOG
229229
null, // SCOPE_SCHEMA
230230
null, // SCOPE_TABLE

0 commit comments

Comments
 (0)