Skip to content

Commit 8aeab4f

Browse files
yaooqinnCopilot
andcommitted
[SPARK-55716][SQL] Fix V1 file source NOT NULL constraint enforcement
V1 file-based DataSource writes (parquet/orc/json) silently accept null values into NOT NULL columns. The root cause: 1. `DataSource.resolveRelation()` calls `dataSchema.asNullable` (SPARK-13738) for read safety, stripping NOT NULL recursively. 2. `CreateDataSourceTableCommand` stores this all-nullable schema in the catalog, permanently losing NOT NULL info. 3. `PreprocessTableInsertion` never injects `AssertNotNull` because the schema is all-nullable. Fix: - `CreateDataSourceTableCommand`: preserve user-specified nullability via recursive merging into the resolved schema. - `PreprocessTableInsertion`: restore nullability flags from catalog schema before null checks. - Add legacy config `spark.sql.legacy.allowNullInsertForFileSourceTables` (default false) for backward compatibility. Covers top-level and nested types (array elements, struct fields, map values). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 361ece3 commit 8aeab4f

5 files changed

Lines changed: 277 additions & 4 deletions

File tree

docs/sql-migration-guide.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ license: |
2525
## Upgrading from Spark SQL 4.1 to 4.2
2626

2727
- Since Spark 4.2, Spark enables order-independent checksums for shuffle outputs by default to detect data inconsistencies during indeterminate shuffle stage retries. If a checksum mismatch is detected, Spark rolls back and re-executes all succeeding stages that depend on the shuffle output. If rolling back is not possible for some succeeding stages, the job will fail. To restore the previous behavior, set `spark.sql.shuffle.orderIndependentChecksum.enabled` and `spark.sql.shuffle.orderIndependentChecksum.enableFullRetryOnMismatch` to `false`.
28+
- Since Spark 4.2, Spark properly enforces NOT NULL constraints when inserting data into V1 file-based data source tables (e.g., Parquet, ORC, JSON). Previously, null values were silently accepted into NOT NULL columns. To restore the previous behavior, set `spark.sql.legacy.allowNullInsertForFileSourceTables` to `true`.
2829
- Since Spark 4.2, support for Derby JDBC datasource is deprecated.
2930

