Skip to content

Commit b71ec8e

Browse files
committed
PR feedback
1 parent a42da1d commit b71ec8e

2 files changed

Lines changed: 38 additions & 6 deletions

File tree

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -127,12 +127,9 @@ case class Scd1BatchProcessor(
127127
val caseSensitiveColumnComparison =
128128
microbatchWithCdcMetadataDf.sparkSession.sessionState.conf.caseSensitiveAnalysis
129129

130-
// Calculate the schema of the microbatch less the system-projected CDC metadata column, i.e.
131-
// the The user schema is the microbatch's schema after dropping the system columns - i.e the
132-
// CDC metadata column.
133-
134-
// We project out the system columns before applying user selection and project back in
135-
// afterwards, so that users cannot control whether these [necessary] columns show up in the
130+
// The user schema is the microbatch schema after dropping the system CDC metadata column.
131+
// We project out the system column before applying user selection and project it back in
132+
// afterwards, so that users cannot control whether this [necessary] column shows up in the
136133
// target table.
137134
val userColumnsInMicrobatchSchema = ColumnSelection.applyToSchema(
138135
schemaName = "microbatch",

sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -897,4 +897,39 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession {
897897
expectedAnswer = Row(1, "u-100", Row(null, 10L))
898898
)
899899
}
900+
901+
test("projectTargetColumnsOntoMicrobatch resolves columnSelection case-insensitively " +
902+
"when SQLConf.CASE_SENSITIVE=false") {
903+
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
904+
val batch = microbatchOf(microbatchWithCdcMetadataSchema)(
905+
Row(1, "alice", 30, Row(null, 10L))
906+
)
907+
908+
val processor = Scd1BatchProcessor(
909+
changeArgs = ChangeArgs(
910+
keys = Seq(UnqualifiedColumnName("id")),
911+
sequencing = F.col("seq"),
912+
storedAsScdType = ScdType.Type1,
913+
// User columns intentionally use a different case than the schema (id, age).
914+
columnSelection = Some(
915+
ColumnSelection.IncludeColumns(
916+
Seq(UnqualifiedColumnName("ID"), UnqualifiedColumnName("AGE"))
917+
)
918+
)
919+
),
920+
resolvedSequencingType = LongType
921+
)
922+
923+
val result = processor.projectTargetColumnsOntoMicrobatch(batch)
924+
925+
// Output column names follow the microbatch schema's casing, not the casing in the user's
926+
// columnSelection. The CDC metadata column is appended last as always.
927+
assert(result.schema.fieldNames.toSeq ==
928+
Seq("id", "age", Scd1BatchProcessor.cdcMetadataColName))
929+
checkAnswer(
930+
df = result,
931+
expectedAnswer = Row(1, 30, Row(null, 10L))
932+
)
933+
}
934+
}
900935
}

0 commit comments

Comments
 (0)