Skip to content

[SPARK-56660][SQL] Decompose struct equality into field-level predicates for filter pushdown#56244

Open
yadavay-amzn wants to merge 5 commits into
apache:masterfrom
yadavay-amzn:fix/SPARK-56660-struct-predicate-decompose
Open

[SPARK-56660][SQL] Decompose struct equality into field-level predicates for filter pushdown#56244
yadavay-amzn wants to merge 5 commits into
apache:masterfrom
yadavay-amzn:fix/SPARK-56660-struct-predicate-decompose

Conversation

@yadavay-amzn

@yadavay-amzn yadavay-amzn commented Jun 1, 2026

Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Add optimizer rule DecomposeStructComparison that rewrites struct-level equality (= and <=>) in Filter conditions into a conjunction of field-level comparisons, enabling per-field filter pushdown to data sources.

For example, WHERE struct_col = named_struct('field1', 1, 'field2', 'a') becomes struct_col.field1 <=> 1 AND struct_col.field2 <=> 'a'.

Key design points:

  • Scoped to the pushable, non-duplicating shape. The rule fires only when one operand is foldable (a struct literal) and the other is CollapseProject.isCheap (e.g. a struct attribute). This is the only case that yields pushable col <=> literal field predicates without duplicating work. It deliberately does NOT decompose struct_col1 = struct_col2 (decomposes to col <=> col, which no source pushes - pure plan expansion) or udf() = literal (would duplicate the non-cheap UDF once per field). There is a negative test for the CreateStruct = CreateStruct (tuple) case.
  • NULL semantics are preserved. Per-field comparisons use <=>. At a top-level Filter conjunct a row is kept only when the predicate is TRUE (NULL and FALSE are indistinguishable), so there the rule emits IsNotNull(<nullable operand>) AND <field conjunction>. The IsNotNull guard is required for correctness: it distinguishes a whole-null struct from a non-null struct whose aligned fields are all null (e.g. s = named_struct('a', CAST(NULL AS INT)) must drop an s IS NULL row, matching the original null = struct(null) -> NULL). Comparisons nested under Not/Or/etc. are not in filter position - NULL vs FALSE is observable - so those route through a null-preserving form (decomposeEqualTo / decomposeEqualNullSafe) that mirrors the original comparison's NULL semantics.
  • Filter-scoped. Only rewrites comparisons inside Filter conditions; deliberately does not touch join conditions (decomposing an equi-join key would break BHJ/SMJ key matching) or aggregate grouping keys.
  • Bounded and opt-in. Gated on spark.sql.optimizer.decomposeStructComparison.enabled (default off). Expansion is bounded by spark.sql.optimizer.decomposeStructComparison.maxFields (counting leaf fields of nested structs) to prevent runaway rewrites on wide/deeply-nested structs.

Why are the changes needed?

Struct literal comparisons are treated as opaque predicates by the optimizer. Data source filter pushdown only understands scalar predicates, so struct_col = <struct literal> cannot be pushed down for file pruning (Parquet row group skipping, partition pruning, etc.), even though the equivalent per-field scalar predicates would be. Rewriting the pushable shape (a foldable struct literal vs a cheap struct reference) into field-level <=> predicates lets those reach the scan.

Does this PR introduce any user-facing change?

Yes. When enabled, queries filtering a struct column against a struct literal benefit from file pruning and filter pushdown, improving performance on large tables. Off by default.

How was this patch tested?

  • StructPredicateDecomposeSuite (catalyst): nested-struct decomposition; the foldable+cheap scope gate (including negative tests that struct_col = struct_col, udf() = literal, and CreateStruct = CreateStruct tuple comparisons are not decomposed); non-deterministic and join/aggregate scope guards; the top-level IsNotNull-guarded pushable form; and the null-preserving non-pushable branches under Not/Or (decomposeEqualTo's nullable If(anyNull, NULL, conjunction) and decomposeEqualNullSafe's nullability branches), with oracle tests asserting original-vs-rewritten parity across whole-null / all-null-fields / non-null rows.
  • StructPredicateDecomposeE2ESuite (sql/core): runs each query rule-on vs rule-off over Parquet-backed tables (with s a nullable struct attribute so the rule actually fires) and asserts identical row sets for top-level, Not, Or, and null-field cases, plus a test asserting the decomposed field predicates reach the scan's PushedFilters.

