@@ -27,6 +27,20 @@ import org.apache.spark.sql.types._
2727
2828class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession {
2929
30+ /**
31+ * Test Schema for a microbatch that already has the SCD1 CDC metadata column projected.
32+ */
33+ private val microbatchWithCdcMetadataSchema : StructType = new StructType ()
34+ .add(" id" , IntegerType )
35+ .add(" name" , StringType )
36+ .add(" age" , IntegerType )
37+ .add(
38+ Scd1BatchProcessor .cdcMetadataColName,
39+ new StructType ()
40+ .add(Scd1BatchProcessor .cdcDeleteSequenceFieldName, LongType )
41+ .add(Scd1BatchProcessor .cdcUpsertSequenceFieldName, LongType )
42+ )
43+
3044 /** Build a microbatch [[DataFrame ]] from explicit rows and an explicit schema. */
3145 private def microbatchOf (schema : StructType )(rows : Row * ): DataFrame =
3246 spark.createDataFrame(spark.sparkContext.parallelize(rows), schema)
@@ -715,4 +729,207 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession {
715729 )
716730 }
717731 }
732+
733+ test(" projectTargetColumnsOntoMicrobatch keeps every user column and the CDC metadata column " +
734+ " when columnSelection is None" ) {
735+ val batch = microbatchOf(microbatchWithCdcMetadataSchema)(
736+ Row (1 , " alice" , 30 , Row (null , 10L )),
737+ Row (2 , " bob" , 25 , Row (20L , null ))
738+ )
739+
740+ val processor = Scd1BatchProcessor (
741+ changeArgs = ChangeArgs (
742+ keys = Seq (UnqualifiedColumnName (" id" )),
743+ sequencing = F .col(" seq" ),
744+ storedAsScdType = ScdType .Type1 ,
745+ columnSelection = None
746+ ),
747+ resolvedSequencingType = LongType
748+ )
749+
750+ val result = processor.projectTargetColumnsOntoMicrobatch(batch)
751+
752+ // None selection is no-op on the user columns, and the CDC metadata column is unconditionally
753+ // re-projected last, so the output shape exactly matches the input.
754+ assert(result.schema.fieldNames.toSeq == microbatchWithCdcMetadataSchema.fieldNames.toSeq)
755+ checkAnswer(
756+ df = result,
757+ expectedAnswer = Seq (
758+ Row (1 , " alice" , 30 , Row (null , 10L )),
759+ Row (2 , " bob" , 25 , Row (20L , null ))
760+ )
761+ )
762+ }
763+
764+ test(" projectTargetColumnsOntoMicrobatch retains the CDC metadata column even when " +
765+ " IncludeColumns does not contain it" ) {
766+ val batch = microbatchOf(microbatchWithCdcMetadataSchema)(
767+ Row (1 , " alice" , 30 , Row (null , 10L ))
768+ )
769+
770+ val processor = Scd1BatchProcessor (
771+ changeArgs = ChangeArgs (
772+ keys = Seq (UnqualifiedColumnName (" id" )),
773+ sequencing = F .col(" seq" ),
774+ storedAsScdType = ScdType .Type1 ,
775+ columnSelection = Some (
776+ ColumnSelection .IncludeColumns (
777+ Seq (UnqualifiedColumnName (" id" ), UnqualifiedColumnName (" age" ))
778+ )
779+ )
780+ ),
781+ resolvedSequencingType = LongType
782+ )
783+
784+ val result = processor.projectTargetColumnsOntoMicrobatch(batch)
785+
786+ assert(result.schema.fieldNames.toSeq ==
787+ Seq (" id" , " age" , Scd1BatchProcessor .cdcMetadataColName))
788+ checkAnswer(
789+ df = result,
790+ expectedAnswer = Row (1 , 30 , Row (null , 10L ))
791+ )
792+ }
793+
794+ test(" projectTargetColumnsOntoMicrobatch respects exclude column" ) {
795+ val batch = microbatchOf(microbatchWithCdcMetadataSchema)(
796+ Row (1 , " alice" , 30 , Row (null , 10L ))
797+ )
798+
799+ val processor = Scd1BatchProcessor (
800+ changeArgs = ChangeArgs (
801+ keys = Seq (UnqualifiedColumnName (" id" )),
802+ sequencing = F .col(" seq" ),
803+ storedAsScdType = ScdType .Type1 ,
804+ columnSelection = Some (
805+ ColumnSelection .ExcludeColumns (
806+ Seq (UnqualifiedColumnName (" age" ))
807+ )
808+ )
809+ ),
810+ resolvedSequencingType = LongType
811+ )
812+
813+ val result = processor.projectTargetColumnsOntoMicrobatch(batch)
814+
815+ assert(
816+ result.schema.fieldNames.toSeq ==
817+ Seq (" id" , " name" , Scd1BatchProcessor .cdcMetadataColName)
818+ )
819+ checkAnswer(
820+ df = result,
821+ expectedAnswer = Row (1 , " alice" , Row (null , 10L ))
822+ )
823+ }
824+
825+ test(" projectTargetColumnsOntoMicrobatch preserves the microbatch schema order" ) {
826+ val batch = microbatchOf(microbatchWithCdcMetadataSchema)(
827+ Row (1 , " alice" , 30 , Row (null , 10L ))
828+ )
829+
830+ val processor = Scd1BatchProcessor (
831+ changeArgs = ChangeArgs (
832+ keys = Seq (UnqualifiedColumnName (" id" )),
833+ sequencing = F .col(" seq" ),
834+ storedAsScdType = ScdType .Type1 ,
835+ // User specifies (age, id) -- intentionally different from the schema order (id, age).
836+ columnSelection = Some (ColumnSelection .IncludeColumns (
837+ Seq (UnqualifiedColumnName (" age" ), UnqualifiedColumnName (" id" ))
838+ ))
839+ ),
840+ resolvedSequencingType = LongType
841+ )
842+
843+ val result = processor.projectTargetColumnsOntoMicrobatch(batch)
844+
845+ // Output column order follows the original microbatch schema (id before age), not the order
846+ // in which the user listed columns in IncludeColumns. The CDC metadata column is appended
847+ // last as always.
848+ assert(result.schema.fieldNames.toSeq ==
849+ Seq (" id" , " age" , Scd1BatchProcessor .cdcMetadataColName))
850+
851+ checkAnswer(
852+ df = result,
853+ expectedAnswer = Row (1 , 30 , Row (null , 10L ))
854+ )
855+ }
856+
857+ test(" projectTargetColumnsOntoMicrobatch handles backticked column names containing a " +
858+ " literal dot" ) {
859+ val schema = new StructType ()
860+ .add(" id" , IntegerType )
861+ // Even if a column is created with backticks via DDL, those backticks are consumed by Spark
862+ // before resolving the schema; they won't show up in the schema field.
863+ .add(" user.id" , StringType )
864+ .add(
865+ Scd1BatchProcessor .cdcMetadataColName,
866+ new StructType ()
867+ .add(Scd1BatchProcessor .cdcDeleteSequenceFieldName, LongType )
868+ .add(Scd1BatchProcessor .cdcUpsertSequenceFieldName, LongType ))
869+
870+ val batch = microbatchOf(schema)(
871+ Row (1 , " u-100" , Row (null , 10L ))
872+ )
873+
874+ val processor = Scd1BatchProcessor (
875+ changeArgs = ChangeArgs (
876+ keys = Seq (UnqualifiedColumnName (" id" )),
877+ sequencing = F .col(" seq" ),
878+ storedAsScdType = ScdType .Type1 ,
879+ columnSelection = Some (
880+ ColumnSelection .IncludeColumns (
881+ Seq (
882+ UnqualifiedColumnName (" id" ),
883+ UnqualifiedColumnName (" `user.id`" )
884+ )
885+ )
886+ )
887+ ),
888+ resolvedSequencingType = LongType
889+ )
890+
891+ val result = processor.projectTargetColumnsOntoMicrobatch(batch)
892+
893+ assert(result.schema.fieldNames.toSeq ==
894+ Seq (" id" , " user.id" , Scd1BatchProcessor .cdcMetadataColName))
895+ checkAnswer(
896+ df = result,
897+ expectedAnswer = Row (1 , " u-100" , Row (null , 10L ))
898+ )
899+ }
900+
901+ test(" projectTargetColumnsOntoMicrobatch resolves columnSelection case-insensitively " +
902+ " when SQLConf.CASE_SENSITIVE=false" ) {
903+ withSQLConf(SQLConf .CASE_SENSITIVE .key -> " false" ) {
904+ val batch = microbatchOf(microbatchWithCdcMetadataSchema)(
905+ Row (1 , " alice" , 30 , Row (null , 10L ))
906+ )
907+
908+ val processor = Scd1BatchProcessor (
909+ changeArgs = ChangeArgs (
910+ keys = Seq (UnqualifiedColumnName (" id" )),
911+ sequencing = F .col(" seq" ),
912+ storedAsScdType = ScdType .Type1 ,
913+ // User columns intentionally use a different case than the schema (id, age).
914+ columnSelection = Some (
915+ ColumnSelection .IncludeColumns (
916+ Seq (UnqualifiedColumnName (" ID" ), UnqualifiedColumnName (" AGE" ))
917+ )
918+ )
919+ ),
920+ resolvedSequencingType = LongType
921+ )
922+
923+ val result = processor.projectTargetColumnsOntoMicrobatch(batch)
924+
925+ // Output column names follow the microbatch schema's casing, not the casing in the user's
926+ // columnSelection. The CDC metadata column is appended last as always.
927+ assert(result.schema.fieldNames.toSeq ==
928+ Seq (" id" , " age" , Scd1BatchProcessor .cdcMetadataColName))
929+ checkAnswer(
930+ df = result,
931+ expectedAnswer = Row (1 , 30 , Row (null , 10L ))
932+ )
933+ }
934+ }
718935}
0 commit comments