File tree Expand file tree Collapse file tree
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -103,22 +103,16 @@ case class Scd1BatchProcessor(
103103 }
104104
105105 private def validateCdcMetadataColumnNotPresent (microbatchDf : DataFrame ): Unit = {
106- val ignoreColumnNameCase =
107- ! microbatchDf.sparkSession.sessionState.conf.caseSensitiveAnalysis
106+ val sqlConf = microbatchDf.sparkSession.sessionState.conf
107+ val resolver = sqlConf.resolver
108108
109109 microbatchDf.schema.fieldNames
110- .find { fieldName =>
111- if (ignoreColumnNameCase) {
112- fieldName.equalsIgnoreCase(Scd1BatchProcessor .cdcMetadataColName)
113- } else {
114- fieldName.equals(Scd1BatchProcessor .cdcMetadataColName)
115- }
116- }
110+ .find(resolver(_, Scd1BatchProcessor .cdcMetadataColName))
117111 .foreach { conflictingColumnName =>
118112 throw new AnalysisException (
119113 errorClass = " AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT" ,
120114 messageParameters = Map (
121- " caseSensitivity" -> CaseSensitivityLabels .of(! ignoreColumnNameCase ),
115+ " caseSensitivity" -> CaseSensitivityLabels .of(sqlConf.caseSensitiveAnalysis ),
122116 " columnName" -> conflictingColumnName,
123117 " schemaName" -> " microbatch" ,
124118 " reservedColumnName" -> Scd1BatchProcessor .cdcMetadataColName
You can’t perform that action at this time.
0 commit comments