-
Notifications
You must be signed in to change notification settings - Fork 29.2k
[SPARK-56687][SQL] Support netChanges for DSv2 CDC streaming reads #55637
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
7ebdee6
c1c94d1
cb1e7ac
dee5e84
ffa0646
7f471f2
791d5ce
869f762
ac0cff8
e31cd2b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,168 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.catalyst.analysis | ||
|
|
||
| import org.apache.spark.SparkException | ||
| import org.apache.spark.sql.{Encoder, Row} | ||
| import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder | ||
| import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema | ||
| import org.apache.spark.sql.connector.catalog.Changelog | ||
| import org.apache.spark.sql.streaming._ | ||
| import org.apache.spark.sql.types.StructType | ||
|
|
||
| /** | ||
| * StatefulProcessor that incrementalises CDC net-change computation for streaming reads. | ||
| * | ||
| * The batch path (`ResolveChangelogTable.injectNetChangeComputation`) uses a Catalyst | ||
| * `Window` partitioned by `rowId` and ordered by `(_commit_version, change_type_rank)` to | ||
| * extract the first and last events per row identity, then applies the SPIP collapse | ||
| * matrix on `(existedBefore, existsAfter)`. That `Window` is rejected on streaming | ||
| * queries (`NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING`). | ||
| * | ||
| * This processor reproduces the same semantics with `transformWithState`. Per-row-identity | ||
| * state stores the first event ever observed and the most-recent event observed; an event | ||
| * time timer keyed on `_commit_timestamp` advances with each batch and fires once the | ||
| * global watermark passes the latest event time observed for the key, at which point the | ||
| * SPIP matrix is evaluated and the net result is emitted. | ||
| * | ||
| * Output schema: identical to the connector's changelog schema. | ||
| * | ||
| * Documented limitation: row identities only touched in the latest observed commit do not | ||
| * emit until a later commit (with strictly greater `_commit_timestamp`) advances the | ||
| * watermark past them, or the source terminates. End-of-stream flushes all pending | ||
| * timers, so bounded streams produce the same output as the corresponding batch read. | ||
| * | ||
| * @param inputSchema schema of the rows fed into this processor; the connector's | ||
| * changelog schema (data columns + `_change_type` + | ||
| * `_commit_version` + `_commit_timestamp`) optionally extended with | ||
| * rowId helper columns added by | ||
| * [[org.apache.spark.sql.catalyst.analysis.ResolveChangelogTable]]. | ||
| * @param computeUpdates whether `(existedBefore, existsAfter) = (true, true)` should be | ||
| * relabeled as `update_preimage` / `update_postimage` (true) or kept | ||
| * as `delete` / `insert` (false), matching the batch contract. | ||
| */ | ||
| private[analysis] class CdcNetChangesStatefulProcessor( | ||
| inputSchema: StructType, | ||
| computeUpdates: Boolean) | ||
| extends StatefulProcessor[Row, Row, Row] { | ||
|
|
||
| @transient private var firstEvent: ValueState[Row] = _ | ||
| @transient private var lastEvent: ValueState[Row] = _ | ||
|
|
||
| // Hoisted out of `relabel` so we don't pay a linear `fieldIndex` scan per emitted row. | ||
| private val changeTypeIdx: Int = inputSchema.fieldIndex("_change_type") | ||
|
|
||
| override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = { | ||
| val handle = getHandle | ||
| val rowEncoder: Encoder[Row] = ExpressionEncoder(inputSchema) | ||
| firstEvent = handle.getValueState[Row]("firstEvent", rowEncoder, TTLConfig.NONE) | ||
| lastEvent = handle.getValueState[Row]("lastEvent", rowEncoder, TTLConfig.NONE) | ||
| } | ||
|
|
||
| override def handleInputRows( | ||
| key: Row, | ||
| inputRows: Iterator[Row], | ||
| timerValues: TimerValues): Iterator[Row] = { | ||
| val handle = getHandle | ||
| val sorted = inputRows.toSeq.sortBy { row => | ||
| val v = row.getAs[Long]("_commit_version") | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The changelog contract says _commit_version is connector-defined, while the batch path sorts the attribute generically through Catalyst. This processor hard-casts _commit_version with getAs[Long], so any connector using another orderable version type will fail at runtime or be unsupported only on the streaming path. The plan should either preserve Catalyst ordering/ranking before entering the processor, or explicitly validate/restrict the version type. |
||
| val ct = row.getAs[String]("_change_type") | ||
| val rank = ct match { | ||
| case Changelog.CHANGE_TYPE_UPDATE_PREIMAGE | Changelog.CHANGE_TYPE_DELETE => 0 | ||
| case Changelog.CHANGE_TYPE_INSERT | Changelog.CHANGE_TYPE_UPDATE_POSTIMAGE => 1 | ||
| case _ => throw new SparkException( | ||
| errorClass = "CHANGELOG_CONTRACT_VIOLATION.UNEXPECTED_CHANGE_TYPE", | ||
| messageParameters = Map.empty, | ||
| cause = null) | ||
| } | ||
| (v, rank) | ||
| } | ||
| if (sorted.isEmpty) return Iterator.empty | ||
|
|
||
| if (!firstEvent.exists()) { | ||
| firstEvent.update(sorted.head) | ||
| } | ||
| lastEvent.update(sorted.last) | ||
|
|
||
| // Re-arm the per-key event-time timer to the latest observed `_commit_timestamp`. | ||
| // Without dropping any existing timers we'd risk an earlier timer firing first and | ||
| // emitting state that later events would then re-populate, producing duplicate | ||
| // output for the same row identity. | ||
| // | ||
| // A NULL `_commit_timestamp` cannot be turned into a timer epoch and would NPE on | ||
| // `getTime()`. The `Changelog` Javadoc requires non-NULL `_commit_timestamp` on | ||
| // streaming reads engaging post-processing, so we surface the contract violation | ||
| // with a clear error class rather than failing the micro-batch with an opaque NPE. | ||
| val ts = sorted.last.getAs[java.sql.Timestamp]("_commit_timestamp") | ||
| if (ts == null) { | ||
| throw new SparkException( | ||
| errorClass = "CHANGELOG_CONTRACT_VIOLATION.NULL_COMMIT_TIMESTAMP", | ||
| messageParameters = Map.empty, | ||
| cause = null) | ||
| } | ||
| val newTimerMs = ts.getTime | ||
| val existing = handle.listTimers().toList | ||
| existing.foreach(handle.deleteTimer) | ||
| handle.registerTimer(newTimerMs) | ||
|
|
||
| Iterator.empty | ||
| } | ||
|
|
||
| override def handleExpiredTimer( | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. handleExpiredTimer emits the current net result and then clears both first/last state. In a long-running stream, the same rowId can receive later commits after an unrelated commit advances the watermark. Example: id=1 insert at v1, id=2 change at v2 advances watermark and emits id=1 insert, then id=1 delete at v3 emits a later delete. Batch netChanges over v1..v3 would cancel to no row, so this is not equivalent to the documented range-scoped netChanges semantics. The tests add all rows before starting the query, so they do not cover a later event arriving after an earlier timer fired. |
||
| key: Row, | ||
| timerValues: TimerValues, | ||
| expiredTimerInfo: ExpiredTimerInfo): Iterator[Row] = { | ||
| if (!firstEvent.exists()) return Iterator.empty | ||
|
|
||
| val first = firstEvent.get() | ||
| val last = lastEvent.get() | ||
| val firstChangeType = first.getAs[String]("_change_type") | ||
| val lastChangeType = last.getAs[String]("_change_type") | ||
|
|
||
| val existedBefore = | ||
| firstChangeType == Changelog.CHANGE_TYPE_DELETE || | ||
| firstChangeType == Changelog.CHANGE_TYPE_UPDATE_PREIMAGE | ||
| val existsAfter = | ||
| lastChangeType == Changelog.CHANGE_TYPE_INSERT || | ||
| lastChangeType == Changelog.CHANGE_TYPE_UPDATE_POSTIMAGE | ||
|
|
||
| val (preLabel, postLabel) = | ||
| if (computeUpdates) { | ||
| (Changelog.CHANGE_TYPE_UPDATE_PREIMAGE, Changelog.CHANGE_TYPE_UPDATE_POSTIMAGE) | ||
| } else { | ||
| (Changelog.CHANGE_TYPE_DELETE, Changelog.CHANGE_TYPE_INSERT) | ||
| } | ||
|
|
||
| val out: Iterator[Row] = (existedBefore, existsAfter) match { | ||
| case (false, false) => Iterator.empty | ||
| case (false, true) => Iterator(relabel(last, Changelog.CHANGE_TYPE_INSERT)) | ||
| case (true, false) => Iterator(relabel(first, Changelog.CHANGE_TYPE_DELETE)) | ||
| case (true, true) => Iterator(relabel(first, preLabel), relabel(last, postLabel)) | ||
| } | ||
|
|
||
| firstEvent.clear() | ||
| lastEvent.clear() | ||
| out | ||
| } | ||
|
|
||
| private def relabel(row: Row, newChangeType: String): Row = { | ||
| val values = row.toSeq.toArray | ||
| values(changeTypeIdx) = newChangeType | ||
| new GenericRowWithSchema(values, inputSchema) | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PR body notes that TransformWithState requires RocksDB, but the changes() Scaladoc says streaming supports all post-processing without mentioning that the default state store will fail at query start. Since this is an analyzer-injected operator rather than user-written transformWithState, users have little reason to know they must set the RocksDB provider.