diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 7822dc05502c0..8edba027dd86d 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -666,6 +666,11 @@ "The Change Data Capture (CDC) connector violated the `Changelog` contract at runtime." ], "subClass" : { + "NULL_COMMIT_TIMESTAMP" : { + "message" : [ + "Connector emitted a row with a NULL `_commit_timestamp` on a streaming read engaging post-processing. The `Changelog` contract requires `_commit_timestamp` to be non-NULL for streaming reads, since post-processing uses it as event time to advance the watermark." + ] + }, "UNEXPECTED_CHANGE_TYPE" : { "message" : [ "Connector emitted a row with a `_change_type` value that is not one of the four supported types (`insert`, `delete`, `update_preimage`, `update_postimage`). The `Changelog` contract requires every emitted row to carry one of these four values." @@ -3297,11 +3302,6 @@ "`startingVersion` is required when `endingVersion` is specified for CDC queries." ] }, - "STREAMING_POST_PROCESSING_NOT_SUPPORTED" : { - "message" : [ - "Change Data Capture (CDC) streaming reads on connector `` do not yet support post-processing (carry-over removal, update detection, or net change computation). The requested combination of options would require post-processing, which is currently only available for batch reads. Use a batch read, or set `deduplicationMode = none` and `computeUpdates = false` to receive raw change rows in streaming." - ] - }, "UPDATE_DETECTION_REQUIRES_CARRY_OVER_REMOVAL" : { "message" : [ "`computeUpdates` cannot be used with `deduplicationMode=none` on connector `` because the connector emits copy-on-write carry-over pairs (`containsCarryoverRows()` returns true) that would be silently mislabeled as updates. Set `deduplicationMode` to `dropCarryovers` or `netChanges`." diff --git a/sql/api/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/api/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index dc7d2b94576ea..1c82d9cacbaf0 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -131,6 +131,30 @@ abstract class DataStreamReader { * .changes("my_table") * }}} * + * Streaming reads support all of the same post-processing as batch reads -- `computeUpdates`, + * `deduplicationMode = dropCarryovers`, and `deduplicationMode = netChanges`. The streaming + * netChanges path holds per-row-identity state in the state store and emits the SPIP collapse + * output once the global watermark advances past the last `_commit_timestamp` observed for that + * row identity. Row identities only touched in the latest observed commit are therefore not + * emitted until a later commit (with strictly greater `_commit_timestamp`) advances the + * watermark past them, or the source terminates -- bounded streams produce the same output as + * the corresponding batch read. + * + * When the requested options engage post-processing (carry-over removal, update detection, or + * netChanges), the rewrite injects an internal `EventTimeWatermark` on `_commit_timestamp` and + * a stateful streaming operator (an aggregate for the row-level passes, a `transformWithState` + * for netChanges). Two implications follow: + * - A commit's events are emitted in the next micro-batch after the commit is read + * (append-mode aggregate eviction is `eventTime <= watermark`, and the watermark advances + * to the max `_commit_timestamp` observed in the previous batch). A stream that reads its + * last commit and stops will keep that commit's events in state until a subsequent + * (no-data) micro-batch fires. + * - The query is constrained to `Append` output mode; `Update` and `Complete` are rejected at + * writer-start time with `STREAMING_OUTPUT_MODE.UNSUPPORTED_OPERATION`. The internal + * watermark metadata is stripped from the user-visible `_commit_timestamp` output, so + * downstream user-supplied watermarks on other columns do not interact with it via the + * global multi-watermark policy. + * * @param tableName * a qualified or unqualified name that designates a table. * @since 4.2.0 diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Changelog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Changelog.java index 5f2203aa1c379..53a1582dd8a37 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Changelog.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Changelog.java @@ -33,10 +33,39 @@ * + *

+ * Streaming reads support carry-over removal, update detection, and net change + * computation. Net change collapses are kept in the state store keyed by row identity; + * row identities only touched in the latest observed commit are held back until either a + * later commit (with strictly greater `_commit_timestamp`) advances the global watermark + * past them, or the source terminates. * * @since 4.2.0 */ @@ -81,6 +110,9 @@ public interface Changelog { * Spark will collapse multiple changes per row identity into the net effect. * If {@code false}, the connector guarantees at most one change per row identity across * the entire changelog range, and Spark will skip net change computation. + *

+ * Note this flag is range-scoped (across all commits in the request), not + * micro-batch-scoped. */ boolean containsIntermediateChanges(); diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CdcNetChangesStatefulProcessor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CdcNetChangesStatefulProcessor.scala new file mode 100644 index 0000000000000..c32fbecc29c19 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CdcNetChangesStatefulProcessor.scala @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.SparkException +import org.apache.spark.sql.{Encoder, Row} +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema +import org.apache.spark.sql.connector.catalog.Changelog +import org.apache.spark.sql.streaming._ +import org.apache.spark.sql.types.StructType + +/** + * StatefulProcessor that incrementalises CDC net-change computation for streaming reads. + * + * The batch path (`ResolveChangelogTable.injectNetChangeComputation`) uses a Catalyst + * `Window` partitioned by `rowId` and ordered by `(_commit_version, change_type_rank)` to + * extract the first and last events per row identity, then applies the SPIP collapse + * matrix on `(existedBefore, existsAfter)`. That `Window` is rejected on streaming + * queries (`NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING`). + * + * This processor reproduces the same semantics with `transformWithState`. Per-row-identity + * state stores the first event ever observed and the most-recent event observed; an event + * time timer keyed on `_commit_timestamp` advances with each batch and fires once the + * global watermark passes the latest event time observed for the key, at which point the + * SPIP matrix is evaluated and the net result is emitted. + * + * Output schema: identical to the connector's changelog schema. + * + * Documented limitation: row identities only touched in the latest observed commit do not + * emit until a later commit (with strictly greater `_commit_timestamp`) advances the + * watermark past them, or the source terminates. End-of-stream flushes all pending + * timers, so bounded streams produce the same output as the corresponding batch read. + * + * @param inputSchema schema of the rows fed into this processor; the connector's + * changelog schema (data columns + `_change_type` + + * `_commit_version` + `_commit_timestamp`) optionally extended with + * rowId helper columns added by + * [[org.apache.spark.sql.catalyst.analysis.ResolveChangelogTable]]. + * @param computeUpdates whether `(existedBefore, existsAfter) = (true, true)` should be + * relabeled as `update_preimage` / `update_postimage` (true) or kept + * as `delete` / `insert` (false), matching the batch contract. + */ +private[analysis] class CdcNetChangesStatefulProcessor( + inputSchema: StructType, + computeUpdates: Boolean) + extends StatefulProcessor[Row, Row, Row] { + + @transient private var firstEvent: ValueState[Row] = _ + @transient private var lastEvent: ValueState[Row] = _ + + // Hoisted out of `relabel` so we don't pay a linear `fieldIndex` scan per emitted row. + private val changeTypeIdx: Int = inputSchema.fieldIndex("_change_type") + + override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = { + val handle = getHandle + val rowEncoder: Encoder[Row] = ExpressionEncoder(inputSchema) + firstEvent = handle.getValueState[Row]("firstEvent", rowEncoder, TTLConfig.NONE) + lastEvent = handle.getValueState[Row]("lastEvent", rowEncoder, TTLConfig.NONE) + } + + override def handleInputRows( + key: Row, + inputRows: Iterator[Row], + timerValues: TimerValues): Iterator[Row] = { + val handle = getHandle + val sorted = inputRows.toSeq.sortBy { row => + val v = row.getAs[Long]("_commit_version") + val ct = row.getAs[String]("_change_type") + val rank = ct match { + case Changelog.CHANGE_TYPE_UPDATE_PREIMAGE | Changelog.CHANGE_TYPE_DELETE => 0 + case Changelog.CHANGE_TYPE_INSERT | Changelog.CHANGE_TYPE_UPDATE_POSTIMAGE => 1 + case _ => throw new SparkException( + errorClass = "CHANGELOG_CONTRACT_VIOLATION.UNEXPECTED_CHANGE_TYPE", + messageParameters = Map.empty, + cause = null) + } + (v, rank) + } + if (sorted.isEmpty) return Iterator.empty + + if (!firstEvent.exists()) { + firstEvent.update(sorted.head) + } + lastEvent.update(sorted.last) + + // Re-arm the per-key event-time timer to the latest observed `_commit_timestamp`. + // Without dropping any existing timers we'd risk an earlier timer firing first and + // emitting state that later events would then re-populate, producing duplicate + // output for the same row identity. + // + // A NULL `_commit_timestamp` cannot be turned into a timer epoch and would NPE on + // `getTime()`. The `Changelog` Javadoc requires non-NULL `_commit_timestamp` on + // streaming reads engaging post-processing, so we surface the contract violation + // with a clear error class rather than failing the micro-batch with an opaque NPE. + val ts = sorted.last.getAs[java.sql.Timestamp]("_commit_timestamp") + if (ts == null) { + throw new SparkException( + errorClass = "CHANGELOG_CONTRACT_VIOLATION.NULL_COMMIT_TIMESTAMP", + messageParameters = Map.empty, + cause = null) + } + val newTimerMs = ts.getTime + val existing = handle.listTimers().toList + existing.foreach(handle.deleteTimer) + handle.registerTimer(newTimerMs) + + Iterator.empty + } + + override def handleExpiredTimer( + key: Row, + timerValues: TimerValues, + expiredTimerInfo: ExpiredTimerInfo): Iterator[Row] = { + if (!firstEvent.exists()) return Iterator.empty + + val first = firstEvent.get() + val last = lastEvent.get() + val firstChangeType = first.getAs[String]("_change_type") + val lastChangeType = last.getAs[String]("_change_type") + + val existedBefore = + firstChangeType == Changelog.CHANGE_TYPE_DELETE || + firstChangeType == Changelog.CHANGE_TYPE_UPDATE_PREIMAGE + val existsAfter = + lastChangeType == Changelog.CHANGE_TYPE_INSERT || + lastChangeType == Changelog.CHANGE_TYPE_UPDATE_POSTIMAGE + + val (preLabel, postLabel) = + if (computeUpdates) { + (Changelog.CHANGE_TYPE_UPDATE_PREIMAGE, Changelog.CHANGE_TYPE_UPDATE_POSTIMAGE) + } else { + (Changelog.CHANGE_TYPE_DELETE, Changelog.CHANGE_TYPE_INSERT) + } + + val out: Iterator[Row] = (existedBefore, existsAfter) match { + case (false, false) => Iterator.empty + case (false, true) => Iterator(relabel(last, Changelog.CHANGE_TYPE_INSERT)) + case (true, false) => Iterator(relabel(first, Changelog.CHANGE_TYPE_DELETE)) + case (true, true) => Iterator(relabel(first, preLabel), relabel(last, postLabel)) + } + + firstEvent.clear() + lastEvent.clear() + out + } + + private def relabel(row: Row, newChangeType: String): Row = { + val values = row.toSeq.toArray + values(changeTypeIdx) = newChangeType + new GenericRowWithSchema(values, inputSchema) + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala index 665a229f00c34..d2963f461f402 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala @@ -17,8 +17,12 @@ package org.apache.spark.sql.catalyst.analysis +import java.util.UUID + +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.{ + CollectList, Count, First, Last, @@ -31,7 +35,9 @@ import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2 import org.apache.spark.sql.connector.catalog.{Changelog, ChangelogInfo} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.datasources.v2.{ChangelogTable, DataSourceV2Relation} -import org.apache.spark.sql.types.{IntegerType, StringType} +import org.apache.spark.sql.streaming.{OutputMode, StatefulProcessor} +import org.apache.spark.sql.types.{BooleanType, IntegerType, MetadataBuilder, StringType, StructField, StructType} +import org.apache.spark.unsafe.types.CalendarInterval /** * Post-processes a resolved [[ChangelogTable]] read to apply CDC option semantics @@ -47,10 +53,20 @@ import org.apache.spark.sql.types.{IntegerType, StringType} * carry-over pairs (same rowVersion on both sides) and the subsequent Project relabels * real delete+insert pairs as update_preimage / update_postimage. Net change * computation runs on top of that, collapsing intermediate states per `rowId`. - * - Streaming: post-processing is not yet supported. If the requested options would - * require any post-processing, the rule throws an explicit [[AnalysisException]] to - * prevent silent wrong results. Streams that don't require post-processing pass - * through unchanged. + * - Streaming: row-level passes (carry-over removal and update detection) are supported + * by rewriting the same logic in streaming-allowed primitives -- an + * [[EventTimeWatermark]] on `_commit_timestamp`, a stateful [[Aggregate]] keyed by + * `(rowId, _commit_version, _commit_timestamp)` that buffers events into an array, an + * optional [[Filter]] for carry-over removal, a [[Generate]] using `Inline` to + * re-emit the buffered events as rows, and an optional relabel [[Project]] for + * update detection. Net change computation is supported by delegating per-row-identity + * state management to a [[CdcNetChangesStatefulProcessor]] driven by + * [[TransformWithState]] -- the processor keeps the first and last event observed for + * each row identity and emits the SPIP collapse output when the global watermark + * advances past the last `_commit_timestamp` seen for that key. Row identities only + * touched in the latest observed commit are held back until a later commit advances + * the watermark or the source terminates. Streams that don't require any + * post-processing pass through unchanged. */ object ResolveChangelogTable extends Rule[LogicalPlan] { @@ -64,8 +80,11 @@ object ResolveChangelogTable extends Rule[LogicalPlan] { final val MinRv = "__spark_cdc_min_rv" final val MaxRv = "__spark_cdc_max_rv" final val RvCnt = "__spark_cdc_rv_cnt" + // Streaming-only: array of struct buffering all input rows for one (rowId, + // _commit_version) group, fed into Generate(Inline(...)) to re-emit per-row. + final val Events = "__spark_cdc_events" - val all: Set[String] = Set(DelCnt, InsCnt, MinRv, MaxRv, RvCnt) + val all: Set[String] = Set(DelCnt, InsCnt, MinRv, MaxRv, RvCnt, Events) } /** @@ -78,6 +97,10 @@ object ResolveChangelogTable extends Rule[LogicalPlan] { final val FirstRowChangeTypeValue = "__spark_cdc_first_row_change_type_value" final val LastRowChangeTypeValue = "__spark_cdc_last_row_change_type_value" + // Streaming-only: rowId expressions are aliased to top-level helper columns named + // `__spark_cdc_rowid_` so they can be referenced as plain Attributes in the + // grouping list of `transformWithState`. + def rowIdColumn(idx: Int): String = s"__spark_cdc_rowid_$idx" val all: Set[String] = Set(RowNumber, RowCount, FirstRowChangeTypeValue, LastRowChangeTypeValue) @@ -109,16 +132,27 @@ object ResolveChangelogTable extends Rule[LogicalPlan] { case rel @ StreamingRelationV2(_, _, table: ChangelogTable, _, _, _, _, _, _) if !table.resolved => - // Streaming CDC reads do not yet apply post-processing. Run the same option / - // capability validation as the batch path so silent wrong results are impossible: - // either no post-processing would be required (fall through, return raw stream), - // or we throw an explicit AnalysisException. val changelog = table.changelog val req = evaluateRequirements(changelog, table.changelogInfo) - if (req.needsAny) { - throw QueryCompilationErrors.cdcStreamingPostProcessingNotSupported(changelog.name()) + val resolvedRel = rel.copy(table = table.copy(resolved = true)) + var updatedRel: LogicalPlan = resolvedRel + if (req.requiresCarryOverRemoval || req.requiresUpdateDetection) { + updatedRel = addStreamingRowLevelPostProcessing( + resolvedRel, changelog, req.requiresCarryOverRemoval, req.requiresUpdateDetection) + } + if (req.requiresNetChanges) { + // Resolve the rowId references against `updatedRel` (the post-row-level plan) + // rather than the bare `resolvedRel`. The streaming row-level rewrite uses + // Aggregate + Generate(Inline), neither of which preserves the original + // attribute ExprIds for the inlined columns; resolving against `resolvedRel` + // yields stale ExprIds that fail post-analysis attribute resolution. The + // row-level rewrite preserves the connector's schema (column names) on its + // output, so name-based resolution against `updatedRel` recovers the right + // attributes regardless of any preceding wrapping. + updatedRel = addStreamingNetChangeComputation( + updatedRel, changelog, table.changelogInfo.computeUpdates()) } - rel.copy(table = table.copy(resolved = true)) + updatedRel } // --------------------------------------------------------------------------- @@ -197,6 +231,252 @@ object ResolveChangelogTable extends Rule[LogicalPlan] { removeHelperColumns(modifiedPlan) } + /** + * Streaming counterpart of [[addRowLevelPostProcessing]]. + * + * ==Why a different shape from the batch path?== + * + * The batch rewrite is Window-based: + * {{{ + * DataSourceV2Relation + * -> Window partitioned by (rowId..., _commit_version) + * -> [Filter (carry-over)] + * -> [Project (update relabel)] + * -> Project (drop helper columns) + * }}} + * [[org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker]] rejects + * `Window` on streaming queries (`NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING`). + * Replacing it with a plain [[Aggregate]] is not enough on its own: an aggregate + * collapses each group to a single row, losing the per-input rows we still need to + * relabel/filter; and an append-mode streaming aggregate without an event-time + * watermark on a grouping key is itself rejected by the checker. + * + * ==The rewritten plan== + * + * Two adjustments over the naive substitution: (a) inject an [[EventTimeWatermark]] + * on `_commit_timestamp` (zero delay) so the aggregate is legal in append mode, and + * (b) buffer every input row of a group as `Inline`-able structs and re-explode after + * the aggregate so no rows are lost. + * {{{ + * DataSourceV2Relation + * -> Filter (RaiseError on NULL _commit_timestamp) + * -> EventTimeWatermark(_commit_timestamp, 0s) + * -> Aggregate + * group by (rowId..., _commit_version, _commit_timestamp) + * aggs : _del_cnt, _ins_cnt + * [, _min_rv, _max_rv, _rv_cnt (carry-over removal only)] + * , __spark_cdc_events = collect_list(struct(*)) + * -> [Filter (carry-over: _del_cnt=1 AND _ins_cnt=1 + * AND _rv_cnt=2 AND _min_rv=_max_rv)] + * -> Generate(Inline(__spark_cdc_events)) // re-emit one row per buffered input + * -> [Project (update relabel)] + * -> Project (drop helper columns) + * -> Project (strip internal EventTimeWatermark metadata) + * }}} + * + * ==Runtime walkthrough== + * + * Append-mode streaming aggregates emit a group when its event-time grouping key + * falls at or below the global watermark (eviction predicate `eventTime <= watermark`, + * applied at the start of the next micro-batch). Suppose three commits with + * `_commit_timestamp` 10, 20, 30 each arrive in their own micro-batch: + * {{{ + * batch max _ts seen watermark after batch groups emitted by this batch + * ----- ------------ --------------------- ---------------------------- + * 1 10 10 + * 2 20 20 groups with _commit_timestamp == 10 + * 3 30 30 groups with _commit_timestamp == 20 + * end-of-stream final flush groups with _commit_timestamp == 30 + * }}} + * Because every row of a single commit shares the same `_commit_timestamp` (CDC + * contract), advancing past commit T releases every group whose grouping + * `_commit_timestamp` equals T -- one commit's worth of post-processed output per + * micro-batch, with the final commit flushed on stream termination. + * + * ==Per-operator detail== + * + * 0. [[Filter]] guarding against NULL `_commit_timestamp` -- raises + * `CHANGELOG_CONTRACT_VIOLATION.NULL_COMMIT_TIMESTAMP` for any row that + * violates the contract. A NULL would never satisfy the downstream Aggregate's + * `eventTime <= watermark` eviction predicate (NULL is silent in MAX, never + * compares less-than-or-equal), so its group would be held in state forever. + * Failing fast surfaces the connector bug instead of producing no output. + * 1. [[EventTimeWatermark]] on `_commit_timestamp` (zero delay) -- required so the + * downstream stateful aggregate can emit groups in append output mode. By CDC + * contract every row in a single commit shares `_commit_timestamp`, so taking it + * as event time is safe. + * 2. [[Aggregate]] keyed by `(rowId..., _commit_version, _commit_timestamp)`. Computes + * the same `_del_cnt` / `_ins_cnt` / (`_min_rv` / `_max_rv` / `_rv_cnt`) helpers as + * the batch path, plus an `__spark_cdc_events` array-of-struct buffering every + * input row of the group. `_commit_timestamp` is included in the grouping keys + * (besides being a no-op given the contract) to satisfy + * [[org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker]]'s + * requirement that the watermark attribute appear among grouping expressions for + * append-mode streaming aggregations. + * 3. [[Filter]] (only when carry-over removal is requested) on the same predicate as + * the batch path -- groups with `_del_cnt = 1 AND _ins_cnt = 1 AND _rv_cnt = 2 AND + * _min_rv = _max_rv` are dropped wholesale. + * 4. [[Generate]] using `Inline(events)` to re-emit one output row per buffered input + * row. `unrequiredChildIndex` drops the duplicate grouping columns and the events + * buffer; the helper count columns flow through. + * 5. [[Project]] (only when update detection is requested) applying the same + * `CHANGELOG_CONTRACT_VIOLATION.UNEXPECTED_MULTIPLE_CHANGES_PER_ROW_VERSION` + * guard and `_change_type` relabel as the batch path. + * 6. [[Project]] (via [[removeHelperColumns]]) drops `__spark_cdc_*` helpers so + * the output schema matches the connector's declared schema. + * 7. Final [[Project]] (via [[stripCommitTimestampWatermarkMetadata]]) clears the + * `EventTimeWatermark.delayKey` from the user-visible `_commit_timestamp` + * attribute so a downstream user-supplied `withWatermark` on a different column + * does not interact with our internal watermark via the global multi-watermark + * policy. + */ + private def addStreamingRowLevelPostProcessing( + plan: LogicalPlan, + cl: Changelog, + requiresCarryOverRemoval: Boolean, + requiresUpdateDetection: Boolean): LogicalPlan = { + // Fail fast on a NULL `_commit_timestamp`. The downstream Aggregate uses it as + // both an event-time watermark column and a grouping key; a NULL group-key value + // would never satisfy the `eventTime <= watermark` eviction predicate, so the + // group would silently stall (held in state until end of stream). Mirrors the + // runtime check in [[CdcNetChangesStatefulProcessor]] -- fail fast at the + // contract violation rather than producing no output. + val plan1 = addNullCommitTimestampGuard(plan) + val rawCommitTsAttr = getAttribute(plan1, "_commit_timestamp") + val watermarked = EventTimeWatermark( + UUID.randomUUID(), rawCommitTsAttr, new CalendarInterval(0, 0, 0L), plan1) + + val rowIdExprs = V2ExpressionUtils.resolveRefs[NamedExpression]( + cl.rowId().toSeq, watermarked) + val commitVersionAttr = getAttribute(watermarked, "_commit_version") + // Pick up the post-watermark `_commit_timestamp` attribute -- it carries the + // EventTimeWatermark.delayKey metadata that UnsupportedOperationChecker scans for. + val commitTimestampAttr = getAttribute(watermarked, "_commit_timestamp") + val changeTypeAttr = getAttribute(watermarked, "_change_type") + + val groupingExprs: Seq[Expression] = + rowIdExprs ++ Seq(commitVersionAttr, commitTimestampAttr) + val groupingNamedExprs: Seq[NamedExpression] = + groupingExprs.map(_.asInstanceOf[NamedExpression]) + + val insertIf = If(EqualTo(changeTypeAttr, Literal(Changelog.CHANGE_TYPE_INSERT)), + Literal(1), Literal(null, IntegerType)) + val deleteIf = If(EqualTo(changeTypeAttr, Literal(Changelog.CHANGE_TYPE_DELETE)), + Literal(1), Literal(null, IntegerType)) + val delCntAlias = Alias( + Count(Seq(deleteIf)).toAggregateExpression(), HelperColumn.DelCnt)() + val insCntAlias = Alias( + Count(Seq(insertIf)).toAggregateExpression(), HelperColumn.InsCnt)() + + val rvAliases = if (requiresCarryOverRemoval) { + val rowVersionExpr = V2ExpressionUtils.resolveRef[NamedExpression]( + cl.rowVersion(), watermarked) + Seq( + Alias(Min(rowVersionExpr).toAggregateExpression(), HelperColumn.MinRv)(), + Alias(Max(rowVersionExpr).toAggregateExpression(), HelperColumn.MaxRv)(), + Alias(Count(Seq(rowVersionExpr)).toAggregateExpression(), HelperColumn.RvCnt)()) + } else Seq.empty + + // Buffer every input row as a struct so Inline can re-emit them after the aggregate. + // The grouping-key columns (rowId..., `_commit_version`, `_commit_timestamp`) appear + // both inside the struct and as top-level grouping outputs; the top-level duplicates + // are dropped via `unrequiredChildIndex` below. + val structOfAllCols = CreateStruct(watermarked.output) + val eventsAlias = Alias( + new CollectList(structOfAllCols).toAggregateExpression(), HelperColumn.Events)() + + val aggregateExprs: Seq[NamedExpression] = + groupingNamedExprs ++ Seq(delCntAlias, insCntAlias) ++ rvAliases :+ eventsAlias + val aggregated = Aggregate(groupingExprs, aggregateExprs, watermarked) + + val filtered: LogicalPlan = if (requiresCarryOverRemoval) { + val delCnt = getAttribute(aggregated, HelperColumn.DelCnt) + val insCnt = getAttribute(aggregated, HelperColumn.InsCnt) + val minRv = getAttribute(aggregated, HelperColumn.MinRv) + val maxRv = getAttribute(aggregated, HelperColumn.MaxRv) + val rvCnt = getAttribute(aggregated, HelperColumn.RvCnt) + val isCarryoverPair = And( + And(EqualTo(delCnt, Literal(1L)), EqualTo(insCnt, Literal(1L))), + And(EqualTo(rvCnt, Literal(2L)), EqualTo(minRv, maxRv))) + Filter(Not(isCarryoverPair), aggregated) + } else aggregated + + // Inline the struct array back into rows. Drop the events column (consumed by Inline) + // and the grouping-key columns (re-emitted from inside the struct) so the final shape + // matches the connector's schema plus the surviving helper count columns. + val eventsAttr = getAttribute(filtered, HelperColumn.Events) + val groupingAttrSet = AttributeSet(groupingNamedExprs.map(_.toAttribute)) + val unrequiredChildIndex: Seq[Int] = filtered.output.zipWithIndex.collect { + case (a, i) if a.exprId == eventsAttr.exprId => i + case (a, i) if groupingAttrSet.contains(a) => i + } + val generatorOutput: Seq[Attribute] = watermarked.output.map { col => + AttributeReference(col.name, col.dataType, col.nullable, col.metadata)() + } + val generated = Generate( + Inline(eventsAttr), + unrequiredChildIndex = unrequiredChildIndex, + outer = false, + qualifier = None, + generatorOutput = generatorOutput, + child = filtered) + + val withRelabel: LogicalPlan = if (requiresUpdateDetection) { + addUpdateRelabelProjection(generated) + } else generated + + // Strip the auto-injected EventTimeWatermark metadata from the user-visible + // `_commit_timestamp` so it does not interact with downstream user-supplied + // watermarks via the global multi-watermark policy. The metadata flows through + // Generate(Inline) (which copies attribute metadata) and the relabel Project, so + // it must be cleared here at the boundary of the rewrite. + val cleaned = stripCommitTimestampWatermarkMetadata(withRelabel) + removeHelperColumns(cleaned) + } + + /** + * Adds a `Filter` that raises + * `CHANGELOG_CONTRACT_VIOLATION.NULL_COMMIT_TIMESTAMP` for any input row whose + * `_commit_timestamp` is `NULL`. Used as the first step of the streaming row-level + * rewrite so a contract-violating connector fails fast instead of silently stalling + * the downstream stateful aggregate's group. + */ + private def addNullCommitTimestampGuard(input: LogicalPlan): LogicalPlan = { + val commitTsAttr = getAttribute(input, "_commit_timestamp") + val raise = RaiseError( + Literal("CHANGELOG_CONTRACT_VIOLATION.NULL_COMMIT_TIMESTAMP"), + CreateMap(Nil), + BooleanType) + // CaseWhen returns the default branch (true) for non-null timestamps and + // evaluates the side-effecting RaiseError for nulls; the row never passes the + // filter on a contract violation. + val checkExpr = CaseWhen(Seq(IsNull(commitTsAttr) -> raise), Literal(true)) + Filter(checkExpr, input) + } + + /** + * Final boundary for the streaming row-level rewrite: rebuilds the user-visible + * `_commit_timestamp` attribute with empty watermark-related metadata. Other + * attributes flow through unchanged. + */ + private def stripCommitTimestampWatermarkMetadata(input: LogicalPlan): LogicalPlan = { + val projectList: Seq[NamedExpression] = input.output.map { attr => + if (attr.name == "_commit_timestamp" && + attr.metadata.contains(EventTimeWatermark.delayKey)) { + val cleanedMetadata = new MetadataBuilder() + .withMetadata(attr.metadata) + .remove(EventTimeWatermark.delayKey) + .build() + Alias(attr.withMetadata(cleanedMetadata), attr.name)( + exprId = attr.exprId, + qualifier = attr.qualifier) + } else { + attr + } + } + Project(projectList, input) + } + /** * Adds a Window node partitioned by (rowId, _commit_version) that computes * `_del_cnt` and `_ins_cnt` per partition, and, when `includeRowVersionBounds` @@ -329,6 +609,111 @@ object ResolveChangelogTable extends Rule[LogicalPlan] { filteredAndRelabeledPlan } + /** + * Streaming counterpart of [[injectNetChangeComputation]]. The batch version uses a + * Catalyst `Window` partitioned by `rowId`, which is rejected on streaming queries. + * This version delegates the per-`rowId` first/last extraction and the SPIP collapse + * matrix to a [[CdcNetChangesStatefulProcessor]] driven by `transformWithState`: + * + * 1. [[EventTimeWatermark]] on `_commit_timestamp` (zero delay) so the global query + * watermark advances with each batch. When this rewrite runs on top of the row-level + * post-processing rewrite (combined `containsCarryoverRows` / + * `representsUpdateAsDeleteAndInsert` + `containsIntermediateChanges` path), the + * row-level rewrite has already injected an identical `EventTimeWatermark` and we + * reuse it instead of stacking a second one. Stacking watermarks on the same column + * fails the multi-watermark check unless `STATEFUL_OPERATOR_ALLOW_MULTIPLE` is set, + * and even then it would just produce two redundant nodes. + * 2. [[Project]] that aliases each rowId expression to a top-level helper column. This + * lets us address the rowId as an `Attribute` for the `transformWithState` grouping, + * which in turn makes nested rowId paths (e.g. `payload.id`) work without special + * casing. + * 3. [[TransformWithState]] keyed by the rowId helper attributes, in + * [[org.apache.spark.sql.catalyst.plans.logical.EventTime]] mode. The processor + * buffers the first and last event per row identity; an event-time timer set to the + * latest observed `_commit_timestamp` fires once the global watermark advances past + * it, at which point the processor evaluates the SPIP `(existedBefore, existsAfter)` + * matrix and emits 0, 1, or 2 output rows. + * 4. [[SerializeFromObject]] (added by the `transformWithState` factory) brings the + * processor's `Row` outputs back into a regular tabular shape. + * 5. Final [[Project]] drops the rowId helper columns so the user-visible schema + * matches the connector's declared changelog schema. + * + * Documented limitation: row identities only touched in the latest observed commit do + * not emit until a later commit (with strictly greater `_commit_timestamp`) advances + * the watermark past them, or the source terminates. For bounded streams this matches + * the batch output exactly. + */ + private def addStreamingNetChangeComputation( + plan: LogicalPlan, + cl: Changelog, + computeUpdates: Boolean): LogicalPlan = { + // 1. Inject (or reuse, if already injected by the row-level rewrite) a watermark on + // `_commit_timestamp`. The row-level rewrite already adds one with zero delay, so + // we only add it when no watermark is present in the lineage to avoid stacking + // EventTimeWatermark nodes (which is rejected by the multi-watermark check + // unless STATEFUL_OPERATOR_ALLOW_MULTIPLE is set). + val needsWatermark = !plan.exists { + case _: EventTimeWatermark => true + case _ => false + } + val watermarked: LogicalPlan = if (needsWatermark) { + val rawCommitTsAttr = getAttribute(plan, "_commit_timestamp") + EventTimeWatermark( + UUID.randomUUID(), rawCommitTsAttr, new CalendarInterval(0, 0, 0L), plan) + } else plan + + // 2. Resolve rowId expressions against the watermarked plan. Resolving here (after + // any preceding row-level rewrite) ensures the attribute ExprIds match the + // columns in `plan.output` -- name-based resolution recovers them by their + // connector-declared names. Then project them to top-level helper columns so + // they can be referenced as plain Attributes by `transformWithState`'s grouping. + val rowIdExprs = + V2ExpressionUtils.resolveRefs[NamedExpression](cl.rowId().toSeq, watermarked) + val rowIdHelpers: Seq[Alias] = rowIdExprs.zipWithIndex.map { case (expr, idx) => + Alias(expr, NetChangesHelperColumns.rowIdColumn(idx))() + } + val originalCols: Seq[Attribute] = watermarked.output + val withHelpers = Project(originalCols ++ rowIdHelpers, watermarked) + + // 3. Build the input/output Row encoder for the processor. The schema is the + // watermarked plan's schema plus the rowId helper columns. + val processorInputSchema = StructType( + withHelpers.output.map { a => + StructField(a.name, a.dataType, a.nullable, a.metadata) + }) + val rowEncoder = ExpressionEncoder(processorInputSchema) + val groupingAttrs: Seq[Attribute] = rowIdHelpers.map(_.toAttribute) + val keyEncoder = ExpressionEncoder(StructType(rowIdHelpers.map { a => + StructField(a.name, a.dataType, a.nullable, a.metadata) + })) + + val processor = new CdcNetChangesStatefulProcessor(processorInputSchema, computeUpdates) + + val tws = new TransformWithState( + keyDeserializer = UnresolvedDeserializer(keyEncoder.deserializer, groupingAttrs), + valueDeserializer = UnresolvedDeserializer(rowEncoder.deserializer, withHelpers.output), + groupingAttributes = groupingAttrs, + dataAttributes = withHelpers.output, + statefulProcessor = processor.asInstanceOf[StatefulProcessor[Any, Any, Any]], + timeMode = EventTime, + outputMode = OutputMode.Append(), + keyEncoder = keyEncoder.asInstanceOf[ExpressionEncoder[Any]], + outputObjAttr = CatalystSerde.generateObjAttr(rowEncoder), + child = withHelpers, + hasInitialState = false, + initialStateGroupingAttrs = groupingAttrs, + initialStateDataAttrs = withHelpers.output, + initialStateDeserializer = UnresolvedDeserializer(keyEncoder.deserializer, groupingAttrs), + initialState = LocalRelation(keyEncoder.schema)) + + // 4. Wrap with SerializeFromObject so the obj column becomes regular tabular output. + val serialized = CatalystSerde.serialize(tws)(rowEncoder) + + // 5. Drop the rowId helper columns so the final output matches the connector's schema. + val helperNames = rowIdHelpers.map(_.name).toSet + Project(serialized.output.filterNot(a => helperNames.contains(a.name)), serialized) + } + /** * Adds a Window node partitioned by `rowId` and ordered by * `(_commit_version, change_type_rank)` where pre-events (`update_preimage`, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index f9fb2936a9145..bc0390cc3dbe2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -23,7 +23,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.LogKeys.{ANALYSIS_ERROR, QUERY_PLAN} import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.ExtendedAnalysisException -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, CurrentDate, CurrentTimestampLike, Expression, GroupingSets, LocalTimestamp, MonotonicallyIncreasingID, SessionWindow, WindowExpression} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, CurrentDate, CurrentTimestampLike, Expression, GroupingSets, LocalTimestamp, MonotonicallyIncreasingID, NamedExpression, SessionWindow, WindowExpression} import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ @@ -280,6 +280,26 @@ object UnsupportedOperationChecker extends Logging { case _ => } + // The streaming Change Data Capture (CDC) row-level post-processing rewrite in + // [[ResolveChangelogTable.addStreamingRowLevelPostProcessing]] injects a streaming + // Aggregate buffering input rows into the helper column + // `ResolveChangelogTable.HelperColumn.Events` ("__spark_cdc_events") before + // re-emitting them via `Generate(Inline(...))`. The rewrite is designed and + // validated only for Append output mode -- under Update or Complete the Aggregate + // would re-emit per-batch state changes or the full result table per batch + // respectively, neither of which matches batch CDC semantics. Reject those modes + // at analysis time with a clear error rather than silently producing a misleading + // change feed. + if (outputMode != InternalOutputModes.Append && + aggregates.exists(a => a.aggregateExpressions.exists { + case ne: NamedExpression if ne.resolved => + ne.name == ResolveChangelogTable.HelperColumn.Events + case _ => false + })) { + throw QueryCompilationErrors.unsupportedOutputModeForStreamingOperationError( + outputMode, "Change Data Capture (CDC) streaming reads with post-processing") + } + /** * Whether the subplan will contain complete data or incremental data in every incremental * execution. Some operations may be allowed only when the child logical plan gives complete diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 543711112a709..a5675cd93cf60 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -3869,12 +3869,6 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat messageParameters = Map("changelogName" -> changelogName)) } - def cdcStreamingPostProcessingNotSupported(changelogName: String): AnalysisException = { - new AnalysisException( - errorClass = "INVALID_CDC_OPTION.STREAMING_POST_PROCESSING_NOT_SUPPORTED", - messageParameters = Map("changelogName" -> changelogName)) - } - def changelogMissingColumnError( changelogName: String, columnName: String): AnalysisException = { new AnalysisException( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogEndToEndSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogEndToEndSuite.scala index c56f0e14417e0..eebfa62c542bd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogEndToEndSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogEndToEndSuite.scala @@ -25,7 +25,8 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.NamedStreamingRelation import org.apache.spark.sql.catalyst.streaming.UserProvided import org.apache.spark.sql.connector.catalog._ -import org.apache.spark.sql.connector.catalog.Changelog.{CHANGE_TYPE_DELETE, CHANGE_TYPE_INSERT} +import org.apache.spark.sql.connector.catalog.Changelog.{ + CHANGE_TYPE_DELETE, CHANGE_TYPE_INSERT, CHANGE_TYPE_UPDATE_POSTIMAGE, CHANGE_TYPE_UPDATE_PREIMAGE} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{LongType, StringType} @@ -662,4 +663,450 @@ class ChangelogEndToEndSuite extends SharedSparkSession { } assert(e.getMessage.contains("changes")) } + + // ---------- Streaming: row-level post-processing ---------- + // + // Streaming row-level passes (carry-over removal, update detection) rewrite the plan + // into Aggregate(rowId, _commit_version, _commit_timestamp) -> [Filter] -> + // Generate(Inline(events)) -> [relabel Project], under an EventTimeWatermark on + // _commit_timestamp. + + /** Schema variant for post-processing tests: includes `row_commit_version`. */ + private def recreateWithRowVersion(): Identifier = { + val id = ident + val cat = catalog + if (cat.tableExists(id)) cat.dropTable(id) + cat.createTable( + id, + Array( + Column.create("id", LongType, false), + Column.create("data", StringType), + Column.create("row_commit_version", LongType, false)), + Array.empty, + new util.HashMap[String, String]()) + cat.clearChangeRows(id) + id + } + + /** Row constructor for the row-version-enabled schema. */ + private def ppRow( + id: Long, + data: String, + rcv: Long, + changeType: String, + commitVersion: Long, + commitTimestampMicros: Long): InternalRow = { + InternalRow( + id, + UTF8String.fromString(data), + rcv, + UTF8String.fromString(changeType), + commitVersion, + commitTimestampMicros) + } + + test("streaming carry-over removal drops CoW pairs") { + val id = recreateWithRowVersion() + catalog.setChangelogProperties(id, ChangelogProperties( + containsCarryoverRows = true, + rowIdNames = Seq("id"), + rowVersionName = Some("row_commit_version"))) + + catalog.addChangeRows(id, Seq( + // v1: insert Alice (rcv=1), Bob (rcv=1) + ppRow(1L, "Alice", 1L, CHANGE_TYPE_INSERT, 1L, 1000000L), + ppRow(2L, "Bob", 1L, CHANGE_TYPE_INSERT, 1L, 1000000L), + // v2: real delete Alice + carry-over for Bob (rcv unchanged) + ppRow(1L, "Alice", 1L, CHANGE_TYPE_DELETE, 2L, 2000000L), + ppRow(2L, "Bob", 1L, CHANGE_TYPE_DELETE, 2L, 2000000L), + ppRow(2L, "Bob", 1L, CHANGE_TYPE_INSERT, 2L, 2000000L))) + + val q = spark.readStream + .option("startingVersion", "1") + .changes(fullTableName) + .select("id", "data", "_change_type", "_commit_version") + .writeStream + .format("memory") + .queryName("cdc_stream_carryover") + .outputMode("append") + .start() + try { + q.processAllAvailable() + // The next micro-batch advances the input watermark to the max _commit_timestamp + // seen in the previous batch; append-mode aggregate eviction (eventTime <= watermark) + // then emits all groups including the highest commit. v1 inserts + Alice's real + // delete survive; Bob's carry-over pair at v2 is dropped. + checkAnswer( + spark.sql("SELECT * FROM cdc_stream_carryover"), + Seq( + Row(1L, "Alice", CHANGE_TYPE_INSERT, 1L), + Row(2L, "Bob", CHANGE_TYPE_INSERT, 1L), + Row(1L, "Alice", CHANGE_TYPE_DELETE, 2L))) + } finally { + q.stop() + } + } + + test("streaming update detection relabels delete+insert as update") { + val id = recreateWithRowVersion() + catalog.setChangelogProperties(id, ChangelogProperties( + representsUpdateAsDeleteAndInsert = true, + rowIdNames = Seq("id"), + rowVersionName = Some("row_commit_version"))) + + catalog.addChangeRows(id, Seq( + // v1: insert Alice (rcv=1) + ppRow(1L, "Alice", 1L, CHANGE_TYPE_INSERT, 1L, 1000000L), + // v2: real update Alice -> Robert (delete old, insert new) + ppRow(1L, "Alice", 1L, CHANGE_TYPE_DELETE, 2L, 2000000L), + ppRow(1L, "Robert", 2L, CHANGE_TYPE_INSERT, 2L, 2000000L))) + + val q = spark.readStream + .option("startingVersion", "1") + .option("computeUpdates", "true") + .option("deduplicationMode", "none") + .changes(fullTableName) + .select("id", "data", "_change_type", "_commit_version") + .writeStream + .format("memory") + .queryName("cdc_stream_updates") + .outputMode("append") + .start() + try { + q.processAllAvailable() + checkAnswer( + spark.sql("SELECT * FROM cdc_stream_updates"), + Seq( + Row(1L, "Alice", CHANGE_TYPE_INSERT, 1L), + Row(1L, "Alice", CHANGE_TYPE_UPDATE_PREIMAGE, 2L), + Row(1L, "Robert", CHANGE_TYPE_UPDATE_POSTIMAGE, 2L))) + } finally { + q.stop() + } + } + + // TransformWithState requires the RocksDB state store backend. + private val rocksDbProviderConf = SQLConf.STATE_STORE_PROVIDER_CLASS.key -> + "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider" + + test("streaming netChanges collapses INSERT then DELETE to no output") { + val id = recreateWithRowVersion() + catalog.setChangelogProperties(id, ChangelogProperties( + containsIntermediateChanges = true, + rowIdNames = Seq("id"), + rowVersionName = Some("row_commit_version"))) + + catalog.addChangeRows(id, Seq( + // v1: insert Alice (rcv=1) + ppRow(1L, "Alice", 1L, CHANGE_TYPE_INSERT, 1L, 1000000L), + // v2: delete Alice -- net cancels out + ppRow(1L, "Alice", 1L, CHANGE_TYPE_DELETE, 2L, 2000000L), + // v3: insert Bob -- emits at end-of-input flush + ppRow(2L, "Bob", 3L, CHANGE_TYPE_INSERT, 3L, 3000000L))) + + withSQLConf(rocksDbProviderConf) { + val q = spark.readStream + .option("startingVersion", "1") + .option("deduplicationMode", "netChanges") + .changes(fullTableName) + .select("id", "data", "_change_type", "_commit_version") + .writeStream + .format("memory") + .queryName("cdc_stream_netchanges_cancel") + .outputMode("append") + .start() + try { + q.processAllAvailable() + // End-of-input flushes all timers so Bob's insert emits. + // Alice's INSERT then DELETE cancels out (no row), and the final "Bob" stays. + checkAnswer( + spark.sql("SELECT * FROM cdc_stream_netchanges_cancel"), + Seq(Row(2L, "Bob", CHANGE_TYPE_INSERT, 3L))) + } finally { + q.stop() + } + } + } + + test("streaming netChanges with computeUpdates labels persisting rows as updates") { + val id = recreateWithRowVersion() + catalog.setChangelogProperties(id, ChangelogProperties( + containsIntermediateChanges = true, + representsUpdateAsDeleteAndInsert = false, // updates already materialized + rowIdNames = Seq("id"), + rowVersionName = Some("row_commit_version"))) + + // Row identity 1 already exists before the stream window, so the first event we + // observe is its update_preimage -> existedBefore = true. The last event is the + // update_postimage in v2 -> existsAfter = true. With computeUpdates = true the + // (true, true) cell of the SPIP matrix emits a relabeled + // update_preimage + update_postimage pair (rather than delete + insert). + catalog.addChangeRows(id, Seq( + // v1: pre-existing Alice updated to Bob + ppRow(1L, "Alice", 1L, CHANGE_TYPE_UPDATE_PREIMAGE, 1L, 1000000L), + ppRow(1L, "Bob", 1L, CHANGE_TYPE_UPDATE_POSTIMAGE, 1L, 1000000L), + // v2: Bob updated to Robert -- the v1 preimage and the v2 postimage are the + // first and last events for row identity 1 across the entire range. + ppRow(1L, "Bob", 1L, CHANGE_TYPE_UPDATE_PREIMAGE, 2L, 2000000L), + ppRow(1L, "Robert", 2L, CHANGE_TYPE_UPDATE_POSTIMAGE, 2L, 2000000L))) + + withSQLConf(rocksDbProviderConf) { + val q = spark.readStream + .option("startingVersion", "1") + .option("deduplicationMode", "netChanges") + .option("computeUpdates", "true") + .changes(fullTableName) + .select("id", "data", "_change_type") + .writeStream + .format("memory") + .queryName("cdc_stream_netchanges_update") + .outputMode("append") + .start() + try { + q.processAllAvailable() + checkAnswer( + spark.sql("SELECT * FROM cdc_stream_netchanges_update"), + Seq( + Row(1L, "Alice", CHANGE_TYPE_UPDATE_PREIMAGE), + Row(1L, "Robert", CHANGE_TYPE_UPDATE_POSTIMAGE))) + } finally { + q.stop() + } + } + } + + // The streaming row-level rewrite injects a streaming Aggregate, which is only + // semantically valid with Append output mode (Update / Complete would re-emit + // per-batch updates or the entire result table per batch, neither of which matches + // batch CDC semantics). UnsupportedOperationChecker now rejects those modes. + + test("streaming row-level post-processing with update output mode is rejected") { + val id = recreateWithRowVersion() + catalog.setChangelogProperties(id, ChangelogProperties( + containsCarryoverRows = true, + rowIdNames = Seq("id"), + rowVersionName = Some("row_commit_version"))) + catalog.addChangeRows(id, Seq( + ppRow(1L, "Alice", 1L, CHANGE_TYPE_INSERT, 1L, 1000000L))) + + val e = intercept[AnalysisException] { + spark.readStream + .option("startingVersion", "1") + .changes(fullTableName) + .writeStream + .format("memory") + .queryName("cdc_stream_update_rejected") + .outputMode("update") + .start() + } + assert(e.getCondition == "STREAMING_OUTPUT_MODE.UNSUPPORTED_OPERATION", + s"Unexpected error: ${e.getMessage}") + assert(e.getMessage.contains("Change Data Capture"), + s"Error should mention CDC: ${e.getMessage}") + } + + test("streaming row-level post-processing with complete output mode is rejected") { + val id = recreateWithRowVersion() + catalog.setChangelogProperties(id, ChangelogProperties( + containsCarryoverRows = true, + rowIdNames = Seq("id"), + rowVersionName = Some("row_commit_version"))) + catalog.addChangeRows(id, Seq( + ppRow(1L, "Alice", 1L, CHANGE_TYPE_INSERT, 1L, 1000000L))) + + val e = intercept[AnalysisException] { + spark.readStream + .option("startingVersion", "1") + .changes(fullTableName) + .writeStream + .format("memory") + .queryName("cdc_stream_complete_rejected") + .outputMode("complete") + .start() + } + assert(e.getCondition == "STREAMING_OUTPUT_MODE.UNSUPPORTED_OPERATION", + s"Unexpected error: ${e.getMessage}") + assert(e.getMessage.contains("Change Data Capture"), + s"Error should mention CDC: ${e.getMessage}") + } + + test("streaming row-level rewrite raises on NULL _commit_timestamp") { + val id = recreateWithRowVersion() + catalog.setChangelogProperties(id, ChangelogProperties( + containsCarryoverRows = true, + rowIdNames = Seq("id"), + rowVersionName = Some("row_commit_version"))) + + // Insert a row with NULL _commit_timestamp (last column). + val row = InternalRow( + 1L, UTF8String.fromString("Alice"), 1L, + UTF8String.fromString(CHANGE_TYPE_INSERT), 1L, null) + catalog.addChangeRows(id, Seq(row)) + + val q = spark.readStream + .option("startingVersion", "1") + .changes(fullTableName) + .writeStream + .format("memory") + .queryName("cdc_stream_null_ts") + .outputMode("append") + .start() + try { + val e = intercept[org.apache.spark.sql.streaming.StreamingQueryException] { + q.processAllAvailable() + } + // The CHANGELOG_CONTRACT_VIOLATION runtime error wraps the message; it should + // mention NULL_COMMIT_TIMESTAMP somewhere in the chain. + assert(e.getMessage.contains("NULL_COMMIT_TIMESTAMP") || + Option(e.getCause).map(_.getMessage).getOrElse("").contains("NULL_COMMIT_TIMESTAMP"), + s"Expected NULL_COMMIT_TIMESTAMP in the error chain. Got: ${e.getMessage}") + } finally { + q.stop() + } + } + + test("streaming netChanges emits DELETE for pre-existing row deleted in range") { + // Exercises the SPIP `(true, false)` cell: existedBefore = true (first event is a + // delete or update_preimage), existsAfter = false (last event is a delete). + val id = recreateWithRowVersion() + catalog.setChangelogProperties(id, ChangelogProperties( + containsIntermediateChanges = true, + rowIdNames = Seq("id"), + rowVersionName = Some("row_commit_version"))) + + catalog.addChangeRows(id, Seq( + // v1: pre-existing Alice gets updated to Bob. + ppRow(1L, "Alice", 1L, CHANGE_TYPE_UPDATE_PREIMAGE, 1L, 1000000L), + ppRow(1L, "Bob", 1L, CHANGE_TYPE_UPDATE_POSTIMAGE, 1L, 1000000L), + // v2: Bob deleted -- the v1 preimage is the first event and the v2 delete is + // the last event for row identity 1 across the entire range. + ppRow(1L, "Bob", 1L, CHANGE_TYPE_DELETE, 2L, 2000000L), + // v3: insert Carol -- gives the watermark something to advance past, so row + // identity 1's timer fires before end-of-input. + ppRow(2L, "Carol", 3L, CHANGE_TYPE_INSERT, 3L, 3000000L))) + + withSQLConf(rocksDbProviderConf) { + val q = spark.readStream + .option("startingVersion", "1") + .option("deduplicationMode", "netChanges") + .changes(fullTableName) + .select("id", "data", "_change_type") + .writeStream + .format("memory") + .queryName("cdc_stream_netchanges_delete") + .outputMode("append") + .start() + try { + q.processAllAvailable() + checkAnswer( + spark.sql("SELECT * FROM cdc_stream_netchanges_delete"), + Seq( + // (true, false): emit a single DELETE carrying the *first* event's data + // (the preimage), per the batch contract. + Row(1L, "Alice", CHANGE_TYPE_DELETE), + Row(2L, "Carol", CHANGE_TYPE_INSERT))) + } finally { + q.stop() + } + } + } + + test("streaming netChanges without computeUpdates keeps persisting rows as DELETE+INSERT") { + // Exercises the SPIP `(true, true)` cell with computeUpdates = false: the pair is + // emitted as DELETE + INSERT rather than relabeled as + // update_preimage + update_postimage. + val id = recreateWithRowVersion() + catalog.setChangelogProperties(id, ChangelogProperties( + containsIntermediateChanges = true, + rowIdNames = Seq("id"), + rowVersionName = Some("row_commit_version"))) + + catalog.addChangeRows(id, Seq( + // v1: pre-existing Alice updated to Bob. + ppRow(1L, "Alice", 1L, CHANGE_TYPE_UPDATE_PREIMAGE, 1L, 1000000L), + ppRow(1L, "Bob", 1L, CHANGE_TYPE_UPDATE_POSTIMAGE, 1L, 1000000L), + // v2: Bob updated to Robert -- row identity 1 spans (preimage Alice) .. + // (postimage Robert). + ppRow(1L, "Bob", 1L, CHANGE_TYPE_UPDATE_PREIMAGE, 2L, 2000000L), + ppRow(1L, "Robert", 2L, CHANGE_TYPE_UPDATE_POSTIMAGE, 2L, 2000000L))) + + withSQLConf(rocksDbProviderConf) { + val q = spark.readStream + .option("startingVersion", "1") + .option("deduplicationMode", "netChanges") + // computeUpdates defaults to false. + .changes(fullTableName) + .select("id", "data", "_change_type") + .writeStream + .format("memory") + .queryName("cdc_stream_netchanges_no_compute_updates") + .outputMode("append") + .start() + try { + q.processAllAvailable() + checkAnswer( + spark.sql("SELECT * FROM cdc_stream_netchanges_no_compute_updates"), + Seq( + Row(1L, "Alice", CHANGE_TYPE_DELETE), + Row(1L, "Robert", CHANGE_TYPE_INSERT))) + } finally { + q.stop() + } + } + } + + test("streaming netChanges + carry-over removal: combined post-processing") { + // Validates the design point that the row-level rewrite and the netChanges rewrite + // share a single EventTimeWatermark on `_commit_timestamp` and produce the + // expected combined result. Carry-over CoW pairs are dropped before the netChanges + // collapse runs, so the final emission only reflects real content changes. + val id = recreateWithRowVersion() + catalog.setChangelogProperties(id, ChangelogProperties( + containsCarryoverRows = true, + containsIntermediateChanges = true, + rowIdNames = Seq("id"), + rowVersionName = Some("row_commit_version"))) + + catalog.addChangeRows(id, Seq( + // v1: insert Alice, Bob (rcv=1). + ppRow(1L, "Alice", 1L, CHANGE_TYPE_INSERT, 1L, 1000000L), + ppRow(2L, "Bob", 1L, CHANGE_TYPE_INSERT, 1L, 1000000L), + // v2: real delete Alice + carry-over for Bob (rcv unchanged means CoW rewrite, + // no content change). + ppRow(1L, "Alice", 1L, CHANGE_TYPE_DELETE, 2L, 2000000L), + ppRow(2L, "Bob", 1L, CHANGE_TYPE_DELETE, 2L, 2000000L), + ppRow(2L, "Bob", 1L, CHANGE_TYPE_INSERT, 2L, 2000000L), + // v3: insert Carol -- advances the watermark past v2 so timers for row + // identities 1 and 2 fire and the netChanges output is emitted. + ppRow(3L, "Carol", 3L, CHANGE_TYPE_INSERT, 3L, 3000000L))) + + withSQLConf(rocksDbProviderConf) { + val q = spark.readStream + .option("startingVersion", "1") + .option("deduplicationMode", "netChanges") + .changes(fullTableName) + .select("id", "data", "_change_type") + .writeStream + .format("memory") + .queryName("cdc_stream_netchanges_with_carryover") + .outputMode("append") + .start() + try { + q.processAllAvailable() + // After carry-over removal: Alice has v1 INSERT + v2 DELETE; Bob has only + // v1 INSERT (the v2 CoW pair was dropped); Carol has v3 INSERT. + // After netChanges: + // Alice -- (false, false) -> no output + // Bob -- (false, true) -> emit INSERT + // Carol -- (false, true) -> emit INSERT + checkAnswer( + spark.sql("SELECT * FROM cdc_stream_netchanges_with_carryover"), + Seq( + Row(2L, "Bob", CHANGE_TYPE_INSERT), + Row(3L, "Carol", CHANGE_TYPE_INSERT))) + } finally { + q.stop() + } + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogResolutionSuite.scala index 8f29df44538b6..603b2f5ff6ae3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogResolutionSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.connector import java.util.Collections import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2 import org.apache.spark.sql.connector.catalog._ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ @@ -208,12 +209,13 @@ class ChangelogResolutionSuite extends SharedSparkSession { } // =========================================================================== - // Streaming post-processing rejection + // Streaming post-processing // =========================================================================== // - // Streaming CDC reads bypass the post-processing analyzer rule's transformation - // path. To prevent silent wrong results when the requested options would require - // post-processing, the rule throws an explicit AnalysisException for streaming. + // Row-level passes (carry-over removal and update detection) rewrite the streaming plan + // into Aggregate -> [Filter] -> Generate(Inline) -> [Project] under an + // EventTimeWatermark on `_commit_timestamp`. Net-change computation is still rejected + // since it requires reasoning over the entire requested range. /** Re-creates the test table with non-nullable columns suitable as rowId / rowVersion. */ private def recreatePostProcessingTable(): Identifier = { @@ -230,7 +232,23 @@ class ChangelogResolutionSuite extends SharedSparkSession { ident } - test("DataStreamReader - changes() with carry-over capability throws") { + private def assertStreamingRowLevelRewrite(plan: LogicalPlan): Unit = { + import org.apache.spark.sql.catalyst.plans.logical.{ + Aggregate, EventTimeWatermark, Generate} + val watermarks = plan.collect { case w: EventTimeWatermark => w } + assert(watermarks.nonEmpty, + s"Expected EventTimeWatermark in streaming row-level rewrite. Plan:\n$plan") + assert(watermarks.head.eventTime.name == "_commit_timestamp", + s"Watermark must be on `_commit_timestamp`. Plan:\n$plan") + val aggs = plan.collect { case a: Aggregate => a } + assert(aggs.nonEmpty, + s"Expected Aggregate in streaming row-level rewrite. Plan:\n$plan") + val gens = plan.collect { case g: Generate => g } + assert(gens.nonEmpty, + s"Expected Generate(Inline) in streaming row-level rewrite. Plan:\n$plan") + } + + test("DataStreamReader - changes() with carry-over capability rewrites plan") { val ident = recreatePostProcessingTable() val cat = spark.sessionState.catalogManager .catalog(cdcCatalogName) @@ -240,17 +258,13 @@ class ChangelogResolutionSuite extends SharedSparkSession { rowIdNames = Seq("id"), rowVersionName = Some("row_commit_version"))) - checkError( - intercept[AnalysisException] { - spark.readStream - .changes(s"$cdcCatalogName.test_table") - .queryExecution.analyzed - }, - condition = "INVALID_CDC_OPTION.STREAMING_POST_PROCESSING_NOT_SUPPORTED", - parameters = Map("changelogName" -> s"$cdcCatalogName.test_table_changelog")) + val analyzed = spark.readStream + .changes(s"$cdcCatalogName.test_table") + .queryExecution.analyzed + assertStreamingRowLevelRewrite(analyzed) } - test("DataStreamReader - changes() with computeUpdates throws") { + test("DataStreamReader - changes() with computeUpdates rewrites plan") { val ident = recreatePostProcessingTable() val cat = spark.sessionState.catalogManager .catalog(cdcCatalogName) @@ -260,16 +274,32 @@ class ChangelogResolutionSuite extends SharedSparkSession { rowIdNames = Seq("id"), rowVersionName = Some("row_commit_version"))) - checkError( - intercept[AnalysisException] { - spark.readStream - .option("computeUpdates", "true") - .option("deduplicationMode", "none") - .changes(s"$cdcCatalogName.test_table") - .queryExecution.analyzed - }, - condition = "INVALID_CDC_OPTION.STREAMING_POST_PROCESSING_NOT_SUPPORTED", - parameters = Map("changelogName" -> s"$cdcCatalogName.test_table_changelog")) + val analyzed = spark.readStream + .option("computeUpdates", "true") + .option("deduplicationMode", "none") + .changes(s"$cdcCatalogName.test_table") + .queryExecution.analyzed + assertStreamingRowLevelRewrite(analyzed) + } + + test("DataStreamReader - changes() with deduplicationMode=netChanges rewrites plan") { + import org.apache.spark.sql.catalyst.plans.logical.TransformWithState + val ident = recreatePostProcessingTable() + val cat = spark.sessionState.catalogManager + .catalog(cdcCatalogName) + .asInstanceOf[InMemoryChangelogCatalog] + cat.setChangelogProperties(ident, ChangelogProperties( + containsIntermediateChanges = true, + rowIdNames = Seq("id"), + rowVersionName = Some("row_commit_version"))) + + val analyzed = spark.readStream + .option("deduplicationMode", "netChanges") + .changes(s"$cdcCatalogName.test_table") + .queryExecution.analyzed + val tws = analyzed.collect { case t: TransformWithState => t } + assert(tws.size == 1, + s"Expected exactly one TransformWithState; found ${tws.size}. Plan:\n$analyzed") } // =========================================================================== diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTablePostProcessingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTablePostProcessingSuite.scala index 721aa47beb560..c48ced72a15cc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTablePostProcessingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTablePostProcessingSuite.scala @@ -368,22 +368,9 @@ class ResolveChangelogTablePostProcessingSuite s"Expected ChangelogTable to be marked resolved by the rule. Plan:\n$plan") } - test("streaming with post-processing options is rejected") { - catalog.setChangelogProperties(ident, ChangelogProperties( - containsCarryoverRows = true, - rowIdNames = Seq("id"), - rowVersionName = Some("row_commit_version"))) - - checkError( - exception = intercept[AnalysisException] { - spark.readStream - .option("startingVersion", "1") - .changes(s"$catalogName.$testTableName") - .queryExecution.analyzed - }, - condition = "INVALID_CDC_OPTION.STREAMING_POST_PROCESSING_NOT_SUPPORTED", - parameters = Map("changelogName" -> s"$catalogName.${testTableName}_changelog")) - } + // The streaming netChanges path is covered by + // ResolveChangelogTableStreamingPostProcessingSuite -- not duplicated here, since + // this suite focuses on the batch path. // =========================================================================== // Combined diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTableStreamingPostProcessingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTableStreamingPostProcessingSuite.scala new file mode 100644 index 0000000000000..eb3e14f5e5e87 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTableStreamingPostProcessingSuite.scala @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector + +import java.util.Collections + +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.sql.{DataFrame, QueryTest} +import org.apache.spark.sql.catalyst.expressions.Inline +import org.apache.spark.sql.catalyst.plans.logical.{ + Aggregate, EventTimeWatermark, Filter, Generate, LogicalPlan, Project, TransformWithState} +import org.apache.spark.sql.connector.catalog.{ + ChangelogProperties, Column, Identifier, InMemoryChangelogCatalog} +import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.LongType + +/** + * Plan-shape tests for the streaming arm of + * [[org.apache.spark.sql.catalyst.analysis.ResolveChangelogTable]]. These mirror the batch + * checks in [[ResolveChangelogTablePostProcessingSuite]] but assert on the rewritten + * logical plan rather than running the streaming query end-to-end (the end-to-end + * coverage lives in [[ChangelogEndToEndSuite]]). + * + * The streaming row-level rewrite is: + * EventTimeWatermark(_commit_timestamp, 0s) + * -> Aggregate keyed by (rowId..., _commit_version, _commit_timestamp) + * -> [Filter (carry-over)] + * -> Generate(Inline(events)) + * -> [Project (update relabel)] + * -> Project (drop helper columns) + */ +class ResolveChangelogTableStreamingPostProcessingSuite + extends QueryTest + with SharedSparkSession + with BeforeAndAfterEach { + + private val catalogName = "cdc_streaming_pp" + private val testTableName = "events" + + override def beforeAll(): Unit = { + super.beforeAll() + spark.conf.set( + s"spark.sql.catalog.$catalogName", + classOf[InMemoryChangelogCatalog].getName) + } + + override def beforeEach(): Unit = { + super.beforeEach() + val cat = catalog + val ident = identifier + if (cat.tableExists(ident)) cat.dropTable(ident) + cat.clearChangeRows(ident) + cat.setChangelogProperties(ident, ChangelogProperties()) + cat.createTable( + ident, + Array( + Column.create("id", LongType, false), + Column.create("row_commit_version", LongType, false)), + Array.empty[Transform], + Collections.emptyMap[String, String]()) + } + + private def catalog: InMemoryChangelogCatalog = + spark.sessionState.catalogManager + .catalog(catalogName) + .asInstanceOf[InMemoryChangelogCatalog] + + private def identifier: Identifier = Identifier.of(Array.empty, testTableName) + + private def streamingDf(opts: (String, String)*): DataFrame = { + val reader = spark.readStream.option("startingVersion", "1") + opts.foldLeft(reader) { case (r, (k, v)) => r.option(k, v) } + .changes(s"$catalogName.$testTableName") + } + + private def assertWatermarkOnCommitTimestamp(plan: LogicalPlan): Unit = { + val wm = plan.collect { case w: EventTimeWatermark => w } + assert(wm.size == 1, + s"Expected exactly one EventTimeWatermark; found ${wm.size}. Plan:\n$plan") + assert(wm.head.eventTime.name == "_commit_timestamp", + s"Watermark must be on `_commit_timestamp` but was on `${wm.head.eventTime.name}`. " + + s"Plan:\n$plan") + assert(wm.head.delay.months == 0 && wm.head.delay.days == 0 && + wm.head.delay.microseconds == 0L, + s"Watermark delay must be zero. Plan:\n$plan") + } + + private def assertNoStreamingPostProcessing(plan: LogicalPlan): Unit = { + assert(plan.collect { case w: EventTimeWatermark => w }.isEmpty, + s"No EventTimeWatermark expected for raw streaming pass-through. Plan:\n$plan") + val planStr = plan.treeString + assert(!planStr.contains("__spark_cdc_"), + s"Helper columns must not appear in pass-through plan. Plan:\n$planStr") + } + + private def assertHelperColumnsRemoved(plan: LogicalPlan): Unit = { + val outputNames = plan.output.map(_.name).toSet + assert(!outputNames.exists(_.startsWith("__spark_cdc_")), + s"Helper columns must be dropped before the user-visible output. Output: " + + outputNames.mkString(", ")) + } + + private def assertNoWatermarkMetadataOnOutput(plan: LogicalPlan): Unit = { + import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark + val leaks = plan.output.filter(_.metadata.contains(EventTimeWatermark.delayKey)) + assert(leaks.isEmpty, + s"User-visible output must not carry EventTimeWatermark.delayKey metadata; " + + s"found on: ${leaks.map(_.name).mkString(",")}. Plan:\n$plan") + } + + private def assertInlineGenerate(plan: LogicalPlan): Unit = { + val gens = plan.collect { case g: Generate => g } + assert(gens.size == 1, + s"Expected exactly one Generate; found ${gens.size}. Plan:\n$plan") + assert(gens.head.generator.isInstanceOf[Inline], + s"Generate must use Inline. Plan:\n$plan") + } + + private def assertContainsNullCommitTimestampGuard(plan: LogicalPlan): Unit = { + val nullGuards = plan.collect { + case f: Filter + if f.condition.toString.contains("NULL_COMMIT_TIMESTAMP") => f + } + assert(nullGuards.size == 1, + s"Expected exactly one NULL_COMMIT_TIMESTAMP guard Filter. Plan:\n$plan") + } + + // =========================================================================== + // Carry-over removal only + // =========================================================================== + + test("carry-over removal injects watermark + Aggregate + Filter + Generate") { + catalog.setChangelogProperties(identifier, ChangelogProperties( + containsCarryoverRows = true, + rowIdNames = Seq("id"), + rowVersionName = Some("row_commit_version"))) + + val analyzed = streamingDf().queryExecution.analyzed + assertWatermarkOnCommitTimestamp(analyzed) + + val aggs = analyzed.collect { case a: Aggregate => a } + assert(aggs.size == 1, s"Expected one Aggregate. Plan:\n$analyzed") + val groupingNames = aggs.head.groupingExpressions.collect { + case ne: org.apache.spark.sql.catalyst.expressions.NamedExpression => ne.name + } + assert(groupingNames.toSet == Set("id", "_commit_version", "_commit_timestamp"), + s"Expected grouping by (id, _commit_version, _commit_timestamp); got $groupingNames") + + // Two Filters: the NULL `_commit_timestamp` guard + the carry-over predicate. + val filters = analyzed.collect { case f: Filter => f } + assert(filters.size == 2, + s"Expected NULL guard + carry-over Filter. Plan:\n$analyzed") + assertContainsNullCommitTimestampGuard(analyzed) + + assertInlineGenerate(analyzed) + assertHelperColumnsRemoved(analyzed) + assertNoWatermarkMetadataOnOutput(analyzed) + } + + // =========================================================================== + // Update detection only + // =========================================================================== + + test("update detection alone injects watermark + Aggregate + Generate + relabel Project") { + catalog.setChangelogProperties(identifier, ChangelogProperties( + representsUpdateAsDeleteAndInsert = true, + rowIdNames = Seq("id"), + rowVersionName = Some("row_commit_version"))) + + val analyzed = streamingDf( + "computeUpdates" -> "true", + "deduplicationMode" -> "none").queryExecution.analyzed + assertWatermarkOnCommitTimestamp(analyzed) + + // No carry-over Filter when only update detection runs -- but the NULL + // `_commit_timestamp` guard Filter is always present. + val filters = analyzed.collect { case f: Filter => f } + assert(filters.size == 1, + s"Only the NULL guard Filter is expected for update-detection-only path. " + + s"Plan:\n$analyzed") + assertContainsNullCommitTimestampGuard(analyzed) + + assertInlineGenerate(analyzed) + + // The relabel Project must reference _change_type (CaseWhen rewrites it). + val projects = analyzed.collect { case p: Project => p } + assert(projects.exists { p => + p.projectList.exists(_.toString.toLowerCase.contains("update_preimage")) + }, s"Expected a Project that emits `update_preimage`. Plan:\n$analyzed") + + assertHelperColumnsRemoved(analyzed) + assertNoWatermarkMetadataOnOutput(analyzed) + } + + // =========================================================================== + // Both passes + // =========================================================================== + + test("carry-over + update detection share a single Aggregate") { + catalog.setChangelogProperties(identifier, ChangelogProperties( + containsCarryoverRows = true, + representsUpdateAsDeleteAndInsert = true, + rowIdNames = Seq("id"), + rowVersionName = Some("row_commit_version"))) + + val analyzed = streamingDf("computeUpdates" -> "true").queryExecution.analyzed + assertWatermarkOnCommitTimestamp(analyzed) + + val aggs = analyzed.collect { case a: Aggregate => a } + assert(aggs.size == 1, s"Should fuse both passes into a single Aggregate. Plan:\n$analyzed") + + // Two Filters: NULL guard + carry-over removal. + val filters = analyzed.collect { case f: Filter => f } + assert(filters.size == 2, + s"Expected NULL guard + carry-over Filter for combined path. Plan:\n$analyzed") + assertContainsNullCommitTimestampGuard(analyzed) + + assertInlineGenerate(analyzed) + assertHelperColumnsRemoved(analyzed) + assertNoWatermarkMetadataOnOutput(analyzed) + } + + // =========================================================================== + // Net changes + // =========================================================================== + + test("netChanges alone injects watermark + TransformWithState") { + catalog.setChangelogProperties(identifier, ChangelogProperties( + containsIntermediateChanges = true, + rowIdNames = Seq("id"), + rowVersionName = Some("row_commit_version"))) + + val analyzed = streamingDf( + "deduplicationMode" -> "netChanges").queryExecution.analyzed + assertWatermarkOnCommitTimestamp(analyzed) + val tws = analyzed.collect { case t: TransformWithState => t } + assert(tws.size == 1, + s"Expected exactly one TransformWithState; found ${tws.size}. Plan:\n$analyzed") + // Guard against a regression that grouped by the wrong attributes (e.g. omitting a + // rowId column or grouping by `_commit_version`) -- the size check alone would still + // pass. + val groupingNames = tws.head.groupingAttributes.map(_.name) + assert(groupingNames == Seq("__spark_cdc_rowid_0"), + s"Expected TransformWithState grouping by [__spark_cdc_rowid_0]; got $groupingNames. " + + s"Plan:\n$analyzed") + assertHelperColumnsRemoved(analyzed) + } + + test("netChanges with composite rowId groups by all helper columns") { + // Recreate with a two-column rowId so we exercise the rowIdColumn(idx) helper + // for idx > 0. The single-rowId test asserts the size-1 case; this guards + // against a regression that hard-codes a single helper attribute. + val cat = catalog + val ident = identifier + cat.dropTable(ident) + cat.createTable( + ident, + Array( + Column.create("ns", LongType, false), + Column.create("id", LongType, false), + Column.create("row_commit_version", LongType, false)), + Array.empty[Transform], + Collections.emptyMap[String, String]()) + cat.setChangelogProperties(ident, ChangelogProperties( + containsIntermediateChanges = true, + rowIdNames = Seq("ns", "id"), + rowVersionName = Some("row_commit_version"))) + + val analyzed = streamingDf( + "deduplicationMode" -> "netChanges").queryExecution.analyzed + assertWatermarkOnCommitTimestamp(analyzed) + val tws = analyzed.collect { case t: TransformWithState => t } + assert(tws.size == 1, s"Expected one TransformWithState. Plan:\n$analyzed") + val groupingNames = tws.head.groupingAttributes.map(_.name) + assert(groupingNames == Seq("__spark_cdc_rowid_0", "__spark_cdc_rowid_1"), + s"Expected grouping by [__spark_cdc_rowid_0, __spark_cdc_rowid_1]; got $groupingNames. " + + s"Plan:\n$analyzed") + assertHelperColumnsRemoved(analyzed) + } + + test("netChanges + carry-over removal share a single watermark") { + catalog.setChangelogProperties(identifier, ChangelogProperties( + containsCarryoverRows = true, + containsIntermediateChanges = true, + rowIdNames = Seq("id"), + rowVersionName = Some("row_commit_version"))) + + val analyzed = streamingDf( + "deduplicationMode" -> "netChanges").queryExecution.analyzed + val watermarks = analyzed.collect { case w: EventTimeWatermark => w } + assert(watermarks.size == 1, + s"Combined row-level + netChanges path should share one EventTimeWatermark. " + + s"Plan:\n$analyzed") + val aggs = analyzed.collect { case a: Aggregate => a } + assert(aggs.size == 1, + s"Row-level Aggregate should still be present. Plan:\n$analyzed") + val tws = analyzed.collect { case t: TransformWithState => t } + assert(tws.size == 1, + s"netChanges TransformWithState should be on top of the row-level rewrite. " + + s"Plan:\n$analyzed") + assertHelperColumnsRemoved(analyzed) + } + + // =========================================================================== + // No post-processing -> no rewrite + // =========================================================================== + + test("no post-processing required: raw streaming relation passes through") { + // No capability flags set -> no Aggregate, no watermark. + val analyzed = streamingDf().queryExecution.analyzed + assertNoStreamingPostProcessing(analyzed) + } + + test("computeUpdates without representsUpdateAsDeleteAndInsert: no rewrite") { + // Connector says updates are already materialized -> nothing to do. + val analyzed = streamingDf( + "computeUpdates" -> "true").queryExecution.analyzed + assertNoStreamingPostProcessing(analyzed) + } + + // =========================================================================== + // Watermark metadata is internal-only and stripped from user-visible output + // =========================================================================== + + test("watermark metadata is stripped from user-visible _commit_timestamp") { + import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark + catalog.setChangelogProperties(identifier, ChangelogProperties( + containsCarryoverRows = true, + rowIdNames = Seq("id"), + rowVersionName = Some("row_commit_version"))) + + val analyzed = streamingDf().queryExecution.analyzed + // Internally, the EventTimeWatermark must still be present. + assertWatermarkOnCommitTimestamp(analyzed) + // But none of the user-visible output attributes should leak the watermark + // metadata; downstream user-supplied watermarks must not interact with our + // auto-injected internal watermark via the global multi-watermark policy. + val ts = analyzed.output.find(_.name == "_commit_timestamp") + assert(ts.isDefined, s"Expected `_commit_timestamp` in output. Plan:\n$analyzed") + assert(!ts.get.metadata.contains(EventTimeWatermark.delayKey), + s"Watermark metadata leaked to user-visible `_commit_timestamp`. Plan:\n$analyzed") + } + + // =========================================================================== + // NULL _commit_timestamp guard + // =========================================================================== + + test("NULL _commit_timestamp guard Filter is the first operator after the source") { + catalog.setChangelogProperties(identifier, ChangelogProperties( + containsCarryoverRows = true, + rowIdNames = Seq("id"), + rowVersionName = Some("row_commit_version"))) + + val analyzed = streamingDf().queryExecution.analyzed + // The guard must sit BELOW the EventTimeWatermark (we don't want a NULL row to + // be considered for watermark advancement at all). Verify by walking the plan + // top-down and finding the guard before any Aggregate. + val guards = analyzed.collect { + case f: Filter if f.condition.toString.contains("NULL_COMMIT_TIMESTAMP") => f + } + assert(guards.size == 1, s"Expected exactly one guard. Plan:\n$analyzed") + val guard = guards.head + val guardChild = guard.child + // The guard's child should be the bare relation (or a SubqueryAlias wrapping it), + // not the EventTimeWatermark. + val isSourceBelowGuard = guardChild match { + case _: org.apache.spark.sql.catalyst.streaming.StreamingRelationV2 => true + case org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias(_, + _: org.apache.spark.sql.catalyst.streaming.StreamingRelationV2) => true + case _ => false + } + assert(isSourceBelowGuard, + s"NULL guard Filter should sit directly above the streaming relation. Plan:\n$analyzed") + } +}