3031
## Upgrading from Spark SQL 4.0 to 4.1

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4743,6 +4743,16 @@ object SQLConf {
47434743
.enumConf(StoreAssignmentPolicy)
47444744
.createWithDefault(StoreAssignmentPolicy.ANSI)
47454745

4746+
val LEGACY_NO_NULL_CHECK_FOR_FILE_SOURCE_INSERT =
4747+
buildConf("spark.sql.legacy.allowNullInsertForFileSourceTables")
4748+
.doc("When true (legacy behavior), Spark does not enforce NOT NULL constraints " +
4749+
"when inserting data into file-based data source tables (e.g., Parquet, ORC, JSON). " +
4750+
"When false, Spark properly enforces NOT NULL constraints for file-based tables, " +
4751+
"consistent with the behavior for other data sources and V2 catalog tables.")
4752+
.version("4.2.0")
4753+
.booleanConf
4754+
.createWithDefault(false)
4755+
47464756
val ANSI_ENABLED = buildConf(SqlApiConfHelper.ANSI_ENABLED_KEY)
47474757
.doc("When true, Spark SQL uses an ANSI compliant dialect instead of being Hive compliant. " +
47484758
"For example, Spark will throw an exception at runtime instead of returning null results " +

sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.spark.sql.errors.QueryCompilationErrors
2929
import org.apache.spark.sql.execution.CommandExecutionMode
3030
import org.apache.spark.sql.execution.datasources._
3131
import org.apache.spark.sql.sources.BaseRelation
32-
import org.apache.spark.sql.types.StructType
32+
import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
3333
import org.apache.spark.util.ArrayImplicits._
3434

3535
/**
@@ -107,8 +107,17 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
107107
table.copy(schema = new StructType(), partitionColumnNames = Nil)
108108

109109
case _ =>
110+
// Merge nullability from the user-specified schema into the resolved schema.
111+
// DataSource.resolveRelation() calls dataSchema.asNullable which strips NOT NULL
112+
// constraints. We restore nullability from the original user schema while keeping
113+
// the resolved data types (which may include CharVarchar normalization, metadata, etc.)
114+
val resolvedSchema = if (table.schema.nonEmpty) {
115+
restoreNullability(dataSource.schema, table.schema)
116+
} else {
117+
dataSource.schema
118+
}
110119
table.copy(
111-
schema = dataSource.schema,
120+
schema = resolvedSchema,
112121
partitionColumnNames = partitionColumnNames,
113122
// If metastore partition management for file source tables is enabled, we start off with
114123
// partition provider hive, but no partitions in the metastore. The user has to call
@@ -122,6 +131,39 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
122131

123132
Seq.empty[Row]
124133
}
134+
135+
/**
136+
* Recursively restores nullability from the original user-specified schema into
137+
* the resolved schema. The resolved schema's data types are preserved (they may
138+
* contain CharVarchar normalization, metadata, etc.), but nullability flags
139+
* (top-level and nested) are taken from the original schema.
140+
*/
141+
private def restoreNullability(resolved: StructType, original: StructType): StructType = {
142+
val originalFields = original.fields.map(f => f.name -> f).toMap
143+
StructType(resolved.fields.map { resolvedField =>
144+
originalFields.get(resolvedField.name) match {
145+
case Some(origField) =>
146+
resolvedField.copy(
147+
nullable = origField.nullable,
148+
dataType = restoreDataTypeNullability(resolvedField.dataType, origField.dataType))
149+
case None => resolvedField
150+
}
151+
})
152+
}
153+
154+
private def restoreDataTypeNullability(resolved: DataType, original: DataType): DataType = {
155+
(resolved, original) match {
156+
case (r: StructType, o: StructType) => restoreNullability(r, o)
157+
case (ArrayType(rElem, _), ArrayType(oElem, oNull)) =>
158+
ArrayType(restoreDataTypeNullability(rElem, oElem), oNull)
159+
case (MapType(rKey, rVal, _), MapType(oKey, oVal, oValNull)) =>
160+
MapType(
161+
restoreDataTypeNullability(rKey, oKey),
162+
restoreDataTypeNullability(rVal, oVal),
163+
oValNull)
164+
case _ => resolved
165+
}
166+
}
125167
}
126168

127169
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ import org.apache.spark.sql.execution.datasources.{CreateTable => CreateTableV1}
3939
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
4040
import org.apache.spark.sql.internal.SQLConf
4141
import org.apache.spark.sql.sources.InsertableRelation
42-
import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
42+
import org.apache.spark.sql.types.{ArrayType, DataType, MapType, MetadataBuilder, StructField, StructType}
4343
import org.apache.spark.sql.util.PartitioningUtils.normalizePartitionSpec
4444
import org.apache.spark.sql.util.SchemaUtils
4545
import org.apache.spark.util.ArrayImplicits._
@@ -470,7 +470,29 @@ object PreprocessTableInsertion extends ResolveInsertionBase {
470470
insert.partitionSpec, partColNames, tblName, conf.resolver)
471471

472472
val staticPartCols = normalizedPartSpec.filter(_._2.isDefined).keySet
473-
val expectedColumns = insert.table.output.filterNot(a => staticPartCols.contains(a.name))
473+
val expectedColumns = {
474+
val cols = insert.table.output.filterNot(a => staticPartCols.contains(a.name))
475+
// When the legacy config is disabled, restore the original nullability from the
476+
// catalog table schema. HadoopFsRelation forces dataSchema.asNullable for safe reads,
477+
// which strips NOT NULL constraints (both top-level and nested) from the
478+
// LogicalRelation output. We restore nullability so that AssertNotNull checks are
479+
// properly injected.
480+
if (!conf.getConf(SQLConf.LEGACY_NO_NULL_CHECK_FOR_FILE_SOURCE_INSERT)) {
481+
catalogTable.map { ct =>
482+
val catalogFields = ct.schema.fields.map(f => f.name -> f).toMap
483+
cols.map { col =>
484+
catalogFields.get(col.name) match {
485+
case Some(field) =>
486+
col.withNullability(field.nullable)
487+
.withDataType(restoreDataTypeNullability(col.dataType, field.dataType))
488+
case None => col
489+
}
490+
}
491+
}.getOrElse(cols)
492+
} else {
493+
cols
494+
}
495+
}
474496

475497
val partitionsTrackedByCatalog = catalogTable.isDefined &&
476498
catalogTable.get.partitionColumnNames.nonEmpty &&
@@ -546,6 +568,34 @@ object PreprocessTableInsertion extends ResolveInsertionBase {
546568
case _ => i
547569
}
548570
}
571+
572+
/**
573+
* Recursively restores nullability flags from the original data type into the resolved
574+
* data type, keeping the resolved type structure intact.
575+
*/
576+
private def restoreDataTypeNullability(resolved: DataType, original: DataType): DataType = {
577+
(resolved, original) match {
578+
case (r: StructType, o: StructType) =>
579+
val origFields = o.fields.map(f => f.name -> f).toMap
580+
StructType(r.fields.map { rf =>
581+
origFields.get(rf.name) match {
582+
case Some(of) =>
583+
rf.copy(
584+
nullable = of.nullable,
585+
dataType = restoreDataTypeNullability(rf.dataType, of.dataType))
586+
case None => rf
587+
}
588+
})
589+
case (ArrayType(rElem, _), ArrayType(oElem, oNull)) =>
590+
ArrayType(restoreDataTypeNullability(rElem, oElem), oNull)
591+
case (MapType(rKey, rVal, _), MapType(oKey, oVal, oValNull)) =>
592+
MapType(
593+
restoreDataTypeNullability(rKey, oKey),
594+
restoreDataTypeNullability(rVal, oVal),
595+
oValNull)
596+
case _ => resolved
597+
}
598+
}
549599
}
550600

