Skip to content

Commit af2cfc0

Browse files
AnishMahtodbtsai
authored andcommitted
[SPARK-56882][SDP] Implement SCD1 Batch Processor; Target Column Projection
Approved AutoCDC SPIP: https://lists.apache.org/thread/j6sj9wo9odgdpgzlxtvhoy7szs0jplf7 -------- **Preamble:** The SCD type 1 flow is a foreachBatch streaming query on an input change-data-feed, and is responsible for reconciling the incoming change data onto some target table that follows SCD1 replication semantics. SCD1 flows also maintain an "auxiliary" table to keep track of early-arriving out-of-order received events state. Each microbatch will need to reconcile against this auxiliary table as well, and update the auxiliary table's state appropriately for future microbatches. **Target Column Projection:** As per the SPIP and `ChangeArgs.columnSelection`, users are allowed to specify the set of columns that actually gets persisted in the target table. Any columns not selected should be dropped before target table merge/persistence. We should project only these selected columns onto the microbatch so that its dataframe is in the correct shape prior to CDC processing and merging into the target table. Closes #55991 from AnishMahto/SPARK-56882-SCD1-project-target-columns-onto-microbatch. Authored-by: AnishMahto <anish.mahto99@gmail.com> Signed-off-by: DB Tsai <dbtsai@dbtsai.com>
1 parent 9e85d06 commit af2cfc0

2 files changed

Lines changed: 269 additions & 0 deletions

File tree

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,58 @@ case class Scd1BatchProcessor(
114114
)
115115
}
116116

