Skip to content

Commit 096afb5

Browse files
committed
[SPARK-56686][FOLLOWUP][SQL] Mark CDC streaming rewrite via attribute metadata
### What changes were proposed in this pull request? Follow-up to #55636 addressing post-merge review comments from zikangh: 1. **Deduplicate `isCarryoverPair`.** The carry-over predicate (`_del_cnt = 1 AND _ins_cnt = 1 AND _rv_cnt = 2 AND _min_rv = _max_rv`) was duplicated between the batch path's `addCarryOverPairFilter` and the streaming path's inline filter. Extracted a shared `buildCarryOverPairPredicate` helper and call it from both. 2. **Mark the streaming row-level rewrite via attribute metadata rather than helper column name.** `UnsupportedOperationChecker` previously detected the rewrite by string-matching the `__spark_cdc_events` aggregate alias name. Switched to a metadata marker (`ResolveChangelogTable.streamingPostProcessingMarker`) attached to the alias's output attribute -- mirroring the existing `EventTimeWatermark.delayKey` and `SessionWindow.marker` patterns. The marker travels with the attribute through optimization. 3. **Expand streaming E2E coverage.** New tests in `ChangelogEndToEndSuite`: - composite rowId carry-over removal, - composite rowId update detection (different tuples kept raw), - carry-over + update detection across multiple commits, - DELETE-all-rows and UPDATE-all-rows fixtures, - append-only workload pass-through, - no-op UPDATE labeled as update (rcv differs on pre/post), - large carry-over removal (9 carry-over pairs + 1 real delete). ### Why are the changes needed? zikangh raised these on the merged PR. Bundled together so they can be reviewed and shipped as one follow-up. ### Does this PR introduce _any_ user-facing change? No. Internal refactor (#1, #2) and additional test coverage (#3). The behavior of streaming CDC reads is unchanged. ### How was this patch tested? All 157 tests pass across the four CDC suites: - `ChangelogResolutionSuite` - `ResolveChangelogTablePostProcessingSuite` - `ResolveChangelogTableStreamingPostProcessingSuite` - `ChangelogEndToEndSuite` Also confirmed: - `UnsupportedOperationsSuite` (216 tests) still passes after the marker-based detection switch. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code (claude-opus-4-7) Closes #55653 from gengliangwang/streamingCDC-followup-zikangh. Authored-by: Gengliang Wang <gengliang@apache.org> Signed-off-by: Gengliang Wang <gengliang@apache.org> (cherry picked from commit 84d9c84) Signed-off-by: Gengliang Wang <gengliang@apache.org>
1 parent 0eb4fc1 commit 096afb5

3 files changed

Lines changed: 387 additions & 24 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala

Lines changed: 38 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,17 @@ object ResolveChangelogTable extends Rule[LogicalPlan] {
9090
val all: Set[String] = Set(DelCnt, InsCnt, MinRv, MaxRv, RvCnt, Events)
9191
}
9292

