Skip to content

Commit fda259a

Browse files
committed
[SPARK-XXXXX][SS] Widen stateful operator output and state schema nullability
### What changes were proposed in this pull request? Introduce a three-component fix for stateful-operator nullability drift, gated by `spark.sql.streaming.statefulOperator.alwaysNullableOutput.enabled` (pinned per-query via the offset log): - (a) `WidenStatefulOpNullability.widenStateSchema`: every stateful physical exec widens its state key/value schema to fully nullable at construction. - (b) `WidenStatefulOpNullability.widenOutputForStatefulOp`: every stateful logical and physical operator widens its declared `output` to fully nullable. - (c) `WidenStatefulOperatorAttributeNullability`: an optimizer rule that widens `AttributeReference`s inside stateful ops' internal expressions and propagates upward through ancestor expressions. ### Why are the changes needed? `PropagateEmptyRelation` can drop empty `Union` branches, causing a per-column nullability flip that propagates into a stateful operator's state schema across microbatches or restarts. This causes either `STATE_STORE_KEY_SCHEMA_NOT_COMPATIBLE` on restart or a codegen NPE when state-restored rows carry nulls in columns declared non-nullable. ### Does this PR introduce _any_ user-facing change? No user-visible behavior change for new queries (all stateful operator outputs become nullable, which is semantically correct). Existing queries keep their original behavior via the offset log gate. ### How was this patch tested? New `StreamingStatefulOperatorNullabilityDriftSuite` covering: - New-query path: Union-branch-drop restart scenarios for aggregate, dropDuplicates, dropDuplicatesWithinWatermark. - Codegen NPE regression with struct grouping keys. - Existing-query path: widening forced off still triggers schema mismatch. - Rule-level: scope check (non-stateful subtrees skipped). - Helper-level: `deepWidenAttribute` recursion into nested types. ### Was this patch authored or co-authored using generative AI tooling? Yes.
1 parent 3e8c865 commit fda259a

20 files changed

Lines changed: 877 additions & 277 deletions