117+
/**
118+
* Project the user-defined column selection onto the microbatch. By this point the input
119+
* microbatch should already have projected its CDC metadata, because it's possible that the
120+
* user-defined column selection drops columns that are otherwise necessary to compute the
121+
* CDC metadata.
122+
*
123+
* Returned dataframe's schema is: all of the user-selected columns in the input dataframe as per
124+
* [[ChangeArgs.columnSelection]] + the CDC metadata column.
125+
*/
126+
def projectTargetColumnsOntoMicrobatch(microbatchWithCdcMetadataDf: DataFrame): DataFrame = {
127+
val caseSensitiveColumnComparison =
128+
microbatchWithCdcMetadataDf.sparkSession.sessionState.conf.caseSensitiveAnalysis
129+
130+
// The user schema is the microbatch schema after dropping the system CDC metadata column.
131+
// We project out the system column before applying user selection and project it back in
132+
// afterwards, so that users cannot control whether this [necessary] column shows up in the
133+
// target table.
134+
val userColumnsInMicrobatchSchema = ColumnSelection.applyToSchema(
135+
schemaName = "microbatch",
136+
schema = microbatchWithCdcMetadataDf.schema,
137+
columnSelection = Some(
138+
ColumnSelection.ExcludeColumns(
139+
Seq(UnqualifiedColumnName(Scd1BatchProcessor.cdcMetadataColName))
140+
)
141+
),
142+
caseSensitive = caseSensitiveColumnComparison
143+
)
144+
145+
val userSelectedColumnsInMicrobatchSchema =
146+
ColumnSelection.applyToSchema(
147+
schemaName = "microbatch",
148+
schema = userColumnsInMicrobatchSchema,
149+
columnSelection = changeArgs.columnSelection,
150+
caseSensitive = caseSensitiveColumnComparison
151+
)
152+
153+
// In addition to the explicit user-selected columns, re-project the operational CDC metadata
154+
// column as the last column.
155+
val finalColumnsInMicrobatchToSelect =
156+
userSelectedColumnsInMicrobatchSchema.fieldNames.map(colName => {
157+
// Spark drops backticks in the schema, quote all identifiers for safety before executing
158+
// select. Identifiers could have special characters such as '.'.
159+
F.col(QuotingUtils.quoteIdentifier(colName))
160+
}) :+ F.col(
161+
Scd1BatchProcessor.cdcMetadataColName
162+
)
163+
164+
microbatchWithCdcMetadataDf.select(
165+
finalColumnsInMicrobatchToSelect.toImmutableArraySeq: _*
166+
)
167+
}
168+
117169
private def validateCdcMetadataColumnNotPresent(microbatch: DataFrame): Unit = {
118170
val microbatchSqlConf = microbatch.sparkSession.sessionState.conf
119171
val resolver = microbatchSqlConf.resolver

sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala

Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,20 @@ import org.apache.spark.sql.types._
2727

2828
class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession {
2929

30+
/**
31+
* Test Schema for a microbatch that already has the SCD1 CDC metadata column projected.
32+
*/
33+
private val microbatchWithCdcMetadataSchema: StructType = new StructType()
34+
.add("id", IntegerType)
35+
.add("name", StringType)
36+
.add("age", IntegerType)
37+
.add(
38+
Scd1BatchProcessor.cdcMetadataColName,
39+
new StructType()
40+
.add(Scd1BatchProcessor.cdcDeleteSequenceFieldName, LongType)
41+
.add(Scd1BatchProcessor.cdcUpsertSequenceFieldName, LongType)
42+
)
43+
3044
/** Build a microbatch [[DataFrame]] from explicit rows and an explicit schema. */
3145
private def microbatchOf(schema: StructType)(rows: Row*): DataFrame =
3246
spark.createDataFrame(spark.sparkContext.parallelize(rows), schema)
@@ -715,4 +729,207 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession {
715729
)
716730
}
717731
}
732+
733+
test("projectTargetColumnsOntoMicrobatch keeps every user column and the CDC metadata column " +
734+
"when columnSelection is None") {
735+
val batch = microbatchOf(microbatchWithCdcMetadataSchema)(
736+
Row(1, "alice", 30, Row(null, 10L)),
737+
Row(2, "bob", 25, Row(20L, null))
738+
)
739+
740+
val processor = Scd1BatchProcessor(
741+
changeArgs = ChangeArgs(
742+
keys = Seq(UnqualifiedColumnName("id")),
743+
sequencing = F.col("seq"),
744+
storedAsScdType = ScdType.Type1,
745+
columnSelection = None
746+
),
747+
resolvedSequencingType = LongType
748+
)
749+
750+
val result = processor.projectTargetColumnsOntoMicrobatch(batch)
751+
752+
// None selection is no-op on the user columns, and the CDC metadata column is unconditionally
753+
// re-projected last, so the output shape exactly matches the input.
754+
assert(result.schema.fieldNames.toSeq == microbatchWithCdcMetadataSchema.fieldNames.toSeq)
755+
checkAnswer(
756+
df = result,
757+
expectedAnswer = Seq(
758+
Row(1, "alice", 30, Row(null, 10L)),
759+
Row(2, "bob", 25, Row(20L, null))
760+
)
761+
)
762+
}
763+
764+
test("projectTargetColumnsOntoMicrobatch retains the CDC metadata column even when " +
765+
"IncludeColumns does not contain it") {
766+
val batch = microbatchOf(microbatchWithCdcMetadataSchema)(
767+
Row(1, "alice", 30, Row(null, 10L))
768+
)
769+
770+
val processor = Scd1BatchProcessor(
771+
changeArgs = ChangeArgs(
772+
keys = Seq(UnqualifiedColumnName("id")),
773+
sequencing = F.col("seq"),
774+
storedAsScdType = ScdType.Type1,
775+
columnSelection = Some(
776+
ColumnSelection.IncludeColumns(
777+
Seq(UnqualifiedColumnName("id"), UnqualifiedColumnName("age"))
778+
)
779+
)
780+
),
781+
resolvedSequencingType = LongType
782+
)
783+
784+
val result = processor.projectTargetColumnsOntoMicrobatch(batch)
785+
786+
assert(result.schema.fieldNames.toSeq ==
787+
Seq("id", "age", Scd1BatchProcessor.cdcMetadataColName))
788+
checkAnswer(
789+
df = result,
790+
expectedAnswer = Row(1, 30, Row(null, 10L))
791+
)
792+
}
793+
794+
test("projectTargetColumnsOntoMicrobatch respects exclude column") {
795+
val batch = microbatchOf(microbatchWithCdcMetadataSchema)(
796+
Row(1, "alice", 30, Row(null, 10L))
797+
)
798+
799+
val processor = Scd1BatchProcessor(
800+
changeArgs = ChangeArgs(
801+
keys = Seq(UnqualifiedColumnName("id")),
802+
sequencing = F.col("seq"),
803+
storedAsScdType = ScdType.Type1,
804+
columnSelection = Some(
805+
ColumnSelection.ExcludeColumns(
806+
Seq(UnqualifiedColumnName("age"))
807+
)
808+
)
809+
),
810+
resolvedSequencingType = LongType
811+
)
812+
813+
val result = processor.projectTargetColumnsOntoMicrobatch(batch)
814+
815+
assert(
816+
result.schema.fieldNames.toSeq ==
817+
Seq("id", "name", Scd1BatchProcessor.cdcMetadataColName)
818+
)
819+
checkAnswer(
820+
df = result,
821+
expectedAnswer = Row(1, "alice", Row(null, 10L))
822+
)
823+
}
824+
825+
test("projectTargetColumnsOntoMicrobatch preserves the microbatch schema order") {
826+
val batch = microbatchOf(microbatchWithCdcMetadataSchema)(
827+
Row(1, "alice", 30, Row(null, 10L))
828+
)
829+
830+
val processor = Scd1BatchProcessor(
831+
changeArgs = ChangeArgs(
832+
keys = Seq(UnqualifiedColumnName("id")),
833+
sequencing = F.col("seq"),
834+
storedAsScdType = ScdType.Type1,
835+
// User specifies (age, id) -- intentionally different from the schema order (id, age).
836+
columnSelection = Some(ColumnSelection.IncludeColumns(
837+
Seq(UnqualifiedColumnName("age"), UnqualifiedColumnName("id"))
838+
))
839+
),
840+
resolvedSequencingType = LongType
841+
)
842+
843+
val result = processor.projectTargetColumnsOntoMicrobatch(batch)
844+
845+
// Output column order follows the original microbatch schema (id before age), not the order
846+
// in which the user listed columns in IncludeColumns. The CDC metadata column is appended
847+
// last as always.
848+
assert(result.schema.fieldNames.toSeq ==
849+
Seq("id", "age", Scd1BatchProcessor.cdcMetadataColName))
850+
851+
checkAnswer(
852+
df = result,
853+
expectedAnswer = Row(1, 30, Row(null, 10L))
854+
)
855+
}
856+
857+
test("projectTargetColumnsOntoMicrobatch handles backticked column names containing a " +
858+
"literal dot") {
859+
val schema = new StructType()
860+
.add("id", IntegerType)
861+
// Even if a column is created with backticks via DDL, those backticks are consumed by Spark
862+
// before resolving the schema; they won't show up in the schema field.
863+
.add("user.id", StringType)
864+
.add(
865+
Scd1BatchProcessor.cdcMetadataColName,
866+
new StructType()
867+
.add(Scd1BatchProcessor.cdcDeleteSequenceFieldName, LongType)
868+
.add(Scd1BatchProcessor.cdcUpsertSequenceFieldName, LongType))
869+
870+
val batch = microbatchOf(schema)(
871+
Row(1, "u-100", Row(null, 10L))
872+
)
873+
874+
val processor = Scd1BatchProcessor(
875+
changeArgs = ChangeArgs(
876+
keys = Seq(UnqualifiedColumnName("id")),
877+
sequencing = F.col("seq"),
878+
storedAsScdType = ScdType.Type1,
879+
columnSelection = Some(
880+
ColumnSelection.IncludeColumns(
881+
Seq(
882+
UnqualifiedColumnName("id"),
883+
UnqualifiedColumnName("`user.id`")
884+
)
885+
)
886+
)
887+
),
888+
resolvedSequencingType = LongType
889+
)
890+
891+
val result = processor.projectTargetColumnsOntoMicrobatch(batch)
892+
893+
assert(result.schema.fieldNames.toSeq ==
894+
Seq("id", "user.id", Scd1BatchProcessor.cdcMetadataColName))
895+
checkAnswer(
896+
df = result,
897+
expectedAnswer = Row(1, "u-100", Row(null, 10L))
898+
)
899+
}
900+
901+
test("projectTargetColumnsOntoMicrobatch resolves columnSelection case-insensitively " +
902+
"when SQLConf.CASE_SENSITIVE=false") {
903+
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
904+
val batch = microbatchOf(microbatchWithCdcMetadataSchema)(
905+
Row(1, "alice", 30, Row(null, 10L))
906+
)
907+
908+
val processor = Scd1BatchProcessor(
909+
changeArgs = ChangeArgs(
910+
keys = Seq(UnqualifiedColumnName("id")),
911+
sequencing = F.col("seq"),
912+
storedAsScdType = ScdType.Type1,
913+
// User columns intentionally use a different case than the schema (id, age).
914+
columnSelection = Some(
915+
ColumnSelection.IncludeColumns(
916+
Seq(UnqualifiedColumnName("ID"), UnqualifiedColumnName("AGE"))
917+
)
918+
)
919+
),
920+
resolvedSequencingType = LongType
921+
)
922+
923+
val result = processor.projectTargetColumnsOntoMicrobatch(batch)
924+
925+
// Output column names follow the microbatch schema's casing, not the casing in the user's
926+
// columnSelection. The CDC metadata column is appended last as always.
927+
assert(result.schema.fieldNames.toSeq ==
928+
Seq("id", "age", Scd1BatchProcessor.cdcMetadataColName))
929+
checkAnswer(
930+
df = result,
931+
expectedAnswer = Row(1, 30, Row(null, 10L))
932+
)
933+
}
934+
}
718935
}

0 commit comments

Comments
 (0)