93+
/**
94+
* Metadata-key marker placed on the `__spark_cdc_events` aggregate's output attribute
95+
* by [[addStreamingRowLevelPostProcessing]]. Downstream rules
96+
* (`UnsupportedOperationChecker`'s CDC-specific output-mode check) detect the
97+
* streaming row-level rewrite by looking for this marker rather than by string-matching
98+
* the helper column's name -- mirroring the existing `EventTimeWatermark.delayKey` and
99+
* `SessionWindow.marker` patterns and surviving any optimization that might relabel
100+
* or rewrite the alias.
101+
*/
102+
final val streamingPostProcessingMarker = "spark.cdc.streamingPostProcessing"
103+
93104
/**
94105
* Reserved (`__spark_cdc_*`) column names used internally by net-change
95106
* computation; connectors must not emit columns with these names.
@@ -385,23 +396,22 @@ object ResolveChangelogTable extends Rule[LogicalPlan] {
385396
// both inside the struct and as top-level grouping outputs; the top-level duplicates
386397
// are dropped via `unrequiredChildIndex` below.
387398
val structOfAllCols = CreateStruct(watermarked.output)
399+
// Attach a metadata marker to the `__spark_cdc_events` alias so downstream rules
400+
// can detect the streaming row-level rewrite by metadata rather than by helper
401+
// column name (mirrors `SessionWindow.marker` / `EventTimeWatermark.delayKey`).
402+
val eventsMetadata = new MetadataBuilder()
403+
.putBoolean(streamingPostProcessingMarker, true)
404+
.build()
388405
val eventsAlias = Alias(
389-
new CollectList(structOfAllCols).toAggregateExpression(), HelperColumn.Events)()
406+
new CollectList(structOfAllCols).toAggregateExpression(), HelperColumn.Events)(
407+
explicitMetadata = Some(eventsMetadata))
390408

391409
val aggregateExprs: Seq[NamedExpression] =
392410
groupingNamedExprs ++ Seq(delCntAlias, insCntAlias) ++ rvAliases :+ eventsAlias
393411
val aggregated = Aggregate(groupingExprs, aggregateExprs, watermarked)
394412

395413
val filtered: LogicalPlan = if (requiresCarryOverRemoval) {
396-
val delCnt = getAttribute(aggregated, HelperColumn.DelCnt)
397-
val insCnt = getAttribute(aggregated, HelperColumn.InsCnt)
398-
val minRv = getAttribute(aggregated, HelperColumn.MinRv)
399-
val maxRv = getAttribute(aggregated, HelperColumn.MaxRv)
400-
val rvCnt = getAttribute(aggregated, HelperColumn.RvCnt)
401-
val isCarryoverPair = And(
402-
And(EqualTo(delCnt, Literal(1L)), EqualTo(insCnt, Literal(1L))),
403-
And(EqualTo(rvCnt, Literal(2L)), EqualTo(minRv, maxRv)))
404-
Filter(Not(isCarryoverPair), aggregated)
414+
Filter(Not(buildCarryOverPairPredicate(aggregated)), aggregated)
405415
} else aggregated
406416

407417
// Inline the struct array back into rows. Drop the events column (consumed by Inline)
@@ -531,23 +541,33 @@ object ResolveChangelogTable extends Rule[LogicalPlan] {
531541
}
532542

533543
/**
534-
* Adds a Filter node that drops rows belonging to a CoW carry-over pair.
535-
* A pair is a carry-over iff
536-
* `_del_cnt = 1 AND _ins_cnt = 1 AND _rv_cnt = 2 AND _min_rv = _max_rv`.
537-
* The `_rv_cnt = 2` clause guards against a NULL rowVersion silently matching
544+
* Builds the carry-over pair predicate against the helper attributes exposed by
545+
* `input`: a pair is a CoW carry-over iff
546+
* `_del_cnt = 1 AND _ins_cnt = 1 AND _rv_cnt = 2 AND _min_rv = _max_rv`. The
547+
* `_rv_cnt = 2` clause guards against a NULL rowVersion silently matching
538548
* `_min_rv = _max_rv` (Spark's min/max skip NULLs).
549+
*
550+
* Used by both the batch path (`addCarryOverPairFilter` over a Window child) and the
551+
* streaming path (in `addStreamingRowLevelPostProcessing` over an Aggregate child).
552+
* The helper-attribute layout is the same in both cases.
539553
*/
540-
private def addCarryOverPairFilter(input: LogicalPlan): LogicalPlan = {
554+
private def buildCarryOverPairPredicate(input: LogicalPlan): Expression = {
541555
val delCnt = getAttribute(input, HelperColumn.DelCnt)
542556
val insCnt = getAttribute(input, HelperColumn.InsCnt)
543557
val minRv = getAttribute(input, HelperColumn.MinRv)
544558
val maxRv = getAttribute(input, HelperColumn.MaxRv)
545559
val rvCnt = getAttribute(input, HelperColumn.RvCnt)
546-
547-
val isCarryoverPair = And(
560+
And(
548561
And(EqualTo(delCnt, Literal(1L)), EqualTo(insCnt, Literal(1L))),
549562
And(EqualTo(rvCnt, Literal(2L)), EqualTo(minRv, maxRv)))
550-
Filter(Not(isCarryoverPair), input)
563+
}
564+
565+
/**
566+
* Adds a Filter node that drops rows belonging to a CoW carry-over pair, using the
567+
* shared `buildCarryOverPairPredicate`.
568+
*/
569+
private def addCarryOverPairFilter(input: LogicalPlan): LogicalPlan = {
570+
Filter(Not(buildCarryOverPairPredicate(input)), input)
551571
}
552572

553573
/**

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -284,9 +284,10 @@ object UnsupportedOperationChecker extends Logging {
284284
// [[ResolveChangelogTable]] are designed and validated only for Append output mode.
285285
// Two markers can identify a CDC-rewritten plan:
286286
// - The row-level rewrite (`addStreamingRowLevelPostProcessing`) injects a
287-
// streaming Aggregate buffering input rows into the helper column
288-
// `ResolveChangelogTable.HelperColumn.Events` ("__spark_cdc_events") before
289-
// re-emitting them via `Generate(Inline(...))`.
287+
// streaming Aggregate whose `__spark_cdc_events` alias's output attribute
288+
// carries the metadata marker
289+
// `ResolveChangelogTable.streamingPostProcessingMarker` (mirrors
290+
// `SessionWindow.marker` / `EventTimeWatermark.delayKey`).
290291
// - The netChanges rewrite (`addStreamingNetChangeComputation`) injects a
291292
// `TransformWithState` driven by `CdcNetChangesStatefulProcessor`.
292293
// Under Update or Complete the Aggregate / TransformWithState would re-emit
@@ -296,9 +297,10 @@ object UnsupportedOperationChecker extends Logging {
296297
// netChanges-only marker is needed here primarily to catch Update mode.) Reject
297298
// those modes at analysis time with a clear error rather than silently producing
298299
// a misleading change feed.
299-
val containsCdcEventsAggregate = aggregates.exists(a => a.aggregateExpressions.exists {
300+
val containsCdcRowLevelRewrite = aggregates.exists(a => a.aggregateExpressions.exists {
300301
case ne: NamedExpression if ne.resolved =>
301-
ne.name == ResolveChangelogTable.HelperColumn.Events
302+
ne.metadata.contains(ResolveChangelogTable.streamingPostProcessingMarker) &&
303+
ne.metadata.getBoolean(ResolveChangelogTable.streamingPostProcessingMarker)
302304
case _ => false
303305
})
304306
val containsCdcNetChangesProcessor = plan.exists {
@@ -307,7 +309,7 @@ object UnsupportedOperationChecker extends Logging {
307309
case _ => false
308310
}
309311
if (outputMode != InternalOutputModes.Append &&
310-
(containsCdcEventsAggregate || containsCdcNetChangesProcessor)) {
312+
(containsCdcRowLevelRewrite || containsCdcNetChangesProcessor)) {
311313
throw QueryCompilationErrors.unsupportedOutputModeForStreamingOperationError(
312314
outputMode, "Change Data Capture (CDC) streaming reads with post-processing")
313315
}

0 commit comments

Comments
 (0)