Skip to content

Commit cd26a84

Browse files
AnishMahtodbtsai
authored andcommitted
[SPARK-56870][SDP] Implement SCD1 Batch Processor; Extend Microbatch with CDC Metadata
Approved AutoCDC SPIP: https://lists.apache.org/thread/j6sj9wo9odgdpgzlxtvhoy7szs0jplf7 -------- **Preamble:** The SCD type 1 flow is a foreachBatch streaming query on an input change-data-feed, and is responsible for reconciling the incoming change data onto some target table that follows SCD1 replication semantics. SCD1 flows also maintain an "auxiliary" table to keep track of early-arriving out-of-order received events state. Each microbatch will need to reconcile against this auxiliary table as well, and update the auxiliary table's state appropriately for future microbatches. **Extend Microbatch with CDC Metadata:** After deduplication, all of the incoming rows can be classified as either a delete event or an upsert event (mutually exclusive), and there's at most one per key. If we identify a row as a delete event, remember its sequencing as its `deleteSequence`. If we identify a row as an upsert event, remember its sequencing as its `upsertSequence`. That is, `deleteSequence`/`upsertSequence` encode both the sequencing for the row as well as the row classification (delete or upsert). We need to persist this encoded information now, because in future stages we may drop the columns that `deleteCondition` needed to do the classification in the first place, depending on which columns were selected by `ChangeArgs.columnSelection`. **Where is the CDC Metadata stored?** Within the microbatch, we append a `_cdc_metadata` struct column, that stores the `deleteSequence` and `upsertSequence`. This `_cdc_metadata` column will eventually also land in the persisted target and auxiliary tables, which are the artifacts of an AutoCDC flow. This column represents operational metadata that the AutoCDC flow has tagged a row with, and is necessary for out-of-order correctness of the SCD decomposition.  Users will not be able to opt out of persisting this column in the target table using `ChangeArgs.columnSelection`, as it is necessary for correctness. The column will not have a stable public contract, and users should make no assumptions on its contents. Closes #55970 from AnishMahto/SPARK-56870-extend-microbatch-with-cdc-metadata. Authored-by: AnishMahto <anish.mahto99@gmail.com> Signed-off-by: DB Tsai <dbtsai@dbtsai.com> (cherry picked from commit 12807c5) Signed-off-by: DB Tsai <dbtsai@dbtsai.com>
1 parent f6312fb commit cd26a84

3 files changed

Lines changed: 419 additions & 17 deletions

