@@ -455,7 +455,8 @@ public void testSpannerChangeStreamsToBigQueryAddColumn() throws Exception {
455455 .addParameter ("spannerChangeStreamName" , testName + "_stream" )
456456 .addParameter ("bigQueryDataset" , bigQueryResourceManager .getDatasetId ())
457457 .addParameter ("rpcPriority" , "HIGH" )
458- .addParameter ("dlqRetryMinutes" , "3" )));
458+ .addParameter ("dlqRetryMinutes" , "3" )
459+ .addParameter ("usePublicIps" , "false" )));
459460
460461 assertThatPipeline (launchInfo ).isRunning ();
461462
@@ -509,72 +510,6 @@ public void testSpannerChangeStreamsToBigQueryAddColumn() throws Exception {
509510 }
510511 }
511512
512- public static int nextValue () {
513- return counter .getAndIncrement ();
514- }
515-
516- private String queryCdcTable (String cdcTable , int key ) {
517- return "SELECT * FROM `"
518- + bigQueryResourceManager .getDatasetId ()
519- + "."
520- + cdcTable
521- + "`"
522- + String .format (" WHERE Id = %d" , key );
523- }
524-
525- @ NotNull
526- private Supplier <Boolean > dataShownUp (String query , int minRows ) {
527- return () -> {
528- try {
529- return bigQueryResourceManager .runQuery (query ).getTotalRows () >= minRows ;
530- } catch (Exception e ) {
531- if (ExceptionUtils .containsMessage (e , "Not found: Table" )) {
532- return false ;
533- } else {
534- throw e ;
535- }
536- }
537- };
538- }
539-
540- private void waitForQueryToReturnRows (String query , int resultsRequired , boolean cancelOnceDone )
541- throws IOException {
542- Config config = createConfig (launchInfo );
543- Result result =
544- cancelOnceDone
545- ? pipelineOperator ()
546- .waitForConditionAndCancel (config , dataShownUp (query , resultsRequired ))
547- : pipelineOperator ().waitForCondition (config , dataShownUp (query , resultsRequired ));
548- assertThatResult (result ).meetsConditions ();
549- }
550-
551- public void addEmptyColumn (String newColumnName , String tableId ) {
552- try {
553-
554- Table table = bigQueryResourceManager .getTableIfExists (tableId );
555- Schema schema = table .getDefinition ().getSchema ();
556- FieldList fields = schema .getFields ();
557-
558- // Create the new field/column
559- Field newField = Field .of (newColumnName , LegacySQLTypeName .STRING );
560-
561- // Create a new schema adding the current fields, plus the new one
562- List <Field > fieldList = new ArrayList <Field >();
563- fields .forEach (fieldList ::add );
564- fieldList .add (newField );
565- Schema newSchema = Schema .of (fieldList );
566-
567- // Update the table with the new schema
568- Table updatedTable =
569- table .toBuilder ().setDefinition (StandardTableDefinition .of (newSchema )).build ();
570- updatedTable .update ();
571- } catch (BigQueryException e ) {
572- LOG .info (
573- "Caught exception when trying to add a new column to bigquery changelog table. \n "
574- + e .toString ());
575- }
576- }
577-
578513 @ Test
579514 public void testSpannerChangeStreamsToBigQueryGoogleSqlUuid () throws IOException {
580515 String spannerTable = testName + RandomStringUtils .randomAlphanumeric (1 , 5 );
@@ -609,7 +544,8 @@ public void testSpannerChangeStreamsToBigQueryGoogleSqlUuid() throws IOException
609544 .addParameter ("spannerChangeStreamName" , testName + "_stream" )
610545 .addParameter ("bigQueryDataset" , bigQueryResourceManager .getDatasetId ())
611546 .addParameter ("rpcPriority" , "HIGH" )
612- .addParameter ("dlqRetryMinutes" , "3" )));
547+ .addParameter ("dlqRetryMinutes" , "3" )
548+ .addParameter ("usePublicIps" , "false" )));
613549
614550 assertThatPipeline (launchInfo ).isRunning ();
615551
@@ -704,4 +640,173 @@ public void testSpannerChangeStreamsToBigQueryGoogleSqlUuid() throws IOException
704640 row3 .get ("GsqlUuidArray" ).isNull ()
705641 || row3 .get ("GsqlUuidArray" ).getRepeatedValue ().isEmpty ());
706642 }
643+
644+ @ Test
645+ public void testSpannerChangeStreamsToBigQueryUuidPk () throws IOException {
646+ String spannerTable = testName + RandomStringUtils .randomAlphanumeric (1 , 5 );
647+ String createTableStatement =
648+ String .format (
649+ "CREATE TABLE %s ("
650+ + " Id UUID NOT NULL,"
651+ + " Value String(1024)"
652+ + ") PRIMARY KEY(Id)" ,
653+ spannerTable );
654+ spannerResourceManager .executeDdlStatement (createTableStatement );
655+ String cdcTable = spannerTable + "_changelog" ;
656+
657+ String createChangeStreamStatement =
658+ String .format ("CREATE CHANGE STREAM %s_stream FOR %s" , testName , spannerTable );
659+ spannerResourceManager .executeDdlStatement (createChangeStreamStatement );
660+ bigQueryResourceManager .createDataset (REGION );
661+
662+ Function <LaunchConfig .Builder , LaunchConfig .Builder > paramsAdder = Function .identity ();
663+
664+ launchInfo =
665+ launchTemplate (
666+ paramsAdder .apply (
667+ LaunchConfig .builder (testName , specPath )
668+ .addParameter ("spannerProjectId" , PROJECT )
669+ .addParameter ("spannerInstanceId" , spannerResourceManager .getInstanceId ())
670+ .addParameter ("spannerDatabase" , spannerResourceManager .getDatabaseId ())
671+ .addParameter (
672+ "spannerMetadataInstanceId" , spannerResourceManager .getInstanceId ())
673+ .addParameter ("spannerMetadataDatabase" , spannerResourceManager .getDatabaseId ())
674+ .addParameter ("spannerChangeStreamName" , testName + "_stream" )
675+ .addParameter ("bigQueryDataset" , bigQueryResourceManager .getDatasetId ())
676+ .addParameter ("rpcPriority" , "HIGH" )
677+ .addParameter ("dlqRetryMinutes" , "3" )
678+ .addParameter ("usePublicIps" , "false" )));
679+
680+ assertThatPipeline (launchInfo ).isRunning ();
681+
682+ // Insert
683+ String uuidPk1 = UUID .randomUUID ().toString ();
684+ String value1 = "Value A" ;
685+ Mutation insert1 =
686+ Mutation .newInsertBuilder (spannerTable )
687+ .set ("Id" )
688+ .to (uuidPk1 )
689+ .set ("Value" )
690+ .to (value1 )
691+ .build ();
692+ spannerResourceManager .write (Collections .singletonList (insert1 ));
693+
694+ String query1 =
695+ "SELECT * FROM `"
696+ + bigQueryResourceManager .getDatasetId ()
697+ + "."
698+ + cdcTable
699+ + "` WHERE Id = \" "
700+ + uuidPk1
701+ + "\" " ;
702+ waitForQueryToReturnRows (query1 , 1 , false );
703+ TableResult result1 = bigQueryResourceManager .runQuery (query1 );
704+ assertEquals (1 , result1 .getTotalRows ());
705+ FieldValueList row1 = result1 .iterateAll ().iterator ().next ();
706+ assertEquals (uuidPk1 , row1 .get ("Id" ).getStringValue ());
707+ assertEquals (value1 , row1 .get ("Value" ).getStringValue ());
708+
709+ // Update
710+ String value1Updated = "Value B" ;
711+ Mutation update1 =
712+ Mutation .newUpdateBuilder (spannerTable )
713+ .set ("Id" )
714+ .to (uuidPk1 )
715+ .set ("Value" )
716+ .to (value1Updated )
717+ .build ();
718+ spannerResourceManager .write (Collections .singletonList (update1 ));
719+ waitForQueryToReturnRows (query1 , 2 , false ); // Expecting a second row for the update
720+ TableResult result1Updated =
721+ bigQueryResourceManager .runQuery (
722+ query1 + " ORDER BY _metadata_spanner_commit_timestamp DESC LIMIT 1" );
723+ assertEquals (1 , result1Updated .getTotalRows ());
724+ FieldValueList row1Updated = result1Updated .iterateAll ().iterator ().next ();
725+ assertEquals (value1Updated , row1Updated .get ("Value" ).getStringValue ());
726+
727+ // Delete
728+ Mutation delete1 = Mutation .delete (spannerTable , Key .of (uuidPk1 ));
729+ spannerResourceManager .write (Collections .singletonList (delete1 ));
730+ waitForQueryToReturnRows (query1 , 3 , false ); // Expecting a third row for the delete
731+ TableResult result1Deleted =
732+ bigQueryResourceManager .runQuery (
733+ query1 + " ORDER BY _metadata_spanner_commit_timestamp DESC LIMIT 1" );
734+ assertEquals (1 , result1Deleted .getTotalRows ());
735+ FieldValueList row1Deleted = result1Deleted .iterateAll ().iterator ().next ();
736+ assertTrue (row1Deleted .get ("Value" ).isNull ());
737+ assertEquals ("DELETE" , row1Deleted .get ("_metadata_spanner_mod_type" ).getStringValue ());
738+
739+ // Verify BQ Schema
740+ Table bqTable = bigQueryResourceManager .getTableIfExists (cdcTable );
741+ Schema bqSchema = bqTable .getDefinition ().getSchema ();
742+ Field idField = bqSchema .getFields ().get ("Id" );
743+ assertEquals (LegacySQLTypeName .STRING , idField .getType ());
744+ assertEquals (Field .Mode .NULLABLE , idField .getMode ()); // Primary Keys are non-nullable
745+ }
746+
747+ public static int nextValue () {
748+ return counter .getAndIncrement ();
749+ }
750+
751+ private String queryCdcTable (String cdcTable , int key ) {
752+ return "SELECT * FROM `"
753+ + bigQueryResourceManager .getDatasetId ()
754+ + "."
755+ + cdcTable
756+ + "`"
757+ + String .format (" WHERE Id = %d" , key );
758+ }
759+
760+ @ NotNull
761+ private Supplier <Boolean > dataShownUp (String query , int minRows ) {
762+ return () -> {
763+ try {
764+ return bigQueryResourceManager .runQuery (query ).getTotalRows () >= minRows ;
765+ } catch (Exception e ) {
766+ if (ExceptionUtils .containsMessage (e , "Not found: Table" )) {
767+ return false ;
768+ } else {
769+ throw e ;
770+ }
771+ }
772+ };
773+ }
774+
775+ private void waitForQueryToReturnRows (String query , int resultsRequired , boolean cancelOnceDone )
776+ throws IOException {
777+ Config config = createConfig (launchInfo );
778+ Result result =
779+ cancelOnceDone
780+ ? pipelineOperator ()
781+ .waitForConditionAndCancel (config , dataShownUp (query , resultsRequired ))
782+ : pipelineOperator ().waitForCondition (config , dataShownUp (query , resultsRequired ));
783+ assertThatResult (result ).meetsConditions ();
784+ }
785+
786+ public void addEmptyColumn (String newColumnName , String tableId ) {
787+ try {
788+
789+ Table table = bigQueryResourceManager .getTableIfExists (tableId );
790+ Schema schema = table .getDefinition ().getSchema ();
791+ FieldList fields = schema .getFields ();
792+
793+ // Create the new field/column
794+ Field newField = Field .of (newColumnName , LegacySQLTypeName .STRING );
795+
796+ // Create a new schema adding the current fields, plus the new one
797+ List <Field > fieldList = new ArrayList <Field >();
798+ fields .forEach (fieldList ::add );
799+ fieldList .add (newField );
800+ Schema newSchema = Schema .of (fieldList );
801+
802+ // Update the table with the new schema
803+ Table updatedTable =
804+ table .toBuilder ().setDefinition (StandardTableDefinition .of (newSchema )).build ();
805+ updatedTable .update ();
806+ } catch (BigQueryException e ) {
807+ LOG .info (
808+ "Caught exception when trying to add a new column to bigquery changelog table. \n "
809+ + e .toString ());
810+ }
811+ }
707812}
0 commit comments