Skip to content

Commit fccedae

Browse files
committed
[SPARK-55716][SQL] Support NOT NULL constraint enforcement for V1 file source table inserts
### What changes were proposed in this pull request? V1 file-based DataSource writes (Parquet, ORC, JSON, etc.) silently accept null values into NOT NULL columns. This PR adds opt-in NOT NULL constraint enforcement controlled by `spark.sql.fileSource.insert.enforceNotNull`. **Changes:** 1. **`CreateDataSourceTableCommand`**: Preserves user-specified nullability by recursively merging nullability flags from the user schema into the resolved `dataSource.schema`. Previously it stored `dataSource.schema` directly, which is all-nullable due to `DataSource.resolveRelation()` calling `dataSchema.asNullable` (SPARK-13738). 2. **`PreprocessTableInsertion`**: Restores nullability flags from the catalog schema before null checks, ensuring `AssertNotNull` is injected when needed. Gated behind `spark.sql.fileSource.insert.enforceNotNull`. 3. **New config**: `spark.sql.fileSource.insert.enforceNotNull` (default `false`) — when set to `true`, enables NOT NULL constraint enforcement for V1 file-based tables, consistent with the behavior for other data sources and V2 catalog tables. 4. **`SparkGetColumnsOperation`**: Fixed `IS_NULLABLE` to respect `column.nullable` instead of always returning `"YES"`. ### Why are the changes needed? `DataSource.resolveRelation()` calls `dataSchema.asNullable` (added in SPARK-13738 for read safety), which strips all NOT NULL constraints recursively. `CreateDataSourceTableCommand` then stores this all-nullable schema in the catalog, permanently losing NOT NULL information. As a result, `PreprocessTableInsertion` never injects `AssertNotNull` for V1 file source tables. Note: `InsertableRelation` (e.g., `SimpleInsertSource`) does NOT have this problem because it preserves the original schema (SPARK-24583). ### Does this PR introduce _any_ user-facing change? No change in default behavior. Users can opt in to NOT NULL enforcement for V1 file source tables by setting `spark.sql.fileSource.insert.enforceNotNull` to `true`. ### How was this patch tested? - Added 7 new tests in `InsertSuite` covering top-level, nested struct, array, and map null constraint enforcement. - Fixed 3 existing interval column test assertions in `SparkMetadataOperationSuite`. ### Was this patch authored or co-authored using generative AI tooling? Yes, co-authored with GitHub Copilot. Closes #54517 from yaooqinn/SPARK-55716. Lead-authored-by: Kent Yao <kentyao@microsoft.com> Co-authored-by: Kent Yao <yao@apache.org> Signed-off-by: Kent Yao <kentyao@microsoft.com>
1 parent 1323ab5 commit fccedae

File tree

7 files changed

+297
-11
lines changed

7 files changed

