@@ -146,7 +146,8 @@ public void testSpannerChangeStreamsToBigQueryBasic() throws IOException {
146146 .addParameter ("spannerChangeStreamName" , testName + "_stream" )
147147 .addParameter ("bigQueryDataset" , bigQueryResourceManager .getDatasetId ())
148148 .addParameter ("rpcPriority" , "HIGH" )
149- .addParameter ("dlqRetryMinutes" , "3" )));
149+ .addParameter ("dlqRetryMinutes" , "3" )
150+ .addParameter ("usePublicIps" , "false" )));
150151
151152 assertThatPipeline (launchInfo ).isRunning ();
152153
@@ -223,7 +224,8 @@ public void testSpannerChangeStreamsToBigQueryBasicWriteApiExactlyOnce() throws
223224 .addParameter ("useStorageWriteApiAtLeastOnce" , "false" )
224225 .addParameter ("numStorageWriteApiStreams" , "1" )
225226 .addParameter ("storageWriteApiTriggeringFrequencySec" , "10" )
226- .addParameter ("dlqRetryMinutes" , "3" )));
227+ .addParameter ("dlqRetryMinutes" , "3" )
228+ .addParameter ("usePublicIps" , "false" )));
227229
228230 assertThatPipeline (launchInfo ).isRunning ();
229231
@@ -283,7 +285,8 @@ public void testSpannerChangeStreamsToBigQueryFloatColumns() throws IOException
283285 .addParameter ("spannerChangeStreamName" , testName + "_stream" )
284286 .addParameter ("bigQueryDataset" , bigQueryResourceManager .getDatasetId ())
285287 .addParameter ("rpcPriority" , "HIGH" )
286- .addParameter ("dlqRetryMinutes" , "3" )));
288+ .addParameter ("dlqRetryMinutes" , "3" )
289+ .addParameter ("usePublicIps" , "false" )));
287290
288291 assertThatPipeline (launchInfo ).isRunning ();
289292
@@ -350,7 +353,8 @@ public void testSpannerChangeStreamsToBigQueryAddTable() throws Exception {
350353 .addParameter ("spannerChangeStreamName" , testName + "_stream" )
351354 .addParameter ("bigQueryDataset" , bigQueryResourceManager .getDatasetId ())
352355 .addParameter ("rpcPriority" , "HIGH" )
353- .addParameter ("dlqRetryMinutes" , "3" )));
356+ .addParameter ("dlqRetryMinutes" , "3" )
357+ .addParameter ("usePublicIps" , "false" )));
354358
355359 assertThatPipeline (launchInfo ).isRunning ();
356360
@@ -455,7 +459,8 @@ public void testSpannerChangeStreamsToBigQueryAddColumn() throws Exception {
455459 .addParameter ("spannerChangeStreamName" , testName + "_stream" )
456460 .addParameter ("bigQueryDataset" , bigQueryResourceManager .getDatasetId ())
457461 .addParameter ("rpcPriority" , "HIGH" )
458- .addParameter ("dlqRetryMinutes" , "3" )));
462+ .addParameter ("dlqRetryMinutes" , "3" )
463+ .addParameter ("usePublicIps" , "false" )));
459464
460465 assertThatPipeline (launchInfo ).isRunning ();
461466
@@ -609,7 +614,8 @@ public void testSpannerChangeStreamsToBigQueryGoogleSqlUuid() throws IOException
609614 .addParameter ("spannerChangeStreamName" , testName + "_stream" )
610615 .addParameter ("bigQueryDataset" , bigQueryResourceManager .getDatasetId ())
611616 .addParameter ("rpcPriority" , "HIGH" )
612- .addParameter ("dlqRetryMinutes" , "3" )));
617+ .addParameter ("dlqRetryMinutes" , "3" )
618+ .addParameter ("usePublicIps" , "false" )));
613619
614620 assertThatPipeline (launchInfo ).isRunning ();
615621
@@ -704,4 +710,107 @@ public void testSpannerChangeStreamsToBigQueryGoogleSqlUuid() throws IOException
704710 row3 .get ("GsqlUuidArray" ).isNull ()
705711 || row3 .get ("GsqlUuidArray" ).getRepeatedValue ().isEmpty ());
706712 }
713+
714+ @ Test
715+ public void testSpannerChangeStreamsToBigQueryUuidPk () throws IOException {
716+ String spannerTable = testName + RandomStringUtils .randomAlphanumeric (1 , 5 );
717+ String createTableStatement =
718+ String .format (
719+ "CREATE TABLE %s ("
720+ + " Id UUID NOT NULL,"
721+ + " Value String(1024)"
722+ + ") PRIMARY KEY(Id)" ,
723+ spannerTable );
724+ spannerResourceManager .executeDdlStatement (createTableStatement );
725+ String cdcTable = spannerTable + "_changelog" ;
726+
727+ String createChangeStreamStatement =
728+ String .format ("CREATE CHANGE STREAM %s_stream FOR %s" , testName , spannerTable );
729+ spannerResourceManager .executeDdlStatement (createChangeStreamStatement );
730+ bigQueryResourceManager .createDataset (REGION );
731+
732+ Function <LaunchConfig .Builder , LaunchConfig .Builder > paramsAdder = Function .identity ();
733+
734+ launchInfo =
735+ launchTemplate (
736+ paramsAdder .apply (
737+ LaunchConfig .builder (testName , specPath )
738+ .addParameter ("spannerProjectId" , PROJECT )
739+ .addParameter ("spannerInstanceId" , spannerResourceManager .getInstanceId ())
740+ .addParameter ("spannerDatabase" , spannerResourceManager .getDatabaseId ())
741+ .addParameter (
742+ "spannerMetadataInstanceId" , spannerResourceManager .getInstanceId ())
743+ .addParameter ("spannerMetadataDatabase" , spannerResourceManager .getDatabaseId ())
744+ .addParameter ("spannerChangeStreamName" , testName + "_stream" )
745+ .addParameter ("bigQueryDataset" , bigQueryResourceManager .getDatasetId ())
746+ .addParameter ("rpcPriority" , "HIGH" )
747+ .addParameter ("dlqRetryMinutes" , "3" )
748+ .addParameter ("usePublicIps" , "false" )));
749+
750+ assertThatPipeline (launchInfo ).isRunning ();
751+
752+ // Insert
753+ String uuidPk1 = UUID .randomUUID ().toString ();
754+ String value1 = "Value A" ;
755+ Mutation insert1 =
756+ Mutation .newInsertBuilder (spannerTable )
757+ .set ("Id" )
758+ .to (uuidPk1 )
759+ .set ("Value" )
760+ .to (value1 )
761+ .build ();
762+ spannerResourceManager .write (Collections .singletonList (insert1 ));
763+
764+ String query1 =
765+ "SELECT * FROM `"
766+ + bigQueryResourceManager .getDatasetId ()
767+ + "."
768+ + cdcTable
769+ + "` WHERE Id = \" "
770+ + uuidPk1
771+ + "\" " ;
772+ waitForQueryToReturnRows (query1 , 1 , false );
773+ TableResult result1 = bigQueryResourceManager .runQuery (query1 );
774+ assertEquals (1 , result1 .getTotalRows ());
775+ FieldValueList row1 = result1 .iterateAll ().iterator ().next ();
776+ assertEquals (uuidPk1 , row1 .get ("Id" ).getStringValue ());
777+ assertEquals (value1 , row1 .get ("Value" ).getStringValue ());
778+
779+ // Update
780+ String value1Updated = "Value B" ;
781+ Mutation update1 =
782+ Mutation .newUpdateBuilder (spannerTable )
783+ .set ("Id" )
784+ .to (uuidPk1 )
785+ .set ("Value" )
786+ .to (value1Updated )
787+ .build ();
788+ spannerResourceManager .write (Collections .singletonList (update1 ));
789+ waitForQueryToReturnRows (query1 , 2 , false ); // Expecting a second row for the update
790+ TableResult result1Updated =
791+ bigQueryResourceManager .runQuery (
792+ query1 + " ORDER BY _metadata_spanner_commit_timestamp DESC LIMIT 1" );
793+ assertEquals (1 , result1Updated .getTotalRows ());
794+ FieldValueList row1Updated = result1Updated .iterateAll ().iterator ().next ();
795+ assertEquals (value1Updated , row1Updated .get ("Value" ).getStringValue ());
796+
797+ // Delete
798+ Mutation delete1 = Mutation .delete (spannerTable , Key .of (uuidPk1 ));
799+ spannerResourceManager .write (Collections .singletonList (delete1 ));
800+ waitForQueryToReturnRows (query1 , 3 , false ); // Expecting a third row for the delete
801+ TableResult result1Deleted =
802+ bigQueryResourceManager .runQuery (
803+ query1 + " ORDER BY _metadata_spanner_commit_timestamp DESC LIMIT 1" );
804+ assertEquals (1 , result1Deleted .getTotalRows ());
805+ FieldValueList row1Deleted = result1Deleted .iterateAll ().iterator ().next ();
806+ assertTrue (row1Deleted .get ("Value" ).isNull ());
807+ assertEquals ("DELETE" , row1Deleted .get ("_metadata_spanner_mod_type" ).getStringValue ());
808+
809+ // Verify BQ Schema
810+ Table bqTable = bigQueryResourceManager .getTableIfExists (cdcTable );
811+ Schema bqSchema = bqTable .getDefinition ().getSchema ();
812+ Field idField = bqSchema .getFields ().get ("Id" );
813+ assertEquals (LegacySQLTypeName .STRING , idField .getType ());
814+ assertEquals (Field .Mode .NULLABLE , idField .getMode ()); // Primary Keys are non-nullable
815+ }
707816}
0 commit comments