File tree

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.analysis
19+
20+
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, ExprId}
21+
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
22+
import org.apache.spark.sql.catalyst.rules.Rule
23+
import org.apache.spark.sql.internal.SQLConf
24+
import org.apache.spark.sql.types.StructType
25+
26+
/**
27+
* Shared helpers for the stateful-operator nullability fix. The fix has three
28+
* independent components, all gated by
29+
* [[SQLConf.STATEFUL_OPERATOR_ALWAYS_NULLABLE_OUTPUT]] (pinned per-query via the
30+
* offset log so existing queries keep their pre-fix behavior on restart):
31+
*
32+
* - (a) `widenStateSchema`: explicit `asNullable` at every state-schema construction
33+
* site in each stateful physical exec.
34+
* - (b) `widenOutputForStatefulOp`: a per-op `output` override on every stateful logical
35+
* and physical operator, used by the operator's `output` definition.
36+
* - (c) [[WidenStatefulOperatorAttributeNullability]] (defined below in this file): a
37+
* custom optimizer rule that widens `AttributeReference`s inside stateful ops'
38+
* internal expressions and propagates upward to ancestor expressions.
39+
*/
40+
object WidenStatefulOpNullability {
41+
42+
def isEnabled: Boolean =
43+
SQLConf.get.getConf(SQLConf.STATEFUL_OPERATOR_ALWAYS_NULLABLE_OUTPUT)
44+
45+
/**
46+
* Recursively widens an attribute to be fully nullable: outer `nullable = true` plus
47+
* every nested `StructField.nullable`, `ArrayType.containsNull`, and
48+
* `MapType.valueContainsNull` flipped to `true` via
49+
* [[org.apache.spark.sql.types.DataType#asNullable]].
50+
*/
51+
def deepWidenAttribute(a: Attribute): Attribute = a match {
52+
case ref: AttributeReference =>
53+
AttributeReference(
54+
ref.name, ref.dataType.asNullable, nullable = true, ref.metadata)(
55+
ref.exprId, ref.qualifier)
56+
case other => other.withNullability(true)
57+
}
58+
59+
/**
60+
* Component (a): widens a state schema to fully nullable. Stateful physical execs apply
61+
* this at every `validateAndMaybeEvolveStateSchema(...)` call site and every
62+
* `mapPartitionsWith*StateStore(...)` call site. When the conf is off, returns the
63+
* schema unchanged.
64+
*/
65+
def widenStateSchema(schema: StructType): StructType =
66+
if (isEnabled) schema.asNullable else schema
67+
68+
/**
69+
* Component (b): wraps a stateful operator's `output` to be fully nullable. The caller
70+
* is responsible for only calling this from within an `output` definition on a stateful
71+
* operator; gating is handled here via [[isEnabled]].
72+
*/
73+
def widenOutputForStatefulOp(base: Seq[Attribute]): Seq[Attribute] =
74+
if (isEnabled) base.map(deepWidenAttribute) else base
75+
}
76+
77+
/**
78+
* Component (c) of the stateful-operator nullability fix: a custom optimizer rule that
79+
* widens `AttributeReference`s inside streaming-stateful operators' internal expressions
80+
* and propagates the widening upward to ancestor operators' expressions.
81+
*
82+
* The rule does NOT introduce any new logical or physical node. It is purely an
83+
* attribute-rewrite pass:
84+
*
85+
* 1. At a stateful operator: rewrite every `AttributeReference` inside the operator's
86+
* internal expressions via [[WidenStatefulOpNullability#deepWidenAttribute]] whenever
87+
* the attribute's `exprId` matches one in the operator's own (already widened via
88+
* component (b)) `output`.
89+
*
90+
* 2. At non-stateful ancestor operators: rewrite `AttributeReference`s whose `exprId` is
91+
* in `children.flatMap(_.output)` (already widened thanks to component (b)).
92+
*
93+
* '''Scope.''' The walk only fires on nodes whose subtree contains a stateful operator.
94+
*
95+
* '''Ordering constraint.''' This rule must run AFTER every `UpdateAttributeNullability`
96+
* invocation in both the main optimizer and AQE.
97+
*
98+
* '''Idempotence.''' [[WidenStatefulOpNullability#deepWidenAttribute]] is idempotent.
99+
*/
100+
object WidenStatefulOperatorAttributeNullability extends Rule[LogicalPlan] {
101+
102+
override def apply(plan: LogicalPlan): LogicalPlan = {
103+
if (!conf.getConf(SQLConf.STATEFUL_OPERATOR_ALWAYS_NULLABLE_OUTPUT) ||
104+
!plan.containsStatefulOperator) {
105+
return plan
106+
}
107+
plan.resolveOperatorsUp {
108+
case p if !p.resolved => p
109+
case p: LeafNode => p
110+
case p if !p.containsStatefulOperator => p
111+
case p =>
112+
val widenableExprIds: Set[ExprId] = (p.output ++ p.children.flatMap(_.output))
113+
.iterator.collect { case ar: AttributeReference => ar.exprId }.toSet
114+
if (widenableExprIds.isEmpty) {
115+
p
116+
} else {
117+
p.transformExpressions {
118+
case ar: AttributeReference if widenableExprIds.contains(ar.exprId) =>
119+
val widened = WidenStatefulOpNullability.deepWidenAttribute(ar)
120+
if (ar.dataType == widened.dataType && ar.nullable == widened.nullable) {
121+
ar
122+
} else {
123+
widened
124+
}
125+
}
126+
}
127+
}
128+
}
129+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.sql.catalyst.plans.logical
1919

2020
import org.apache.spark.sql.catalyst.{AliasIdentifier, InternalRow, SQLConfHelper}
21-
import org.apache.spark.sql.catalyst.analysis.{Analyzer, AnsiTypeCoercion, MultiInstanceRelation, Resolver, TypeCoercion, TypeCoercionBase, UnresolvedUnaryNode}
21+
import org.apache.spark.sql.catalyst.analysis.{Analyzer, AnsiTypeCoercion, MultiInstanceRelation, Resolver, TypeCoercion, TypeCoercionBase, UnresolvedUnaryNode, WidenStatefulOpNullability}
2222
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable}
2323
import org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_STORING_ANALYZED_PLAN
2424
import org.apache.spark.sql.catalyst.expressions._
@@ -746,7 +746,10 @@ case class Join(
746746
}
747747
}
748748

