Skip to content

Commit f9c2aed

Browse files
committed
PR feedback
1 parent e8df6f0 commit f9c2aed

2 files changed

Lines changed: 72 additions & 0 deletions

File tree

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,14 @@ case class Scd1BatchProcessor(
7373
/**
7474
* Project the CDC metadata column onto the microbatch.
7575
*
76+
* This must run before any column selection is applied to the microbatch. The
77+
* [[ChangeArgs.deleteCondition]] and [[ChangeArgs.sequencing]] expressions are evaluated against
78+
* the current microbatch schema, and column selection may drop inputs required by those
79+
* expressions.
80+
*
81+
* Rows are classified as deletes only when [[ChangeArgs.deleteCondition]] evaluates to true. A
82+
* false or null delete condition classifies the row as an upsert.
83+
*
7684
* The returned dataframe has all of the columns in the input microbatch + the CDC metadata
7785
* column.
7886
*/

sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,32 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession {
414414
)
415415
}
416416

417+
test("extendMicrobatchRowsWithCdcMetadata treats null deleteCondition results as upserts") {
418+
val schema = new StructType()
419+
.add("id", IntegerType)
420+
.add("seq", LongType)
421+
.add("is_delete", BooleanType)
422+
423+
val batch = microbatchOf(schema)(
424+
Row(1, 10L, null)
425+
)
426+
427+
val processor = Scd1BatchProcessor(
428+
changeArgs = ChangeArgs(
429+
keys = Seq(UnqualifiedColumnName("id")),
430+
sequencing = F.col("seq"),
431+
storedAsScdType = ScdType.Type1,
432+
deleteCondition = Some(F.col("is_delete"))
433+
),
434+
resolvedSequencingType = LongType
435+
)
436+
437+
checkAnswer(
438+
df = processor.extendMicrobatchRowsWithCdcMetadata(batch),
439+
expectedAnswer = Row(1, 10L, null, Row(null, 10L))
440+
)
441+
}
442+
417443
test("extendMicrobatchRowsWithCdcMetadata treats every row as an upsert " +
418444
"when deleteCondition is None") {
419445
val schema = new StructType()
@@ -578,4 +604,42 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession {
578604
)
579605
}
580606
}
607+
608+
test("extendMicrobatchRowsWithCdcMetadata rejects reserved CDC metadata column " +
609+
"case-insensitively") {
610+
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
611+
val conflictingColumnName = Scd1BatchProcessor.cdcMetadataColName.toUpperCase
612+
val schema = new StructType()
613+
.add("id", IntegerType)
614+
.add("seq", LongType)
615+
.add(conflictingColumnName, StringType)
616+
617+
val batch = microbatchOf(schema)(
618+
Row(1, 10L, "user-supplied")
619+
)
620+
621+
val processor = Scd1BatchProcessor(
622+
changeArgs = ChangeArgs(
623+
keys = Seq(UnqualifiedColumnName("id")),
624+
sequencing = F.col("seq"),
625+
storedAsScdType = ScdType.Type1
626+
),
627+
resolvedSequencingType = LongType
628+
)
629+
630+
checkError(
631+
exception = intercept[AnalysisException] {
632+
processor.extendMicrobatchRowsWithCdcMetadata(batch)
633+
},
634+
condition = "AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT",
635+
sqlState = "42710",
636+
parameters = Map(
637+
"caseSensitivity" -> CaseSensitivityLabels.CaseInsensitive,
638+
"columnName" -> conflictingColumnName,
639+
"schemaName" -> "microbatch",
640+
"reservedColumnName" -> Scd1BatchProcessor.cdcMetadataColName
641+
)
642+
)
643+
}
644+
}
581645
}

0 commit comments

Comments
 (0)