551601
/**

sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2851,6 +2851,176 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
28512851
}
28522852
}
28532853

2854+
test("SPARK-55716: V1 INSERT rejects null into NOT NULL column for file sources") {
2855+
Seq("parquet", "orc", "json").foreach { format =>
2856+
withTable("t") {
2857+
sql(s"CREATE TABLE t(i INT NOT NULL, s STRING NOT NULL) USING $format")
2858+
// V1 DataSource writes now enforce NOT NULL constraints via AssertNotNull
2859+
val e1 = intercept[SparkRuntimeException] {
2860+
sql("INSERT INTO t VALUES(null, 'a')")
2861+
}
2862+
assert(e1.getCondition === "NOT_NULL_ASSERT_VIOLATION")
2863+
val e2 = intercept[SparkRuntimeException] {
2864+
sql("INSERT INTO t VALUES(1, null)")
2865+
}
2866+
assert(e2.getCondition === "NOT_NULL_ASSERT_VIOLATION")
2867+
// Valid insert should succeed
2868+
sql("INSERT INTO t VALUES(1, 'a')")
2869+
checkAnswer(spark.table("t"), Seq(Row(1, "a")))
2870+
}
2871+
}
2872+
}
2873+
2874+
test("SPARK-55716: V1 INSERT NOT NULL enforcement respects storeAssignmentPolicy") {
2875+
Seq("parquet", "orc").foreach { format =>
2876+
// ANSI mode (default): rejects null
2877+
withSQLConf(
2878+
SQLConf.STORE_ASSIGNMENT_POLICY.key -> SQLConf.StoreAssignmentPolicy.ANSI.toString) {
2879+
withTable("t") {
2880+
sql(s"CREATE TABLE t(i INT NOT NULL) USING $format")
2881+
val e = intercept[SparkRuntimeException] {
2882+
sql("INSERT INTO t VALUES(null)")
2883+
}
2884+
assert(e.getCondition === "NOT_NULL_ASSERT_VIOLATION")
2885+
}
2886+
}
2887+
// STRICT mode: also rejects null (fails at analysis with type mismatch)
2888+
withSQLConf(
2889+
SQLConf.STORE_ASSIGNMENT_POLICY.key -> SQLConf.StoreAssignmentPolicy.STRICT.toString) {
2890+
withTable("t") {
2891+
sql(s"CREATE TABLE t(i INT NOT NULL) USING $format")
2892+
intercept[AnalysisException] {
2893+
sql("INSERT INTO t VALUES(null)")
2894+
}
2895+
}
2896+
}
2897+
// LEGACY mode: allows null (no AssertNotNull injected)
2898+
withSQLConf(
2899+
SQLConf.STORE_ASSIGNMENT_POLICY.key -> SQLConf.StoreAssignmentPolicy.LEGACY.toString) {
2900+
withTable("t") {
2901+
sql(s"CREATE TABLE t(i INT NOT NULL) USING $format")
2902+
sql("INSERT INTO t VALUES(null)")
2903+
checkAnswer(spark.table("t"), Seq(Row(null)))
2904+
}
2905+
}
2906+
// Legacy config: allows null even in ANSI mode
2907+
withSQLConf(
2908+
SQLConf.LEGACY_NO_NULL_CHECK_FOR_FILE_SOURCE_INSERT.key -> "true") {
2909+
withTable("t") {
2910+
sql(s"CREATE TABLE t(i INT NOT NULL) USING $format")
2911+
sql("INSERT INTO t VALUES(null)")
2912+
checkAnswer(spark.table("t"), Seq(Row(null)))
2913+
}
2914+
}
2915+
}
2916+
}
2917+
2918+
test("SPARK-55716: V1 INSERT rejects null with V2 file source path") {
2919+
Seq("parquet", "orc").foreach { format =>
2920+
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") {
2921+
withTable("t") {
2922+
sql(s"CREATE TABLE t(i INT NOT NULL, s STRING NOT NULL) USING $format")
2923+
val e = intercept[SparkRuntimeException] {
2924+
sql("INSERT INTO t VALUES(null, 'a')")
2925+
}
2926+
assert(e.getCondition === "NOT_NULL_ASSERT_VIOLATION")
2927+
}
2928+
}
2929+
}
2930+
}
2931+
2932+
test("SPARK-55716: V1 INSERT rejects null array element for NOT NULL element type") {
2933+
Seq("parquet", "orc").foreach { format =>
2934+
withTable("t") {
2935+
val schema = new StructType()
2936+
.add("a", ArrayType(IntegerType, containsNull = false))
2937+
spark.sessionState.catalog.createTable(
2938+
CatalogTable(
2939+
identifier = TableIdentifier("t"),
2940+
tableType = CatalogTableType.MANAGED,
2941+
storage = CatalogStorageFormat.empty,
2942+
schema = schema,
2943+
provider = Some(format)),
2944+
ignoreIfExists = false)
2945+
val e = intercept[SparkRuntimeException] {
2946+
sql("INSERT INTO t SELECT array(1, null, 3)")
2947+
}
2948+
assert(e.getCondition === "NOT_NULL_ASSERT_VIOLATION")
2949+
// Valid insert should succeed
2950+
sql("INSERT INTO t SELECT array(1, 2, 3)")
2951+
checkAnswer(spark.table("t"), Seq(Row(Seq(1, 2, 3))))
2952+
}
2953+
}
2954+
}
2955+
2956+
test("SPARK-55716: V1 INSERT rejects null struct field for NOT NULL field") {
2957+
Seq("parquet", "orc").foreach { format =>
2958+
withTable("t") {
2959+
val schema = new StructType()
2960+
.add("s", new StructType()
2961+
.add("x", IntegerType, nullable = false)
2962+
.add("y", StringType, nullable = false))
2963+
spark.sessionState.catalog.createTable(
2964+
CatalogTable(
2965+
identifier = TableIdentifier("t"),
2966+
tableType = CatalogTableType.MANAGED,
2967+
storage = CatalogStorageFormat.empty,
2968+
schema = schema,
2969+
provider = Some(format)),
2970+
ignoreIfExists = false)
2971+
val e = intercept[SparkRuntimeException] {
2972+
sql("INSERT INTO t SELECT named_struct('x', null, 'y', 'hello')")
2973+
}
2974+
assert(e.getCondition === "NOT_NULL_ASSERT_VIOLATION")
2975+
// Valid insert should succeed
2976+
sql("INSERT INTO t SELECT named_struct('x', 1, 'y', 'hello')")
2977+
checkAnswer(spark.table("t"), Seq(Row(Row(1, "hello"))))
2978+
}
2979+
}
2980+
}
2981+
2982+
test("SPARK-55716: V1 INSERT rejects null map value for NOT NULL value type") {
2983+
Seq("parquet", "orc").foreach { format =>
2984+
withTable("t") {
2985+
val schema = new StructType()
2986+
.add("m", MapType(StringType, IntegerType, valueContainsNull = false))
2987+
spark.sessionState.catalog.createTable(
2988+
CatalogTable(
2989+
identifier = TableIdentifier("t"),
2990+
tableType = CatalogTableType.MANAGED,
2991+
storage = CatalogStorageFormat.empty,
2992+
schema = schema,
2993+
provider = Some(format)),
2994+
ignoreIfExists = false)
2995+
val e = intercept[SparkRuntimeException] {
2996+
sql("INSERT INTO t SELECT map('a', 1, 'b', null)")
2997+
}
2998+
assert(e.getCondition === "NOT_NULL_ASSERT_VIOLATION")
2999+
// Valid insert should succeed
3000+
sql("INSERT INTO t SELECT map('a', 1, 'b', 2)")
3001+
checkAnswer(spark.table("t"), Seq(Row(Map("a" -> 1, "b" -> 2))))
3002+
}
3003+
}
3004+
}
3005+
3006+
test("SPARK-55716: V1 DataFrame write ignores NOT NULL schema constraint") {
3007+
Seq("parquet", "orc").foreach { format =>
3008+
withTempPath { path =>
3009+
val data = Seq(Row(null, "hello", 1.0), Row(1, null, 2.0), Row(2, "world", null))
3010+
val df = spark.createDataFrame(
3011+
spark.sparkContext.parallelize(data),
3012+
new StructType()
3013+
.add("id", IntegerType, nullable = true)
3014+
.add("name", StringType, nullable = true)
3015+
.add("value", DoubleType, nullable = true))
3016+
// V1 DataSource writes do not enforce NOT NULL constraints
3017+
df.write.mode(SaveMode.Overwrite).format(format).save(path.getCanonicalPath)
3018+
val result = spark.read.format(format).load(path.getCanonicalPath)
3019+
checkAnswer(result, data)
3020+
}
3021+
}
3022+
}
3023+
28543024
test("UNSUPPORTED_OVERWRITE.PATH: Can't overwrite a path that is also being read from") {
28553025
val tableName = "t1"
28563026
withTable(tableName) {

0 commit comments

Comments
 (0)