Skip to content

Commit 7d5884f

Browse files
authored
fix: [Spark 4.1.1] preserve parent struct nullness when all requested fields missing in Parquet (#4190)
1 parent 06d565a commit 7d5884f

12 files changed

Lines changed: 178 additions & 56 deletions

File tree

common/src/main/java/org/apache/comet/parquet/Native.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,7 @@ public static native long initRecordBatchReader(
226226
String sessionTimezone,
227227
int batchSize,
228228
boolean caseSensitive,
229+
boolean returnNullStructIfAllFieldsMissing,
229230
Map<String, String> objectStoreOptions,
230231
CometFileKeyUnwrapper keyUnwrapper,
231232
Object metricsNode);

common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,11 @@ public URI pathUri() throws URISyntaxException {
159159
protected boolean isCaseSensitive;
160160
protected boolean useFieldId;
161161
protected boolean ignoreMissingIds;
162+
// SPARK-53535 (Spark 4.1+): when reading a struct whose requested fields are all
163+
// missing in the Parquet file, true returns the entire struct as null (legacy
164+
// pre-4.1 behavior); false preserves the parent struct's nullness from the file
165+
// so non-null parents materialize as a struct of all-null fields.
166+
protected boolean returnNullStructIfAllFieldsMissing = true;
162167
protected StructType partitionSchema;
163168
protected InternalRow partitionValues;
164169
protected PartitionedFile file;
@@ -278,6 +283,7 @@ private NativeBatchReader(AbstractColumnReader[] columnReaders) {
278283
boolean useFieldId,
279284
boolean ignoreMissingIds,
280285
boolean useLegacyDateTimestamp,
286+
boolean returnNullStructIfAllFieldsMissing,
281287
StructType partitionSchema,
282288
InternalRow partitionValues,
283289
Map<String, SQLMetric> metrics,
@@ -290,6 +296,7 @@ private NativeBatchReader(AbstractColumnReader[] columnReaders) {
290296
this.useFieldId = useFieldId;
291297
this.ignoreMissingIds = ignoreMissingIds;
292298
this.useLegacyDateTimestamp = useLegacyDateTimestamp;
299+
this.returnNullStructIfAllFieldsMissing = returnNullStructIfAllFieldsMissing;
293300
this.partitionSchema = partitionSchema;
294301
this.partitionValues = partitionValues;
295302
this.file = inputSplit;
@@ -578,6 +585,7 @@ public void init() throws Throwable {
578585
timeZoneId,
579586
batchSize,
580587
caseSensitive,
588+
returnNullStructIfAllFieldsMissing,
581589
objectStoreOptions,
582590
keyUnwrapper,
583591
metricsNode);

dev/diffs/4.1.1.diff

Lines changed: 5 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -2944,7 +2944,7 @@ index 6b73cc8618d..624694916fb 100644
29442944
case _ => assert(false, "Can not match ParquetTable in the query.")
29452945
}
29462946
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
2947-
index 3072657a095..b2293ccab17 100644
2947+
index 3072657a095..6b5b9103363 100644
29482948
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
29492949
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
29502950
@@ -40,6 +40,7 @@ import org.apache.parquet.schema.{MessageType, MessageTypeParser}
@@ -2955,57 +2955,17 @@ index 3072657a095..b2293ccab17 100644
29552955
import org.apache.spark.sql.catalyst.InternalRow
29562956
import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericInternalRow, UnsafeRow}
29572957
import org.apache.spark.sql.catalyst.util.{DateTimeConstants, DateTimeUtils}
2958-
@@ -765,7 +766,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
2959-
}
2960-
}
2961-
2962-
- test("vectorized reader: missing all struct fields") {
2963-
+ test("vectorized reader: missing all struct fields",
2964-
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4136")) {
2965-
for {
2966-
offheapEnabled <- Seq(true, false)
2967-
returnNullStructIfAllFieldsMissing <- Seq(true, false)
2968-
@@ -803,7 +805,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
2969-
}
2970-
}
2971-
2972-
- test("SPARK-53535: vectorized reader: missing all struct fields, struct with complex fields") {
2973-
+ test("SPARK-53535: vectorized reader: missing all struct fields, struct with complex fields",
2974-
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4136")) {
2975-
val data = Seq(
2976-
Row(Row(Seq(11, 12, null, 14), Row("21", 22), Row(true)), 100),
2977-
Row(Row(Seq(11, 12, null, 14), Row("21", 22), Row(false)), 100),
2978-
@@ -858,7 +861,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
2979-
}
2980-
}
2981-
2982-
- test("SPARK-53535: vectorized reader: missing all struct fields, struct with map field only") {
2983-
+ test("SPARK-53535: vectorized reader: missing all struct fields, struct with map field only",
2984-
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4136")) {
2985-
val data = Seq(
2986-
Row(Row(Map("key1" -> 1)), 100),
2987-
Row(Row(Map("key2" -> 2)), 100),
2988-
@@ -903,7 +907,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
2989-
}
2990-
2991-
test("SPARK-53535: vectorized reader: missing all struct fields, " +
2992-
- "struct with cheap map and more expensive array field") {
2993-
+ "struct with cheap map and more expensive array field",
2994-
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4136")) {
2995-
val data = Seq(
2996-
Row(Row(Map(false -> Row("expensive", 1)), Seq("test1")), 100),
2997-
Row(Row(Map(true -> Row("expensive", 2)), Seq("test2")), 100),
2998-
@@ -953,7 +958,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
2958+
@@ -953,7 +954,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
29992959
}
30002960
}
30012961

30022962
- test("SPARK-54220: vectorized reader: missing all struct fields, struct with NullType only") {
30032963
+ test("SPARK-54220: vectorized reader: missing all struct fields, struct with NullType only",
3004-
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4136")) {
2964+
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4199")) {
30052965
val data = Seq(
30062966
Tuple1((null, null)),
30072967
Tuple1((null, null)),
3008-
@@ -1282,7 +1288,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
2968+
@@ -1282,7 +1284,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
30092969
}
30102970
}
30112971

@@ -3015,7 +2975,7 @@ index 3072657a095..b2293ccab17 100644
30152975
val data = (1 to 4).map(i => Tuple1(i.toString))
30162976
val readSchema = StructType(Seq(StructField("_1", DataTypes.TimestampType)))
30172977

3018-
@@ -1567,7 +1574,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
2978+
@@ -1567,7 +1570,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
30192979
}
30202980
}
30212981

native/core/src/execution/planner.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1323,6 +1323,7 @@ impl PhysicalPlanner {
13231323
default_values,
13241324
common.session_timezone.as_str(),
13251325
common.case_sensitive,
1326+
common.return_null_struct_if_all_fields_missing,
13261327
self.session_ctx(),
13271328
common.encryption_enabled,
13281329
)?;

native/core/src/parquet/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat
438438
session_timezone: JString,
439439
batch_size: jint,
440440
case_sensitive: jboolean,
441+
return_null_struct_if_all_fields_missing: jboolean,
441442
object_store_options: JObject,
442443
key_unwrapper_obj: JObject,
443444
metrics_node: JObject,
@@ -511,6 +512,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat
511512
None,
512513
session_timezone.as_str(),
513514
case_sensitive != JNI_FALSE,
515+
return_null_struct_if_all_fields_missing != JNI_FALSE,
514516
session_ctx,
515517
encryption_enabled,
516518
)?;

native/core/src/parquet/parquet_exec.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,12 +71,14 @@ pub(crate) fn init_datasource_exec(
7171
default_values: Option<HashMap<Column, ScalarValue>>,
7272
session_timezone: &str,
7373
case_sensitive: bool,
74+
return_null_struct_if_all_fields_missing: bool,
7475
session_ctx: &Arc<SessionContext>,
7576
encryption_enabled: bool,
7677
) -> Result<Arc<DataSourceExec>, ExecutionError> {
7778
let (table_parquet_options, spark_parquet_options) = get_options(
7879
session_timezone,
7980
case_sensitive,
81+
return_null_struct_if_all_fields_missing,
8082
&object_store_url,
8183
encryption_enabled,
8284
);
@@ -185,6 +187,7 @@ pub(crate) fn init_datasource_exec(
185187
fn get_options(
186188
session_timezone: &str,
187189
case_sensitive: bool,
190+
return_null_struct_if_all_fields_missing: bool,
188191
object_store_url: &ObjectStoreUrl,
189192
encryption_enabled: bool,
190193
) -> (TableParquetOptions, SparkParquetOptions) {
@@ -196,6 +199,8 @@ fn get_options(
196199
SparkParquetOptions::new(EvalMode::Legacy, session_timezone, false);
197200
spark_parquet_options.allow_cast_unsigned_ints = true;
198201
spark_parquet_options.case_sensitive = case_sensitive;
202+
spark_parquet_options.return_null_struct_if_all_fields_missing =
203+
return_null_struct_if_all_fields_missing;
199204

200205
if encryption_enabled {
201206
table_parquet_options.crypto.configure_factory(

native/core/src/parquet/parquet_support.rs

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,11 @@ pub struct SparkParquetOptions {
7979
pub use_legacy_date_timestamp_or_ntz: bool,
8080
// Whether schema field names are case sensitive
8181
pub case_sensitive: bool,
82+
/// SPARK-53535 (Spark 4.1+): when reading a struct whose requested fields are all
83+
/// missing in the Parquet file, true returns the entire struct as null (pre-4.1
84+
/// legacy behavior); false preserves the parent struct's nullness from the file
85+
/// so non-null parents return a struct of all-null fields.
86+
pub return_null_struct_if_all_fields_missing: bool,
8287
}
8388

8489
impl SparkParquetOptions {
@@ -91,6 +96,7 @@ impl SparkParquetOptions {
9196
use_decimal_128: false,
9297
use_legacy_date_timestamp_or_ntz: false,
9398
case_sensitive: false,
99+
return_null_struct_if_all_fields_missing: true,
94100
}
95101
}
96102

@@ -103,6 +109,7 @@ impl SparkParquetOptions {
103109
use_decimal_128: false,
104110
use_legacy_date_timestamp_or_ntz: false,
105111
case_sensitive: false,
112+
return_null_struct_if_all_fields_missing: true,
106113
}
107114
}
108115
}
@@ -279,13 +286,18 @@ fn parquet_convert_struct_to_struct(
279286
}
280287
}
281288

282-
// If target schema doesn't contain any of the existing fields
283-
// mark such a column in array as NULL
284-
let nulls = if field_overlap {
285-
array.nulls().cloned()
286-
} else {
287-
Some(NullBuffer::new_null(array.len()))
288-
};
289+
// When the file's struct contains none of the requested fields, the
290+
// returned validity buffer depends on Spark's
291+
// `spark.sql.legacy.parquet.returnNullStructIfAllFieldsMissing` (SPARK-53535,
292+
// Spark 4.1+). Legacy mode marks the whole column null; the new default
293+
// preserves the file's parent-row nullness so non-null parents materialize
294+
// as a struct of all-null fields.
295+
let nulls =
296+
if !field_overlap && parquet_options.return_null_struct_if_all_fields_missing {
297+
Some(NullBuffer::new_null(array.len()))
298+
} else {
299+
array.nulls().cloned()
300+
};
289301

290302
Ok(Arc::new(StructArray::new(
291303
to_fields.clone(),

native/proto/src/proto/operator.proto

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,11 @@ message NativeScanCommon {
107107
bool encryption_enabled = 11;
108108
string source = 12;
109109
repeated spark.spark_expression.DataType fields = 13;
110+
// SPARK-53535 (Spark 4.1+): when reading a struct whose requested fields are all
111+
// missing in the Parquet file, true returns the entire struct as null (legacy
112+
// pre-4.1 behavior); false preserves the parent struct's nullness from the file
113+
// so non-null parents return a struct of all-null fields.
114+
bool return_null_struct_if_all_fields_missing = 14;
110115
}
111116

112117
message NativeScan {

spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import org.apache.spark.sql.types.{DateType, StructType, TimestampType}
4343
import org.apache.spark.util.SerializableConfiguration
4444

4545
import org.apache.comet.CometConf
46+
import org.apache.comet.CometSparkSessionExtensions.isSpark41Plus
4647
import org.apache.comet.MetricsSupport
4748
import org.apache.comet.shims.ShimSQLConf
4849
import org.apache.comet.vector.CometVector
@@ -96,6 +97,15 @@ class CometParquetFileFormat(session: SparkSession)
9697
val isCaseSensitive = sqlConf.caseSensitiveAnalysis
9798
val useFieldId = CometParquetUtils.readFieldId(sqlConf)
9899
val ignoreMissingIds = CometParquetUtils.ignoreMissingIds(sqlConf)
100+
// SPARK-53535 (Spark 4.1+): when reading a struct whose requested fields are all
101+
// missing in the Parquet file, the new default preserves the parent struct's
102+
// nullness from the file. Pre-4.1 Spark hardcodes the legacy behavior, so we
103+
// default to "true" there for backwards compatibility.
104+
val returnNullStructIfAllFieldsMissing = sqlConf
105+
.getConfString(
106+
"spark.sql.legacy.parquet.returnNullStructIfAllFieldsMissing",
107+
if (isSpark41Plus) "false" else "true")
108+
.toBoolean
99109
val pushDownDate = sqlConf.parquetFilterPushDownDate
100110
val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
101111
val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
@@ -158,6 +168,7 @@ class CometParquetFileFormat(session: SparkSession)
158168
useFieldId,
159169
ignoreMissingIds,
160170
datetimeRebaseSpec.mode == CORRECTED,
171+
returnNullStructIfAllFieldsMissing,
161172
partitionSchema,
162173
file.partitionValues,
163174
metrics.asJava,

spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import org.apache.spark.sql.internal.SQLConf
3131

3232
import org.apache.comet.{CometConf, ConfigEntry}
3333
import org.apache.comet.CometConf.COMET_EXEC_ENABLED
34-
import org.apache.comet.CometSparkSessionExtensions.{hasExplainInfo, isSpark35Plus, withInfo}
34+
import org.apache.comet.CometSparkSessionExtensions.{hasExplainInfo, isSpark35Plus, isSpark41Plus, withInfo}
3535
import org.apache.comet.objectstore.NativeConfig
3636
import org.apache.comet.parquet.CometParquetUtils
3737
import org.apache.comet.serde.{CometOperatorSerde, Compatible, OperatorOuterClass, SupportLevel}
@@ -189,6 +189,17 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging {
189189
commonBuilder.setSessionTimezone(scan.conf.getConfString("spark.sql.session.timeZone"))
190190
commonBuilder.setCaseSensitive(scan.conf.getConf[Boolean](SQLConf.CASE_SENSITIVE))
191191

192+
// SPARK-53535 (Spark 4.1+): when reading a struct whose requested fields are all
193+
// missing in the Parquet file, the new default preserves the parent struct's
194+
// nullness from the file (so non-null parents materialize as a struct of all-null
195+
// fields). Pre-4.1 Spark hardcodes the legacy behavior (whole struct null), which
196+
// matches the Comet default we use as fallback.
197+
val returnNullStructConfKey =
198+
"spark.sql.legacy.parquet.returnNullStructIfAllFieldsMissing"
199+
val returnNullStructDefault = if (isSpark41Plus) "false" else "true"
200+
commonBuilder.setReturnNullStructIfAllFieldsMissing(
201+
scan.conf.getConfString(returnNullStructConfKey, returnNullStructDefault).toBoolean)
202+
192203
// Collect S3/cloud storage configurations
193204
val hadoopConf = scan.relation.sparkSession.sessionState
194205
.newHadoopConfWithOptions(scan.relation.options)

0 commit comments

Comments
 (0)