-
Notifications
You must be signed in to change notification settings - Fork 29.2k
[SPARK-57001][SS] Hoist isStateful / containsStatefulOperator onto LogicalPlan
#56057
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -827,6 +827,8 @@ case class Join( | |
|
|
||
| override protected def withNewChildrenInternal( | ||
| newLeft: LogicalPlan, newRight: LogicalPlan): Join = copy(left = newLeft, right = newRight) | ||
|
|
||
| override def isStateful: Boolean = left.isStreaming && right.isStreaming | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -1243,6 +1245,8 @@ case class Aggregate( | |
| override protected def withNewChildInternal(newChild: LogicalPlan): Aggregate = | ||
| copy(child = newChild) | ||
|
|
||
| override def isStateful: Boolean = child.isStreaming | ||
|
|
||
| // Whether this Aggregate operator is group only. For example: SELECT a, a FROM t GROUP BY a | ||
| private[sql] def groupOnly: Boolean = { | ||
| // aggregateExpressions can be empty through Dateset.agg, | ||
|
|
@@ -1757,6 +1761,8 @@ case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryN | |
|
|
||
| override protected def withNewChildInternal(newChild: LogicalPlan): GlobalLimit = | ||
| copy(child = newChild) | ||
|
|
||
| override def isStateful: Boolean = child.isStreaming | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -2002,6 +2008,7 @@ case class Distinct(child: LogicalPlan) extends UnaryNode { | |
| final override val nodePatterns: Seq[TreePattern] = Seq(DISTINCT_LIKE) | ||
| override protected def withNewChildInternal(newChild: LogicalPlan): Distinct = | ||
| copy(child = newChild) | ||
| override def isStateful: Boolean = child.isStreaming | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This override is non-obvious at the |
||
| } | ||
|
|
||
| /** | ||
|
|
@@ -2169,6 +2176,7 @@ case class Deduplicate( | |
| final override val nodePatterns: Seq[TreePattern] = Seq(DISTINCT_LIKE) | ||
| override protected def withNewChildInternal(newChild: LogicalPlan): Deduplicate = | ||
| copy(child = newChild) | ||
| override def isStateful: Boolean = child.isStreaming | ||
| } | ||
|
|
||
| case class DeduplicateWithinWatermark(keys: Seq[Attribute], child: LogicalPlan) extends UnaryNode { | ||
|
|
@@ -2180,6 +2188,7 @@ case class DeduplicateWithinWatermark(keys: Seq[Attribute], child: LogicalPlan) | |
| final override val nodePatterns: Seq[TreePattern] = Seq(DISTINCT_LIKE) | ||
| override protected def withNewChildInternal(newChild: LogicalPlan): DeduplicateWithinWatermark = | ||
| copy(child = newChild) | ||
| override def isStateful: Boolean = child.isStreaming | ||
| } | ||
|
|
||
| /** | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two suggestions for the Scaladoc:
"Marks if" is awkward — these return a boolean rather than marking anything. "Whether …" or "Returns true if …" is more conventional. For
containsStatefulOperator, please also say it includesthis(the body readsisStateful || children.exists(...)).More substantively, please nail down what "stateful" means here. The new definition is the streaming-runtime view (any operator that becomes a
StateStoreWriterat execution) and matchesMicroBatchExecution.containsStatefulOperatorexactly. It diverges fromUnsupportedOperationChecker.isStatefulOperationon two operators:Deduplicateis stateful here regardless of whether keys carry an event-time column, and streamingGlobalLimitis included here but not there. Calling that out — and noting thatisStatefulOperationis intentionally narrower (scoped to the chained-watermark correctness check) and isn't a drop-in replacement target — will keep future PRs from silently swapping callers and changing analyzer semantics. Worth naming which existing checks are intended replacement targets, too.