Skip to content

Commit 25387c3

Browse files
committed
PR feedback
1 parent c36f910 commit 25387c3

2 files changed

Lines changed: 11 additions & 7 deletions

File tree

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -81,12 +81,16 @@ case class Scd1BatchProcessor(
8181
* Rows are classified as deletes only when [[ChangeArgs.deleteCondition]] evaluates to true. A
8282
* false or null delete condition classifies the row as an upsert.
8383
*
84+
* @param validatedMicrobatch A microbatch that has already been validated such that the
85+
* sequencing column should not contain null values, and its data type
86+
* should support ordering.
87+
*
8488
* The returned dataframe has all of the columns in the input microbatch + the CDC metadata
8589
* column.
8690
*/
87-
def extendMicrobatchRowsWithCdcMetadata(microbatchDf: DataFrame): DataFrame = {
91+
def extendMicrobatchRowsWithCdcMetadata(validatedMicrobatch: DataFrame): DataFrame = {
8892
// Proactively validate the reserved CDC metadata column does not exist in the microbatch.
89-
validateCdcMetadataColumnNotPresent(microbatchDf)
93+
validateCdcMetadataColumnNotPresent(validatedMicrobatch)
9094

9195
val rowDeleteSequence: Column = changeArgs.deleteCondition match {
9296
case Some(deleteCondition) =>
@@ -100,7 +104,7 @@ case class Scd1BatchProcessor(
100104
// set of CDC event types.
101105
F.when(rowDeleteSequence.isNull, changeArgs.sequencing).otherwise(F.lit(null))
102106

103-
microbatchDf.withColumn(
107+
validatedMicrobatch.withColumn(
104108
Scd1BatchProcessor.cdcMetadataColName,
105109
Scd1BatchProcessor.constructCdcMetadataCol(
106110
deleteSequence = rowDeleteSequence,
@@ -110,11 +114,11 @@ case class Scd1BatchProcessor(
110114
)
111115
}
112116

113-
private def validateCdcMetadataColumnNotPresent(microbatchDf: DataFrame): Unit = {
114-
val microbatchSqlConf = microbatchDf.sparkSession.sessionState.conf
117+
private def validateCdcMetadataColumnNotPresent(microbatch: DataFrame): Unit = {
118+
val microbatchSqlConf = microbatch.sparkSession.sessionState.conf
115119
val resolver = microbatchSqlConf.resolver
116120

117-
microbatchDf.schema.fieldNames
121+
microbatch.schema.fieldNames
118122
.find(resolver(_, Scd1BatchProcessor.cdcMetadataColName))
119123
.foreach { conflictingColumnName =>
120124
throw new AnalysisException(

sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -471,7 +471,7 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession {
471471
resolvedSequencingType = LongType
472472
)
473473

474-
// Mutual-exclusivity invariant: each row's _cdc_metadata struct has exactly one of
474+
// Mutual-exclusivity invariant: each row's CDC metadata struct has exactly one of
475475
// (deleteSequence, upsertSequence) non-null, and the non-null side carries the row's
476476
// sequence value.
477477
checkAnswer(

0 commit comments

Comments
 (0)