+297
-11
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4777,6 +4777,16 @@ object SQLConf {
47774777
.enumConf(StoreAssignmentPolicy)
47784778
.createWithDefault(StoreAssignmentPolicy.ANSI)
47794779

4780+
val FILE_SOURCE_INSERT_ENFORCE_NOT_NULL =
4781+
buildConf("spark.sql.fileSource.insert.enforceNotNull")
4782+
.doc("When true, Spark enforces NOT NULL constraints when inserting data into " +
4783+
"file-based data source tables (e.g., Parquet, ORC, JSON), consistent with the " +
4784+
"behavior for other data sources and V2 catalog tables. " +
4785+
"When false (default), null values are silently accepted into NOT NULL columns.")
4786+
.version("4.2.0")
4787+
.booleanConf
4788+
.createWithDefault(false)
4789+
47804790
val ANSI_ENABLED = buildConf(SqlApiConfHelper.ANSI_ENABLED_KEY)
47814791
.doc("When true, Spark SQL uses an ANSI compliant dialect instead of being Hive compliant. " +
47824792
"For example, Spark will throw an exception at runtime instead of returning null results " +

sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.spark.sql.errors.QueryCompilationErrors
2929
import org.apache.spark.sql.execution.CommandExecutionMode
3030
import org.apache.spark.sql.execution.datasources._
3131
import org.apache.spark.sql.sources.BaseRelation
32-
import org.apache.spark.sql.types.StructType
32+
import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
3333
import org.apache.spark.util.ArrayImplicits._
3434

3535
/**
@@ -107,8 +107,17 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
107107
table.copy(schema = new StructType(), partitionColumnNames = Nil)
108108

109109
case _ =>
110+
// Merge nullability from the user-specified schema into the resolved schema.
111+
// DataSource.resolveRelation() calls dataSchema.asNullable which strips NOT NULL
112+
// constraints. We restore nullability from the original user schema while keeping
113+
// the resolved data types (which may include CharVarchar normalization, metadata, etc.)
114+
val resolvedSchema = if (table.schema.nonEmpty) {
115+
restoreNullability(dataSource.schema, table.schema)
116+
} else {
117+
dataSource.schema
118+
}
110119
table.copy(
111-
schema = dataSource.schema,
120+
schema = resolvedSchema,
112121
partitionColumnNames = partitionColumnNames,
113122
// If metastore partition management for file source tables is enabled, we start off with
114123
// partition provider hive, but no partitions in the metastore. The user has to call
@@ -122,6 +131,39 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
122131

123132
Seq.empty[Row]
124133
}
134+
135+
/**
136+
* Recursively restores nullability from the original user-specified schema into
137+
* the resolved schema. The resolved schema's data types are preserved (they may
138+
* contain CharVarchar normalization, metadata, etc.), but nullability flags
139+
* (top-level and nested) are taken from the original schema.
140+
*/
141+
private def restoreNullability(resolved: StructType, original: StructType): StructType = {
142+
val originalFields = original.fields.map(f => f.name -> f).toMap
143+
StructType(resolved.fields.map { resolvedField =>
144+
originalFields.get(resolvedField.name) match {
145+
case Some(origField) =>
146+
resolvedField.copy(
147+
nullable = origField.nullable,
148+
dataType = restoreDataTypeNullability(resolvedField.dataType, origField.dataType))
149+
case None => resolvedField
150+
}
151+
})
152+
}
153+
154+
private def restoreDataTypeNullability(resolved: DataType, original: DataType): DataType = {
155+
(resolved, original) match {
156+
case (r: StructType, o: StructType) => restoreNullability(r, o)
157+
case (ArrayType(rElem, _), ArrayType(oElem, oNull)) =>
158+
ArrayType(restoreDataTypeNullability(rElem, oElem), oNull)
159+
case (MapType(rKey, rVal, _), MapType(oKey, oVal, oValNull)) =>
160+
MapType(
161+
restoreDataTypeNullability(rKey, oKey),
162+
restoreDataTypeNullability(rVal, oVal),
163+
oValNull)
164+
case _ => resolved
165+
}
166+
}
125167
}
126168

127169
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ import org.apache.spark.sql.execution.datasources.{CreateTable => CreateTableV1}
3939
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
4040
import org.apache.spark.sql.internal.SQLConf
4141
import org.apache.spark.sql.sources.InsertableRelation
42-
import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
42+
import org.apache.spark.sql.types.{ArrayType, DataType, MapType, MetadataBuilder, StructField, StructType}
4343
import org.apache.spark.sql.util.PartitioningUtils.normalizePartitionSpec
4444
import org.apache.spark.sql.util.SchemaUtils
4545
import org.apache.spark.util.ArrayImplicits._
@@ -470,7 +470,29 @@ object PreprocessTableInsertion extends ResolveInsertionBase {
470470
insert.partitionSpec, partColNames, tblName, conf.resolver)
471471

472472
val staticPartCols = normalizedPartSpec.filter(_._2.isDefined).keySet
473-
val expectedColumns = insert.table.output.filterNot(a => staticPartCols.contains(a.name))
473+
val expectedColumns = {
474+
val cols = insert.table.output.filterNot(a => staticPartCols.contains(a.name))
475+
// When the legacy config is disabled, restore the original nullability from the
476+
// catalog table schema. HadoopFsRelation forces dataSchema.asNullable for safe reads,
477+
// which strips NOT NULL constraints (both top-level and nested) from the
478+
// LogicalRelation output. We restore nullability so that AssertNotNull checks are
479+
// properly injected.
480+
if (conf.getConf(SQLConf.FILE_SOURCE_INSERT_ENFORCE_NOT_NULL)) {
481+
catalogTable.map { ct =>
482+
val catalogFields = ct.schema.fields.map(f => f.name -> f).toMap
483+
cols.map { col =>
484+
catalogFields.get(col.name) match {
485+
case Some(field) =>
486+
col.withNullability(field.nullable)
487+
.withDataType(restoreDataTypeNullability(col.dataType, field.dataType))
488+
case None => col
489+
}
490+
}
491+
}.getOrElse(cols)
492+
} else {
493+
cols
494+
}
495+
}
474496

475497
val partitionsTrackedByCatalog = catalogTable.isDefined &&
476498
catalogTable.get.partitionColumnNames.nonEmpty &&
@@ -546,6 +568,34 @@ object PreprocessTableInsertion extends ResolveInsertionBase {
546568
case _ => i
547569
}
548570
}
571+
572+
/**
573+
* Recursively restores nullability flags from the original data type into the resolved
574+
* data type, keeping the resolved type structure intact.
575+
*/
576+
private def restoreDataTypeNullability(resolved: DataType, original: DataType): DataType = {
577+
(resolved, original) match {
578+
case (r: StructType, o: StructType) =>
579+
val origFields = o.fields.map(f => f.name -> f).toMap
580+
StructType(r.fields.map { rf =>
581+
origFields.get(rf.name) match {
582+
case Some(of) =>
583+
rf.copy(
584+
nullable = of.nullable,
585+
dataType = restoreDataTypeNullability(rf.dataType, of.dataType))
586+
case None => rf
587+
}
588+
})
589+
case (ArrayType(rElem, _), ArrayType(oElem, oNull)) =>
590+
ArrayType(restoreDataTypeNullability(rElem, oElem), oNull)
591+
case (MapType(rKey, rVal, _), MapType(oKey, oVal, oValNull)) =>
592+
MapType(
593+
restoreDataTypeNullability(rKey, oKey),
594+
restoreDataTypeNullability(rVal, oVal),
595+
oValNull)
596+
case _ => resolved
597+
}
598+
}
549599
}
550600

