Skip to content

Commit e6ff38e

Browse files
committed
[SPARK-56686][SQL] Support streaming row-level CDC post-processing
### What changes were proposed in this pull request? This PR implements row-level CDC post-processing (carry-over removal and update detection) for DSv2 streaming reads. Previously, streaming `changes()` rejected any post-processing with a blanket `INVALID_CDC_OPTION.STREAMING_POST_PROCESSING_NOT_SUPPORTED` error. The batch path (added in #55508 and #55583) uses a Catalyst `Window` keyed by `(rowId, _commit_version)`, which `UnsupportedOperationChecker` rejects on streaming queries (`NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING`). The streaming rewrite in `ResolveChangelogTable` now expresses the same logic with streaming-allowed primitives: ``` EventTimeWatermark(_commit_timestamp, 0s) -> Aggregate keyed by (rowId..., _commit_version, _commit_timestamp) (count_if delete/insert, [min/max/count rowVersion,] collect_list(struct(*))) -> [Filter on the carry-over predicate] -> Generate(Inline(events)) -> [Project relabeling _change_type for delete+insert pairs] -> Project dropping __spark_cdc_* helpers ``` Including `_commit_timestamp` in the grouping keys is required to satisfy the Append-mode streaming aggregation contract (the watermark attribute must appear among the grouping expressions). By CDC convention all rows in a single commit share `_commit_timestamp`, so this is a no-op semantically relative to the batch `(rowId, _commit_version)` grouping. `deduplicationMode = netChanges` is still rejected -- net change computation partitions by `rowId` alone and reasons over the entire requested range, which is fundamentally cross-batch. The existing error class `INVALID_CDC_OPTION.STREAMING_POST_PROCESSING_NOT_SUPPORTED` is replaced with the more specific `INVALID_CDC_OPTION.STREAMING_NET_CHANGES_NOT_SUPPORTED`, which now names the offending option and points users at the supported streaming alternatives. Doc updates: - `Changelog.java` clarifies that all rows of a single `_commit_version` must share `_commit_timestamp`, and that streaming reads expect non-decreasing `_commit_timestamp` across micro-batches. - `Changelog.java` notes that `containsIntermediateChanges()` is range-scoped, hence the streaming limitation for `netChanges`. - `DataStreamReader.changes()` Scaladoc lists the `netChanges` streaming limitation. ### Why are the changes needed? Without this PR, any streaming CDC read against a connector that emits CoW carry-over pairs (`containsCarryoverRows = true`) or represents updates as raw delete+insert (`representsUpdateAsDeleteAndInsert = true`) raises an analysis error, forcing users to fall back to batch reads. The batch-only restriction is unnecessary for these passes -- they don't need cross-version state -- and it surprises users since the same options work on batch reads. ### Does this PR introduce _any_ user-facing change? Yes. - Streaming `spark.readStream.changes(...)` now supports `computeUpdates = true` and `deduplicationMode = dropCarryovers`. Previously these threw `INVALID_CDC_OPTION.STREAMING_POST_PROCESSING_NOT_SUPPORTED`. - The error class `INVALID_CDC_OPTION.STREAMING_POST_PROCESSING_NOT_SUPPORTED` is renamed to `INVALID_CDC_OPTION.STREAMING_NET_CHANGES_NOT_SUPPORTED` with a more specific message. The new error fires only for `deduplicationMode = netChanges` on streaming reads. - `DataStreamReader.changes()` Scaladoc is updated accordingly. - `Changelog.java` Scaladoc clarifies the `_commit_timestamp` contract for streaming. ### How was this patch tested? 86 tests across 4 CDC suites (all passing): - `ResolveChangelogTableStreamingPostProcessingSuite` (new, 5 tests) -- plan-shape assertions covering carry-over only, update detection only, both fused, and the no-rewrite pass-through cases. Verifies the `EventTimeWatermark` + `Aggregate` + `Generate(Inline)` rewrite shape. - `ChangelogResolutionSuite` -- the two existing streaming throw-tests are flipped to plan-shape assertions; a new test covers the `netChanges` streaming throw. - `ResolveChangelogTablePostProcessingSuite` -- the existing streaming throw test is updated to cover the `netChanges`-only case. - `ChangelogEndToEndSuite` -- three new streaming end-to-end tests using `InMemoryChangelogCatalog`: carry-over removal drops CoW pairs, update detection relabels delete+insert as update, and `netChanges` throws. Also confirmed `UnsupportedOperationsSuite` (216 tests) still passes -- the rewritten plan does not contain `Window` or any other streaming-rejected operator. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code (claude-opus-4-7) Closes #55636 from gengliangwang/streamingCDC. Authored-by: Gengliang Wang <gengliang@apache.org> Signed-off-by: Gengliang Wang <gengliang@apache.org>
1 parent 8457567 commit e6ff38e

