Skip to content

Commit f2f6ac2

Browse files
authored
fix: correct GetStructField null handling for null parent structs (value + nullability) (#4523)
1 parent d55fc9c commit f2f6ac2

2 files changed

Lines changed: 144 additions & 6 deletions

File tree

native/spark-expr/src/struct_funcs/get_struct_field.rs

Lines changed: 113 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use arrow::array::{Array, StructArray};
18+
use arrow::array::{make_array, Array, ArrayRef, StructArray};
19+
use arrow::buffer::NullBuffer;
1920
use arrow::datatypes::{DataType, Field, Schema};
2021
use arrow::record_batch::RecordBatch;
2122
use datafusion::common::{DataFusionError, Result as DataFusionResult, ScalarValue};
@@ -59,6 +60,27 @@ impl GetStructField {
5960
))),
6061
}
6162
}
63+
64+
/// Extract field `ordinal` from a struct array, propagating the parent struct's null mask.
65+
///
66+
/// Spark semantics: a field of a NULL struct is NULL. Arrow stores a StructArray's child
67+
/// arrays with their own validity, INDEPENDENT of the parent struct's null buffer -- so the
68+
/// raw child value at a row where the struct itself is null can be non-null (e.g. parquet
69+
/// files where a logically-null struct column still has a populated child buffer). Returning
70+
/// the child column verbatim then makes `isnotnull(struct.field)` wrongly true for a null
71+
/// struct. Union the struct's null mask into the child's (null where the struct is null OR
72+
/// the child is null).
73+
fn project_field(struct_array: &StructArray, ordinal: usize) -> DataFusionResult<ArrayRef> {
74+
let child = struct_array.column(ordinal);
75+
match struct_array.nulls() {
76+
Some(_) => {
77+
let combined = NullBuffer::union(struct_array.nulls(), child.nulls());
78+
let data = child.to_data().into_builder().nulls(combined).build()?;
79+
Ok(make_array(data))
80+
}
81+
None => Ok(Arc::clone(child)),
82+
}
83+
}
6284
}
6385

