Skip to content

Commit 1be3cba

Browse files
committed
reuse applyToSchema
1 parent dec7426 commit 1be3cba

1 file changed

Lines changed: 17 additions & 12 deletions

File tree

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -111,18 +111,23 @@ case class Scd1BatchProcessor(
111111
val ignoreColumnNameCase =
112112
!microbatchWithCdcMetadataDf.sparkSession.sessionState.conf.caseSensitiveAnalysis
113113

114-
// The schema of the microbatch less the system-projected CDC metadata column, i.e. the
115-
// original microbatch schema.
116-
val userColumnsInMicrobatchSchema =
117-
StructType(
118-
microbatchWithCdcMetadataDf.schema.fields.filterNot { field =>
119-
if (ignoreColumnNameCase) {
120-
field.name.equalsIgnoreCase(Scd1BatchProcessor.cdcMetadataColName)
121-
} else {
122-
field.name.equals(Scd1BatchProcessor.cdcMetadataColName)
123-
}
124-
}
125-
)
114+
// Calculate the schema of the microbatch less the system-projected CDC metadata column, i.e.
115+
// the The user schema is the microbatch's schema after dropping the system columns - i.e the
116+
// CDC metadata column.
117+
118+
// We project out the system columns before applying user selection and project back in
119+
// afterwards, so that users cannot control whether these [necessary] columns show up in the
120+
// target table.
121+
val userColumnsInMicrobatchSchema = ColumnSelection.applyToSchema(
122+
schemaName = "microbatch",
123+
schema = microbatchWithCdcMetadataDf.schema,
124+
columnSelection = Some(
125+
ColumnSelection.ExcludeColumns(
126+
Seq(UnqualifiedColumnName(Scd1BatchProcessor.cdcMetadataColName))
127+
)
128+
),
129+
ignoreCase = ignoreColumnNameCase
130+
)
126131

127132
val userSelectedColumnsInMicrobatchSchema =
128133
ColumnSelection.applyToSchema(

0 commit comments

Comments
 (0)