Was this patch authored or co-authored using generative AI tooling?

Authored with assistance by Claude Opus 4.8.

@yadavay-amzn yadavay-amzn force-pushed the fix/SPARK-56660-struct-predicate-decompose branch 3 times, most recently from 857e4be to a9a74c4 Compare June 3, 2026 01:15

@yyanyy yyanyy left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for making this change!

@yadavay-amzn yadavay-amzn force-pushed the fix/SPARK-56660-struct-predicate-decompose branch from a9a74c4 to 76dca41 Compare June 5, 2026 01:53
@yadavay-amzn

yadavay-amzn commented Jun 5, 2026

Copy link
Copy Markdown
Contributor Author

@yyanyy Thanks for reviewing and great catch on the NULL semantics, you're right!

Spark's struct equality uses InterpretedOrdering which treats null=null within fields as equal (returns TRUE), while EqualTo(null, null) returns NULL.

Fixed: the decomposition now uses EqualNullSafe (<=>) for per-field comparisons, which matches the struct equality semantics exactly:

  • null <=> null → true (matches struct behavior)
  • null <=> 2 → false (matches struct behavior)

The only remaining discrepancy is when the entire struct itself is null (original returns NULL, decomposed returns FALSE), but since our rule only fires in Filter context, this is harmless (both NULL and FALSE exclude the row from WHERE).

Also added a width guard (max 100 fields) to prevent stack overflow on very wide/deeply nested structs, per your second concern.

@yadavay-amzn yadavay-amzn force-pushed the fix/SPARK-56660-struct-predicate-decompose branch from 76dca41 to 707a859 Compare June 9, 2026 00:05
yadavay-amzn added a commit to yadavay-amzn/spark that referenced this pull request Jun 9, 2026
…Conf; rework tests

Addresses review feedback on PR apache#56244:

1. Correctness fix for NULL handling. The original decomposition rewrote
   EqualTo(struct, struct) into a plain conjunction of per-field EqualTo
   comparisons, which silently changed semantics for non-null structs that
   contained NULL fields:
     - Before this PR: struct(1, null) = struct(1, null) returned TRUE
       (Spark's whole-struct EqualTo evaluates ordering.equiv on the row,
       which treats per-field NULL == NULL as equal).
     - With original PR apache#56244: returned NULL.

   The fix wraps the conjunction with a null-check that mirrors the
   original outer null behavior:
     - EqualTo(L, R) over nullable structs: IF (L IS NULL OR R IS NULL)
       THEN NULL ELSE And(EqualNullSafe(L.fi, R.fi)).
     - EqualNullSafe(L, R): IF (L IS NULL AND R IS NULL) THEN TRUE
       ELSE IF (L IS NULL OR R IS NULL) THEN FALSE
       ELSE And(EqualNullSafe(L.fi, R.fi)).
   The wrappers fold out cleanly when either operand is non-nullable,
   leaving the simple conjunction in the common
   `CreateNamedStruct = column` pushdown case.

2. SQLConf gate. Add `spark.sql.optimizer.decomposeStructComparison.enabled`
   (default false) so users opt in once the behavior has soaked. Add
   `spark.sql.optimizer.decomposeStructComparison.maxFields` (default 1000)
   that bounds total decomposed predicates including recursively nested
   struct fields, replacing the unprincipled per-level field cap of 100.

3. Scaladoc explaining Filter scope. Document why join conditions and
   aggregate grouping keys are deliberately not rewritten.

4. Tests reworked as oracle tests. The original suite asserted post-rewrite
   NULL behavior directly, which codified the regression as expected. The
   rewritten suite uses two patterns:
     - Catalyst-level: build expressions and assert eval result of original
       expression equals eval result of rewritten expression on representative
       inputs (struct(1, null), whole-struct null, Not wrapper, etc.).
     - End-to-end: run each query with the rule enabled and with the conf
       disabled; assert row sets are identical.
   Added tests for: Not(struct = struct) with NULL fields, whole-struct null
   on one side, conf gating. Removed: 3 wrong-oracle NULL tests, structural-
   only "nullable fields decomposes" test, duplicate LessThan, duplicate
   3-level nested, single-field, duplicate join test in catalyst suite.
