diff --git a/v2/datastream-to-spanner/README_Cloud_Datastream_to_Spanner.md b/v2/datastream-to-spanner/README_Cloud_Datastream_to_Spanner.md index 7088f6bc13..9dbc8fb3e7 100644 --- a/v2/datastream-to-spanner/README_Cloud_Datastream_to_Spanner.md +++ b/v2/datastream-to-spanner/README_Cloud_Datastream_to_Spanner.md @@ -78,7 +78,7 @@ on [Metadata Annotations](https://github.com/GoogleCloudPlatform/DataflowTemplat * **transformationClassName**: Fully qualified class name having the custom transformation logic. It is a mandatory field in case transformationJarPath is specified. Defaults to empty. * **transformationCustomParameters**: String containing any custom parameters to be passed to the custom transformation class. Defaults to empty. * **filteredEventsDirectory**: This is the file path to store the events filtered via custom transformation. Default is a directory under the Dataflow job's temp location. The default value is enough under most conditions. -* **shardingContextFilePath**: Sharding context file path in cloud storage is used to populate the shard id in spanner database for each source shard.It expects a JSON file with the format: {\"StreamToDbAndShardMap\": Map>}. +* **sourceConfigURL**: Cloud Storage path to a shard config file for sharded migrations. It expects a HOCON or JSON file. For a sample file, please refer to v2/datastream-to-spanner/src/test/resources/DatastreamToSpannerSingleDFShardedMigrationIT/sharding-config.conf in the repository. For example, `gs://my-bucket/my-shard-config.conf`. * **tableOverrides**: These are the table name overrides from source to spanner. They are written in thefollowing format: [{SourceTableName1, SpannerTableName1}, {SourceTableName2, SpannerTableName2}]This example shows mapping Singers table to Vocalists and Albums table to Records. For example, `[{Singers, Vocalists}, {Albums, Records}]`. Defaults to empty. * **columnOverrides**: These are the column name overrides from source to spanner. They are written in thefollowing format: [{SourceTableName1.SourceColumnName1, SourceTableName1.SpannerColumnName1}, {SourceTableName2.SourceColumnName1, SourceTableName2.SpannerColumnName1}]Note that the SourceTableName should remain the same in both the source and spanner pair. To override table names, use tableOverrides.The example shows mapping SingerName to TalentName and AlbumName to RecordName in Singers and Albums table respectively. For example, `[{Singers.SingerName, Singers.TalentName}, {Albums.AlbumName, Albums.RecordName}]`. Defaults to empty. * **schemaOverridesFilePath**: A file which specifies the table and the column name overrides from source to spanner. Defaults to empty. @@ -207,7 +207,7 @@ export TRANSFORMATION_JAR_PATH="" export TRANSFORMATION_CLASS_NAME="" export TRANSFORMATION_CUSTOM_PARAMETERS="" export FILTERED_EVENTS_DIRECTORY="" -export SHARDING_CONTEXT_FILE_PATH= +export SOURCE_CONFIG_URL= export TABLE_OVERRIDES="" export COLUMN_OVERRIDES="" export SCHEMA_OVERRIDES_FILE_PATH="" @@ -247,7 +247,7 @@ gcloud dataflow flex-template run "cloud-datastream-to-spanner-job" \ --parameters "transformationClassName=$TRANSFORMATION_CLASS_NAME" \ --parameters "transformationCustomParameters=$TRANSFORMATION_CUSTOM_PARAMETERS" \ --parameters "filteredEventsDirectory=$FILTERED_EVENTS_DIRECTORY" \ - --parameters "shardingContextFilePath=$SHARDING_CONTEXT_FILE_PATH" \ + --parameters "sourceConfigURL=$SOURCE_CONFIG_URL" \ --parameters "tableOverrides=$TABLE_OVERRIDES" \ --parameters "columnOverrides=$COLUMN_OVERRIDES" \ --parameters "schemaOverridesFilePath=$SCHEMA_OVERRIDES_FILE_PATH" \ @@ -302,7 +302,7 @@ export TRANSFORMATION_JAR_PATH="" export TRANSFORMATION_CLASS_NAME="" export TRANSFORMATION_CUSTOM_PARAMETERS="" export FILTERED_EVENTS_DIRECTORY="" -export SHARDING_CONTEXT_FILE_PATH= +export SOURCE_CONFIG_URL= export TABLE_OVERRIDES="" export COLUMN_OVERRIDES="" export SCHEMA_OVERRIDES_FILE_PATH="" @@ -317,7 +317,7 @@ mvn clean package -PtemplatesRun \ -Dregion="$REGION" \ -DjobName="cloud-datastream-to-spanner-job" \ -DtemplateName="Cloud_Datastream_to_Spanner" \ --Dparameters="inputFilePattern=$INPUT_FILE_PATTERN,inputFileFormat=$INPUT_FILE_FORMAT,sessionFilePath=$SESSION_FILE_PATH,instanceId=$INSTANCE_ID,databaseId=$DATABASE_ID,projectId=$PROJECT_ID,spannerHost=$SPANNER_HOST,gcsPubSubSubscription=$GCS_PUB_SUB_SUBSCRIPTION,streamName=$STREAM_NAME,shadowTablePrefix=$SHADOW_TABLE_PREFIX,shouldCreateShadowTables=$SHOULD_CREATE_SHADOW_TABLES,rfcStartDateTime=$RFC_START_DATE_TIME,fileReadConcurrency=$FILE_READ_CONCURRENCY,deadLetterQueueDirectory=$DEAD_LETTER_QUEUE_DIRECTORY,dlqRetryMinutes=$DLQ_RETRY_MINUTES,dlqMaxRetryCount=$DLQ_MAX_RETRY_COUNT,dataStreamRootUrl=$DATA_STREAM_ROOT_URL,datastreamSourceType=$DATASTREAM_SOURCE_TYPE,roundJsonDecimals=$ROUND_JSON_DECIMALS,runMode=$RUN_MODE,transformationContextFilePath=$TRANSFORMATION_CONTEXT_FILE_PATH,directoryWatchDurationInMinutes=$DIRECTORY_WATCH_DURATION_IN_MINUTES,spannerPriority=$SPANNER_PRIORITY,dlqGcsPubSubSubscription=$DLQ_GCS_PUB_SUB_SUBSCRIPTION,transformationJarPath=$TRANSFORMATION_JAR_PATH,transformationClassName=$TRANSFORMATION_CLASS_NAME,transformationCustomParameters=$TRANSFORMATION_CUSTOM_PARAMETERS,filteredEventsDirectory=$FILTERED_EVENTS_DIRECTORY,shardingContextFilePath=$SHARDING_CONTEXT_FILE_PATH,tableOverrides=$TABLE_OVERRIDES,columnOverrides=$COLUMN_OVERRIDES,schemaOverridesFilePath=$SCHEMA_OVERRIDES_FILE_PATH,shadowTableSpannerDatabaseId=$SHADOW_TABLE_SPANNER_DATABASE_ID,shadowTableSpannerInstanceId=$SHADOW_TABLE_SPANNER_INSTANCE_ID,failureInjectionParameter=$FAILURE_INJECTION_PARAMETER" \ +-Dparameters="inputFilePattern=$INPUT_FILE_PATTERN,inputFileFormat=$INPUT_FILE_FORMAT,sessionFilePath=$SESSION_FILE_PATH,instanceId=$INSTANCE_ID,databaseId=$DATABASE_ID,projectId=$PROJECT_ID,spannerHost=$SPANNER_HOST,gcsPubSubSubscription=$GCS_PUB_SUB_SUBSCRIPTION,streamName=$STREAM_NAME,shadowTablePrefix=$SHADOW_TABLE_PREFIX,shouldCreateShadowTables=$SHOULD_CREATE_SHADOW_TABLES,rfcStartDateTime=$RFC_START_DATE_TIME,fileReadConcurrency=$FILE_READ_CONCURRENCY,deadLetterQueueDirectory=$DEAD_LETTER_QUEUE_DIRECTORY,dlqRetryMinutes=$DLQ_RETRY_MINUTES,dlqMaxRetryCount=$DLQ_MAX_RETRY_COUNT,dataStreamRootUrl=$DATA_STREAM_ROOT_URL,datastreamSourceType=$DATASTREAM_SOURCE_TYPE,roundJsonDecimals=$ROUND_JSON_DECIMALS,runMode=$RUN_MODE,transformationContextFilePath=$TRANSFORMATION_CONTEXT_FILE_PATH,directoryWatchDurationInMinutes=$DIRECTORY_WATCH_DURATION_IN_MINUTES,spannerPriority=$SPANNER_PRIORITY,dlqGcsPubSubSubscription=$DLQ_GCS_PUB_SUB_SUBSCRIPTION,transformationJarPath=$TRANSFORMATION_JAR_PATH,transformationClassName=$TRANSFORMATION_CLASS_NAME,transformationCustomParameters=$TRANSFORMATION_CUSTOM_PARAMETERS,filteredEventsDirectory=$FILTERED_EVENTS_DIRECTORY,sourceConfigURL=$SOURCE_CONFIG_URL,tableOverrides=$TABLE_OVERRIDES,columnOverrides=$COLUMN_OVERRIDES,schemaOverridesFilePath=$SCHEMA_OVERRIDES_FILE_PATH,shadowTableSpannerDatabaseId=$SHADOW_TABLE_SPANNER_DATABASE_ID,shadowTableSpannerInstanceId=$SHADOW_TABLE_SPANNER_INSTANCE_ID,failureInjectionParameter=$FAILURE_INJECTION_PARAMETER" \ -f v2/datastream-to-spanner ``` @@ -390,7 +390,7 @@ resource "google_dataflow_flex_template_job" "cloud_datastream_to_spanner" { # transformationClassName = "" # transformationCustomParameters = "" # filteredEventsDirectory = "" - # shardingContextFilePath = "" + # sourceConfigURL = "" # tableOverrides = "" # columnOverrides = "" # schemaOverridesFilePath = "" diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamToSpanner.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamToSpanner.java index e21906def7..eb63135e3a 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamToSpanner.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamToSpanner.java @@ -36,12 +36,16 @@ import com.google.cloud.teleport.v2.spanner.migrations.schema.Schema; import com.google.cloud.teleport.v2.spanner.migrations.schema.SchemaFileOverridesParser; import com.google.cloud.teleport.v2.spanner.migrations.schema.SchemaStringOverridesParser; +import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard; import com.google.cloud.teleport.v2.spanner.migrations.shard.ShardingContext; +import com.google.cloud.teleport.v2.spanner.migrations.source.config.JdbcShardConfig; +import com.google.cloud.teleport.v2.spanner.migrations.source.config.SourceConfigParser; +import com.google.cloud.teleport.v2.spanner.migrations.source.config.SourceConnectionConfig; import com.google.cloud.teleport.v2.spanner.migrations.transformation.CustomTransformation; import com.google.cloud.teleport.v2.spanner.migrations.transformation.TransformationContext; import com.google.cloud.teleport.v2.spanner.migrations.utils.DataflowWorkerMachineTypeUtils; +import com.google.cloud.teleport.v2.spanner.migrations.utils.SecretManagerAccessorImpl; import com.google.cloud.teleport.v2.spanner.migrations.utils.SessionFileReader; -import com.google.cloud.teleport.v2.spanner.migrations.utils.ShardingContextReader; import com.google.cloud.teleport.v2.spanner.migrations.utils.TransformationContextReader; import com.google.cloud.teleport.v2.templates.DataStreamToSpanner.Options; import com.google.cloud.teleport.v2.templates.constants.DatastreamToSpannerConstants; @@ -55,6 +59,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions; @@ -494,13 +499,13 @@ public interface Options @TemplateParameter.GcsReadFile( order = 29, optional = true, + description = "Source Config URL", helpText = - "Sharding context file path in cloud storage is used to populate the shard id in spanner database for each source shard." - + "It expects a JSON file with the format: {\\\"StreamToDbAndShardMap\\\": Map>}", - description = "Sharding context file path in cloud storage") - String getShardingContextFilePath(); + "Cloud Storage path to a shard config file for sharded migrations. It expects a HOCON or JSON file. For a sample file, please refer to v2/datastream-to-spanner/src/test/resources/DatastreamToSpannerSingleDFShardedMigrationIT/sharding-config.conf in the repository. For example, `gs://my-bucket/my-shard-config.conf`.", + example = "gs://my-bucket/my-shard-config.conf") + String getSourceConfigURL(); - void setShardingContextFilePath(String value); + void setSourceConfigURL(String value); @TemplateParameter.Text( order = 30, @@ -814,8 +819,31 @@ static Pipeline buildPipeline(Options options) { options.getTransformationContextFilePath()); // Ingest sharding context file into memory. - ShardingContext shardingContext = - ShardingContextReader.getShardingContext(options.getShardingContextFilePath()); + ShardingContext shardingContext = new ShardingContext(); + if (options.getSourceConfigURL() != null && !options.getSourceConfigURL().isEmpty()) { + try { + SourceConfigParser parser = new SourceConfigParser(new SecretManagerAccessorImpl()); + SourceConnectionConfig sourceConfig = + parser.parseConfiguration( + options.getDatastreamSourceType(), + options.getSourceConfigURL(), + /* resolveSecrets= */ false); + + if (sourceConfig instanceof JdbcShardConfig) { + JdbcShardConfig jdbcShardConfig = (JdbcShardConfig) sourceConfig; + List shards = jdbcShardConfig.getShardConfigs(); + Map> streamToDbAndShardMap = new HashMap<>(); + for (Shard shard : shards) { + streamToDbAndShardMap + .computeIfAbsent(shard.getStreamId(), k -> new HashMap<>()) + .put(shard.getDbName(), shard.getLogicalShardId()); + } + shardingContext = new ShardingContext(streamToDbAndShardMap); + } + } catch (Exception e) { + throw new RuntimeException("Failed to parse source config URL", e); + } + } CustomTransformation customTransformation = CustomTransformation.builder( diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerITBase.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerITBase.java index 99cf8c66e8..63fe521625 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerITBase.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerITBase.java @@ -313,7 +313,7 @@ protected LaunchInfo launchDataflowJob( LOG.info( "Uploading sharding context file from resource: {}", shardingContextFileResourceName); gcsResourceManager.uploadArtifact( - gcsPathPrefix + "/shardingContext.json", + gcsPathPrefix + "/shardingConfig.conf", Resources.getResource(shardingContextFileResourceName).getPath()); } else { LOG.info("No sharding context file provided, skipping upload."); @@ -398,8 +398,8 @@ protected LaunchInfo launchDataflowJob( if (shardingContextFileResourceName != null) { params.put( - "shardingContextFilePath", - getGcsPath(gcsPathPrefix + "/shardingContext.json", gcsResourceManager)); + "sourceConfigURL", + getGcsPath(gcsPathPrefix + "/shardingConfig.conf", gcsResourceManager)); } if (customTransformation != null) { diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerShardedMySQLRetryAllDLQIT.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerShardedMySQLRetryAllDLQIT.java index aa1ed3a1bb..729ce64bc2 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerShardedMySQLRetryAllDLQIT.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerShardedMySQLRetryAllDLQIT.java @@ -194,9 +194,9 @@ public void setUp() throws IOException, InterruptedException { datastreamResourceManager.startStream(streamB); streamNameB = streamB.getName().substring(streamB.getName().lastIndexOf('/') + 1); - // Generate Shard Context JSON - String shardContextJson = - generateShardContextJson( + // Generate Shard Config + String shardConfig = + generateSourceConfig( streamNameA, jdbcResourceManagerShardA.getDatabaseName(), "shard1", @@ -205,7 +205,7 @@ public void setUp() throws IOException, InterruptedException { "shard2"); gcsResourceManager.createArtifact( - "input/shardingContext.json", shardContextJson.getBytes(StandardCharsets.UTF_8)); + "input/shardingConfig.conf", shardConfig.getBytes(StandardCharsets.UTF_8)); // Prepare job parameters Map jobParameters = new HashMap<>(); @@ -223,8 +223,7 @@ public void setUp() throws IOException, InterruptedException { jobParameters.put( "deadLetterQueueDirectory", getGcsPath(GCS_PATH_PREFIX + "/dlq/", gcsResourceManager)); jobParameters.put( - "shardingContextFilePath", - getGcsPath("input/shardingContext.json", gcsResourceManager)); + "sourceConfigURL", getGcsPath("input/shardingConfig.conf", gcsResourceManager)); jobParameters.put( "inputFilePattern", getGcsPath(GCS_PATH_PREFIX + "/cdc/", gcsResourceManager)); @@ -378,8 +377,7 @@ public void testDataStreamToSpannerShardedRetryAllDLQ() throws Exception { "deadLetterQueueDirectory", getGcsPath(GCS_PATH_PREFIX + "/dlq/", gcsResourceManager)); retryParams.put("dlqMaxRetryCount", "20"); retryParams.put("dlqRetryMinutes", "60"); - retryParams.put( - "shardingContextFilePath", getGcsPath("input/shardingContext.json", gcsResourceManager)); + retryParams.put("sourceConfigURL", getGcsPath("input/shardingConfig.conf", gcsResourceManager)); PipelineLauncher.LaunchInfo retryJobInfo = launchDataflowJob( @@ -514,25 +512,33 @@ private void insertDataInMySQL() { // error) } - private String generateShardContextJson( + private String generateSourceConfig( String streamA, String dbA, String shardA, String streamB, String dbB, String shardB) { return "{\n" - + " \"StreamToDbAndShardMap\": {\n" - + " \"" - + streamA - + "\": {\"" - + dbA - + "\": \"" + + " \"shardConfigs\": [\n" + + " {\n" + + " \"logicalShardId\": \"" + shardA - + "\"},\n" - + " \"" - + streamB - + "\": {\"" - + dbB - + "\": \"" + + "\",\n" + + " \"dbName\": \"" + + dbA + + "\",\n" + + " \"streamId\": \"" + + streamA + + "\"\n" + + " },\n" + + " {\n" + + " \"logicalShardId\": \"" + shardB - + "\"}\n" - + " }\n" + + "\",\n" + + " \"dbName\": \"" + + dbB + + "\",\n" + + " \"streamId\": \"" + + streamB + + "\"\n" + + " }\n" + + " ]\n" + "}"; } } diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerShardedMySQLRetryDLQIT.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerShardedMySQLRetryDLQIT.java index 1541556912..cec11d607b 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerShardedMySQLRetryDLQIT.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerShardedMySQLRetryDLQIT.java @@ -197,9 +197,9 @@ public void setUp() throws IOException, InterruptedException { datastreamResourceManager.startStream(streamB); streamNameB = streamB.getName().substring(streamB.getName().lastIndexOf('/') + 1); - // Generate Shard Context JSON - String shardContextJson = - generateShardContextJson( + // Generate Shard Config + String shardConfig = + generateSourceConfig( streamNameA, jdbcResourceManagerShardA.getDatabaseName(), "shard1", @@ -208,7 +208,7 @@ public void setUp() throws IOException, InterruptedException { "shard2"); gcsResourceManager.createArtifact( - "input/shardingContext.json", shardContextJson.getBytes(StandardCharsets.UTF_8)); + "input/shardingConfig.conf", shardConfig.getBytes(StandardCharsets.UTF_8)); // Prepare job parameters Map jobParameters = new HashMap<>(); @@ -219,8 +219,7 @@ public void setUp() throws IOException, InterruptedException { jobParameters.put( "dlqMaxRetryCount", "1000"); // High retry count to keep items in retry/ bucket jobParameters.put( - "shardingContextFilePath", - getGcsPath("input/shardingContext.json", gcsResourceManager)); + "sourceConfigURL", getGcsPath("input/shardingConfig.conf", gcsResourceManager)); jobParameters.put( "inputFilePattern", getGcsPath(GCS_PATH_PREFIX + "/cdc/", gcsResourceManager)); jobParameters.put( @@ -363,7 +362,7 @@ public void testDataStreamToSpannerShardedRetryDLQ() throws Exception { retryJobParameters.put( "deadLetterQueueDirectory", getGcsPath(GCS_PATH_PREFIX + "/dlq/", gcsResourceManager)); retryJobParameters.put( - "shardingContextFilePath", getGcsPath("input/shardingContext.json", gcsResourceManager)); + "sourceConfigURL", getGcsPath("input/shardingConfig.conf", gcsResourceManager)); PipelineLauncher.LaunchInfo retryJobInfo = launchDataflowJob( @@ -532,25 +531,33 @@ private void executeSqlScript(CloudMySQLResourceManager resourceManager, String } } - private String generateShardContextJson( + private String generateSourceConfig( String streamA, String dbA, String shardA, String streamB, String dbB, String shardB) { return "{\n" - + " \"StreamToDbAndShardMap\": {\n" - + " \"" - + streamA - + "\": {\"" - + dbA - + "\": \"" + + " \"shardConfigs\": [\n" + + " {\n" + + " \"logicalShardId\": \"" + shardA - + "\"},\n" - + " \"" - + streamB - + "\": {\"" - + dbB - + "\": \"" + + "\",\n" + + " \"dbName\": \"" + + dbA + + "\",\n" + + " \"streamId\": \"" + + streamA + + "\"\n" + + " },\n" + + " {\n" + + " \"logicalShardId\": \"" + shardB - + "\"}\n" - + " }\n" + + "\",\n" + + " \"dbName\": \"" + + dbB + + "\",\n" + + " \"streamId\": \"" + + streamB + + "\"\n" + + " }\n" + + " ]\n" + "}"; } } diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DatastreamToSpannerSingleDFShardedMigrationIT.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DatastreamToSpannerSingleDFShardedMigrationIT.java index e3382b9d81..ceef7a6708 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DatastreamToSpannerSingleDFShardedMigrationIT.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DatastreamToSpannerSingleDFShardedMigrationIT.java @@ -67,7 +67,7 @@ public class DatastreamToSpannerSingleDFShardedMigrationIT extends DataStreamToS "DatastreamToSpannerSingleDFShardedMigrationIT/mysql-session.json"; private static final String SHARDING_CONTEXT_RESOURCE = - "DatastreamToSpannerSingleDFShardedMigrationIT/sharding-context.json"; + "DatastreamToSpannerSingleDFShardedMigrationIT/sharding-config.conf"; private static final String SPANNER_DDL_RESOURCE = "DatastreamToSpannerSingleDFShardedMigrationIT/spanner-schema.sql"; diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/SeparateShadowTableDatabaseSingleDFShardedMigrationIT.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/SeparateShadowTableDatabaseSingleDFShardedMigrationIT.java index 3cf6f6f435..efd69c73de 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/SeparateShadowTableDatabaseSingleDFShardedMigrationIT.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/SeparateShadowTableDatabaseSingleDFShardedMigrationIT.java @@ -68,7 +68,7 @@ public class SeparateShadowTableDatabaseSingleDFShardedMigrationIT "DatastreamToSpannerSingleDFShardedMigrationIT/mysql-session.json"; private static final String SHARDING_CONTEXT_RESOURCE = - "DatastreamToSpannerSingleDFShardedMigrationIT/sharding-context.json"; + "DatastreamToSpannerSingleDFShardedMigrationIT/sharding-config.conf"; private static final String SPANNER_DDL_RESOURCE = "DatastreamToSpannerSingleDFShardedMigrationIT/spanner-schema.sql"; diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/endtoend/EndToEndTestingITBase.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/endtoend/EndToEndTestingITBase.java index af7bc8aeaa..9395d4cc94 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/endtoend/EndToEndTestingITBase.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/endtoend/EndToEndTestingITBase.java @@ -289,21 +289,23 @@ protected void createAndUploadBulkShardConfigToGcs( protected void createAndUploadShardContextFileToGcs( Map> streamDbMapping, GcsResourceManager gcsResourceManager) { - JSONObject shardConfig = new JSONObject(); - JSONObject streams = new JSONObject(); + JSONObject config = new JSONObject(); + JSONArray shardConfigs = new JSONArray(); for (String stream : streamDbMapping.keySet()) { - JSONObject dbs = new JSONObject(); for (String db : streamDbMapping.get(stream).keySet()) { - dbs.put(db, streamDbMapping.get(stream).get(db)); + JSONObject shardConfig = new JSONObject(); + shardConfig.put("streamId", stream); + shardConfig.put("dbName", db); + shardConfig.put("logicalShardId", streamDbMapping.get(stream).get(db)); + shardConfigs.put(shardConfig); } - streams.put(stream, dbs); } - shardConfig.put("StreamToDbAndShardMap", streams); - String shardFileContents = shardConfig.toString(); - LOG.info("Shard context file contents: {}", shardFileContents); - gcsResourceManager.createArtifact("input/sharding-context.json", shardFileContents); + config.put("shardConfigs", shardConfigs); + String shardFileContents = config.toString(); + LOG.info("Shard config file contents: {}", shardFileContents); + gcsResourceManager.createArtifact("input/sharding-config.conf", shardFileContents); } protected PipelineLauncher.LaunchInfo launchBulkDataflowJob( @@ -475,8 +477,7 @@ public PipelineLauncher.LaunchInfo launchFwdDataflowJob( .addParameter("inputFileFormat", "avro") .addParameter("sessionFilePath", getGcsPath("input/session.json", gcsResourceManager)) .addParameter( - "shardingContextFilePath", - getGcsPath("input/sharding-context.json", gcsResourceManager)) + "sourceConfigURL", getGcsPath("input/sharding-config.conf", gcsResourceManager)) .addEnvironmentVariable( "additionalExperiments", Collections.singletonList("use_runner_v2")) .build(); diff --git a/v2/datastream-to-spanner/src/test/resources/DatastreamToSpannerSingleDFShardedMigrationIT/sharding-config.conf b/v2/datastream-to-spanner/src/test/resources/DatastreamToSpannerSingleDFShardedMigrationIT/sharding-config.conf new file mode 100644 index 0000000000..93c53ca4e6 --- /dev/null +++ b/v2/datastream-to-spanner/src/test/resources/DatastreamToSpannerSingleDFShardedMigrationIT/sharding-config.conf @@ -0,0 +1,24 @@ +{ + "shardConfigs": [ + { + "streamId": "jsonavrodatatypeit", + "dbName": "S1L1", + "logicalShardId": "L1" + }, + { + "streamId": "jsonavrodatatypeit", + "dbName": "S1L2", + "logicalShardId": "L2" + }, + { + "streamId": "jsonavrodatatypeitupdate", + "dbName": "S2L1", + "logicalShardId": "L3" + }, + { + "streamId": "jsonavrodatatypeitupdate", + "dbName": "S2L2", + "logicalShardId": "L4" + } + ] +} \ No newline at end of file diff --git a/v2/datastream-to-spanner/src/test/resources/DatastreamToSpannerSingleDFShardedMigrationIT/sharding-context.json b/v2/datastream-to-spanner/src/test/resources/DatastreamToSpannerSingleDFShardedMigrationIT/sharding-context.json deleted file mode 100644 index 00b7ea7337..0000000000 --- a/v2/datastream-to-spanner/src/test/resources/DatastreamToSpannerSingleDFShardedMigrationIT/sharding-context.json +++ /dev/null @@ -1,6 +0,0 @@ -{ - "StreamToDbAndShardMap": { - "jsonavrodatatypeit": {"S1L1":"L1", "S1L2": "L2"}, - "jsonavrodatatypeitupdate": {"S2L1":"L3", "S2L2": "L4"} - } -} \ No newline at end of file diff --git a/v2/datastream-to-spanner/terraform/samples/mysql-sharded-single-df-job/main.tf b/v2/datastream-to-spanner/terraform/samples/mysql-sharded-single-df-job/main.tf index a7425ece60..34755da9a5 100644 --- a/v2/datastream-to-spanner/terraform/samples/mysql-sharded-single-df-job/main.tf +++ b/v2/datastream-to-spanner/terraform/samples/mysql-sharded-single-df-job/main.tf @@ -86,28 +86,26 @@ locals { # if the sharding context file is specified, use that, otherwise # auto-generate sharding context on basis of stream names, MySQL db names and logical # shard names. -resource "google_storage_bucket_object" "sharding_context_file_object" { +resource "google_storage_bucket_object" "source_config_file_object" { depends_on = [google_project_service.enabled_apis] - name = "shardingContext.json" + name = "sourceConfig.conf" content_type = "application/json" bucket = google_storage_bucket.datastream_bucket.id content = ( - var.common_params.dataflow_params.template_params.local_sharding_context_path != null - ? jsonencode({ - "StreamToDbAndShardMap" = { - for host_ip, db_map in jsondecode(file(var.common_params.dataflow_params.template_params.local_sharding_context_path)).StreamToDbAndShardMap : - contains(keys(local.host_to_stream_map), host_ip) ? local.host_to_stream_map[host_ip] : host_ip => db_map - } - }) + var.common_params.dataflow_params.template_params.local_source_config_path != null + ? file(var.common_params.dataflow_params.template_params.local_source_config_path) : jsonencode({ - "StreamToDbAndShardMap" : { - for idx, shard in var.shard_list : "${shard.shard_id != null ? shard.shard_id : random_pet.migration_id[idx].id}-${shard.datastream_params.stream_id}" => { - for db in var.common_params.datastream_params.mysql_databases : - db.database => "${replace(shard.datastream_params.mysql_host, ".", "-")}-${db.database}" - } - } + "shardConfigs" : flatten([ + for idx, shard in var.shard_list : [ + for db in var.common_params.datastream_params.mysql_databases : { + streamId = "${shard.shard_id != null ? shard.shard_id : random_pet.migration_id[idx].id}-${shard.datastream_params.stream_id}" + dbName = db.database + logicalShardId = "${replace(shard.datastream_params.mysql_host, ".", "-")}-${db.database}" + } + ] + ]) }) ) } @@ -287,7 +285,7 @@ resource "google_dataflow_flex_template_job" "live_migration_job" { datastreamSourceType = var.common_params.dataflow_params.template_params.datastream_source_type roundJsonDecimals = tostring(var.common_params.dataflow_params.template_params.round_json_decimals) runMode = var.common_params.dataflow_params.template_params.run_mode - shardingContextFilePath = "gs://${google_storage_bucket_object.sharding_context_file_object.bucket}/${google_storage_bucket_object.sharding_context_file_object.name}" + sourceConfigURL = "gs://${google_storage_bucket_object.source_config_file_object.bucket}/${google_storage_bucket_object.source_config_file_object.name}" directoryWatchDurationInMinutes = tostring(var.common_params.dataflow_params.template_params.directory_watch_duration_in_minutes) spannerPriority = var.common_params.dataflow_params.template_params.spanner_priority dlqGcsPubSubSubscription = var.common_params.dataflow_params.template_params.dlq_gcs_pub_sub_subscription diff --git a/v2/datastream-to-spanner/terraform/samples/mysql-sharded-single-df-job/variables.tf b/v2/datastream-to-spanner/terraform/samples/mysql-sharded-single-df-job/variables.tf index 1176937698..ab31d92655 100644 --- a/v2/datastream-to-spanner/terraform/samples/mysql-sharded-single-df-job/variables.tf +++ b/v2/datastream-to-spanner/terraform/samples/mysql-sharded-single-df-job/variables.tf @@ -54,7 +54,7 @@ variable "common_params" { transformation_class_name = optional(string) filtered_events_directory = optional(string) run_mode = optional(string) - local_sharding_context_path = optional(string) + local_source_config_path = optional(string) dead_letter_queue_directory = optional(string) dlq_gcs_pub_sub_subscription = optional(string) }) diff --git a/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/shard/Shard.java b/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/shard/Shard.java index 6629f1e426..52fa82412c 100644 --- a/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/shard/Shard.java +++ b/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/shard/Shard.java @@ -31,9 +31,33 @@ public class Shard implements Serializable { private String namespace = ""; private String secretManagerUri = ""; private String connectionProperties = ""; + private String streamId = ""; private Map dbNameToLogicalShardIdMap = new HashMap<>(); + public Shard( + String logicalShardId, + String host, + String port, + String user, + String password, + String dbName, + String namespace, + String secretManagerUri, + String connectionProperties, + String streamId) { + this.logicalShardId = logicalShardId; + this.host = host; + this.port = port; + this.user = user; + this.password = password; + this.dbName = dbName; + this.namespace = namespace; + this.secretManagerUri = secretManagerUri; + this.connectionProperties = connectionProperties; + this.streamId = streamId; + } + public Shard( String logicalShardId, String host, @@ -53,10 +77,19 @@ public Shard( this.namespace = namespace; this.secretManagerUri = secretManagerUri; this.connectionProperties = connectionProperties; + this.streamId = ""; } public Shard() {} + public String getStreamId() { + return streamId; + } + + public void setStreamId(String streamId) { + this.streamId = streamId; + } + public String getLogicalShardId() { return logicalShardId; } @@ -157,6 +190,9 @@ public String toString() { + ", connectionProperties='" + connectionProperties + '\'' + + ", streamId='" + + streamId + + '\'' + ", dbNameToLogicalShardIdMap=" + dbNameToLogicalShardIdMap + '}'; @@ -180,6 +216,7 @@ public boolean equals(Object o) { && Objects.equals(namespace, shard.namespace) && Objects.equals(connectionProperties, shard.connectionProperties) && Objects.equals(secretManagerUri, shard.secretManagerUri) + && Objects.equals(streamId, shard.streamId) && Objects.equals(dbNameToLogicalShardIdMap, shard.dbNameToLogicalShardIdMap); } @@ -195,6 +232,7 @@ public int hashCode() { namespace, connectionProperties, secretManagerUri, + streamId, dbNameToLogicalShardIdMap); } } diff --git a/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/source/config/SourceConfigParser.java b/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/source/config/SourceConfigParser.java index 0039b21091..d9ae0d7841 100644 --- a/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/source/config/SourceConfigParser.java +++ b/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/source/config/SourceConfigParser.java @@ -67,6 +67,20 @@ public SourceConfigParser(ISecretManagerAccessor secretManagerAccessor) { */ public SourceConnectionConfig parseConfiguration( String sourceTypeStr, String sourceConfigFilePath) throws Exception { + return parseConfiguration(sourceTypeStr, sourceConfigFilePath, /* resolveSecrets= */ true); + } + + /** + * Parses the configuration file from GCS into the appropriate {@link SourceConnectionConfig} + * implementing class. + * + * @param sourceTypeStr The source database type ("mysql", "postgresql", "cassandra", "astra_db"). + * @param sourceConfigFilePath The URI to the HOCON or JSON config file. + * @param resolveSecrets Whether to resolve secrets from Secret Manager. + * @return A populated implementation of {@link SourceConnectionConfig}. + */ + public SourceConnectionConfig parseConfiguration( + String sourceTypeStr, String sourceConfigFilePath, boolean resolveSecrets) throws Exception { SourceType sourceType = SourceType.parseSourceType(sourceTypeStr); switch (SourceType.parseSourceType(sourceTypeStr)) { @@ -85,7 +99,9 @@ public SourceConnectionConfig parseConfiguration( JdbcShardConfig jdbcShardConfig = mapper.convertValue(jdbcConfigMap, JdbcShardConfig.class); // Returns ordered list of shards jdbcShardConfig.getShardConfigs().sort(Comparator.comparing(Shard::getLogicalShardId)); - resolveShardSecret(jdbcShardConfig, sourceConfigFilePath); + if (resolveSecrets) { + resolveShardSecret(jdbcShardConfig, sourceConfigFilePath); + } return jdbcShardConfig; default: throw new IllegalArgumentException("Unsupported source type: " + sourceType);