Commit 0fb04a4
committed
[SPARK-57003][SQL][SS] Widen stateful operator output and state schema nullability
### What changes were proposed in this pull request?
Introduce a three-component fix for stateful-operator nullability drift, gated by `spark.sql.streaming.statefulOperator.alwaysNullableOutput.enabled` (pinned per-query via the offset log):
- (a) `WidenStatefulOpNullability.widenStateSchema`: every stateful physical exec widens its state key/value schema to fully nullable at construction. This covers `StateStoreSaveExec`, `BaseStreamingDeduplicateExec`, `StreamingSymmetricHashJoinExec`, `FlatMapGroupsWithStateExec`, `TransformWithStateExec` (including user-defined state variable col family schemas), `TransformWithStateInPySparkExec`, and `StreamingGlobalLimitExec`.
- (b) `WidenStatefulOpNullability.widenOutputForStatefulOp`: every stateful logical and physical operator widens its declared `output` to fully nullable.
- (c) `WidenStatefulOperatorAttributeNullability`: an optimizer rule that widens `AttributeReference`s inside stateful ops' internal expressions and propagates upward through ancestor expressions. The rule uses `resolveOperatorsUp` (bottom-up) and scopes the widening precisely: at a stateful operator, all children's output is included (for internal expression references like grouping keys); at non-stateful ancestors, only children whose subtrees contain a stateful operator are included, avoiding unnecessary widening of non-stateful siblings. The node's own `p.output` is excluded for non-stateful ancestors because the bottom-up traversal guarantees children are already transformed.
With the above fix, we aim to ensure the state schema to be "fully" nullable (top level column, nested column, and collection types) regardless of the input schema, and the output schema of the stateful operator to be also "fully" nullable as well. The change of output schema for stateful operator is necessary, because even if the input schema is non-nullable, state can produce the null value, hence the output can be nullable.
### Why are the changes needed?
This has been a long standing issue of streaming engine vs Query Optimizer.
By the nature of streaming query, the query is meant to be long-running, in many cases spans to multiple Spark versions. Also, the logical plan is not always the same across batches (e.g. there are multiple stream sources and one of the source does not have a new data at batch N). This puts the streaming query to be affected by analyzer and optimizer.
The state schema of stateful operator is mostly determined by the input schema of the stateful operator, and nullability isn't an exception. If the input schema has a nullable column, state schema would have a nullable column. Vice versa with non-nullable column.
For Query Optimizer, one of the optimizations is to flip the nullability, say, nullable to non-nullable if appropriate. This can be done directly or indirectly, and the most problematic case is when the optimization is applied "selectively".
The one of easy example is the elimination of Union: for the streaming query with multiple streams using Union, batch N could have one stream be non-empty while another stream to be empty. For that case,`PropagateEmptyRelation` can drop empty `Union` branches, causing a per-column nullability flip that propagates into a stateful operator's state schema across microbatches or restarts. This causes either `STATE_STORE_KEY_SCHEMA_NOT_COMPATIBLE` on restart or a codegen NPE when state-restored rows carry nulls in columns declared non-nullable.
### Does this PR introduce _any_ user-facing change?
No user-visible behavior change for new queries (all stateful operator outputs become nullable, which is semantically correct). Existing queries keep their original behavior via the offset log gate.
### How was this patch tested?
New `StreamingStatefulOperatorNullabilityDriftSuite` covering:
- New-query path: Union-branch-drop restart scenarios for aggregate, dropDuplicates, dropDuplicatesWithinWatermark, stream-stream join, flatMapGroupsWithState, and transformWithState.
- Codegen NPE regression with struct grouping keys.
- Existing-query path: widening forced off still triggers schema mismatch.
- State schema assertion validates all state stores and column families (both v2 file format and v3 directory format including `_stateSchema`).
- Rule-level: scope check (non-stateful subtrees skipped).
- Helper-level: `deepWidenAttribute` recursion into nested types.
### Was this patch authored or co-authored using generative AI tooling?
Yes. Generated-by: Claude 4.7 Opus
Closes #56061 from HeartSaVioR/widen-stateful-op-nullability.
Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>1 parent 5fb4d4c commit 0fb04a4
20 files changed
Lines changed: 976 additions & 93 deletions
File tree
- sql
- catalyst/src/main/scala/org/apache/spark/sql
- catalyst
- analysis
- plans/logical
- internal
- connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/streaming
- core/src
- main/scala/org/apache/spark/sql/execution
- adaptive
- python/streaming
- streaming
- checkpointing
- operators/stateful
- flatmapgroupswithstate
- join
- transformwithstate
- runtime
- test/scala/org/apache/spark/sql/streaming
Lines changed: 167 additions & 0 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
| 1 | + | |
| 2 | + | |
| 3 | + | |
| 4 | + | |
| 5 | + | |
| 6 | + | |
| 7 | + | |
| 8 | + | |
| 9 | + | |
| 10 | + | |
| 11 | + | |
| 12 | + | |
| 13 | + | |
| 14 | + | |
| 15 | + | |
| 16 | + | |
| 17 | + | |
| 18 | + | |
| 19 | + | |
| 20 | + | |
| 21 | + | |
| 22 | + | |
| 23 | + | |
| 24 | + | |
| 25 | + | |
| 26 | + | |
| 27 | + | |
| 28 | + | |
| 29 | + | |
| 30 | + | |
| 31 | + | |
| 32 | + | |
| 33 | + | |
| 34 | + | |
| 35 | + | |
| 36 | + | |
| 37 | + | |
| 38 | + | |
| 39 | + | |
| 40 | + | |
| 41 | + | |
| 42 | + | |
| 43 | + | |
| 44 | + | |
| 45 | + | |
| 46 | + | |
| 47 | + | |
| 48 | + | |
| 49 | + | |
| 50 | + | |
| 51 | + | |
| 52 | + | |
| 53 | + | |
| 54 | + | |
| 55 | + | |
| 56 | + | |
| 57 | + | |
| 58 | + | |
| 59 | + | |
| 60 | + | |
| 61 | + | |
| 62 | + | |
| 63 | + | |
| 64 | + | |
| 65 | + | |
| 66 | + | |
| 67 | + | |
| 68 | + | |
| 69 | + | |
| 70 | + | |
| 71 | + | |
| 72 | + | |
| 73 | + | |
| 74 | + | |
| 75 | + | |
| 76 | + | |
| 77 | + | |
| 78 | + | |
| 79 | + | |
| 80 | + | |
| 81 | + | |
| 82 | + | |
| 83 | + | |
| 84 | + | |
| 85 | + | |
| 86 | + | |
| 87 | + | |
| 88 | + | |
| 89 | + | |
| 90 | + | |
| 91 | + | |
| 92 | + | |
| 93 | + | |
| 94 | + | |
| 95 | + | |
| 96 | + | |
| 97 | + | |
| 98 | + | |
| 99 | + | |
| 100 | + | |
| 101 | + | |
| 102 | + | |
| 103 | + | |
| 104 | + | |
| 105 | + | |
| 106 | + | |
| 107 | + | |
| 108 | + | |
| 109 | + | |
| 110 | + | |
| 111 | + | |
| 112 | + | |
| 113 | + | |
| 114 | + | |
| 115 | + | |
| 116 | + | |
| 117 | + | |
| 118 | + | |
| 119 | + | |
| 120 | + | |
| 121 | + | |
| 122 | + | |
| 123 | + | |
| 124 | + | |
| 125 | + | |
| 126 | + | |
| 127 | + | |
| 128 | + | |
| 129 | + | |
| 130 | + | |
| 131 | + | |
| 132 | + | |
| 133 | + | |
| 134 | + | |
| 135 | + | |
| 136 | + | |
| 137 | + | |
| 138 | + | |
| 139 | + | |
| 140 | + | |
| 141 | + | |
| 142 | + | |
| 143 | + | |
| 144 | + | |
| 145 | + | |
| 146 | + | |
| 147 | + | |
| 148 | + | |
| 149 | + | |
| 150 | + | |
| 151 | + | |
| 152 | + | |
| 153 | + | |
| 154 | + | |
| 155 | + | |
| 156 | + | |
| 157 | + | |
| 158 | + | |
| 159 | + | |
| 160 | + | |
| 161 | + | |
| 162 | + | |
| 163 | + | |
| 164 | + | |
| 165 | + | |
| 166 | + | |
| 167 | + | |
Lines changed: 25 additions & 7 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
18 | 18 | | |
19 | 19 | | |
20 | 20 | | |
21 | | - | |
| 21 | + | |
22 | 22 | | |
23 | 23 | | |
24 | 24 | | |
| |||
746 | 746 | | |
747 | 747 | | |
748 | 748 | | |
749 | | - | |
| 749 | + | |
| 750 | + | |
| 751 | + | |
| 752 | + | |
750 | 753 | | |
751 | 754 | | |
752 | 755 | | |
| |||
1225 | 1228 | | |
1226 | 1229 | | |
1227 | 1230 | | |
1228 | | - | |
| 1231 | + | |
| 1232 | + | |
| 1233 | + | |
| 1234 | + | |
1229 | 1235 | | |
1230 | 1236 | | |
1231 | 1237 | | |
| |||
1749 | 1755 | | |
1750 | 1756 | | |
1751 | 1757 | | |
1752 | | - | |
| 1758 | + | |
| 1759 | + | |
| 1760 | + | |
| 1761 | + | |
1753 | 1762 | | |
1754 | 1763 | | |
1755 | 1764 | | |
| |||
2004 | 2013 | | |
2005 | 2014 | | |
2006 | 2015 | | |
2007 | | - | |
| 2016 | + | |
| 2017 | + | |
| 2018 | + | |
| 2019 | + | |
2008 | 2020 | | |
2009 | 2021 | | |
2010 | 2022 | | |
| |||
2174 | 2186 | | |
2175 | 2187 | | |
2176 | 2188 | | |
2177 | | - | |
| 2189 | + | |
| 2190 | + | |
| 2191 | + | |
| 2192 | + | |
2178 | 2193 | | |
2179 | 2194 | | |
2180 | 2195 | | |
| |||
2186 | 2201 | | |
2187 | 2202 | | |
2188 | 2203 | | |
2189 | | - | |
| 2204 | + | |
| 2205 | + | |
| 2206 | + | |
| 2207 | + | |
2190 | 2208 | | |
2191 | 2209 | | |
2192 | 2210 | | |
| |||
Lines changed: 11 additions & 1 deletion
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
20 | 20 | | |
21 | 21 | | |
22 | 22 | | |
23 | | - | |
| 23 | + | |
24 | 24 | | |
25 | 25 | | |
26 | 26 | | |
| |||
568 | 568 | | |
569 | 569 | | |
570 | 570 | | |
| 571 | + | |
| 572 | + | |
| 573 | + | |
| 574 | + | |
| 575 | + | |
571 | 576 | | |
572 | 577 | | |
573 | 578 | | |
| |||
657 | 662 | | |
658 | 663 | | |
659 | 664 | | |
| 665 | + | |
| 666 | + | |
| 667 | + | |
| 668 | + | |
| 669 | + | |
660 | 670 | | |
661 | 671 | | |
662 | 672 | | |
| |||
Lines changed: 7 additions & 3 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
19 | 19 | | |
20 | 20 | | |
21 | 21 | | |
22 | | - | |
| 22 | + | |
23 | 23 | | |
24 | 24 | | |
25 | 25 | | |
| |||
159 | 159 | | |
160 | 160 | | |
161 | 161 | | |
162 | | - | |
| 162 | + | |
| 163 | + | |
| 164 | + | |
163 | 165 | | |
164 | 166 | | |
165 | 167 | | |
| |||
206 | 208 | | |
207 | 209 | | |
208 | 210 | | |
209 | | - | |
| 211 | + | |
| 212 | + | |
| 213 | + | |
210 | 214 | | |
211 | 215 | | |
212 | 216 | | |
| |||
Lines changed: 18 additions & 0 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
3444 | 3444 | | |
3445 | 3445 | | |
3446 | 3446 | | |
| 3447 | + | |
| 3448 | + | |
| 3449 | + | |
| 3450 | + | |
| 3451 | + | |
| 3452 | + | |
| 3453 | + | |
| 3454 | + | |
| 3455 | + | |
| 3456 | + | |
| 3457 | + | |
| 3458 | + | |
| 3459 | + | |
| 3460 | + | |
| 3461 | + | |
| 3462 | + | |
| 3463 | + | |
| 3464 | + | |
3447 | 3465 | | |
3448 | 3466 | | |
3449 | 3467 | | |
| |||
Lines changed: 1 addition & 1 deletion
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
86 | 86 | | |
87 | 87 | | |
88 | 88 | | |
89 | | - | |
| 89 | + | |
90 | 90 | | |
91 | 91 | | |
92 | 92 | | |
| |||
Lines changed: 3 additions & 2 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
18 | 18 | | |
19 | 19 | | |
20 | 20 | | |
21 | | - | |
| 21 | + | |
22 | 22 | | |
23 | 23 | | |
24 | 24 | | |
| |||
44 | 44 | | |
45 | 45 | | |
46 | 46 | | |
47 | | - | |
| 47 | + | |
| 48 | + | |
48 | 49 | | |
49 | 50 | | |
50 | 51 | | |
| |||
Lines changed: 3 additions & 1 deletion
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
20 | 20 | | |
21 | 21 | | |
22 | 22 | | |
| 23 | + | |
23 | 24 | | |
24 | 25 | | |
25 | 26 | | |
| |||
81 | 82 | | |
82 | 83 | | |
83 | 84 | | |
84 | | - | |
| 85 | + | |
| 86 | + | |
85 | 87 | | |
86 | 88 | | |
87 | 89 | | |
| |||
0 commit comments