diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index c236f7cf08e82..37be90f95b1e1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -80,6 +80,14 @@ abstract class LogicalPlan def isStreaming: Boolean = _isStreaming private[this] lazy val _isStreaming = children.exists(_.isStreaming) + /** Marks if a streaming node is a stateful operator. */ + def isStateful: Boolean = false + + /** Marks if a subplan contains a stateful operator. */ + def containsStatefulOperator: Boolean = _containsStatefulOperator + private[this] lazy val _containsStatefulOperator = + isStateful || children.exists(_.containsStatefulOperator) + override def verboseStringWithSuffix(maxFields: Int): String = { super.verboseString(maxFields) + statsCache.map(", " + _.toString).getOrElse("") } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index a27d9e5269745..f7afda4ea29e6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -827,6 +827,8 @@ case class Join( override protected def withNewChildrenInternal( newLeft: LogicalPlan, newRight: LogicalPlan): Join = copy(left = newLeft, right = newRight) + + override def isStateful: Boolean = left.isStreaming && right.isStreaming } /** @@ -1243,6 +1245,8 @@ case class Aggregate( override protected def withNewChildInternal(newChild: LogicalPlan): Aggregate = copy(child = newChild) + override def isStateful: Boolean = child.isStreaming + // Whether this Aggregate operator is group only. For example: SELECT a, a FROM t GROUP BY a private[sql] def groupOnly: Boolean = { // aggregateExpressions can be empty through Dateset.agg, @@ -1757,6 +1761,8 @@ case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryN override protected def withNewChildInternal(newChild: LogicalPlan): GlobalLimit = copy(child = newChild) + + override def isStateful: Boolean = child.isStreaming } /** @@ -2002,6 +2008,7 @@ case class Distinct(child: LogicalPlan) extends UnaryNode { final override val nodePatterns: Seq[TreePattern] = Seq(DISTINCT_LIKE) override protected def withNewChildInternal(newChild: LogicalPlan): Distinct = copy(child = newChild) + override def isStateful: Boolean = child.isStreaming } /** @@ -2169,6 +2176,7 @@ case class Deduplicate( final override val nodePatterns: Seq[TreePattern] = Seq(DISTINCT_LIKE) override protected def withNewChildInternal(newChild: LogicalPlan): Deduplicate = copy(child = newChild) + override def isStateful: Boolean = child.isStreaming } case class DeduplicateWithinWatermark(keys: Seq[Attribute], child: LogicalPlan) extends UnaryNode { @@ -2180,6 +2188,7 @@ case class DeduplicateWithinWatermark(keys: Seq[Attribute], child: LogicalPlan) final override val nodePatterns: Seq[TreePattern] = Seq(DISTINCT_LIKE) override protected def withNewChildInternal(newChild: LogicalPlan): DeduplicateWithinWatermark = copy(child = newChild) + override def isStateful: Boolean = child.isStreaming } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala index 07423b612c301..0c6f59073559f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala @@ -567,6 +567,7 @@ case class FlatMapGroupsWithState( override protected def withNewChildrenInternal( newLeft: LogicalPlan, newRight: LogicalPlan): FlatMapGroupsWithState = copy(child = newLeft, initialState = newRight) + override def isStateful: Boolean = child.isStreaming } object TransformWithState { @@ -655,6 +656,7 @@ case class TransformWithState( override protected def withNewChildrenInternal( newLeft: LogicalPlan, newRight: LogicalPlan): TransformWithState = copy(child = newLeft, initialState = newRight) + override def isStateful: Boolean = child.isStreaming } /** Factory for constructing new `FlatMapGroupsInR` nodes. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala index db22a0781c0e0..56dc2f6de0437 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala @@ -165,6 +165,7 @@ case class FlatMapGroupsInPandasWithState( override protected def withNewChildInternal( newChild: LogicalPlan): FlatMapGroupsInPandasWithState = copy(child = newChild) + override def isStateful: Boolean = child.isStreaming } /** @@ -215,6 +216,7 @@ case class TransformWithStateInPySpark( override protected def withNewChildrenInternal( newLeft: LogicalPlan, newRight: LogicalPlan): TransformWithStateInPySpark = copy(child = newLeft, initialState = newRight) + override def isStateful: Boolean = child.isStreaming def leftAttributes: Seq[Attribute] = { assert(resolved, "This method is expected to be called after resolution.")