551601
/**

sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowCreateTableSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ trait ShowCreateTableSuiteBase extends command.ShowCreateTableSuiteBase
185185
val showDDL = getShowCreateDDL(t)
186186
assert(showDDL === Array(
187187
s"CREATE TABLE $fullName (",
188-
"a BIGINT,",
188+
"a BIGINT NOT NULL,",
189189
"b BIGINT DEFAULT 42,",
190190
"c STRING COLLATE UTF8_BINARY DEFAULT 'abc, \"def\"' COMMENT 'comment')",
191191
"USING parquet",

sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2853,6 +2853,188 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
28532853
}
28542854
}
28552855

2856+
test("SPARK-55716: V1 INSERT rejects null into NOT NULL column for file sources") {
2857+
Seq("parquet", "orc", "json").foreach { format =>
2858+
withSQLConf(SQLConf.FILE_SOURCE_INSERT_ENFORCE_NOT_NULL.key -> "true") {
2859+
withTable("t") {
2860+
sql(s"CREATE TABLE t(i INT NOT NULL, s STRING NOT NULL) USING $format")
2861+
// V1 DataSource writes now enforce NOT NULL constraints via AssertNotNull
2862+
val e1 = intercept[SparkRuntimeException] {
2863+
sql("INSERT INTO t VALUES(null, 'a')")
2864+
}
2865+
assert(e1.getCondition === "NOT_NULL_ASSERT_VIOLATION")
2866+
val e2 = intercept[SparkRuntimeException] {
2867+
sql("INSERT INTO t VALUES(1, null)")
2868+
}
2869+
assert(e2.getCondition === "NOT_NULL_ASSERT_VIOLATION")
2870+
// Valid insert should succeed
2871+
sql("INSERT INTO t VALUES(1, 'a')")
2872+
checkAnswer(spark.table("t"), Seq(Row(1, "a")))
2873+
}
2874+
}
2875+
}
2876+
}
2877+
2878+
test("SPARK-55716: V1 INSERT NOT NULL enforcement respects storeAssignmentPolicy") {
2879+
Seq("parquet", "orc").foreach { format =>
2880+
// ANSI mode (default): rejects null
2881+
withSQLConf(
2882+
SQLConf.FILE_SOURCE_INSERT_ENFORCE_NOT_NULL.key -> "true",
2883+
SQLConf.STORE_ASSIGNMENT_POLICY.key -> SQLConf.StoreAssignmentPolicy.ANSI.toString) {
2884+
withTable("t") {
2885+
sql(s"CREATE TABLE t(i INT NOT NULL) USING $format")
2886+
val e = intercept[SparkRuntimeException] {
2887+
sql("INSERT INTO t VALUES(null)")
2888+
}
2889+
assert(e.getCondition === "NOT_NULL_ASSERT_VIOLATION")
2890+
}
2891+
}
2892+
// STRICT mode: also rejects null (fails at analysis with type mismatch)
2893+
withSQLConf(
2894+
SQLConf.FILE_SOURCE_INSERT_ENFORCE_NOT_NULL.key -> "true",
2895+
SQLConf.STORE_ASSIGNMENT_POLICY.key -> SQLConf.StoreAssignmentPolicy.STRICT.toString) {
2896+
withTable("t") {
2897+
sql(s"CREATE TABLE t(i INT NOT NULL) USING $format")
2898+
intercept[AnalysisException] {
2899+
sql("INSERT INTO t VALUES(null)")
2900+
}
2901+
}
2902+
}
2903+
// LEGACY mode: allows null (no AssertNotNull injected)
2904+
withSQLConf(
2905+
SQLConf.STORE_ASSIGNMENT_POLICY.key -> SQLConf.StoreAssignmentPolicy.LEGACY.toString) {
2906+
withTable("t") {
2907+
sql(s"CREATE TABLE t(i INT NOT NULL) USING $format")
2908+
sql("INSERT INTO t VALUES(null)")
2909+
checkAnswer(spark.table("t"), Seq(Row(null)))
2910+
}
2911+
}
2912+
// Legacy config: allows null even in ANSI mode
2913+
withSQLConf(
2914+
SQLConf.FILE_SOURCE_INSERT_ENFORCE_NOT_NULL.key -> "false") {
2915+
withTable("t") {
2916+
sql(s"CREATE TABLE t(i INT NOT NULL) USING $format")
2917+
sql("INSERT INTO t VALUES(null)")
2918+
checkAnswer(spark.table("t"), Seq(Row(null)))
2919+
}
2920+
}
2921+
}
2922+
}
2923+
2924+
test("SPARK-55716: V1 INSERT rejects null with V2 file source path") {
2925+
Seq("parquet", "orc").foreach { format =>
2926+
withSQLConf(
2927+
SQLConf.FILE_SOURCE_INSERT_ENFORCE_NOT_NULL.key -> "true",
2928+
SQLConf.USE_V1_SOURCE_LIST.key -> "") {
2929+
withTable("t") {
2930+
sql(s"CREATE TABLE t(i INT NOT NULL, s STRING NOT NULL) USING $format")
2931+
val e = intercept[SparkRuntimeException] {
2932+
sql("INSERT INTO t VALUES(null, 'a')")
2933+
}
2934+
assert(e.getCondition === "NOT_NULL_ASSERT_VIOLATION")
2935+
}
2936+
}
2937+
}
2938+
}
2939+
2940+
test("SPARK-55716: V1 INSERT rejects null array element for NOT NULL element type") {
2941+
Seq("parquet", "orc").foreach { format =>
2942+
withSQLConf(SQLConf.FILE_SOURCE_INSERT_ENFORCE_NOT_NULL.key -> "true") {
2943+
withTable("t") {
2944+
val schema = new StructType()
2945+
.add("a", ArrayType(IntegerType, containsNull = false))
2946+
spark.sessionState.catalog.createTable(
2947+
CatalogTable(
2948+
identifier = TableIdentifier("t"),
2949+
tableType = CatalogTableType.MANAGED,
2950+
storage = CatalogStorageFormat.empty,
2951+
schema = schema,
2952+
provider = Some(format)),
2953+
ignoreIfExists = false)
2954+
val e = intercept[SparkRuntimeException] {
2955+
sql("INSERT INTO t SELECT array(1, null, 3)")
2956+
}
2957+
assert(e.getCondition === "NOT_NULL_ASSERT_VIOLATION")
2958+
// Valid insert should succeed
2959+
sql("INSERT INTO t SELECT array(1, 2, 3)")
2960+
checkAnswer(spark.table("t"), Seq(Row(Seq(1, 2, 3))))
2961+
}
2962+
}
2963+
}
2964+
}
2965+
2966+
test("SPARK-55716: V1 INSERT rejects null struct field for NOT NULL field") {
2967+
Seq("parquet", "orc").foreach { format =>
2968+
withSQLConf(SQLConf.FILE_SOURCE_INSERT_ENFORCE_NOT_NULL.key -> "true") {
2969+
withTable("t") {
2970+
val schema = new StructType()
2971+
.add("s", new StructType()
2972+
.add("x", IntegerType, nullable = false)
2973+
.add("y", StringType, nullable = false))
2974+
spark.sessionState.catalog.createTable(
2975+
CatalogTable(
2976+
identifier = TableIdentifier("t"),
2977+
tableType = CatalogTableType.MANAGED,
2978+
storage = CatalogStorageFormat.empty,
2979+
schema = schema,
2980+
provider = Some(format)),
2981+
ignoreIfExists = false)
2982+
val e = intercept[SparkRuntimeException] {
2983+
sql("INSERT INTO t SELECT named_struct('x', null, 'y', 'hello')")
2984+
}
2985+
assert(e.getCondition === "NOT_NULL_ASSERT_VIOLATION")
2986+
// Valid insert should succeed
2987+
sql("INSERT INTO t SELECT named_struct('x', 1, 'y', 'hello')")
2988+
checkAnswer(spark.table("t"), Seq(Row(Row(1, "hello"))))
2989+
}
2990+
}
2991+
}
2992+
}
2993+
2994+
test("SPARK-55716: V1 INSERT rejects null map value for NOT NULL value type") {
2995+
Seq("parquet", "orc").foreach { format =>
2996+
withSQLConf(SQLConf.FILE_SOURCE_INSERT_ENFORCE_NOT_NULL.key -> "true") {
2997+
withTable("t") {
2998+
val schema = new StructType()
2999+
.add("m", MapType(StringType, IntegerType, valueContainsNull = false))
3000+
spark.sessionState.catalog.createTable(
3001+
CatalogTable(
3002+
identifier = TableIdentifier("t"),
3003+
tableType = CatalogTableType.MANAGED,
3004+
storage = CatalogStorageFormat.empty,
3005+
schema = schema,
3006+
provider = Some(format)),
3007+
ignoreIfExists = false)
3008+
val e = intercept[SparkRuntimeException] {
3009+
sql("INSERT INTO t SELECT map('a', 1, 'b', null)")
3010+
}
3011+
assert(e.getCondition === "NOT_NULL_ASSERT_VIOLATION")
3012+
// Valid insert should succeed
3013+
sql("INSERT INTO t SELECT map('a', 1, 'b', 2)")
3014+
checkAnswer(spark.table("t"), Seq(Row(Map("a" -> 1, "b" -> 2))))
3015+
}
3016+
}
3017+
}
3018+
}
3019+
3020+
test("SPARK-55716: V1 DataFrame write ignores NOT NULL schema constraint") {
3021+
Seq("parquet", "orc").foreach { format =>
3022+
withTempPath { path =>
3023+
val data = Seq(Row(null, "hello", 1.0), Row(1, null, 2.0), Row(2, "world", null))
3024+
val df = spark.createDataFrame(
3025+
spark.sparkContext.parallelize(data),
3026+
new StructType()
3027+
.add("id", IntegerType, nullable = true)
3028+
.add("name", StringType, nullable = true)
3029+
.add("value", DoubleType, nullable = true))
3030+
// V1 DataSource writes do not enforce NOT NULL constraints
3031+
df.write.mode(SaveMode.Overwrite).format(format).save(path.getCanonicalPath)
3032+
val result = spark.read.format(format).load(path.getCanonicalPath)
3033+
checkAnswer(result, data)
3034+
}
3035+
}
3036+
}
3037+
28563038
test("UNSUPPORTED_OVERWRITE.PATH: Can't overwrite a path that is also being read from") {
28573039
val tableName = "t1"
28583040
withTable(tableName) {

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ private[hive] class SparkGetColumnsOperation(
224224
null, // SQL_DATETIME_SUB
225225
null, // CHAR_OCTET_LENGTH
226226
ordinal.asInstanceOf[AnyRef], // ORDINAL_POSITION, 1-based
227-
"YES", // IS_NULLABLE
227+
(if (column.nullable) "YES" else "NO"), // IS_NULLABLE
228228
null, // SCOPE_CATALOG
229229
null, // SCOPE_SCHEMA
230230
null, // SCOPE_TABLE

0 commit comments

Comments
 (0)