File tree

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,12 @@
209209
],
210210
"sqlState" : "42703"
211211
},
212+
"AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT" : {
213+
"message" : [
214+
"Using <caseSensitivity> column name comparison, the column `<columnName>` in the <schemaName> schema conflicts with the reserved AutoCDC column name `<reservedColumnName>`. Rename or remove the column."
215+
],
216+
"sqlState" : "42710"
217+
},
212218
"AVRO_CANNOT_WRITE_NULL_FIELD" : {
213219
"message" : [
214220
"Cannot write null value for field <name> defined as non-null Avro data type <dataType>.",

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala

Lines changed: 115 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,26 @@
1717

1818
package org.apache.spark.sql.pipelines.autocdc
1919

20-
import org.apache.spark.sql.{functions => F}
20+
import org.apache.spark.SparkException
21+
import org.apache.spark.sql.{functions => F, AnalysisException}
22+
import org.apache.spark.sql.Column
2123
import org.apache.spark.sql.catalyst.util.QuotingUtils
2224
import org.apache.spark.sql.classic.DataFrame
25+
import org.apache.spark.sql.types.{DataType, StructField, StructType}
2326
import org.apache.spark.util.ArrayImplicits._
2427

2528
/**
2629
* Per-microbatch processor for SCD Type 1 AutoCDC flows, complying to the specified [[changeArgs]]
2730
* configuration.
31+
*
32+
* @param changeArgs The CDC flow configuration.
33+
* @param resolvedSequencingType The post-analysis [[DataType]] of the sequencing column, derived
34+
* from the flow's resolved DataFrame at flow setup time.
2835
*/
29-
case class Scd1BatchProcessor(changeArgs: ChangeArgs) {
36+
case class Scd1BatchProcessor(
37+
changeArgs: ChangeArgs,
38+
resolvedSequencingType: DataType) {
39+
3040
/**
3141
* Deduplicate the incoming CDC microbatch by key, keeping the most recent event per key
3242
* as ordered by [[ChangeArgs.sequencing]].
@@ -59,9 +69,111 @@ case class Scd1BatchProcessor(changeArgs: ChangeArgs) {
5969
)
6070
.select(F.col(s"$winningRowCol.*"))
6171
}
72+
73+
/**
74+
* Project the CDC metadata column onto the microbatch.
75+
*
76+
* This must run before any column selection is applied to the microbatch. The
77+
* [[ChangeArgs.deleteCondition]] and [[ChangeArgs.sequencing]] expressions are evaluated against
78+
* the current microbatch schema, and column selection may drop inputs required by those
79+
* expressions.
80+
*
81+
* Rows are classified as deletes only when [[ChangeArgs.deleteCondition]] evaluates to true. A
82+
* false or null delete condition classifies the row as an upsert.
83+
*
84+
* @param validatedMicrobatch A microbatch that has already been validated such that the
85+
* sequencing column should not contain null values, and its data type
86+
* should support ordering.
87+
*
88+
* The returned dataframe has all of the columns in the input microbatch + the CDC metadata
89+
* column.
90+
*/
91+
def extendMicrobatchRowsWithCdcMetadata(validatedMicrobatch: DataFrame): DataFrame = {
92+
// Proactively validate the reserved CDC metadata column does not exist in the microbatch.
93+
validateCdcMetadataColumnNotPresent(validatedMicrobatch)
94+
95+
val rowDeleteSequence: Column = changeArgs.deleteCondition match {
96+
case Some(deleteCondition) =>
97+
F.when(deleteCondition, changeArgs.sequencing).otherwise(F.lit(null))
98+
case None =>
99+
F.lit(null)
100+
}
101+
102+
val rowUpsertSequence: Column =
103+
// A row that is not a delete must be an upsert, these are mutually exclusive and a complete
104+
// set of CDC event types.
105+
F.when(rowDeleteSequence.isNull, changeArgs.sequencing).otherwise(F.lit(null))
106+
107+
validatedMicrobatch.withColumn(
108+
Scd1BatchProcessor.cdcMetadataColName,
109+
Scd1BatchProcessor.constructCdcMetadataCol(
110+
deleteSequence = rowDeleteSequence,
111+
upsertSequence = rowUpsertSequence,
112+
sequencingType = resolvedSequencingType
113+
)
114+
)
115+
}
116+
117+
private def validateCdcMetadataColumnNotPresent(microbatch: DataFrame): Unit = {
118+
val microbatchSqlConf = microbatch.sparkSession.sessionState.conf
119+
val resolver = microbatchSqlConf.resolver
120+
121+
microbatch.schema.fieldNames
122+
.find(resolver(_, Scd1BatchProcessor.cdcMetadataColName))
123+
.foreach { conflictingColumnName =>
124+
throw new AnalysisException(
125+
errorClass = "AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT",
126+
messageParameters = Map(
127+
"caseSensitivity" -> CaseSensitivityLabels.of(microbatchSqlConf.caseSensitiveAnalysis),
128+
"columnName" -> conflictingColumnName,
129+
"schemaName" -> "microbatch",
130+
"reservedColumnName" -> Scd1BatchProcessor.cdcMetadataColName
131+
)
132+
)
133+
}
134+
}
62135
}
63136

64137
object Scd1BatchProcessor {
65138
// Columns prefixed with `__spark_autocdc_` are reserved for internal SDP AutoCDC processing.
66-
private[autocdc] val winningRowColName = "__spark_autocdc_winning_row"
139+
private[autocdc] val winningRowColName: String = "__spark_autocdc_winning_row"
140+
private[autocdc] val cdcMetadataColName: String = "__spark_autocdc_metadata"
141+
142+
private[autocdc] val cdcDeleteSequenceFieldName: String = "deleteSequence"
143+
private[autocdc] val cdcUpsertSequenceFieldName: String = "upsertSequence"
144+
145+
/**
146+
* Schema of the CDC metadata struct column for SCD1.
147+
*/
148+
private def cdcMetadataColSchema(sequencingType: DataType): StructType =
149+
StructType(
150+
Seq(
151+
// The sequencing of the event if it represents a delete, null otherwise.
152+
StructField(cdcDeleteSequenceFieldName, sequencingType, nullable = true),
153+
// The sequencing of the event if it represents an upsert, null otherwise.
154+
StructField(cdcUpsertSequenceFieldName, sequencingType, nullable = true)
155+
)
156+
)
157+
158+
/**
159+
* Construct the CDC metadata struct column for SCD1, following the exact schema and field
160+
* ordering defined by [[cdcMetadataColSchema]].
161+
*/
162+
private[autocdc] def constructCdcMetadataCol(
163+
deleteSequence: Column,
164+
upsertSequence: Column,
165+
sequencingType: DataType): Column = {
166+
val cdcMetadataFieldsInOrder = cdcMetadataColSchema(sequencingType).fields.map { field =>
167+
val value = field.name match {
168+
case `cdcDeleteSequenceFieldName` => deleteSequence
169+
case `cdcUpsertSequenceFieldName` => upsertSequence
170+
case other =>
171+
throw SparkException.internalError(
172+
s"Unable to construct SCD1 CDC metadata column due to unknown `${other}` field."
173+
)
174+
}
175+
value.cast(field.dataType).as(field.name)
176+
}
177+
F.struct(cdcMetadataFieldsInOrder.toImmutableArraySeq: _*)
178+
}
67179
}

0 commit comments

Comments
 (0)