11 files changed

Lines changed: 1078 additions & 55 deletions

File tree

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -666,6 +666,11 @@
666666
"The Change Data Capture (CDC) connector violated the `Changelog` contract at runtime."
667667
],
668668
"subClass" : {
669+
"NULL_COMMIT_TIMESTAMP" : {
670+
"message" : [
671+
"Connector emitted a row with a NULL `_commit_timestamp` on a streaming read engaging post-processing. The `Changelog` contract requires `_commit_timestamp` to be non-NULL for streaming reads, since post-processing uses it as event time to advance the watermark."
672+
]
673+
},
669674
"UNEXPECTED_CHANGE_TYPE" : {
670675
"message" : [
671676
"Connector emitted a row with a `_change_type` value that is not one of the four supported types (`insert`, `delete`, `update_preimage`, `update_postimage`). The `Changelog` contract requires every emitted row to carry one of these four values."
@@ -3303,9 +3308,9 @@
33033308
"`startingVersion` is required when `endingVersion` is specified for CDC queries."
33043309
]
33053310
},
3306-
"STREAMING_POST_PROCESSING_NOT_SUPPORTED" : {
3311+
"STREAMING_NET_CHANGES_NOT_SUPPORTED" : {
33073312
"message" : [
3308-
"Change Data Capture (CDC) streaming reads on connector `<changelogName>` do not yet support post-processing (carry-over removal, update detection, or net change computation). The requested combination of options would require post-processing, which is currently only available for batch reads. Use a batch read, or set `deduplicationMode = none` and `computeUpdates = false` to receive raw change rows in streaming."
3313+
"Change Data Capture (CDC) streaming reads on connector `<changelogName>` do not yet support net change computation (`deduplicationMode = netChanges`). Net change computation reasons over the entire requested version range and is currently only available for batch reads. Use a batch read, or set `deduplicationMode` to `none` or `dropCarryovers` for streaming."
33093314
]
33103315
},
33113316
"UPDATE_DETECTION_REQUIRES_CARRY_OVER_REMOVAL" : {

sql/api/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,26 @@ abstract class DataStreamReader {
131131
* .changes("my_table")
132132
* }}}
133133
*
134+
* Streaming reads support the same `computeUpdates` and `deduplicationMode = dropCarryovers`
135+
* post-processing as batch reads. `deduplicationMode = netChanges` is currently batch-only --
136+
* it requires reasoning over the entire requested range, which is not incrementalized yet.
137+
* Requesting it on a streaming read raises an explicit
138+
* `INVALID_CDC_OPTION.STREAMING_NET_CHANGES_NOT_SUPPORTED` error.
139+
*
140+
* When the requested options engage row-level post-processing (carry-over removal or update
141+
* detection), the rewrite injects an internal `EventTimeWatermark` on `_commit_timestamp` and a
142+
* stateful streaming aggregate. Two implications follow:
143+
* - A commit's events are emitted in the next micro-batch after the commit is read
144+
* (append-mode aggregate eviction is `eventTime &lt;= watermark`, and the watermark
145+
* advances to the max `_commit_timestamp` observed in the previous batch). A stream that
146+
* reads its last commit and stops will keep that commit's events in state until a
147+
* subsequent (no-data) micro-batch fires.
148+
* - The query is constrained to `Append` output mode; `Update` and `Complete` are rejected at
149+
* writer-start time with `STREAMING_OUTPUT_MODE.UNSUPPORTED_OPERATION`. The internal
150+
* watermark metadata is stripped from the user-visible `_commit_timestamp` output, so
151+
* downstream user-supplied watermarks on other columns do not interact with it via the
152+
* global multi-watermark policy.
153+
*
134154
* @param tableName
135155
* a qualified or unqualified name that designates a table.
136156
* @since 4.2.0

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Changelog.java

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,41 @@
3535
* {@code update_preimage}, or {@code update_postimage}</li>
3636
* <li>{@code _commit_version} (connector-defined type, e.g. LONG) — the version containing
3737
* this change</li>
38-
* <li>{@code _commit_timestamp} (TIMESTAMP) — the timestamp of the commit</li>
38+
* <li>{@code _commit_timestamp} (TIMESTAMP) -- the timestamp of the commit. All rows
39+
* belonging to a single {@code _commit_version} must share the same
40+
* {@code _commit_timestamp}. For streaming reads with post-processing enabled,
41+
* two additional requirements apply:
42+
* <ol>
43+
* <li>All rows of a single commit must appear in the same micro-batch (i.e.
44+
* micro-batch boundaries align with commit boundaries).</li>
45+
* <li>Each micro-batch's rows must have {@code _commit_timestamp} strictly
46+
* greater than the maximum {@code _commit_timestamp} of any prior
47+
* micro-batch.</li>
48+
* </ol>
49+
* Streaming post-processing uses {@code _commit_timestamp} as event time with a
50+
* zero-delay watermark, so once a micro-batch observes max event time T the
51+
* global watermark advances to T. Both Spark's late-event filter and its
52+
* state-eviction predicate then use {@code eventTime <= T} -- so any later row
53+
* at {@code _commit_timestamp <= T} (whether from the same commit split across
54+
* batches, a different commit emitted later, or simply an out-of-order commit)
55+
* is silently dropped as late. Requirement 1 keeps a single commit's rows
56+
* together; requirement 2 keeps distinct commits in strictly increasing
57+
* event-time order across batches. Multiple distinct commits with equal
58+
* {@code _commit_timestamp} are allowed within a single micro-batch -- only
59+
* <em>across</em> batches does timestamp progression need to be strictly
60+
* increasing. Atomic-commit CDC connectors (e.g. Delta versions, Iceberg
61+
* snapshots) that derive {@code _commit_timestamp} from wall-clock time at
62+
* commit time naturally satisfy both requirements.
63+
* {@code _commit_timestamp} must be non-{@code NULL} on every row of a streaming
64+
* read engaging post-processing. The row-level rewrite raises
65+
* {@code CHANGELOG_CONTRACT_VIOLATION.NULL_COMMIT_TIMESTAMP} on any row that
66+
* violates this; without the guard a NULL group key would never satisfy the
67+
* watermark eviction predicate and the row would sit in state indefinitely</li>
3968
* </ul>
69+
* <p>
70+
* Streaming reads support carry-over removal and update detection but not net change
71+
* computation. The latter requires reasoning over the entire requested range and is
72+
* batch-only.
4073
*
4174
* @since 4.2.0
4275
*/
@@ -81,6 +114,12 @@ public interface Changelog {
81114
* Spark will collapse multiple changes per row identity into the net effect.
82115
* If {@code false}, the connector guarantees at most one change per row identity across
83116
* the entire changelog range, and Spark will skip net change computation.
117+
* <p>
118+
* Note this flag is range-scoped (across all commits in the request), not
119+
* micro-batch-scoped. Streaming CDC reads currently reject
120+
* {@code deduplicationMode = netChanges} because the per-row-identity collapse cannot
121+
* be incrementalized: a row's full history may span an unbounded number of
122+
* micro-batches.
84123
*/
85124
boolean containsIntermediateChanges();
86125

0 commit comments

Comments
 (0)