Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,11 @@
"The Change Data Capture (CDC) connector violated the `Changelog` contract at runtime."
],
"subClass" : {
"NULL_COMMIT_TIMESTAMP" : {
"message" : [
"Connector emitted a row with a NULL `_commit_timestamp` on a streaming read engaging post-processing. The `Changelog` contract requires `_commit_timestamp` to be non-NULL for streaming reads, since post-processing uses it as event time to advance the watermark."
]
},
"UNEXPECTED_CHANGE_TYPE" : {
"message" : [
"Connector emitted a row with a `_change_type` value that is not one of the four supported types (`insert`, `delete`, `update_preimage`, `update_postimage`). The `Changelog` contract requires every emitted row to carry one of these four values."
Expand Down Expand Up @@ -3297,11 +3302,6 @@
"`startingVersion` is required when `endingVersion` is specified for CDC queries."
]
},
"STREAMING_POST_PROCESSING_NOT_SUPPORTED" : {
"message" : [
"Change Data Capture (CDC) streaming reads on connector `<changelogName>` do not yet support post-processing (carry-over removal, update detection, or net change computation). The requested combination of options would require post-processing, which is currently only available for batch reads. Use a batch read, or set `deduplicationMode = none` and `computeUpdates = false` to receive raw change rows in streaming."
]
},
"UPDATE_DETECTION_REQUIRES_CARRY_OVER_REMOVAL" : {
"message" : [
"`computeUpdates` cannot be used with `deduplicationMode=none` on connector `<changelogName>` because the connector emits copy-on-write carry-over pairs (`containsCarryoverRows()` returns true) that would be silently mislabeled as updates. Set `deduplicationMode` to `dropCarryovers` or `netChanges`."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,30 @@ abstract class DataStreamReader {
* .changes("my_table")
* }}}
*
* Streaming reads support all of the same post-processing as batch reads -- `computeUpdates`,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR body notes that TransformWithState requires RocksDB, but the changes() Scaladoc says streaming supports all post-processing without mentioning that the default state store will fail at query start. Since this is an analyzer-injected operator rather than user-written transformWithState, users have little reason to know they must set the RocksDB provider.

* `deduplicationMode = dropCarryovers`, and `deduplicationMode = netChanges`. The streaming
* netChanges path holds per-row-identity state in the state store and emits the SPIP collapse
* output once the global watermark advances past the last `_commit_timestamp` observed for that
* row identity. Row identities only touched in the latest observed commit are therefore not
* emitted until a later commit (with strictly greater `_commit_timestamp`) advances the
* watermark past them, or the source terminates -- bounded streams produce the same output as
* the corresponding batch read.
*
* When the requested options engage post-processing (carry-over removal, update detection, or
* netChanges), the rewrite injects an internal `EventTimeWatermark` on `_commit_timestamp` and
* a stateful streaming operator (an aggregate for the row-level passes, a `transformWithState`
* for netChanges). Two implications follow:
* - A commit's events are emitted in the next micro-batch after the commit is read
* (append-mode aggregate eviction is `eventTime <= watermark`, and the watermark advances
* to the max `_commit_timestamp` observed in the previous batch). A stream that reads its
* last commit and stops will keep that commit's events in state until a subsequent
* (no-data) micro-batch fires.
* - The query is constrained to `Append` output mode; `Update` and `Complete` are rejected at
* writer-start time with `STREAMING_OUTPUT_MODE.UNSUPPORTED_OPERATION`. The internal
* watermark metadata is stripped from the user-visible `_commit_timestamp` output, so
* downstream user-supplied watermarks on other columns do not interact with it via the
* global multi-watermark policy.
*
* @param tableName
* a qualified or unqualified name that designates a table.
* @since 4.2.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,39 @@
* <ul>
* <li>{@code _change_type} (STRING) — the kind of change: {@code insert}, {@code delete},
* {@code update_preimage}, or {@code update_postimage}</li>
* <li>{@code _commit_version} (connector-defined type, e.g. LONG) — the version containing
* this change</li>
* <li>{@code _commit_timestamp} (TIMESTAMP) — the timestamp of the commit</li>
* <li>{@code _commit_version} (LONG) — the version containing this change. Spark
* post-processing compares versions as primitive longs</li>
* <li>{@code _commit_timestamp} (TIMESTAMP) -- the timestamp of the commit. All rows
* belonging to a single {@code _commit_version} must share the same
* {@code _commit_timestamp}. For streaming reads with post-processing enabled,
* two additional requirements apply:
* <ol>
* <li>All rows of a single commit must appear in the same micro-batch (i.e.
* micro-batch boundaries align with commit boundaries).</li>
* <li>Distinct {@code _commit_version} values must have distinct
* {@code _commit_timestamp} values.</li>
* </ol>
* Streaming post-processing uses {@code _commit_timestamp} as event time with a
* zero-delay watermark, so once a micro-batch observes max event time T the
* global watermark advances to T. Both Spark's late-event filter and its
* state-eviction predicate then use {@code eventTime <= T} -- so any later row
* at exactly {@code _commit_timestamp = T} (whether from the same commit split
* across batches, or from a different commit that happens to share T) is
* silently dropped as late. Requirement 1 rules out the same-commit case;
* requirement 2 rules out the different-commit case. Atomic-commit CDC connectors
* (e.g. Delta versions, Iceberg snapshots) that derive {@code _commit_timestamp}
* from wall-clock time at commit time naturally satisfy both requirements.
* {@code _commit_timestamp} must be non-{@code NULL} on any row of a streaming
* read engaging post-processing; both the row-level Aggregate path and the
* netChanges {@code transformWithState} path raise
* {@code CHANGELOG_CONTRACT_VIOLATION.NULL_COMMIT_TIMESTAMP} on a violation</li>
* </ul>
* <p>
* Streaming reads support carry-over removal, update detection, and net change
* computation. Net change collapses are kept in the state store keyed by row identity;
* row identities only touched in the latest observed commit are held back until either a
* later commit (with strictly greater `_commit_timestamp`) advances the global watermark
* past them, or the source terminates.
*
* @since 4.2.0
*/
Expand Down Expand Up @@ -81,6 +110,9 @@ public interface Changelog {
* Spark will collapse multiple changes per row identity into the net effect.
* If {@code false}, the connector guarantees at most one change per row identity across
* the entire changelog range, and Spark will skip net change computation.
* <p>
* Note this flag is range-scoped (across all commits in the request), not
* micro-batch-scoped.
*/
boolean containsIntermediateChanges();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.SparkException
import org.apache.spark.sql.{Encoder, Row}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.connector.catalog.Changelog
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.types.StructType

/**
* StatefulProcessor that incrementalises CDC net-change computation for streaming reads.
*
* The batch path (`ResolveChangelogTable.injectNetChangeComputation`) uses a Catalyst
* `Window` partitioned by `rowId` and ordered by `(_commit_version, change_type_rank)` to
* extract the first and last events per row identity, then applies the SPIP collapse
* matrix on `(existedBefore, existsAfter)`. That `Window` is rejected on streaming
* queries (`NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING`).
*
* This processor reproduces the same semantics with `transformWithState`. Per-row-identity
* state stores the first event ever observed and the most-recent event observed; an event
* time timer keyed on `_commit_timestamp` advances with each batch and fires once the
* global watermark passes the latest event time observed for the key, at which point the
* SPIP matrix is evaluated and the net result is emitted.
*
* Output schema: identical to the connector's changelog schema.
*
* Documented limitation: row identities only touched in the latest observed commit do not
* emit until a later commit (with strictly greater `_commit_timestamp`) advances the
* watermark past them, or the source terminates. End-of-stream flushes all pending
* timers, so bounded streams produce the same output as the corresponding batch read.
*
* @param inputSchema schema of the rows fed into this processor; the connector's
* changelog schema (data columns + `_change_type` +
* `_commit_version` + `_commit_timestamp`) optionally extended with
* rowId helper columns added by
* [[org.apache.spark.sql.catalyst.analysis.ResolveChangelogTable]].
* @param computeUpdates whether `(existedBefore, existsAfter) = (true, true)` should be
* relabeled as `update_preimage` / `update_postimage` (true) or kept
* as `delete` / `insert` (false), matching the batch contract.
*/
private[analysis] class CdcNetChangesStatefulProcessor(
inputSchema: StructType,
computeUpdates: Boolean)
extends StatefulProcessor[Row, Row, Row] {

@transient private var firstEvent: ValueState[Row] = _
@transient private var lastEvent: ValueState[Row] = _

// Hoisted out of `relabel` so we don't pay a linear `fieldIndex` scan per emitted row.
private val changeTypeIdx: Int = inputSchema.fieldIndex("_change_type")

override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
val handle = getHandle
val rowEncoder: Encoder[Row] = ExpressionEncoder(inputSchema)
firstEvent = handle.getValueState[Row]("firstEvent", rowEncoder, TTLConfig.NONE)
lastEvent = handle.getValueState[Row]("lastEvent", rowEncoder, TTLConfig.NONE)
}

override def handleInputRows(
key: Row,
inputRows: Iterator[Row],
timerValues: TimerValues): Iterator[Row] = {
val handle = getHandle
val sorted = inputRows.toSeq.sortBy { row =>
val v = row.getAs[Long]("_commit_version")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changelog contract says _commit_version is connector-defined, while the batch path sorts the attribute generically through Catalyst. This processor hard-casts _commit_version with getAs[Long], so any connector using another orderable version type will fail at runtime or be unsupported only on the streaming path. The plan should either preserve Catalyst ordering/ranking before entering the processor, or explicitly validate/restrict the version type.

val ct = row.getAs[String]("_change_type")
val rank = ct match {
case Changelog.CHANGE_TYPE_UPDATE_PREIMAGE | Changelog.CHANGE_TYPE_DELETE => 0
case Changelog.CHANGE_TYPE_INSERT | Changelog.CHANGE_TYPE_UPDATE_POSTIMAGE => 1
case _ => throw new SparkException(
errorClass = "CHANGELOG_CONTRACT_VIOLATION.UNEXPECTED_CHANGE_TYPE",
messageParameters = Map.empty,
cause = null)
}
(v, rank)
}
if (sorted.isEmpty) return Iterator.empty

if (!firstEvent.exists()) {
firstEvent.update(sorted.head)
}
lastEvent.update(sorted.last)

// Re-arm the per-key event-time timer to the latest observed `_commit_timestamp`.
// Without dropping any existing timers we'd risk an earlier timer firing first and
// emitting state that later events would then re-populate, producing duplicate
// output for the same row identity.
//
// A NULL `_commit_timestamp` cannot be turned into a timer epoch and would NPE on
// `getTime()`. The `Changelog` Javadoc requires non-NULL `_commit_timestamp` on
// streaming reads engaging post-processing, so we surface the contract violation
// with a clear error class rather than failing the micro-batch with an opaque NPE.
val ts = sorted.last.getAs[java.sql.Timestamp]("_commit_timestamp")
if (ts == null) {
throw new SparkException(
errorClass = "CHANGELOG_CONTRACT_VIOLATION.NULL_COMMIT_TIMESTAMP",
messageParameters = Map.empty,
cause = null)
}
val newTimerMs = ts.getTime
val existing = handle.listTimers().toList
existing.foreach(handle.deleteTimer)
handle.registerTimer(newTimerMs)

Iterator.empty
}

override def handleExpiredTimer(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handleExpiredTimer emits the current net result and then clears both first/last state. In a long-running stream, the same rowId can receive later commits after an unrelated commit advances the watermark. Example: id=1 insert at v1, id=2 change at v2 advances watermark and emits id=1 insert, then id=1 delete at v3 emits a later delete. Batch netChanges over v1..v3 would cancel to no row, so this is not equivalent to the documented range-scoped netChanges semantics. The tests add all rows before starting the query, so they do not cover a later event arriving after an earlier timer fired.

key: Row,
timerValues: TimerValues,
expiredTimerInfo: ExpiredTimerInfo): Iterator[Row] = {
if (!firstEvent.exists()) return Iterator.empty

val first = firstEvent.get()
val last = lastEvent.get()
val firstChangeType = first.getAs[String]("_change_type")
val lastChangeType = last.getAs[String]("_change_type")

val existedBefore =
firstChangeType == Changelog.CHANGE_TYPE_DELETE ||
firstChangeType == Changelog.CHANGE_TYPE_UPDATE_PREIMAGE
val existsAfter =
lastChangeType == Changelog.CHANGE_TYPE_INSERT ||
lastChangeType == Changelog.CHANGE_TYPE_UPDATE_POSTIMAGE

val (preLabel, postLabel) =
if (computeUpdates) {
(Changelog.CHANGE_TYPE_UPDATE_PREIMAGE, Changelog.CHANGE_TYPE_UPDATE_POSTIMAGE)
} else {
(Changelog.CHANGE_TYPE_DELETE, Changelog.CHANGE_TYPE_INSERT)
}

val out: Iterator[Row] = (existedBefore, existsAfter) match {
case (false, false) => Iterator.empty
case (false, true) => Iterator(relabel(last, Changelog.CHANGE_TYPE_INSERT))
case (true, false) => Iterator(relabel(first, Changelog.CHANGE_TYPE_DELETE))
case (true, true) => Iterator(relabel(first, preLabel), relabel(last, postLabel))
}

firstEvent.clear()
lastEvent.clear()
out
}

private def relabel(row: Row, newChangeType: String): Row = {
val values = row.toSeq.toArray
values(changeTypeIdx) = newChangeType
new GenericRowWithSchema(values, inputSchema)
}
}
Loading