Skip to content

Commit 0eb4fc1

Browse files
committed
[SPARK-56687][SQL] Support netChanges for DSv2 CDC streaming reads
### What changes were proposed in this pull request? This PR completes the DSv2 CDC streaming post-processing surface by implementing `deduplicationMode = netChanges` for streaming reads. The previous PR (#55636 / SPARK-56686) added carry-over removal and update detection for streaming but left netChanges batch-only. The batch path (`ResolveChangelogTable.injectNetChangeComputation`) uses a Catalyst `Window` partitioned by `rowId` and ordered by `(_commit_version, change_type_rank)` to find the first and last events per row identity, then applies the SPIP collapse matrix on `(existedBefore, existsAfter)`. `Window` is rejected on streaming children (`NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING`), and unlike the row-level passes the netChanges aggregate is keyed by `rowId` only -- there's no commit-version + commit-timestamp grouping that would let us reuse the streaming Aggregate pattern. This PR adds a streaming-friendly equivalent by delegating per-row-identity state management to a new `CdcNetChangesStatefulProcessor` driven by `TransformWithState`: - The processor stores the first event ever observed and the most-recent event observed for each row identity in `ValueState[Row]`. - An event-time timer is armed on each batch to the latest `_commit_timestamp` observed for the key. When the global watermark advances past the timer, `handleExpiredTimer` evaluates the SPIP matrix and emits 0, 1, or 2 output rows -- identical semantics to the batch path. - Existing per-key timers are deleted before re-arming so that out-of-order events for an earlier commit can't fire a stale timer between batches and produce duplicate output for the same row identity. The analyzer rule constructs `TransformWithState` directly (no precedent in catalyst for this; the typed-Dataset DSL is the usual entry point). Encoders for the input/output `Row` and the rowId tuple are built via `ExpressionEncoder(StructType)`. Nested rowId paths (e.g. `payload.id`) are handled by aliasing each rowId expression to a top-level `__spark_cdc_rowid_<i>` helper column before the `TransformWithState`, then dropping the helpers in a final `Project` so the user-visible schema matches the connector's declared changelog schema. Plan shape: ``` EventTimeWatermark(_commit_timestamp, 0s) -> Project (alias rowId expressions to flat helper columns) -> TransformWithState (grouping = rowId helpers, EventTime mode, Append) -> SerializeFromObject -> Project (drop rowId helper columns) ``` When carry-over removal / update detection are also requested, the row-level rewrite is applied first; the netChanges `TransformWithState` then sits on top of it and the rule reuses the existing `EventTimeWatermark` rather than stacking another (multi-watermark stacking is rejected unless `STATEFUL_OPERATOR_ALLOW_MULTIPLE` is set). #### Documented limitation Row identities only touched in the latest observed commit are held back until a later commit (with strictly greater `_commit_timestamp`) advances the watermark past them, or the source terminates. End-of-input flushes all timers, so bounded streams produce output equivalent to the corresponding batch read. This matches the steady-state behavior of the row-level streaming rewrite. Also removes the now-obsolete error class `INVALID_CDC_OPTION.STREAMING_NET_CHANGES_NOT_SUPPORTED` introduced in SPARK-56686. ### Why are the changes needed? Without this PR, `deduplicationMode = netChanges` is unavailable on streaming CDC reads, forcing users with intermediate-state connectors (`containsIntermediateChanges = true`) to fall back to batch reads when they want a deduplicated change feed. With SPARK-56686 already shipping carry-over removal and update detection for streaming, netChanges was the only post-processing pass still gated to batch -- this completes the surface. ### Does this PR introduce _any_ user-facing change? Yes. - Streaming `spark.readStream.changes(...)` now supports `deduplicationMode = netChanges`. Previously this threw `INVALID_CDC_OPTION.STREAMING_NET_CHANGES_NOT_SUPPORTED`. - That error class is removed; the wording in `DataStreamReader.changes()` and `Changelog.java` Scaladoc has been updated to describe the supported behavior and the latest-commit limitation. Note: the netChanges streaming path uses `TransformWithState`, which requires the RocksDB state store backend (`spark.sql.streaming.stateStore.providerClass = ...RocksDBStateStoreProvider`). Spark surfaces `UNSUPPORTED_FEATURE.STORE_BACKEND_NOT_SUPPORTED_FOR_TWS` if the default HDFS-backed provider is left in place, so this is discoverable. ### How was this patch tested? 89 tests pass across 4 CDC suites (all green): - `ResolveChangelogTableStreamingPostProcessingSuite` -- two new plan-shape tests: `netChanges alone injects watermark + TransformWithState` and `netChanges + carry-over removal share a single watermark` (verifies that the netChanges `TransformWithState` reuses the row-level rewrite's `EventTimeWatermark` rather than stacking another). - `ChangelogResolutionSuite` -- the `netChanges throws` test from SPARK-56686 is flipped to assert that exactly one `TransformWithState` appears in the analyzed plan. - `ResolveChangelogTablePostProcessingSuite` -- the corresponding netChanges throw test is similarly flipped. - `ChangelogEndToEndSuite` -- two new end-to-end tests that drive a streaming query against `InMemoryChangelogCatalog` with the RocksDB state store: `streaming netChanges collapses INSERT then DELETE to no output` (confirms the `(false, false)` cancel case and that end-of-input flushes the latest commit's group) and `streaming netChanges with computeUpdates labels persisting rows as updates` (confirms the `(false, true)` case relabels correctly). Also confirmed `UnsupportedOperationsSuite` (216 tests) still passes. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code (claude-opus-4-7) Closes #55637 from gengliangwang/streamingCDC-netChanges. Authored-by: Gengliang Wang <gengliang@apache.org> Signed-off-by: Gengliang Wang <gengliang@apache.org>
1 parent 2df302d commit 0eb4fc1