yadavay-amzn added a commit to yadavay-amzn/spark that referenced this pull request Jun 16, 2026
…Conf; rework tests

Addresses review feedback on PR apache#56244:

1. Correctness fix for NULL handling. The original decomposition rewrote
   EqualTo(struct, struct) into a plain conjunction of per-field EqualTo
   comparisons, which silently changed semantics for non-null structs that
   contained NULL fields:
     - Before this PR: struct(1, null) = struct(1, null) returned TRUE
       (Spark's whole-struct EqualTo evaluates ordering.equiv on the row,
       which treats per-field NULL == NULL as equal).
     - With original PR apache#56244: returned NULL.

   The fix wraps the conjunction with a null-check that mirrors the
   original outer null behavior:
     - EqualTo(L, R) over nullable structs: IF (L IS NULL OR R IS NULL)
       THEN NULL ELSE And(EqualNullSafe(L.fi, R.fi)).
     - EqualNullSafe(L, R): IF (L IS NULL AND R IS NULL) THEN TRUE
       ELSE IF (L IS NULL OR R IS NULL) THEN FALSE
       ELSE And(EqualNullSafe(L.fi, R.fi)).
   The wrappers fold out cleanly when either operand is non-nullable,
   leaving the simple conjunction in the common
   `CreateNamedStruct = column` pushdown case.

2. SQLConf gate. Add `spark.sql.optimizer.decomposeStructComparison.enabled`
   (default false) so users opt in once the behavior has soaked. Add
   `spark.sql.optimizer.decomposeStructComparison.maxFields` (default 1000)
   that bounds total decomposed predicates including recursively nested
   struct fields, replacing the unprincipled per-level field cap of 100.

3. Scaladoc explaining Filter scope. Document why join conditions and
   aggregate grouping keys are deliberately not rewritten.

4. Tests reworked as oracle tests. The original suite asserted post-rewrite
   NULL behavior directly, which codified the regression as expected. The
   rewritten suite uses two patterns:
     - Catalyst-level: build expressions and assert eval result of original
       expression equals eval result of rewritten expression on representative
       inputs (struct(1, null), whole-struct null, Not wrapper, etc.).
     - End-to-end: run each query with the rule enabled and with the conf
       disabled; assert row sets are identical.
   Added tests for: Not(struct = struct) with NULL fields, whole-struct null
   on one side, conf gating. Removed: 3 wrong-oracle NULL tests, structural-
   only "nullable fields decomposes" test, duplicate LessThan, duplicate
   3-level nested, single-field, duplicate join test in catalyst suite.
@yadavay-amzn yadavay-amzn force-pushed the fix/SPARK-56660-struct-predicate-decompose branch from 7de21ca to 8941106 Compare June 16, 2026 19:02
…Conf; rework tests

Addresses review feedback on PR apache#56244:

1. Correctness fix for NULL handling. The original decomposition rewrote
   EqualTo(struct, struct) into a plain conjunction of per-field EqualTo
   comparisons, which silently changed semantics for non-null structs that
   contained NULL fields:
     - Before this PR: struct(1, null) = struct(1, null) returned TRUE
       (Spark's whole-struct EqualTo evaluates ordering.equiv on the row,
       which treats per-field NULL == NULL as equal).
     - With original PR apache#56244: returned NULL.

   The fix wraps the conjunction with a null-check that mirrors the
   original outer null behavior:
     - EqualTo(L, R) over nullable structs: IF (L IS NULL OR R IS NULL)
       THEN NULL ELSE And(EqualNullSafe(L.fi, R.fi)).
     - EqualNullSafe(L, R): IF (L IS NULL AND R IS NULL) THEN TRUE
       ELSE IF (L IS NULL OR R IS NULL) THEN FALSE
       ELSE And(EqualNullSafe(L.fi, R.fi)).
   The wrappers fold out cleanly when either operand is non-nullable,
   leaving the simple conjunction in the common
   `CreateNamedStruct = column` pushdown case.

