Skip to content

Commit fed096c

Browse files
authored
feat: Restore nullability when consuming substrait fields (#22105)
## Which issue does this PR close? - Closes #12727. ## Rationale for this change The Substrait logical-plan consumer was discarding field nullability when reconstructing DataFusion schemas from Substrait struct types. Nullability matters because a Substrait plan may have been produced or optimized using non-null guarantees. This also improves DataFusion <-> Substrait round-trip fidelity: required fields encoded by the producer are preserved when the plan is consumed again, instead of being widened to nullable. ## What changes are included in this PR? - Preserve per-field nullability when converting Substrait struct types / `NamedStruct` schemas into DataFusion schemas. - Treat Substrait `Required` as non-nullable, and `Nullable`, `Unspecified`, or unknown nullability values as nullable. - Keep deprecated `UserDefinedTypeReference` non-nullable because it does not carry nullability metadata. - Enforce named-table `ReadRel` schema compatibility when the Substrait schema requires a field to be non-null but the resolved DataFusion table schema marks it nullable. - Extend compatibility checking recursively through nested `Struct` fields. - Leave `List` and `Map` child nullability compatibility as future work, since their child nullability is not faithfully reconstructed today. ## Are these changes tested? Yes; new tests added. ## Are there any user-facing changes? We are a bit stricter when consuming Substrait plans now, but that could prevent problems: for example, if a Substrait plan was produced under the assumption that a field `x` is non-nullable but the local DataFusion schema allows null values in `x`, executing the plan might produce unexpected results.
1 parent 2a2a060 commit fed096c

5 files changed

Lines changed: 275 additions & 11 deletions

File tree

datafusion/substrait/src/logical_plan/consumer/types.rs

Lines changed: 91 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -347,12 +347,98 @@ fn from_substrait_struct_type(
347347
) -> datafusion::common::Result<Fields> {
348348
let mut fields = vec![];
349349
for (i, f) in s.types.iter().enumerate() {
350-
let field = Field::new(
351-
next_struct_field_name(i, dfs_names, name_idx)?,
352-
from_substrait_type(consumer, f, dfs_names, name_idx)?,
353-
true, // We assume everything to be nullable since that's easier than ensuring it matches
354-
);
350+
let name = next_struct_field_name(i, dfs_names, name_idx)?;
351+
let data_type = from_substrait_type(consumer, f, dfs_names, name_idx)?;
352+
let field = Field::new(name, data_type, type_is_nullable(f)?);
355353
fields.push(field);
356354
}
357355
Ok(fields.into())
358356
}
357+
358+
fn type_is_nullable(dt: &Type) -> datafusion::common::Result<bool> {
359+
let Some(kind) = dt.kind.as_ref() else {
360+
return Ok(true);
361+
};
362+
363+
let nullability = match kind {
364+
r#type::Kind::Bool(boolean) => boolean.nullability,
365+
r#type::Kind::I8(integer) => integer.nullability,
366+
r#type::Kind::I16(integer) => integer.nullability,
367+
r#type::Kind::I32(integer) => integer.nullability,
368+
r#type::Kind::I64(integer) => integer.nullability,
369+
r#type::Kind::Fp32(float) => float.nullability,
370+
r#type::Kind::Fp64(float) => float.nullability,
371+
#[expect(deprecated)]
372+
r#type::Kind::Timestamp(timestamp) => timestamp.nullability,
373+
r#type::Kind::Date(date) => date.nullability,
374+
#[expect(deprecated)]
375+
r#type::Kind::Time(time) => time.nullability,
376+
#[expect(deprecated)]
377+
r#type::Kind::TimestampTz(timestamp) => timestamp.nullability,
378+
r#type::Kind::IntervalYear(interval) => interval.nullability,
379+
r#type::Kind::IntervalDay(interval) => interval.nullability,
380+
r#type::Kind::IntervalCompound(interval) => interval.nullability,
381+
r#type::Kind::Uuid(uuid) => uuid.nullability,
382+
r#type::Kind::String(string) => string.nullability,
383+
r#type::Kind::Binary(binary) => binary.nullability,
384+
r#type::Kind::FixedChar(fixed) => fixed.nullability,
385+
r#type::Kind::Varchar(varchar) => varchar.nullability,
386+
r#type::Kind::FixedBinary(fixed) => fixed.nullability,
387+
r#type::Kind::Decimal(decimal) => decimal.nullability,
388+
r#type::Kind::PrecisionTime(time) => time.nullability,
389+
r#type::Kind::PrecisionTimestamp(timestamp) => timestamp.nullability,
390+
r#type::Kind::PrecisionTimestampTz(timestamp) => timestamp.nullability,
391+
r#type::Kind::Struct(r#struct) => r#struct.nullability,
392+
r#type::Kind::List(list) => list.nullability,
393+
r#type::Kind::Map(map) => map.nullability,
394+
r#type::Kind::Func(func) => func.nullability,
395+
r#type::Kind::UserDefined(user_defined) => user_defined.nullability,
396+
#[expect(deprecated)]
397+
r#type::Kind::UserDefinedTypeReference(_) => r#type::Nullability::Required as i32,
398+
r#type::Kind::Alias(alias) => alias.nullability,
399+
};
400+
401+
is_nullable(nullability)
402+
}
403+
404+
fn is_nullable(nullability: i32) -> datafusion::common::Result<bool> {
405+
match r#type::Nullability::try_from(nullability) {
406+
Ok(r#type::Nullability::Required) => Ok(false),
407+
Ok(r#type::Nullability::Nullable | r#type::Nullability::Unspecified) => Ok(true),
408+
Err(_) => not_impl_err!("Unsupported Substrait Nullability value {nullability}"),
409+
}
410+
}
411+
412+
#[cfg(test)]
413+
mod tests {
414+
use super::*;
415+
use substrait::proto::r#type::Kind;
416+
417+
#[test]
418+
fn type_is_nullable_user_defined_type_reference_is_required() {
419+
// The deprecated `UserDefinedTypeReference` variant doesn't carry a
420+
// nullability field; the consumer hardcodes Required (non-null).
421+
#[expect(deprecated)]
422+
let dt = Type {
423+
kind: Some(Kind::UserDefinedTypeReference(0)),
424+
};
425+
assert!(!type_is_nullable(&dt).unwrap());
426+
}
427+
428+
#[test]
429+
fn type_is_nullable_missing_kind_defaults_to_nullable() {
430+
// Defensive: a Type whose kind is None is treated as nullable.
431+
let dt = Type { kind: None };
432+
assert!(type_is_nullable(&dt).unwrap());
433+
}
434+
435+
#[test]
436+
fn is_nullable_rejects_unrecognized_enum_value() {
437+
let err = is_nullable(i32::MAX).unwrap_err();
438+
assert!(
439+
err.to_string()
440+
.contains("Unsupported Substrait Nullability"),
441+
"got: {err}"
442+
);
443+
}
444+
}

datafusion/substrait/src/logical_plan/consumer/utils.rs

Lines changed: 99 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,10 @@ pub(super) fn ensure_schema_compatibility(
316316
/// 1. They have logically equivalent types.
317317
/// 2. They have the same nullability OR the Substrait field is nullable and the DataFusion fields
318318
/// is not nullable.
319+
/// 3. For Struct fields, every child field's nullability is compatible by the same rule
320+
/// (recursively).
321+
///
322+
/// TODO: Check nullability for List and Map fields.
319323
///
320324
/// If a Substrait field is not nullable, the Substrait plan may be built around assuming it is not
321325
/// nullable. As such if DataFusion has that field as nullable the plan should be rejected.
@@ -339,15 +343,56 @@ fn ensure_field_compatibility(
339343
datafusion_field.is_nullable(),
340344
substrait_field.is_nullable(),
341345
) {
342-
// TODO: from_substrait_struct_type needs to be updated to set the nullability correctly. It defaults to true for now.
343346
return substrait_err!(
344347
"Field '{}' is nullable in the DataFusion schema but not nullable in the Substrait schema.",
345348
substrait_field.name()
346349
);
347350
}
351+
352+
ensure_nested_nullability_compatibility(
353+
datafusion_field.data_type(),
354+
substrait_field.data_type(),
355+
substrait_field.name(),
356+
)
357+
}
358+
359+
/// Recurses through nested Struct DataTypes, applying
360+
/// [`compatible_nullabilities`] to each child field.
361+
///
362+
/// TODO: Add support for List/LargeList/FixedSizeList and Map fields.
363+
fn ensure_nested_nullability_compatibility(
364+
datafusion_type: &DataType,
365+
substrait_type: &DataType,
366+
field_path: &str,
367+
) -> datafusion::common::Result<()> {
368+
if let (DataType::Struct(df_fields), DataType::Struct(sub_fields)) =
369+
(datafusion_type, substrait_type)
370+
{
371+
for (df_f, sub_f) in df_fields.iter().zip(sub_fields.iter()) {
372+
check_nested_field(df_f, sub_f, field_path)?;
373+
}
374+
}
348375
Ok(())
349376
}
350377

378+
fn check_nested_field(
379+
df_field: &Field,
380+
sub_field: &Field,
381+
parent_path: &str,
382+
) -> datafusion::common::Result<()> {
383+
let path = format!("{parent_path}.{}", sub_field.name());
384+
if !compatible_nullabilities(df_field.is_nullable(), sub_field.is_nullable()) {
385+
return substrait_err!(
386+
"Field '{path}' is nullable in the DataFusion schema but not nullable in the Substrait schema."
387+
);
388+
}
389+
ensure_nested_nullability_compatibility(
390+
df_field.data_type(),
391+
sub_field.data_type(),
392+
&path,
393+
)
394+
}
395+
351396
/// Returns true if the DataFusion and Substrait nullabilities are compatible, false otherwise
352397
fn compatible_nullabilities(
353398
datafusion_nullability: bool,
@@ -521,10 +566,10 @@ pub(crate) fn from_substrait_precision(
521566

522567
#[cfg(test)]
523568
pub(crate) mod tests {
524-
use super::{NameTracker, make_renamed_schema};
569+
use super::{NameTracker, ensure_schema_compatibility, make_renamed_schema};
525570
use crate::extensions::Extensions;
526571
use crate::logical_plan::consumer::DefaultSubstraitConsumer;
527-
use datafusion::arrow::datatypes::{DataType, Field};
572+
use datafusion::arrow::datatypes::{DataType, Field, Fields, Schema};
528573
use datafusion::common::DFSchema;
529574
use datafusion::error::Result;
530575
use datafusion::execution::SessionState;
@@ -813,4 +858,55 @@ pub(crate) mod tests {
813858

814859
Ok(())
815860
}
861+
862+
fn schema_with_struct_inner(inner_nullable: bool) -> DFSchema {
863+
let inner = Field::new("inner", DataType::Int32, inner_nullable);
864+
let outer = Field::new("s", DataType::Struct(Fields::from(vec![inner])), false);
865+
DFSchema::try_from(Schema::new(vec![outer])).unwrap()
866+
}
867+
868+
#[test]
869+
fn nested_compatibility_accepts_required_df_field() -> Result<()> {
870+
// DF makes a stronger guarantee (required) than Substrait expects
871+
// (nullable). The stronger guarantee is compatible with the weaker
872+
// expectation, so this is accepted.
873+
let df = schema_with_struct_inner(false);
874+
let sub = schema_with_struct_inner(true);
875+
ensure_schema_compatibility(&df, sub)
876+
}
877+
878+
#[test]
879+
fn nested_compatibility_rejects_nullable_df_field() {
880+
// Substrait says inner is required; DF says inner is nullable. The
881+
// Substrait plan may rely on inner being non-null, so reject.
882+
let df = schema_with_struct_inner(true);
883+
let sub = schema_with_struct_inner(false);
884+
let err = ensure_schema_compatibility(&df, sub).unwrap_err();
885+
assert!(
886+
err.to_string().contains("'s.inner'"),
887+
"expected error to identify the nested field path 's.inner', got: {err}"
888+
);
889+
}
890+
891+
#[test]
892+
fn nested_compatibility_recurses_into_nested_struct() {
893+
// Two levels of nesting: outer struct with required field that is
894+
// itself a struct, whose `inner` field is required in Substrait but
895+
// nullable in DF.
896+
fn schema(inner_nullable: bool) -> DFSchema {
897+
let inner = Field::new("inner", DataType::Int32, inner_nullable);
898+
let middle =
899+
Field::new("m", DataType::Struct(Fields::from(vec![inner])), false);
900+
let outer =
901+
Field::new("s", DataType::Struct(Fields::from(vec![middle])), false);
902+
DFSchema::try_from(Schema::new(vec![outer])).unwrap()
903+
}
904+
let df = schema(true);
905+
let sub = schema(false);
906+
let err = ensure_schema_compatibility(&df, sub).unwrap_err();
907+
assert!(
908+
err.to_string().contains("'s.m.inner'"),
909+
"expected error to identify the deeply nested field path 's.m.inner', got: {err}"
910+
);
911+
}
816912
}

datafusion/substrait/src/logical_plan/producer/types.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -534,4 +534,35 @@ mod tests {
534534
assert_eq!(schema.as_ref(), &roundtrip_schema);
535535
Ok(())
536536
}
537+
538+
#[test]
539+
fn named_struct_unspecified_nullability_is_nullable() -> Result<()> {
540+
let named_struct = NamedStruct {
541+
names: vec!["unspecified".to_string(), "required".to_string()],
542+
r#struct: Some(r#type::Struct {
543+
types: vec![
544+
substrait::proto::Type {
545+
kind: Some(r#type::Kind::I32(r#type::I32 {
546+
type_variation_reference: DEFAULT_TYPE_VARIATION_REF,
547+
nullability: r#type::Nullability::Unspecified as i32,
548+
})),
549+
},
550+
substrait::proto::Type {
551+
kind: Some(r#type::Kind::I32(r#type::I32 {
552+
type_variation_reference: DEFAULT_TYPE_VARIATION_REF,
553+
nullability: r#type::Nullability::Required as i32,
554+
})),
555+
},
556+
],
557+
type_variation_reference: DEFAULT_TYPE_VARIATION_REF,
558+
nullability: r#type::Nullability::Required as i32,
559+
}),
560+
};
561+
562+
let schema = from_substrait_named_struct(&test_consumer(), &named_struct)?;
563+
564+
assert!(schema.field(0).is_nullable());
565+
assert!(!schema.field(1).is_nullable());
566+
Ok(())
567+
}
537568
}