12 files changed

Lines changed: 843 additions & 106 deletions

File tree

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3308,11 +3308,6 @@
33083308
"`startingVersion` is required when `endingVersion` is specified for CDC queries."
33093309
]
33103310
},
3311-
"STREAMING_NET_CHANGES_NOT_SUPPORTED" : {
3312-
"message" : [
3313-
"Change Data Capture (CDC) streaming reads on connector `<changelogName>` do not yet support net change computation (`deduplicationMode = netChanges`). Net change computation reasons over the entire requested version range and is currently only available for batch reads. Use a batch read, or set `deduplicationMode` to `none` or `dropCarryovers` for streaming."
3314-
]
3315-
},
33163311
"UPDATE_DETECTION_REQUIRES_CARRY_OVER_REMOVAL" : {
33173312
"message" : [
33183313
"`computeUpdates` cannot be used with `deduplicationMode=none` on connector `<changelogName>` because the connector emits copy-on-write carry-over pairs (`containsCarryoverRows()` returns true) that would be silently mislabeled as updates. Set `deduplicationMode` to `dropCarryovers` or `netChanges`."

sql/api/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -131,15 +131,35 @@ abstract class DataStreamReader {
131131
* .changes("my_table")
132132
* }}}
133133
*
134-
* Streaming reads support the same `computeUpdates` and `deduplicationMode = dropCarryovers`
135-
* post-processing as batch reads. `deduplicationMode = netChanges` is currently batch-only --
136-
* it requires reasoning over the entire requested range, which is not incrementalized yet.
137-
* Requesting it on a streaming read raises an explicit
138-
* `INVALID_CDC_OPTION.STREAMING_NET_CHANGES_NOT_SUPPORTED` error.
139-
*
140-
* When the requested options engage row-level post-processing (carry-over removal or update
141-
* detection), the rewrite injects an internal `EventTimeWatermark` on `_commit_timestamp` and a
142-
* stateful streaming aggregate. Two implications follow:
134+
* Streaming reads support all of the same post-processing as batch reads -- `computeUpdates`,
135+
* `deduplicationMode = dropCarryovers`, and `deduplicationMode = netChanges`. The streaming
136+
* netChanges path holds per-row-identity state in the state store and emits the SPIP collapse
137+
* output once the global watermark advances past the last `_commit_timestamp` observed for that
138+
* row identity. Row identities only touched in the latest observed commit are therefore not
139+
* emitted until a later commit (with strictly greater `_commit_timestamp`) advances the
140+
* watermark past them, or the source terminates.
141+
*
142+
* Streaming netChanges differs from batch netChanges in scope. Batch netChanges collapses all
143+
* changes for a row identity over the entire requested version range. Streaming netChanges is
144+
* incremental: it collapses changes that fall within a single watermark window for a row
145+
* identity (i.e. up to the timer firing that emits its current net result). After a row
146+
* identity's net result has been emitted, subsequent commits on the same identity start a fresh
147+
* window and produce additional output rows -- streaming cannot retract previously emitted
148+
* results to match the batch range-scoped collapse. For a query that observes id=1 inserted at
149+
* v1 and deleted at v3 with another commit at v2 in between, batch netChanges over [v1..v3]
150+
* cancels to no row, while streaming emits an `insert` (when v2 advances the watermark past v1)
151+
* followed later by a `delete` (when end-of-stream or another commit advances the watermark
152+
* past v3).
153+
*
154+
* Because the streaming netChanges path uses `transformWithState`, the state store provider
155+
* must be RocksDB. Set `spark.sql.streaming.stateStore.providerClass` to
156+
* `org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider` before starting a
157+
* streaming netChanges query; the default HDFS-backed provider is rejected at query start.
158+
*
159+
* When the requested options engage post-processing (carry-over removal, update detection, or
160+
* netChanges), the rewrite injects an internal `EventTimeWatermark` on `_commit_timestamp` and
161+
* a stateful streaming operator (an aggregate for the row-level passes, a `transformWithState`
162+
* for netChanges). Two implications follow:
143163
* - A commit's events are emitted in the next micro-batch after the commit is read
144164
* (append-mode aggregate eviction is `eventTime &lt;= watermark`, and the watermark
145165
* advances to the max `_commit_timestamp` observed in the previous batch). A stream that

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Changelog.java

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,12 @@
3333
* <ul>
3434
* <li>{@code _change_type} (STRING) — the kind of change: {@code insert}, {@code delete},
3535
* {@code update_preimage}, or {@code update_postimage}</li>
36-
* <li>{@code _commit_version} (connector-defined type, e.g. LONG) — the version containing
37-
* this change</li>
36+
* <li>{@code _commit_version} — the commit version containing this change. Must be of
37+
* an atomic orderable type (e.g. {@code LongType}, {@code StringType},
38+
* {@code IntegerType}, {@code TimestampType}); complex types
39+
* ({@code ArrayType}, {@code MapType}, {@code StructType}) are rejected.
40+
* Spark post-processing sorts rows of a given row identity by this column to
41+
* determine the first and last events</li>
3842
* <li>{@code _commit_timestamp} (TIMESTAMP) -- the timestamp of the commit. All rows
3943
* belonging to a single {@code _commit_version} must share the same
4044
* {@code _commit_timestamp}. For streaming reads with post-processing enabled,
@@ -61,15 +65,16 @@
6165
* snapshots) that derive {@code _commit_timestamp} from wall-clock time at
6266
* commit time naturally satisfy both requirements.
6367
* {@code _commit_timestamp} must be non-{@code NULL} on every row of a streaming
64-
* read engaging post-processing. The row-level rewrite raises
65-
* {@code CHANGELOG_CONTRACT_VIOLATION.NULL_COMMIT_TIMESTAMP} on any row that
66-
* violates this; without the guard a NULL group key would never satisfy the
67-
* watermark eviction predicate and the row would sit in state indefinitely</li>
68+
* read engaging post-processing; both the row-level Aggregate path and the
69+
* netChanges {@code transformWithState} path raise
70+
* {@code CHANGELOG_CONTRACT_VIOLATION.NULL_COMMIT_TIMESTAMP} on a violation</li>
6871
* </ul>
6972
* <p>
70-
* Streaming reads support carry-over removal and update detection but not net change
71-
* computation. The latter requires reasoning over the entire requested range and is
72-
* batch-only.
73+
* Streaming reads support carry-over removal, update detection, and net change
74+
* computation. Net change collapses are kept in the state store keyed by row identity;
75+
* row identities only touched in the latest observed commit are held back until either a
76+
* later commit (with strictly greater `_commit_timestamp`) advances the global watermark
77+
* past them, or the source terminates.
7378
*
7479
* @since 4.2.0
7580
*/
@@ -116,10 +121,7 @@ public interface Changelog {
116121
* the entire changelog range, and Spark will skip net change computation.
117122
* <p>
118123
* Note this flag is range-scoped (across all commits in the request), not
119-
* micro-batch-scoped. Streaming CDC reads currently reject
120-
* {@code deduplicationMode = netChanges} because the per-row-identity collapse cannot
121-
* be incrementalized: a row's full history may span an unbounded number of
122-
* micro-batches.
124+
* micro-batch-scoped.
123125
*/
124126
boolean containsIntermediateChanges();
125127

Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.analysis
19+
20+
import org.apache.spark.SparkException
21+
import org.apache.spark.sql.{Encoder, Row}
22+
import org.apache.spark.sql.catalyst.CatalystTypeConverters
23+
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
24+
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
25+
import org.apache.spark.sql.catalyst.util.TypeUtils
26+
import org.apache.spark.sql.connector.catalog.Changelog
27+
import org.apache.spark.sql.streaming._
28+
import org.apache.spark.sql.types.StructType
29+
30+
/**
31+
* StatefulProcessor that incrementalises CDC net-change computation for streaming reads.
32+
*
33+
* The batch path (`ResolveChangelogTable.injectNetChangeComputation`) uses a Catalyst
34+
* `Window` partitioned by `rowId` and ordered by `(_commit_version, change_type_rank)` to
35+
* extract the first and last events per row identity, then applies the SPIP collapse
36+
* matrix on `(existedBefore, existsAfter)`. That `Window` is rejected on streaming
37+
* queries (`NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING`).
38+
*
39+
* This processor reuses the same SPIP collapse matrix with `transformWithState`, applied
40+
* per watermark window rather than over the full requested version range. Per-row-identity
41+
* state stores the first event ever observed and the most-recent event observed; an event
42+
* time timer keyed on `_commit_timestamp` advances with each batch and fires once the
43+
* global watermark passes the latest event time observed for the key, at which point the
44+
* SPIP matrix is evaluated and the net result is emitted. See the paragraph below for how
45+
* the per-window collapse differs from batch netChanges' range-scoped collapse.
46+
*
47+
* Output schema: identical to the connector's changelog schema.
48+
*
49+
* Streaming netChanges is incremental: per-row-identity state is cleared once its current
50+
* net result is emitted (timer fire or end-of-stream flush). Subsequent commits on the same
51+
* identity arrive against empty state and produce additional output rows independently. This
52+
* differs from batch netChanges, which collapses every change for a row identity across the
53+
* entire requested version range; the streaming path cannot retract previously emitted output
54+
* to match that range-scoped collapse. For example, with id=1 inserted at v1 and deleted at
55+
* v3 and an unrelated commit at v2 in between, batch netChanges over [v1..v3] emits nothing
56+
* for id=1, while streaming emits an `insert` (after v2 advances the watermark past v1) and
57+
* later a `delete` (after end-of-stream or another commit advances the watermark past v3).
58+
*
59+
* End-of-stream flushes all pending timers, so a bounded stream's output matches a batch
60+
* netChanges only when no row identity is touched again after its first emission.
61+
*
62+
* @param inputSchema schema of the rows fed into this processor; the connector's
63+
* changelog schema (data columns + `_change_type` +
64+
* `_commit_version` + `_commit_timestamp`) optionally extended with
65+
* rowId helper columns added by
66+
* [[org.apache.spark.sql.catalyst.analysis.ResolveChangelogTable]].
67+
* @param computeUpdates whether `(existedBefore, existsAfter) = (true, true)` should be
68+
* relabeled as `update_preimage` / `update_postimage` (true) or kept
69+
* as `delete` / `insert` (false), matching the batch contract.
70+
*/
71+
private[analysis] class CdcNetChangesStatefulProcessor(
72+
inputSchema: StructType,
73+
computeUpdates: Boolean)
74+
extends StatefulProcessor[Row, Row, Row] {
75+
76+
@transient private var firstEvent: ValueState[Row] = _
77+
@transient private var lastEvent: ValueState[Row] = _
78+
79+
// Hoisted out of `relabel` so we don't pay a linear `fieldIndex` scan per emitted row.
80+
private val changeTypeIdx: Int = inputSchema.fieldIndex("_change_type")
81+
private val commitVersionIdx: Int = inputSchema.fieldIndex("_commit_version")
82+
83+
// `_commit_version` is connector-defined and may be any atomic orderable type
84+
// (LongType, StringType, IntegerType, TimestampType, BinaryType, ...). To order
85+
// rows generically -- without assuming the boxed external Java value is
86+
// `Comparable` (e.g. BinaryType -> Array[Byte], which is not) -- we route the
87+
// value through Catalyst: convert the external Row value to its Catalyst-internal
88+
// form and compare with the type-aware interpreted ordering. This mirrors the
89+
// batch path which uses Catalyst's `SortOrder` on the same attribute.
90+
private val versionDataType = inputSchema(commitVersionIdx).dataType
91+
private val versionToCatalyst: Any => Any =
92+
CatalystTypeConverters.createToCatalystConverter(versionDataType)
93+
private val versionInternalOrdering: Ordering[Any] =
94+
TypeUtils.getInterpretedOrdering(versionDataType)
95+
private val versionOrdering: Ordering[Row] = new Ordering[Row] {
96+
override def compare(a: Row, b: Row): Int = {
97+
val av = versionToCatalyst(a.get(commitVersionIdx))
98+
val bv = versionToCatalyst(b.get(commitVersionIdx))
99+
versionInternalOrdering.compare(av, bv)
100+
}
101+
}
102+
103+
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
104+
val handle = getHandle
105+
val rowEncoder: Encoder[Row] = ExpressionEncoder(inputSchema)
106+
firstEvent = handle.getValueState[Row]("firstEvent", rowEncoder, TTLConfig.NONE)
107+
lastEvent = handle.getValueState[Row]("lastEvent", rowEncoder, TTLConfig.NONE)
108+
}
109+
110+
override def handleInputRows(
111+
key: Row,
112+
inputRows: Iterator[Row],
113+
timerValues: TimerValues): Iterator[Row] = {
114+
val handle = getHandle
115+
// Sort by (_commit_version, change_type rank) -- pre-events (delete /
116+
// update_preimage) before post-events (insert / update_postimage) within a single
117+
// commit version, matching the batch path's `(_commit_version, change_type_rank)`
118+
// ordering. We compose the type-aware Catalyst version ordering with the rank
119+
// ordering as a tiebreaker.
120+
val sorted = inputRows.toSeq.sorted(versionOrdering.orElse(Ordering.by { row =>
121+
row.getAs[String](changeTypeIdx) match {
122+
case Changelog.CHANGE_TYPE_UPDATE_PREIMAGE | Changelog.CHANGE_TYPE_DELETE => 0
123+
case Changelog.CHANGE_TYPE_INSERT | Changelog.CHANGE_TYPE_UPDATE_POSTIMAGE => 1
124+
case _ => throw new SparkException(
125+
errorClass = "CHANGELOG_CONTRACT_VIOLATION.UNEXPECTED_CHANGE_TYPE",
126+
messageParameters = Map.empty,
127+
cause = null)
128+
}
129+
}))
130+
if (sorted.isEmpty) return Iterator.empty
131+
132+
if (!firstEvent.exists()) {
133+
firstEvent.update(sorted.head)
134+
}
135+
lastEvent.update(sorted.last)
136+
137+
// Re-arm the per-key event-time timer to the latest observed `_commit_timestamp`.
138+
// Without dropping any existing timers we'd risk an earlier timer firing first and
139+
// emitting state that later events would then re-populate, producing duplicate
140+
// output for the same row identity.
141+
//
142+
// A NULL `_commit_timestamp` cannot be turned into a timer epoch and would NPE on
143+
// `getTime()`. The `Changelog` Javadoc requires non-NULL `_commit_timestamp` on
144+
// streaming reads engaging post-processing, so we surface the contract violation
145+
// with a clear error class rather than failing the micro-batch with an opaque NPE.
146+
val ts = sorted.last.getAs[java.sql.Timestamp]("_commit_timestamp")
147+
if (ts == null) {
148+
throw new SparkException(
149+
errorClass = "CHANGELOG_CONTRACT_VIOLATION.NULL_COMMIT_TIMESTAMP",
150+
messageParameters = Map.empty,
151+
cause = null)
152+
}
153+
val newTimerMs = ts.getTime
154+
val existing = handle.listTimers().toList
155+
existing.foreach(handle.deleteTimer)
156+
handle.registerTimer(newTimerMs)
157+
158+
Iterator.empty
159+
}
160+
161+
override def handleExpiredTimer(
162+
key: Row,
163+
timerValues: TimerValues,
164+
expiredTimerInfo: ExpiredTimerInfo): Iterator[Row] = {
165+
if (!firstEvent.exists()) return Iterator.empty
166+
167+
val first = firstEvent.get()
168+
val last = lastEvent.get()
169+
val firstChangeType = first.getAs[String]("_change_type")
170+
val lastChangeType = last.getAs[String]("_change_type")
171+
172+
val existedBefore =
173+
firstChangeType == Changelog.CHANGE_TYPE_DELETE ||
174+
firstChangeType == Changelog.CHANGE_TYPE_UPDATE_PREIMAGE
175+
val existsAfter =
176+
lastChangeType == Changelog.CHANGE_TYPE_INSERT ||
177+
lastChangeType == Changelog.CHANGE_TYPE_UPDATE_POSTIMAGE
178+
179+
val (preLabel, postLabel) =
180+
if (computeUpdates) {
181+
(Changelog.CHANGE_TYPE_UPDATE_PREIMAGE, Changelog.CHANGE_TYPE_UPDATE_POSTIMAGE)
182+
} else {
183+
(Changelog.CHANGE_TYPE_DELETE, Changelog.CHANGE_TYPE_INSERT)
184+
}
185+
186+
val out: Iterator[Row] = (existedBefore, existsAfter) match {
187+
case (false, false) => Iterator.empty
188+
case (false, true) => Iterator(relabel(last, Changelog.CHANGE_TYPE_INSERT))
189+
case (true, false) => Iterator(relabel(first, Changelog.CHANGE_TYPE_DELETE))
190+
case (true, true) => Iterator(relabel(first, preLabel), relabel(last, postLabel))
191+
}
192+
193+
firstEvent.clear()
194+
lastEvent.clear()
195+
out
196+
}
197+
198+
private def relabel(row: Row, newChangeType: String): Row = {
199+
val values = row.toSeq.toArray
200+
values(changeTypeIdx) = newChangeType
201+
new GenericRowWithSchema(values, inputSchema)
202+
}
203+
}

0 commit comments

Comments
 (0)