Skip to content

Commit dec7426

Browse files
committed
project target columns onto microbatch
1 parent 9a566ff commit dec7426

2 files changed

Lines changed: 232 additions & 0 deletions

File tree

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,56 @@ case class Scd1BatchProcessor(
9898
)
9999
}
100100

101+
/**
102+
* Project the user-defined column selection onto the microbatch. By this point the input
103+
* microbatch should already have projected its CDC metadata, because it's possible that the
104+
* user-defined column selection drops columns that are otherwise necessary to compute the
105+
* CDC metadata.
106+
*
107+
* Returned dataframe's schema is: all of the user-selected columns in the input dataframe as per
108+
* [[ChangeArgs.columnSelection]] + the CDC metadata column.
109+
*/
110+
def projectTargetColumnsOntoMicrobatch(microbatchWithCdcMetadataDf: DataFrame): DataFrame = {
111+
val ignoreColumnNameCase =
112+
!microbatchWithCdcMetadataDf.sparkSession.sessionState.conf.caseSensitiveAnalysis
113+
114+
// The schema of the microbatch less the system-projected CDC metadata column, i.e. the
115+
// original microbatch schema.
116+
val userColumnsInMicrobatchSchema =
117+
StructType(
118+
microbatchWithCdcMetadataDf.schema.fields.filterNot { field =>
119+
if (ignoreColumnNameCase) {
120+
field.name.equalsIgnoreCase(Scd1BatchProcessor.cdcMetadataColName)
121+
} else {
122+
field.name.equals(Scd1BatchProcessor.cdcMetadataColName)
123+
}
124+
}
125+
)
126+
127+
val userSelectedColumnsInMicrobatchSchema =
128+
ColumnSelection.applyToSchema(
129+
schemaName = "microbatch",
130+
schema = userColumnsInMicrobatchSchema,
131+
columnSelection = changeArgs.columnSelection,
132+
ignoreCase = ignoreColumnNameCase
133+
)
134+
135+
// In addition to the explicit user-selected columns, re-project the operational CDC metadata
136+
// column as the last column.
137+
val finalColumnsInMicrobatchToSelect =
138+
userSelectedColumnsInMicrobatchSchema.fieldNames.map(colName => {
139+
// Spark drops backticks in the schema, quote all identifiers for safety before executing
140+
// select. Identifiers could have special characters such as '.'.
141+
F.col(QuotingUtils.quoteIdentifier(colName))
142+
}) :+ F.col(
143+
Scd1BatchProcessor.cdcMetadataColName
144+
)
145+
146+
microbatchWithCdcMetadataDf.select(
147+
finalColumnsInMicrobatchToSelect.toImmutableArraySeq: _*
148+
)
149+
}
150+
101151
private def validateCdcMetadataColumnNotPresent(microbatchDf: DataFrame): Unit = {
102152
val sqlConf = microbatchDf.sparkSession.sessionState.conf
103153
val resolver = sqlConf.resolver

sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,20 @@ import org.apache.spark.sql.types._
2626

2727
class Scd1BatchProcessorSuite extends SparkFunSuite with SharedSparkSession {
2828

29+
/**
30+
* Test Schema for a microbatch that already has the SCD1 CDC metadata column projected.
31+
*/
32+
private val microbatchWithCdcMetadataSchema: StructType = new StructType()
33+
.add("id", IntegerType)
34+
.add("name", StringType)
35+
.add("age", IntegerType)
36+
.add(
37+
Scd1BatchProcessor.cdcMetadataColName,
38+
new StructType()
39+
.add(Scd1BatchProcessor.cdcDeleteSequenceFieldName, LongType)
40+
.add(Scd1BatchProcessor.cdcUpsertSequenceFieldName, LongType)
41+
)
42+
2943
/** Build a microbatch [[DataFrame]] from explicit rows and an explicit schema. */
3044
private def microbatchOf(schema: StructType)(rows: Row*): DataFrame =
3145
spark.createDataFrame(spark.sparkContext.parallelize(rows), schema)
@@ -440,4 +454,172 @@ class Scd1BatchProcessorSuite extends SparkFunSuite with SharedSparkSession {
440454
)
441455
}
442456
}
457+
458+
test("projectTargetColumnsOntoMicrobatch keeps every user column and the CDC metadata column " +
459+
"when columnSelection is None") {
460+
val batch = microbatchOf(microbatchWithCdcMetadataSchema)(
461+
Row(1, "alice", 30, Row(null, 10L)),
462+
Row(2, "bob", 25, Row(20L, null))
463+
)
464+
465+
val processor = Scd1BatchProcessor(
466+
changeArgs = ChangeArgs(
467+
keys = Seq(UnqualifiedColumnName("id")),
468+
sequencing = F.col("seq"),
469+
storedAsScdType = ScdType.Type1,
470+
columnSelection = None
471+
),
472+
resolvedSequencingType = LongType
473+
)
474+
475+
val result = processor.projectTargetColumnsOntoMicrobatch(batch)
476+
477+
// None selection is no-op on the user columns, and the CDC metadata column is unconditionally
478+
// re-projected last, so the output shape exactly matches the input.
479+
assert(result.schema.fieldNames.toSeq == microbatchWithCdcMetadataSchema.fieldNames.toSeq)
480+
checkAnswer(
481+
df = result,
482+
expectedAnswer = Seq(
483+
Row(1, "alice", 30, Row(null, 10L)),
484+
Row(2, "bob", 25, Row(20L, null))
485+
)
486+
)
487+
}
488+
489+
test("projectTargetColumnsOntoMicrobatch retains the CDC metadata column even when " +
490+
"IncludeColumns does not contain it") {
491+
val batch = microbatchOf(microbatchWithCdcMetadataSchema)(
492+
Row(1, "alice", 30, Row(null, 10L))
493+
)
494+
495+
val processor = Scd1BatchProcessor(
496+
changeArgs = ChangeArgs(
497+
keys = Seq(UnqualifiedColumnName("id")),
498+
sequencing = F.col("seq"),
499+
storedAsScdType = ScdType.Type1,
500+
columnSelection = Some(
501+
ColumnSelection.IncludeColumns(
502+
Seq(UnqualifiedColumnName("id"), UnqualifiedColumnName("age"))
503+
)
504+
)
505+
),
506+
resolvedSequencingType = LongType
507+
)
508+
509+
val result = processor.projectTargetColumnsOntoMicrobatch(batch)
510+
511+
assert(result.schema.fieldNames.toSeq ==
512+
Seq("id", "age", Scd1BatchProcessor.cdcMetadataColName))
513+
checkAnswer(
514+
df = result,
515+
expectedAnswer = Row(1, 30, Row(null, 10L))
516+
)
517+
}
518+
519+
test("projectTargetColumnsOntoMicrobatch respects exclude column") {
520+
val batch = microbatchOf(microbatchWithCdcMetadataSchema)(
521+
Row(1, "alice", 30, Row(null, 10L))
522+
)
523+
524+
val processor = Scd1BatchProcessor(
525+
changeArgs = ChangeArgs(
526+
keys = Seq(UnqualifiedColumnName("id")),
527+
sequencing = F.col("seq"),
528+
storedAsScdType = ScdType.Type1,
529+
columnSelection = Some(
530+
ColumnSelection.ExcludeColumns(
531+
Seq(UnqualifiedColumnName("age"))
532+
)
533+
)
534+
),
535+
resolvedSequencingType = LongType
536+
)
537+
538+
val result = processor.projectTargetColumnsOntoMicrobatch(batch)
539+
540+
assert(
541+
result.schema.fieldNames.toSeq ==
542+
Seq("id", "name", Scd1BatchProcessor.cdcMetadataColName)
543+
)
544+
checkAnswer(
545+
df = result,
546+
expectedAnswer = Row(1, "alice", Row(null, 10L))
547+
)
548+
}
549+
550+
test("projectTargetColumnsOntoMicrobatch preserves the microbatch schema order") {
551+
val batch = microbatchOf(microbatchWithCdcMetadataSchema)(
552+
Row(1, "alice", 30, Row(null, 10L))
553+
)
554+
555+
val processor = Scd1BatchProcessor(
556+
changeArgs = ChangeArgs(
557+
keys = Seq(UnqualifiedColumnName("id")),
558+
sequencing = F.col("seq"),
559+
storedAsScdType = ScdType.Type1,
560+
// User specifies (age, id) -- intentionally different from the schema order (id, age).
561+
columnSelection = Some(ColumnSelection.IncludeColumns(
562+
Seq(UnqualifiedColumnName("age"), UnqualifiedColumnName("id"))
563+
))
564+
),
565+
resolvedSequencingType = LongType
566+
)
567+
568+
val result = processor.projectTargetColumnsOntoMicrobatch(batch)
569+
570+
// Output column order follows the original microbatch schema (id before age), not the order
571+
// in which the user listed columns in IncludeColumns. The CDC metadata column is appended
572+
// last as always.
573+
assert(result.schema.fieldNames.toSeq ==
574+
Seq("id", "age", Scd1BatchProcessor.cdcMetadataColName))
575+
576+
checkAnswer(
577+
df = result,
578+
expectedAnswer = Row(1, 30, Row(null, 10L))
579+
)
580+
}
581+
582+
test("projectTargetColumnsOntoMicrobatch handles backticked column names containing a " +
583+
"literal dot") {
584+
val schema = new StructType()
585+
.add("id", IntegerType)
586+
// Even if a column is created with backticks via DDL, those backticks are consumed by Spark
587+
// before resolving the schema; they won't show up in the schema field.
588+
.add("user.id", StringType)
589+
.add(
590+
Scd1BatchProcessor.cdcMetadataColName,
591+
new StructType()
592+
.add(Scd1BatchProcessor.cdcDeleteSequenceFieldName, LongType)
593+
.add(Scd1BatchProcessor.cdcUpsertSequenceFieldName, LongType))
594+
595+
val batch = microbatchOf(schema)(
596+
Row(1, "u-100", Row(null, 10L))
597+
)
598+
599+
val processor = Scd1BatchProcessor(
600+
changeArgs = ChangeArgs(
601+
keys = Seq(UnqualifiedColumnName("id")),
602+
sequencing = F.col("seq"),
603+
storedAsScdType = ScdType.Type1,
604+
columnSelection = Some(
605+
ColumnSelection.IncludeColumns(
606+
Seq(
607+
UnqualifiedColumnName("id"),
608+
UnqualifiedColumnName("`user.id`")
609+
)
610+
)
611+
)
612+
),
613+
resolvedSequencingType = LongType
614+
)
615+
616+
val result = processor.projectTargetColumnsOntoMicrobatch(batch)
617+
618+
assert(result.schema.fieldNames.toSeq ==
619+
Seq("id", "user.id", Scd1BatchProcessor.cdcMetadataColName))
620+
checkAnswer(
621+
df = result,
622+
expectedAnswer = Row(1, "u-100", Row(null, 10L))
623+
)
624+
}
443625
}

0 commit comments

Comments
 (0)