6486
impl PhysicalExpr for GetStructField {
@@ -75,7 +97,15 @@ impl PhysicalExpr for GetStructField {
7597
}
7698

7799
fn nullable(&self, input_schema: &Schema) -> DataFusionResult<bool> {
78-
Ok(self.child_field(input_schema)?.is_nullable())
100+
// A field extracted from a struct is nullable if EITHER the field itself is declared
101+
// nullable OR the parent struct can be null -- a field of a null struct is null (Spark
102+
// semantics, enforced by `project_field` unioning the parent null mask). Reporting only
103+
// the field's own nullability under-declares: a non-nullable field of a nullable struct
104+
// then carries the parent's nulls while claiming non-nullable, which fails Arrow's
105+
// RecordBatch validation downstream with "declared as non-nullable but contains null
106+
// values" (e.g. once the projected column reaches a shuffle/sort). Mirrors Spark's
107+
// `GetStructField.nullable = child.nullable || field.nullable`.
108+
Ok(self.child.nullable(input_schema)? || self.child_field(input_schema)?.is_nullable())
79109
}
80110

81111
fn evaluate(&self, batch: &RecordBatch) -> DataFusionResult<ColumnarValue> {
@@ -88,12 +118,13 @@ impl PhysicalExpr for GetStructField {
88118
.downcast_ref::<StructArray>()
89119
.expect("A struct is expected");
90120

91-
Ok(ColumnarValue::Array(Arc::clone(
92-
struct_array.column(self.ordinal),
93-
)))
121+
Ok(ColumnarValue::Array(Self::project_field(
122+
struct_array,
123+
self.ordinal,
124+
)?))
94125
}
95126
ColumnarValue::Scalar(ScalarValue::Struct(struct_array)) => Ok(ColumnarValue::Array(
96-
Arc::clone(struct_array.column(self.ordinal)),
127+
Self::project_field(&struct_array, self.ordinal)?,
97128
)),
98129
value => Err(DataFusionError::Execution(format!(
99130
"Expected a struct array, got {value:?}"
@@ -125,3 +156,79 @@ impl Display for GetStructField {
125156
)
126157
}
127158
}
159+
160+
#[cfg(test)]
161+
mod tests {
162+
use super::*;
163+
use arrow::array::Int64Array;
164+
use arrow::datatypes::Fields;
165+
use datafusion::physical_expr::expressions::Column;
166+
167+
// A field of a NULL struct must be NULL (Spark semantics) even when the child buffer holds a
168+
// non-null value at that row -- Arrow stores child validity independently of the parent
169+
// struct's null mask, so a logically-null struct column read from parquet can still carry a
170+
// populated child buffer. Without propagating the parent null mask, `isnotnull(struct.field)`
171+
// wrongly evaluates TRUE for a null struct.
172+
#[test]
173+
fn field_of_null_struct_is_null() {
174+
// Child is non-null at every row; the struct itself is null at rows 1 and 3.
175+
let child = Arc::new(Int64Array::from(vec![10_i64, 20, 30, 40])) as ArrayRef;
176+
let fields: Fields = Fields::from(vec![Field::new("version", DataType::Int64, true)]);
177+
let nulls = NullBuffer::from(vec![true, false, true, false]);
178+
let struct_array = StructArray::new(fields.clone(), vec![child], Some(nulls));
179+
let schema = Schema::new(vec![Field::new("cm", DataType::Struct(fields), true)]);
180+
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(struct_array)]).unwrap();
181+
182+
let expr = GetStructField::new(Arc::new(Column::new("cm", 0)), 0);
183+
let out = expr
184+
.evaluate(&batch)
185+
.unwrap()
186+
.into_array(batch.num_rows())
187+
.unwrap();
188+
let out = out.as_any().downcast_ref::<Int64Array>().unwrap();
189+
190+
assert!(!out.is_null(0) && out.value(0) == 10);
191+
assert!(out.is_null(1), "field of a null struct must be null");
192+
assert!(!out.is_null(2) && out.value(2) == 30);
193+
assert!(out.is_null(3), "field of a null struct must be null");
194+
}
195+
196+
// A NON-nullable field of a NULLABLE struct must report `nullable() == true`: `project_field`
197+
// unions the parent struct's null mask, so the projected column carries nulls wherever the
198+
// struct is null. Reporting the field's own (non-nullable) flag would make the output schema
199+
// lie, failing Arrow RecordBatch validation downstream with "declared as non-nullable but
200+
// contains null values" once the column reaches a shuffle/sort.
201+
#[test]
202+
fn non_nullable_field_of_nullable_struct_is_nullable() {
203+
// `size` is declared non-nullable, but the enclosing struct is nullable.
204+
let inner: Fields = Fields::from(vec![Field::new("size", DataType::Int64, false)]);
205+
let schema = Schema::new(vec![Field::new(
206+
"add",
207+
DataType::Struct(inner),
208+
/* struct nullable */ true,
209+
)]);
210+
211+
let expr = GetStructField::new(Arc::new(Column::new("add", 0)), 0);
212+
assert!(
213+
expr.nullable(&schema).unwrap(),
214+
"a field of a nullable struct must be nullable even if the field itself is non-nullable"
215+
);
216+
}
217+
218+
// A non-nullable field of a NON-nullable struct stays non-nullable (no over-declaring).
219+
#[test]
220+
fn non_nullable_field_of_non_nullable_struct_stays_non_nullable() {
221+
let inner: Fields = Fields::from(vec![Field::new("size", DataType::Int64, false)]);
222+
let schema = Schema::new(vec![Field::new(
223+
"add",
224+
DataType::Struct(inner),
225+
/* struct nullable */ false,
226+
)]);
227+
228+
let expr = GetStructField::new(Arc::new(Column::new("add", 0)), 0);
229+
assert!(
230+
!expr.nullable(&schema).unwrap(),
231+
"a non-nullable field of a non-nullable struct must remain non-nullable"
232+
);
233+
}
234+
}

spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,37 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
125125
}
126126
}
127127

128+
test("GetStructField: non-nullable field of a nullable struct (Delta action-frame shape)") {
129+
// Repro for the under-declared `GetStructField` nullability that crashed Comet's native
130+
// execution with "Column '...' is declared as non-nullable but contains null values".
131+
//
132+
// Models Delta's action frame: each log row is exactly ONE action type, so action columns are
133+
// NULLABLE structs, but their inner fields are declared NON-nullable by Delta's typed
134+
// SingleAction schema (e.g. `add.size`). We build that exact shape with an explicit in-memory
135+
// schema (a Parquet round-trip would mark every field nullable, and a CreateNamedStruct would
136+
// be declined). Pushing the struct through a Comet shuffle and projecting the non-nullable
137+
// inner field (GetStructField) used to produce a null in a column declared non-nullable, which
138+
// Comet's native execution rejected during RecordBatch validation.
139+
val schema = StructType(
140+
Seq(
141+
StructField(
142+
"add",
143+
StructType(Seq(StructField("size", LongType, nullable = false))),
144+
nullable = true),
145+
StructField("v", IntegerType, nullable = false)))
146+
val rows = (0 until 1000).map(i => Row(if (i % 2 == 0) Row(i.toLong) else null, i))
147+
withSQLConf(
148+
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
149+
CometConf.COMET_EXEC_SHUFFLE_WITH_HASH_PARTITIONING_ENABLED.key -> "true") {
150+
val df = spark
151+
.createDataFrame(spark.sparkContext.parallelize(rows), schema)
152+
.repartition(4, col("v")) // materialize the typed struct through a Comet shuffle
153+
.select(col("add.size").as("size")) // GetStructField on the non-nullable inner field
154+
.repartition(4, col("size")) // re-shuffle: read-back validates the declared schema
155+
checkSparkAnswer(df)
156+
}
157+
}
158+
128159
test("compare true/false to negative zero") {
129160
Seq(false, true).foreach { dictionary =>
130161
withSQLConf(

0 commit comments

Comments
 (0)