Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,14 @@ abstract class LogicalPlan
def isStreaming: Boolean = _isStreaming
private[this] lazy val _isStreaming = children.exists(_.isStreaming)

/** Marks if a streaming node is a stateful operator. */
def isStateful: Boolean = false

/** Marks if a subplan contains a stateful operator. */
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two suggestions for the Scaladoc:

  1. "Marks if" is awkward — these return a boolean rather than marking anything. "Whether …" or "Returns true if …" is more conventional. For containsStatefulOperator, please also say it includes this (the body reads isStateful || children.exists(...)).

  2. More substantively, please nail down what "stateful" means here. The new definition is the streaming-runtime view (any operator that becomes a StateStoreWriter at execution) and matches MicroBatchExecution.containsStatefulOperator exactly. It diverges from UnsupportedOperationChecker.isStatefulOperation on two operators: Deduplicate is stateful here regardless of whether keys carry an event-time column, and streaming GlobalLimit is included here but not there. Calling that out — and noting that isStatefulOperation is intentionally narrower (scoped to the chained-watermark correctness check) and isn't a drop-in replacement target — will keep future PRs from silently swapping callers and changing analyzer semantics. Worth naming which existing checks are intended replacement targets, too.

def containsStatefulOperator: Boolean = _containsStatefulOperator
private[this] lazy val _containsStatefulOperator =
isStateful || children.exists(_.containsStatefulOperator)

override def verboseStringWithSuffix(maxFields: Int): String = {
super.verboseString(maxFields) + statsCache.map(", " + _.toString).getOrElse("")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,8 @@ case class Join(

override protected def withNewChildrenInternal(
newLeft: LogicalPlan, newRight: LogicalPlan): Join = copy(left = newLeft, right = newRight)

override def isStateful: Boolean = left.isStreaming && right.isStreaming
}

/**
Expand Down Expand Up @@ -1243,6 +1245,8 @@ case class Aggregate(
override protected def withNewChildInternal(newChild: LogicalPlan): Aggregate =
copy(child = newChild)

override def isStateful: Boolean = child.isStreaming

// Whether this Aggregate operator is group only. For example: SELECT a, a FROM t GROUP BY a
private[sql] def groupOnly: Boolean = {
// aggregateExpressions can be empty through Dateset.agg,
Expand Down Expand Up @@ -1757,6 +1761,8 @@ case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryN

override protected def withNewChildInternal(newChild: LogicalPlan): GlobalLimit =
copy(child = newChild)

override def isStateful: Boolean = child.isStreaming
}

/**
Expand Down Expand Up @@ -2002,6 +2008,7 @@ case class Distinct(child: LogicalPlan) extends UnaryNode {
final override val nodePatterns: Seq[TreePattern] = Seq(DISTINCT_LIKE)
override protected def withNewChildInternal(newChild: LogicalPlan): Distinct =
copy(child = newChild)
override def isStateful: Boolean = child.isStreaming
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This override is non-obvious at the Distinct layer — Distinct doesn't directly become a StateStoreWriter. The existing comment in UnsupportedOperationChecker.isStatefulOperation explains it: "Since the Distinct node will be replaced to Aggregate in the optimizer rule ReplaceDistinctWithAggregate, here we also need to check all Distinct node by assuming it as Aggregate." Worth preserving that rationale here, or at least a // see ReplaceDistinctWithAggregate one-liner.

}

/**
Expand Down Expand Up @@ -2169,6 +2176,7 @@ case class Deduplicate(
final override val nodePatterns: Seq[TreePattern] = Seq(DISTINCT_LIKE)
override protected def withNewChildInternal(newChild: LogicalPlan): Deduplicate =
copy(child = newChild)
override def isStateful: Boolean = child.isStreaming
}

case class DeduplicateWithinWatermark(keys: Seq[Attribute], child: LogicalPlan) extends UnaryNode {
Expand All @@ -2180,6 +2188,7 @@ case class DeduplicateWithinWatermark(keys: Seq[Attribute], child: LogicalPlan)
final override val nodePatterns: Seq[TreePattern] = Seq(DISTINCT_LIKE)
override protected def withNewChildInternal(newChild: LogicalPlan): DeduplicateWithinWatermark =
copy(child = newChild)
override def isStateful: Boolean = child.isStreaming
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,7 @@ case class FlatMapGroupsWithState(
override protected def withNewChildrenInternal(
newLeft: LogicalPlan, newRight: LogicalPlan): FlatMapGroupsWithState =
copy(child = newLeft, initialState = newRight)
override def isStateful: Boolean = child.isStreaming
}

object TransformWithState {
Expand Down Expand Up @@ -655,6 +656,7 @@ case class TransformWithState(
override protected def withNewChildrenInternal(
newLeft: LogicalPlan, newRight: LogicalPlan): TransformWithState =
copy(child = newLeft, initialState = newRight)
override def isStateful: Boolean = child.isStreaming
}

/** Factory for constructing new `FlatMapGroupsInR` nodes. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ case class FlatMapGroupsInPandasWithState(

override protected def withNewChildInternal(
newChild: LogicalPlan): FlatMapGroupsInPandasWithState = copy(child = newChild)
override def isStateful: Boolean = child.isStreaming
}

/**
Expand Down Expand Up @@ -215,6 +216,7 @@ case class TransformWithStateInPySpark(
override protected def withNewChildrenInternal(
newLeft: LogicalPlan, newRight: LogicalPlan): TransformWithStateInPySpark =
copy(child = newLeft, initialState = newRight)
override def isStateful: Boolean = child.isStreaming

def leftAttributes: Seq[Attribute] = {
assert(resolved, "This method is expected to be called after resolution.")
Expand Down