datafusion/substrait/tests/cases/roundtrip_logical_plan.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1566,6 +1566,40 @@ async fn roundtrip_values_duplicate_column_join() -> Result<()> {
15661566
Ok(())
15671567
}
15681568

1569+
#[tokio::test]
1570+
async fn roundtrip_preserves_field_nullability() -> Result<()> {
1571+
use datafusion::arrow::datatypes::Fields;
1572+
1573+
// Verify that required and nullable fields, including nested struct fields,
1574+
// preserve their nullability through a Substrait round-trip.
1575+
//
1576+
// List child nullability is intentionally omitted because it is not
1577+
// preserved today.
1578+
let ctx = create_context().await?;
1579+
let df_schema = DFSchema::try_from(Schema::new(vec![
1580+
Field::new("required_int", DataType::Int32, false),
1581+
Field::new("nullable_int", DataType::Int32, true),
1582+
Field::new(
1583+
"required_struct",
1584+
DataType::Struct(Fields::from(vec![
1585+
Field::new("required_inner", DataType::Boolean, false),
1586+
Field::new("nullable_inner", DataType::Utf8, true),
1587+
])),
1588+
false,
1589+
),
1590+
]))?;
1591+
let plan = LogicalPlan::EmptyRelation(EmptyRelation {
1592+
produce_one_row: false,
1593+
schema: DFSchemaRef::new(df_schema),
1594+
});
1595+
1596+
let proto = to_substrait_plan(&plan, &ctx.state())?;
1597+
let plan2 = from_substrait_plan(&ctx.state(), &proto).await?;
1598+
1599+
assert_eq!(plan.schema(), plan2.schema());
1600+
Ok(())
1601+
}
1602+
15691603
#[tokio::test]
15701604
async fn duplicate_column() -> Result<()> {
15711605
// Substrait does not keep column names (aliases) in the plan, rather it operates on column indices

datafusion/substrait/tests/cases/substrait_validations.rs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ mod tests {
6262
read_json("tests/testdata/test_plans/simple_select.substrait.json");
6363
// this is the exact schema of the Substrait plan
6464
let df_schema =
65-
vec![("a", DataType::Int32, false), ("b", DataType::Int32, true)];
65+
vec![("a", DataType::Int32, true), ("b", DataType::Int32, false)];
6666

6767
let ctx = generate_context_with_table("DATA", df_schema)?;
6868
let plan = from_substrait_plan(&ctx.state(), &proto_plan).await?;
@@ -83,8 +83,8 @@ mod tests {
8383
read_json("tests/testdata/test_plans/simple_select.substrait.json");
8484
// the DataFusion schema { b, a, c } contains the Substrait schema { a, b }
8585
let df_schema = vec![
86-
("b", DataType::Int32, true),
87-
("a", DataType::Int32, false),
86+
("b", DataType::Int32, false),
87+
("a", DataType::Int32, true),
8888
("c", DataType::Int32, false),
8989
];
9090
let ctx = generate_context_with_table("DATA", df_schema)?;
@@ -150,5 +150,22 @@ mod tests {
150150
assert!(res.is_err());
151151
Ok(())
152152
}
153+
154+
#[tokio::test]
155+
async fn reject_plans_with_incompatible_field_nullability() -> Result<()> {
156+
let proto_plan =
157+
read_json("tests/testdata/test_plans/simple_select.substrait.json");
158+
let df_schema =
159+
vec![("a", DataType::Int32, true), ("b", DataType::Int32, true)];
160+
161+
let ctx = generate_context_with_table("DATA", df_schema)?;
162+
let res = from_substrait_plan(&ctx.state(), &proto_plan).await;
163+
164+
assert_snapshot!(
165+
res.unwrap_err().strip_backtrace(),
166+
@r#"Substrait error: Field 'b' is nullable in the DataFusion schema but not nullable in the Substrait schema."#
167+
);
168+
Ok(())
169+
}
153170
}
154171
}

0 commit comments

Comments
 (0)