2. SQLConf gate. Add `spark.sql.optimizer.decomposeStructComparison.enabled`
   (default false) so users opt in once the behavior has soaked. Add
   `spark.sql.optimizer.decomposeStructComparison.maxFields` (default 1000)
   that bounds total decomposed predicates including recursively nested
   struct fields, replacing the unprincipled per-level field cap of 100.

3. Scaladoc explaining Filter scope. Document why join conditions and
   aggregate grouping keys are deliberately not rewritten.

4. Tests reworked as oracle tests. The original suite asserted post-rewrite
   NULL behavior directly, which codified the regression as expected. The
   rewritten suite uses two patterns:
     - Catalyst-level: build expressions and assert eval result of original
       expression equals eval result of rewritten expression on representative
       inputs (struct(1, null), whole-struct null, Not wrapper, etc.).
     - End-to-end: run each query with the rule enabled and with the conf
       disabled; assert row sets are identical.
   Added tests for: Not(struct = struct) with NULL fields, whole-struct null
   on one side, conf gating. Removed: 3 wrong-oracle NULL tests, structural-
   only "nullable fields decomposes" test, duplicate LessThan, duplicate
   3-level nested, single-field, duplicate join test in catalyst suite.
@yadavay-amzn yadavay-amzn force-pushed the fix/SPARK-56660-struct-predicate-decompose branch from 8941106 to e56e97c Compare June 17, 2026 00:55
…ble for nullable structs

Top-level struct equality in a Filter was decomposed into
`If(isnull(s), null, conjunction)`, which is opaque to filter pushdown, so a
nullable struct column (the default for file sources) gained no pushdown -- the
motivating benefit of the rule.

At a top-level Filter conjunct, NULL and FALSE are indistinguishable (a Filter
keeps a row only when the predicate is TRUE), so the bare field conjunction is
filter-equivalent and pushable. Split the Filter condition into top-level
conjuncts and emit the bare conjunction for struct EqualTo / EqualNullSafe there;
comparisons nested under Not/Or/etc. still go through the null-preserving form,
since NULL vs FALSE is observable in those positions. EqualTo keeps the
null-preserving form only when both operands are nullable (the both-null row
would otherwise be wrongly kept).

Add an end-to-end test asserting the decomposed field predicates reach the scan's
PushedFilters for a nullable struct column, and update the whole-struct-null
oracle test to assert filter-equivalence (the rewrite now yields FALSE rather
than NULL in filter position).
@yadavay-amzn

Copy link
Copy Markdown
Contributor Author

@yaooqinn @peter-toth Could you please review when you get a chance. Thanks!

@yadavay-amzn yadavay-amzn requested a review from yyanyy June 20, 2026 18:27

@peter-toth peter-toth left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scoping question: should the rule fire more narrowly?

canDecompose (expressions.scala:643) requires only that both sides are equal-arity structs, deterministic, and within maxFields -- it does not require a foldable side, nor that the duplicated side be cheap. Two consequences:

  1. It duplicates non-cheap expressions. fieldConjunction references each operand once per field, so (...) = costly_udf() (a struct-returning UDF -- deterministic by default) expands to GetStructField(costly_udf(), 0) <=> ... AND GetStructField(costly_udf(), 1) <=> ..., i.e. the UDF N times. The determinism guard prevents duplicating side effects, not cost; subexpression elimination may collapse the runtime evaluations, but it's plan bloat for no gain. CollapseProject.isCheap (Optimizer.scala:1564) is exactly the existing predicate for "safe to duplicate" -- GetStructField over an attribute is cheap, a UDF is not.

  2. No pushdown benefit without a foldable side. struct_col1 = struct_col2 decomposes to col <=> col, which no file source pushes -- pure plan expansion. Doesn't the rewrite only pay off when one side is foldable, so its fields fold to literals that can reach the scan?