749-
override def output: Seq[Attribute] = Join.computeOutput(joinType, left.output, right.output)
749+
override def output: Seq[Attribute] = {
750+
val base = Join.computeOutput(joinType, left.output, right.output)
751+
if (isStateful) WidenStatefulOpNullability.widenOutputForStatefulOp(base) else base
752+
}
750753

751754
override def metadataOutput: Seq[Attribute] = {
752755
joinType match {
@@ -1225,7 +1228,10 @@ case class Aggregate(
12251228
expressions.forall(_.resolved) && childrenResolved && !hasWindowExpressions
12261229
}
12271230

1228-
override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute)
1231+
override def output: Seq[Attribute] = {
1232+
val base = aggregateExpressions.map(_.toAttribute)
1233+
if (isStateful) WidenStatefulOpNullability.widenOutputForStatefulOp(base) else base
1234+
}
12291235
override def metadataOutput: Seq[Attribute] = Nil
12301236
override def maxRows: Option[Long] = {
12311237
if (groupingExpressions.isEmpty) {
@@ -1749,7 +1755,10 @@ object Limit {
17491755
* order.
17501756
*/
17511757
case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNode {
1752-
override def output: Seq[Attribute] = child.output
1758+
override def output: Seq[Attribute] = {
1759+
val base = child.output
1760+
if (isStateful) WidenStatefulOpNullability.widenOutputForStatefulOp(base) else base
1761+
}
17531762
override def maxRows: Option[Long] = {
17541763
limitExpr match {
17551764
case IntegerLiteral(limit) => Some(limit)
@@ -2004,7 +2013,10 @@ case class Sample(
20042013
*/
20052014
case class Distinct(child: LogicalPlan) extends UnaryNode {
20062015
override def maxRows: Option[Long] = child.maxRows
2007-
override def output: Seq[Attribute] = child.output
2016+
override def output: Seq[Attribute] = {
2017+
val base = child.output
2018+
if (isStateful) WidenStatefulOpNullability.widenOutputForStatefulOp(base) else base
2019+
}
20082020
final override val nodePatterns: Seq[TreePattern] = Seq(DISTINCT_LIKE)
20092021
override protected def withNewChildInternal(newChild: LogicalPlan): Distinct =
20102022
copy(child = newChild)
@@ -2172,7 +2184,10 @@ case class Deduplicate(
21722184
keys: Seq[Attribute],
21732185
child: LogicalPlan) extends UnaryNode {
21742186
override def maxRows: Option[Long] = child.maxRows
2175-
override def output: Seq[Attribute] = child.output
2187+
override def output: Seq[Attribute] = {
2188+
val base = child.output
2189+
if (isStateful) WidenStatefulOpNullability.widenOutputForStatefulOp(base) else base
2190+
}
21762191
final override val nodePatterns: Seq[TreePattern] = Seq(DISTINCT_LIKE)
21772192
override protected def withNewChildInternal(newChild: LogicalPlan): Deduplicate =
21782193
copy(child = newChild)
@@ -2184,7 +2199,10 @@ case class DeduplicateWithinWatermark(keys: Seq[Attribute], child: LogicalPlan)
21842199
override def references: AttributeSet = AttributeSet(keys) ++
21852200
AttributeSet(child.output.filter(_.metadata.contains(EventTimeWatermark.delayKey)))
21862201
override def maxRows: Option[Long] = child.maxRows
2187-
override def output: Seq[Attribute] = child.output
2202+
override def output: Seq[Attribute] = {
2203+
val base = child.output
2204+
if (isStateful) WidenStatefulOpNullability.widenOutputForStatefulOp(base) else base
2205+
}
21882206
final override val nodePatterns: Seq[TreePattern] = Seq(DISTINCT_LIKE)
21892207
override protected def withNewChildInternal(newChild: LogicalPlan): DeduplicateWithinWatermark =
21902208
copy(child = newChild)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.plans.logical
2020
import org.apache.spark.api.java.function.FilterFunction
2121
import org.apache.spark.broadcast.Broadcast
2222
import org.apache.spark.sql.{catalyst, Encoder, Row}
23-
import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedDeserializer}
23+
import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedDeserializer, WidenStatefulOpNullability}
2424
import org.apache.spark.sql.catalyst.encoders._
2525
import org.apache.spark.sql.catalyst.expressions._
2626
import org.apache.spark.sql.catalyst.expressions.objects.Invoke
@@ -568,6 +568,11 @@ case class FlatMapGroupsWithState(
568568
newLeft: LogicalPlan, newRight: LogicalPlan): FlatMapGroupsWithState =
569569
copy(child = newLeft, initialState = newRight)
570570
override def isStateful: Boolean = child.isStreaming
571+
572+
override def output: Seq[Attribute] = {
573+
val base = super.output
574+
if (isStateful) WidenStatefulOpNullability.widenOutputForStatefulOp(base) else base
575+
}
571576
}
572577

573578
object TransformWithState {
@@ -657,6 +662,11 @@ case class TransformWithState(
657662
newLeft: LogicalPlan, newRight: LogicalPlan): TransformWithState =
658663
copy(child = newLeft, initialState = newRight)
659664
override def isStateful: Boolean = child.isStreaming
665+
666+
override def output: Seq[Attribute] = {
667+
val base = super.output
668+
if (isStateful) WidenStatefulOpNullability.widenOutputForStatefulOp(base) else base
669+
}
660670
}
661671

662672
/** Factory for constructing new `FlatMapGroupsInR` nodes. */

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.plans.logical
1919

2020
import org.apache.spark.resource.ResourceProfile
2121
import org.apache.spark.sql.catalyst.SQLConfHelper
22-
import org.apache.spark.sql.catalyst.analysis.{FunctionRegistryBase, MultiInstanceRelation, UnresolvedAttribute, UnresolvedStar}
22+
import org.apache.spark.sql.catalyst.analysis.{FunctionRegistryBase, MultiInstanceRelation, UnresolvedAttribute, UnresolvedStar, WidenStatefulOpNullability}
2323
import org.apache.spark.sql.catalyst.analysis.TableFunctionRegistry.TableFunctionBuilder
2424
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeSet, Expression, ExpressionDescription, ExpressionInfo, JsonToStructs, PythonUDF, PythonUDTF}
2525
import org.apache.spark.sql.catalyst.trees.TreePattern._
@@ -159,7 +159,9 @@ case class FlatMapGroupsInPandasWithState(
159159
timeout: GroupStateTimeout,
160160
child: LogicalPlan) extends UnaryNode {
161161

162-
override def output: Seq[Attribute] = outputAttrs
162+
override def output: Seq[Attribute] =
163+
if (isStateful) WidenStatefulOpNullability.widenOutputForStatefulOp(outputAttrs)
164+
else outputAttrs
163165

164166
override def producedAttributes: AttributeSet = AttributeSet(outputAttrs)
165167

@@ -206,7 +208,9 @@ case class TransformWithStateInPySpark(
206208

207209
override def right: LogicalPlan = initialState
208210

209-
override def output: Seq[Attribute] = outputAttrs
211+
override def output: Seq[Attribute] =
212+
if (isStateful) WidenStatefulOpNullability.widenOutputForStatefulOp(outputAttrs)
213+
else outputAttrs
210214

211215
override def producedAttributes: AttributeSet = AttributeSet(outputAttrs)
212216

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3403,6 +3403,22 @@ object SQLConf {
34033403
.booleanConf
34043404
.createWithDefault(true)
34053405

3406+
val STATEFUL_OPERATOR_ALWAYS_NULLABLE_OUTPUT =
3407+
buildConf("spark.sql.streaming.statefulOperator.alwaysNullableOutput.enabled")
3408+
.internal()
3409+
.doc("When true, every streaming stateful operator reports its output schema with " +
3410+
"nullable=true on all columns (including nested struct fields, array elements, and " +
3411+
"map values), the state schema is widened at every construction site, and the state " +
3412+
"schema compatibility checker ignores nullability for stateful operator schemas. " +
3413+
"This prevents query-optimizer decisions (e.g., PropagateEmptyRelation dropping a " +
3414+
"Union branch) from flipping the state schema nullability across microbatches or " +
3415+
"restarts. The effective value is pinned per query via the offset log at batch 0, " +
3416+
"so pre-existing queries keep their original behavior; only newly started queries " +
3417+
"pick this up.")
3418+
.version("4.1.0")
3419+
.booleanConf
3420+
.createWithDefault(true)
3421+
34063422
val FILESTREAM_SINK_METADATA_IGNORED =
34073423
buildConf("spark.sql.streaming.fileStreamSink.ignoreMetadata")
34083424
.internal()

sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/streaming/ClientStreamingQuerySuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ class ClientStreamingQuerySuite extends QueryTest with RemoteSparkSession with L
8686
.count()
8787
.selectExpr("window.start as timestamp", "count as num_events")
8888

89-
assert(countsDF.schema.toDDL == "timestamp TIMESTAMP,num_events BIGINT NOT NULL")
89+
assert(countsDF.schema.toDDL == "timestamp TIMESTAMP,num_events BIGINT")
9090

9191
// Start the query
9292
val queryName = "sparkConnectStreamingQuery"

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.sql.execution.adaptive
1919

2020
import org.apache.spark.internal.LogKeys.{BATCH_NAME, RULE_NAME}
21-
import org.apache.spark.sql.catalyst.analysis.UpdateAttributeNullability
21+
import org.apache.spark.sql.catalyst.analysis.{UpdateAttributeNullability, WidenStatefulOperatorAttributeNullability}
2222
import org.apache.spark.sql.catalyst.optimizer.{ConvertToLocalRelation, EliminateLimits, OptimizeOneRowPlan}
2323
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LogicalPlanIntegrity}
2424
import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
@@ -44,7 +44,8 @@ class AQEOptimizer(conf: SQLConf, extendedRuntimeOptimizerRules: Seq[Rule[Logica
4444
Batch("Dynamic Join Selection", Once, DynamicJoinSelection),
4545
Batch("Eliminate Limits", fixedPoint, EliminateLimits),
4646
Batch("Optimize One Row Plan", fixedPoint, OptimizeOneRowPlan)) :+
47-
Batch("User Provided Runtime Optimizers", fixedPoint, extendedRuntimeOptimizerRules: _*)
47+
Batch("User Provided Runtime Optimizers", fixedPoint, extendedRuntimeOptimizerRules: _*) :+
48+
Batch("Widen Stateful Op Nullability", Once, WidenStatefulOperatorAttributeNullability)
4849

4950
final override protected def batches: Seq[Batch] = {
5051
val excludedRules = conf.getConf(SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES)

sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/FlatMapGroupsInPandasWithStateExec.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import org.apache.spark.sql.execution.streaming.operators.stateful.flatmapgroups
3535
import org.apache.spark.sql.execution.streaming.state.StateStore
3636
import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode}
3737
import org.apache.spark.sql.types.StructType
38+
import org.apache.spark.sql.catalyst.analysis.WidenStatefulOpNullability
3839
import org.apache.spark.util.CompletionIterator
3940

4041
/**
@@ -81,7 +82,8 @@ case class FlatMapGroupsInPandasWithStateExec(
8182
override protected val stateEncoder: ExpressionEncoder[Any] =
8283
ExpressionEncoder(stateType).resolveAndBind().asInstanceOf[ExpressionEncoder[Any]]
8384

84-
override def output: Seq[Attribute] = outAttributes
85+
override def output: Seq[Attribute] =
86+
WidenStatefulOpNullability.widenOutputForStatefulOp(outAttributes)
8587

8688
private val sessionLocalTimeZone = conf.sessionLocalTimeZone
8789
private val pythonRunnerConf = ArrowPythonRunner.getPythonRunnerConfMap(conf)

sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkExec.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSp
4343
import org.apache.spark.sql.internal.SQLConf
4444
import org.apache.spark.sql.streaming.{OutputMode, TimeMode}
4545
import org.apache.spark.sql.types.{BinaryType, StructField, StructType}
46+
import org.apache.spark.sql.catalyst.analysis.WidenStatefulOpNullability
4647
import org.apache.spark.util.{CompletionIterator, SerializableConfiguration, Utils}
4748

4849
/**
@@ -51,7 +52,7 @@ import org.apache.spark.util.{CompletionIterator, SerializableConfiguration, Uti
5152
*
5253
* @param functionExpr function called on each group
5354
* @param groupingAttributes used to group the data
54-
* @param output used to define the output rows
55+
* @param outputAttrs used to define the output rows
5556
* @param outputMode defines the output mode for the statefulProcessor
5657
* @param timeMode The time mode semantics of the stateful processor for timers and TTL.
5758
* @param stateInfo Used to identify the state store for a given operator.
@@ -69,7 +70,7 @@ import org.apache.spark.util.{CompletionIterator, SerializableConfiguration, Uti
6970
case class TransformWithStateInPySparkExec(
7071
functionExpr: Expression,
7172
groupingAttributes: Seq[Attribute],
72-
output: Seq[Attribute],
73+
outputAttrs: Seq[Attribute],
7374
outputMode: OutputMode,
7475
timeMode: TimeMode,
7576
stateInfo: Option[StatefulOperatorStateInfo],
@@ -94,6 +95,9 @@ case class TransformWithStateInPySparkExec(
9495
initialStateGroupingAttrs,
9596
initialState) {
9697

98+
override def output: Seq[Attribute] =
99+
WidenStatefulOpNullability.widenOutputForStatefulOp(outputAttrs)
100+
97101
// NOTE: This is needed to comply with existing release of transformWithStateInPandas.
98102
override def shortName: String = if (
99103
userFacingDataType == TransformWithStateInPySpark.UserFacingDataType.PANDAS

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,8 @@ object OffsetSeqMetadata extends Logging {
204204
STATEFUL_OPERATOR_USE_STRICT_DISTRIBUTION,
205205
PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN, STREAMING_STATE_STORE_ENCODING_FORMAT,
206206
STATE_STORE_ROW_CHECKSUM_ENABLED, PROTOBUF_EXTENSIONS_SUPPORT_ENABLED,
207-
ENABLE_STREAMING_SOURCE_EVOLUTION
207+
ENABLE_STREAMING_SOURCE_EVOLUTION,
208+
STATEFUL_OPERATOR_ALWAYS_NULLABLE_OUTPUT
208209
)
209210

210211
/**
@@ -254,7 +255,8 @@ object OffsetSeqMetadata extends Logging {
254255
STATE_STORE_ROW_CHECKSUM_ENABLED.key -> "false",
255256
STATE_STORE_ROCKSDB_MERGE_OPERATOR_VERSION.key -> "1",
256257
PROTOBUF_EXTENSIONS_SUPPORT_ENABLED.key -> "false",
257-
ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "false"
258+
ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "false",
259+
STATEFUL_OPERATOR_ALWAYS_NULLABLE_OUTPUT.key -> "false"
258260
)
259261

260262
def readValue[T](metadataLog: OffsetSeqMetadataBase, confKey: ConfigEntry[T]): String = {

0 commit comments

Comments
 (0)