Would it make sense to gate canDecompose on (one operand foldable) AND (the other isCheap), so it only fires where it produces pushable, non-duplicating col <=> literal predicates? That matches the stated goal and drops the col=col / udf()=literal cases.

There also seems to be a NULL-correctness bug on the pushable path. The bare field conjunction can't distinguish a whole-null struct from a non-null struct with all-null fields: GetStructField(nullStruct, i) returns null just like a null field does. With s a nullable struct column, WHERE s = named_struct('a', CAST(NULL AS INT)) takes the pushable path (filterEquivalentToConjunction, line 632, returns true because the literal is non-nullable) and emits s.a <=> null. On an s IS NULL row that is null <=> null -> TRUE (kept), while the original null = struct(null) -> NULL (dropped) -- a wrong result (and it pushes to Parquet as IsNull(a), matching whole-null s at the scan). The <=> branch (line 611) is unguarded with the same issue for two nullable struct columns. The non-pushable decomposeEqualTo/decomposeEqualNullSafe are correct -- they keep the outer IsNull check; only the pushable shortcut drops it. A pushable fix is to AND IsNotNull(<nullable operand>) into the conjunction (it pushes down too).

The two interact: narrowing the scope above removes the col=col and udf()=literal shapes and the both-nullable <=> hazard, but it doesn't fully resolve the NULL issue -- s = named_struct('a', null) (foldable side, but all aligned fields null) still takes the pushable path and still needs the IsNotNull guard. So it's probably cleanest to settle the applicability scope first, then finalize NULL handling within it. Worth noting the current oracle/e2e NULL tests (StructPredicateDecomposeSuite.scala:323/366, e2e :100/103) all use a non-null counterpart field, so none exercise this all-null-fields case.

"conditions into a conjunction of field-level equalities, enabling per-field filter " +
"pushdown to data sources. The rewrite preserves NULL semantics by wrapping the " +
"conjunction in a null-check that mirrors the original struct-level comparison.")
.version("4.2.0")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4.2.0 is already released; this and the maxFields config (line 543) should be 4.3.0 (the next release).

Suggested change
.version("4.2.0")
.version("4.3.0")

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done - both confs now use 4.3.0.

…cheap operands and guard NULL semantics on the pushable path
@yadavay-amzn

Copy link
Copy Markdown
Contributor Author

Thanks for the careful review, @peter-toth - both points were spot on.

Scope: narrowed canDecompose to fire only when one operand is foldable and the other is CollapseProject.isCheap. So struct_col1 = struct_col2 (no pushdown benefit) and udf() = literal (non-cheap, would duplicate the UDF) no longer decompose; it now only produces the pushable, non-duplicating col <=> literal shape it was meant for.

NULL correctness: you're right the pushable shortcut dropped the whole-null vs all-null-fields distinction. I now AND IsNotNull(<nullable operand>) into the conjunction on both the EqualTo and EqualNullSafe pushable paths (it pushes down too), so an s IS NULL row is correctly excluded for s = named_struct('a', null). Verified with eval-level tests and end-to-end assertSameUnderRule over Parquet data containing genuine null structs (these fail without the guard). The non-pushable paths were already correct and are unchanged.

Also bumped both confs to 4.3.0. PTAL.

@yadavay-amzn yadavay-amzn requested a review from peter-toth June 26, 2026 18:30
@peter-toth

Copy link
Copy Markdown
Contributor

Thanks @yadavay-amzn for the update, let me take a look at the PR next week.

@peter-toth peter-toth left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @yadavay-amzn, both of my earlier points are resolved well. Two things before this lands:

The PR description has drifted from the last few follow-ups -- please refresh it.

  • The scope narrowing isn't mentioned: the rule now only fires when one operand is foldable and the other is CollapseProject.isCheap. The description reads as if any equal-arity struct comparison decomposes.
  • The Why section says "tuple comparisons … cannot be pushed down," but tuple comparisons (CreateStruct = CreateStruct) are now deliberately not decomposed (there's a negative test for it).
  • The pushable path is described as emitting "the bare field conjunction"; it actually emits IsNotNull(<nullable operand>) AND conjunction -- the IsNotNull guard (the NULL-correctness fix) is worth calling out.
  • How was this patch tested lists a "single-field" test that no longer exists.

Some new paths look untested. The non-pushable, null-preserving path is never exercised: decomposeEqualNullSafe (all four nullability branches) is unreached, and decomposeEqualTo's nullable If(anyNull, NULL, conjunction) branch is unreached (the Not(EqualTo) oracle uses two non-nullable literals). The two Not/NULL e2e tests pass vacuously: they build s via withColumn("s", struct(...)), which the optimizer inlines into a non-cheap CreateNamedStruct, so canDecompose returns false and the rule never fires (your test at line 843 documents exactly this). To reach the null-preserving branches, use a nullable struct attribute (as in the "nullable struct attribute null on one side" test) under Not/Or, asserting parity across whole-null / null-field / non-null rows.

val conjunction = fieldConjunction(l, r)
if (nullGuards.isEmpty) conjunction
else (nullGuards :+ conjunction).reduce(And)
case EqualNullSafe(l, r) if canDecompose(l, r) =>

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: These two arms (the EqualTo case above and this EqualNullSafe case) now have byte-identical bodies -- in Filter position s = lit and s <=> lit decompose to the same IsNotNull-guards AND fieldConjunction. They could fold into one:

case EqualTo(l, r) | EqualNullSafe(l, r)
    if canDecompose(l, r) && filterEquivalentToConjunction(l, r) =>
  val nullGuards = (if (l.nullable) Seq(IsNotNull(l)) else Nil) ++
    (if (r.nullable) Seq(IsNotNull(r)) else Nil)
  val conjunction = fieldConjunction(l, r)
  if (nullGuards.isEmpty) conjunction else (nullGuards :+ conjunction).reduce(And)

Beyond de-duplication, this closes a small asymmetry: the EqualTo arm is structurally guarded by filterEquivalentToConjunction (a both-nullable pair falls through to the null-preserving decomposeEqualTo), but this <=> arm has no such guard -- its correctness for a both-nullable pair relies on NullPropagation (ordered before this rule) having already rewritten <=> null to IsNull. I verified that holds today (s <=> CAST(NULL AS struct<...>) optimizes to isnull(s), and both-nullable columns are rejected by canDecompose), so this is hardening, not a bugfix -- but routing both arms through the same guard makes the <=> path self-evidently correct without depending on batch ordering. (A both-nullable <=> correctly falls through to decomposeEqualNullSafe, which already handles it.)

@peter-toth

Copy link
Copy Markdown
Contributor

cc @cloud-fan

@yadavay-amzn yadavay-amzn force-pushed the fix/SPARK-56660-struct-predicate-decompose branch from 7498033 to bfb0009 Compare June 30, 2026 16:47
@yadavay-amzn yadavay-amzn force-pushed the fix/SPARK-56660-struct-predicate-decompose branch from bfb0009 to d1c02ec Compare June 30, 2026 17:22
@yadavay-amzn

Copy link
Copy Markdown
Contributor Author

Thanks @peter-toth - addressed all three:

  • Folded the EqualTo/EqualNullSafe arms into one case Equality(l, r) guarded by canDecompose && filterEquivalentToConjunction. Adding the guard to the <=> path is safe: canDecompose requires a foldable (hence non-nullable) operand, so both-nullable is unreachable and falls through to decomposeEqualNullSafe; the two-nullable-columns test confirms it.
  • Untested paths: you were right the Not/NULL e2e tests were vacuous - withColumn("s", struct(...)) inlines to a non-cheap CreateNamedStruct, so canDecompose was false and the rule never fired. Replaced them with Parquet-backed tables where s is a nullable struct attribute, and added catalyst tests under Not/Or that actually reach decomposeEqualTo's nullable If(anyNull, NULL, conjunction) branch and decomposeEqualNullSafe's branches, asserting NULL-vs-FALSE parity across whole-null / all-null-fields / non-null rows (verified by breaking each branch).
  • Refreshed the PR description: scope narrowing (foldable + isCheap), the tuple-not-decomposed note, the IsNotNull-guarded pushable form, and the How-tested list.

PTAL.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants