Skip to content

Commit 5a96db2

Browse files
feat(Spanner): implement Standardize shard config in live migration.
1 parent 3d09e7b commit 5a96db2

13 files changed

Lines changed: 223 additions & 88 deletions

File tree

v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamToSpanner.java

Lines changed: 54 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,16 @@
3636
import com.google.cloud.teleport.v2.spanner.migrations.schema.Schema;
3737
import com.google.cloud.teleport.v2.spanner.migrations.schema.SchemaFileOverridesParser;
3838
import com.google.cloud.teleport.v2.spanner.migrations.schema.SchemaStringOverridesParser;
39+
import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
3940
import com.google.cloud.teleport.v2.spanner.migrations.shard.ShardingContext;
41+
import com.google.cloud.teleport.v2.spanner.migrations.source.config.JdbcShardConfig;
42+
import com.google.cloud.teleport.v2.spanner.migrations.source.config.SourceConfigParser;
43+
import com.google.cloud.teleport.v2.spanner.migrations.source.config.SourceConnectionConfig;
4044
import com.google.cloud.teleport.v2.spanner.migrations.transformation.CustomTransformation;
4145
import com.google.cloud.teleport.v2.spanner.migrations.transformation.TransformationContext;
4246
import com.google.cloud.teleport.v2.spanner.migrations.utils.DataflowWorkerMachineTypeUtils;
47+
import com.google.cloud.teleport.v2.spanner.migrations.utils.SecretManagerAccessorImpl;
4348
import com.google.cloud.teleport.v2.spanner.migrations.utils.SessionFileReader;
44-
import com.google.cloud.teleport.v2.spanner.migrations.utils.ShardingContextReader;
4549
import com.google.cloud.teleport.v2.spanner.migrations.utils.TransformationContextReader;
4650
import com.google.cloud.teleport.v2.templates.DataStreamToSpanner.Options;
4751
import com.google.cloud.teleport.v2.templates.constants.DatastreamToSpannerConstants;
@@ -55,6 +59,7 @@
5559
import java.util.ArrayList;
5660
import java.util.Arrays;
5761
import java.util.HashMap;
62+
import java.util.List;
5863
import java.util.Map;
5964
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
6065
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
@@ -505,6 +510,20 @@ public interface Options
505510
@TemplateParameter.Text(
506511
order = 30,
507512
optional = true,
513+
regexes = {"^.+$"},
514+
description = "Source Config URL",
515+
helpText =
516+
"The URL to connect to the source database host. This can be either: "
517+
+ "1. A JDBC connection URL for a single source database, which must contain the host, port and source db name and can optionally contain properties like autoReconnect, maxReconnects etc. Format: `jdbc:{mysql|postgresql}://{host}:{port}/{dbName}?{parameters}`. For example,`jdbc:mysql://127.4.5.30:3306/my-db?autoReconnect=true&maxReconnects=10&unicode=true&characterEncoding=UTF-8`. "
518+
+ "2. A Cloud Storage path to a shard config file for sharded migrations. For example, `gs://my-bucket/my-shard-config.conf`.",
519+
example = "gs://my-bucket/my-shard-config.conf")
520+
String getSourceConfigURL();
521+
522+
void setSourceConfigURL(String value);
523+
524+
@TemplateParameter.Text(
525+
order = 31,
526+
optional = true,
508527
description = "Table name overrides from source to spanner",
509528
regexes =
510529
"^\\[([[:space:]]*\\{[[:space:]]*[[:graph:]]+[[:space:]]*,[[:space:]]*[[:graph:]]+[[:space:]]*\\}[[:space:]]*(,[[:space:]]*)*)*\\]$",
@@ -813,9 +832,41 @@ static Pipeline buildPipeline(Options options) {
813832
TransformationContextReader.getTransformationContext(
814833
options.getTransformationContextFilePath());
815834

835+
// Deprecation check for shardingContextFilePath
836+
if (options.getShardingContextFilePath() != null
837+
&& !options.getShardingContextFilePath().isEmpty()) {
838+
throw new IllegalArgumentException(
839+
"The shardingContextFilePath pipeline option is deprecated and no longer supported. "
840+
+ "Please migrate to the unified HOCON configuration and use the sourceConfigURL option instead. "
841+
+ "Refer to the migration script at src/main/scripts/create_simple_shard_config.bash for assistance.");
842+
}
843+
816844
// Ingest sharding context file into memory.
817-
ShardingContext shardingContext =
818-
ShardingContextReader.getShardingContext(options.getShardingContextFilePath());
845+
ShardingContext shardingContext = new ShardingContext();
846+
if (options.getSourceConfigURL() != null && !options.getSourceConfigURL().isEmpty()) {
847+
try {
848+
SourceConfigParser parser = new SourceConfigParser(new SecretManagerAccessorImpl());
849+
SourceConnectionConfig sourceConfig =
850+
parser.parseConfiguration(
851+
options.getDatastreamSourceType(),
852+
options.getSourceConfigURL(),
853+
/* resolveSecrets= */ false);
854+
855+
if (sourceConfig instanceof JdbcShardConfig) {
856+
JdbcShardConfig jdbcShardConfig = (JdbcShardConfig) sourceConfig;
857+
List<Shard> shards = jdbcShardConfig.getShardConfigs();
858+
Map<String, Map<String, String>> streamToDbAndShardMap = new HashMap<>();
859+
for (Shard shard : shards) {
860+
streamToDbAndShardMap
861+
.computeIfAbsent(shard.getStreamId(), k -> new HashMap<>())
862+
.put(shard.getDbName(), shard.getLogicalShardId());
863+
}
864+
shardingContext = new ShardingContext(streamToDbAndShardMap);
865+
}
866+
} catch (Exception e) {
867+
throw new RuntimeException("Failed to parse source config URL", e);
868+
}
869+
}
819870

820871
CustomTransformation customTransformation =
821872
CustomTransformation.builder(

v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerITBase.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ protected LaunchInfo launchDataflowJob(
313313
LOG.info(
314314
"Uploading sharding context file from resource: {}", shardingContextFileResourceName);
315315
gcsResourceManager.uploadArtifact(
316-
gcsPathPrefix + "/shardingContext.json",
316+
gcsPathPrefix + "/shardingConfig.conf",
317317
Resources.getResource(shardingContextFileResourceName).getPath());
318318
} else {
319319
LOG.info("No sharding context file provided, skipping upload.");
@@ -398,8 +398,8 @@ protected LaunchInfo launchDataflowJob(
398398

399399
if (shardingContextFileResourceName != null) {
400400
params.put(
401-
"shardingContextFilePath",
402-
getGcsPath(gcsPathPrefix + "/shardingContext.json", gcsResourceManager));
401+
"sourceConfigURL",
402+
getGcsPath(gcsPathPrefix + "/shardingConfig.conf", gcsResourceManager));
403403
}
404404

405405
if (customTransformation != null) {

v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerShardedMySQLRetryAllDLQIT.java

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -194,9 +194,9 @@ public void setUp() throws IOException, InterruptedException {
194194
datastreamResourceManager.startStream(streamB);
195195
streamNameB = streamB.getName().substring(streamB.getName().lastIndexOf('/') + 1);
196196

197-
// Generate Shard Context JSON
198-
String shardContextJson =
199-
generateShardContextJson(
197+
// Generate Shard Config
198+
String shardConfig =
199+
generateSourceConfig(
200200
streamNameA,
201201
jdbcResourceManagerShardA.getDatabaseName(),
202202
"shard1",
@@ -205,7 +205,7 @@ public void setUp() throws IOException, InterruptedException {
205205
"shard2");
206206

207207
gcsResourceManager.createArtifact(
208-
"input/shardingContext.json", shardContextJson.getBytes(StandardCharsets.UTF_8));
208+
"input/shardingConfig.conf", shardConfig.getBytes(StandardCharsets.UTF_8));
209209

210210
// Prepare job parameters
211211
Map<String, String> jobParameters = new HashMap<>();
@@ -223,8 +223,7 @@ public void setUp() throws IOException, InterruptedException {
223223
jobParameters.put(
224224
"deadLetterQueueDirectory", getGcsPath(GCS_PATH_PREFIX + "/dlq/", gcsResourceManager));
225225
jobParameters.put(
226-
"shardingContextFilePath",
227-
getGcsPath("input/shardingContext.json", gcsResourceManager));
226+
"sourceConfigURL", getGcsPath("input/shardingConfig.conf", gcsResourceManager));
228227
jobParameters.put(
229228
"inputFilePattern", getGcsPath(GCS_PATH_PREFIX + "/cdc/", gcsResourceManager));
230229

@@ -378,8 +377,7 @@ public void testDataStreamToSpannerShardedRetryAllDLQ() throws Exception {
378377
"deadLetterQueueDirectory", getGcsPath(GCS_PATH_PREFIX + "/dlq/", gcsResourceManager));
379378
retryParams.put("dlqMaxRetryCount", "20");
380379
retryParams.put("dlqRetryMinutes", "60");
381-
retryParams.put(
382-
"shardingContextFilePath", getGcsPath("input/shardingContext.json", gcsResourceManager));
380+
retryParams.put("sourceConfigURL", getGcsPath("input/shardingConfig.conf", gcsResourceManager));
383381

384382
PipelineLauncher.LaunchInfo retryJobInfo =
385383
launchDataflowJob(
@@ -514,25 +512,33 @@ private void insertDataInMySQL() {
514512
// error)
515513
}
516514

517-
private String generateShardContextJson(
515+
private String generateSourceConfig(
518516
String streamA, String dbA, String shardA, String streamB, String dbB, String shardB) {
519517
return "{\n"
520-
+ " \"StreamToDbAndShardMap\": {\n"
521-
+ " \""
522-
+ streamA
523-
+ "\": {\""
524-
+ dbA
525-
+ "\": \""
518+
+ " \"shardConfigs\": [\n"
519+
+ " {\n"
520+
+ " \"logicalShardId\": \""
526521
+ shardA
527-
+ "\"},\n"
528-
+ " \""
529-
+ streamB
530-
+ "\": {\""
531-
+ dbB
532-
+ "\": \""
522+
+ "\",\n"
523+
+ " \"dbName\": \""
524+
+ dbA
525+
+ "\",\n"
526+
+ " \"streamId\": \""
527+
+ streamA
528+
+ "\"\n"
529+
+ " },\n"
530+
+ " {\n"
531+
+ " \"logicalShardId\": \""
533532
+ shardB
534-
+ "\"}\n"
535-
+ " }\n"
533+
+ "\",\n"
534+
+ " \"dbName\": \""
535+
+ dbB
536+
+ "\",\n"
537+
+ " \"streamId\": \""
538+
+ streamB
539+
+ "\"\n"
540+
+ " }\n"
541+
+ " ]\n"
536542
+ "}";
537543
}
538544
}

v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerShardedMySQLRetryDLQIT.java

Lines changed: 29 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -197,9 +197,9 @@ public void setUp() throws IOException, InterruptedException {
197197
datastreamResourceManager.startStream(streamB);
198198
streamNameB = streamB.getName().substring(streamB.getName().lastIndexOf('/') + 1);
199199

200-
// Generate Shard Context JSON
201-
String shardContextJson =
202-
generateShardContextJson(
200+
// Generate Shard Config
201+
String shardConfig =
202+
generateSourceConfig(
203203
streamNameA,
204204
jdbcResourceManagerShardA.getDatabaseName(),
205205
"shard1",
@@ -208,7 +208,7 @@ public void setUp() throws IOException, InterruptedException {
208208
"shard2");
209209

210210
gcsResourceManager.createArtifact(
211-
"input/shardingContext.json", shardContextJson.getBytes(StandardCharsets.UTF_8));
211+
"input/shardingConfig.conf", shardConfig.getBytes(StandardCharsets.UTF_8));
212212

213213
// Prepare job parameters
214214
Map<String, String> jobParameters = new HashMap<>();
@@ -219,8 +219,7 @@ public void setUp() throws IOException, InterruptedException {
219219
jobParameters.put(
220220
"dlqMaxRetryCount", "1000"); // High retry count to keep items in retry/ bucket
221221
jobParameters.put(
222-
"shardingContextFilePath",
223-
getGcsPath("input/shardingContext.json", gcsResourceManager));
222+
"sourceConfigURL", getGcsPath("input/shardingConfig.conf", gcsResourceManager));
224223
jobParameters.put(
225224
"inputFilePattern", getGcsPath(GCS_PATH_PREFIX + "/cdc/", gcsResourceManager));
226225
jobParameters.put(
@@ -363,7 +362,7 @@ public void testDataStreamToSpannerShardedRetryDLQ() throws Exception {
363362
retryJobParameters.put(
364363
"deadLetterQueueDirectory", getGcsPath(GCS_PATH_PREFIX + "/dlq/", gcsResourceManager));
365364
retryJobParameters.put(
366-
"shardingContextFilePath", getGcsPath("input/shardingContext.json", gcsResourceManager));
365+
"sourceConfigURL", getGcsPath("input/shardingConfig.conf", gcsResourceManager));
367366

368367
PipelineLauncher.LaunchInfo retryJobInfo =
369368
launchDataflowJob(
@@ -532,25 +531,33 @@ private void executeSqlScript(CloudMySQLResourceManager resourceManager, String
532531
}
533532
}
534533

535-
private String generateShardContextJson(
534+
private String generateSourceConfig(
536535
String streamA, String dbA, String shardA, String streamB, String dbB, String shardB) {
537536
return "{\n"
538-
+ " \"StreamToDbAndShardMap\": {\n"
539-
+ " \""
540-
+ streamA
541-
+ "\": {\""
542-
+ dbA
543-
+ "\": \""
537+
+ " \"shardConfigs\": [\n"
538+
+ " {\n"
539+
+ " \"logicalShardId\": \""
544540
+ shardA
545-
+ "\"},\n"
546-
+ " \""
547-
+ streamB
548-
+ "\": {\""
549-
+ dbB
550-
+ "\": \""
541+
+ "\",\n"
542+
+ " \"dbName\": \""
543+
+ dbA
544+
+ "\",\n"
545+
+ " \"streamId\": \""
546+
+ streamA
547+
+ "\"\n"
548+
+ " },\n"
549+
+ " {\n"
550+
+ " \"logicalShardId\": \""
551551
+ shardB
552-
+ "\"}\n"
553-
+ " }\n"
552+
+ "\",\n"
553+
+ " \"dbName\": \""
554+
+ dbB
555+
+ "\",\n"
556+
+ " \"streamId\": \""
557+
+ streamB
558+
+ "\"\n"
559+
+ " }\n"
560+
+ " ]\n"
554561
+ "}";
555562
}
556563
}

v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DatastreamToSpannerSingleDFShardedMigrationIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public class DatastreamToSpannerSingleDFShardedMigrationIT extends DataStreamToS
6767
"DatastreamToSpannerSingleDFShardedMigrationIT/mysql-session.json";
6868

6969
private static final String SHARDING_CONTEXT_RESOURCE =
70-
"DatastreamToSpannerSingleDFShardedMigrationIT/sharding-context.json";
70+
"DatastreamToSpannerSingleDFShardedMigrationIT/sharding-config.conf";
7171

7272
private static final String SPANNER_DDL_RESOURCE =
7373
"DatastreamToSpannerSingleDFShardedMigrationIT/spanner-schema.sql";

v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/SeparateShadowTableDatabaseSingleDFShardedMigrationIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public class SeparateShadowTableDatabaseSingleDFShardedMigrationIT
6868
"DatastreamToSpannerSingleDFShardedMigrationIT/mysql-session.json";
6969

7070
private static final String SHARDING_CONTEXT_RESOURCE =
71-
"DatastreamToSpannerSingleDFShardedMigrationIT/sharding-context.json";
71+
"DatastreamToSpannerSingleDFShardedMigrationIT/sharding-config.conf";
7272

7373
private static final String SPANNER_DDL_RESOURCE =
7474
"DatastreamToSpannerSingleDFShardedMigrationIT/spanner-schema.sql";

v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/endtoend/EndToEndTestingITBase.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -289,21 +289,23 @@ protected void createAndUploadBulkShardConfigToGcs(
289289

290290
protected void createAndUploadShardContextFileToGcs(
291291
Map<String, Map<String, String>> streamDbMapping, GcsResourceManager gcsResourceManager) {
292-
JSONObject shardConfig = new JSONObject();
293-
JSONObject streams = new JSONObject();
292+
JSONObject config = new JSONObject();
293+
JSONArray shardConfigs = new JSONArray();
294294

295295
for (String stream : streamDbMapping.keySet()) {
296-
JSONObject dbs = new JSONObject();
297296
for (String db : streamDbMapping.get(stream).keySet()) {
298-
dbs.put(db, streamDbMapping.get(stream).get(db));
297+
JSONObject shardConfig = new JSONObject();
298+
shardConfig.put("streamId", stream);
299+
shardConfig.put("dbName", db);
300+
shardConfig.put("logicalShardId", streamDbMapping.get(stream).get(db));
301+
shardConfigs.put(shardConfig);
299302
}
300-
streams.put(stream, dbs);
301303
}
302304

303-
shardConfig.put("StreamToDbAndShardMap", streams);
304-
String shardFileContents = shardConfig.toString();
305-
LOG.info("Shard context file contents: {}", shardFileContents);
306-
gcsResourceManager.createArtifact("input/sharding-context.json", shardFileContents);
305+
config.put("shardConfigs", shardConfigs);
306+
String shardFileContents = config.toString();
307+
LOG.info("Shard config file contents: {}", shardFileContents);
308+
gcsResourceManager.createArtifact("input/sharding-config.conf", shardFileContents);
307309
}
308310

309311
protected PipelineLauncher.LaunchInfo launchBulkDataflowJob(
@@ -475,8 +477,7 @@ public PipelineLauncher.LaunchInfo launchFwdDataflowJob(
475477
.addParameter("inputFileFormat", "avro")
476478
.addParameter("sessionFilePath", getGcsPath("input/session.json", gcsResourceManager))
477479
.addParameter(
478-
"shardingContextFilePath",
479-
getGcsPath("input/sharding-context.json", gcsResourceManager))
480+
"sourceConfigURL", getGcsPath("input/sharding-config.conf", gcsResourceManager))
480481
.addEnvironmentVariable(
481482
"additionalExperiments", Collections.singletonList("use_runner_v2"))
482483
.build();
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
{
2+
"shardConfigs": [
3+
{
4+
"streamId": "jsonavrodatatypeit",
5+
"dbName": "S1L1",
6+
"logicalShardId": "L1"
7+
},
8+
{
9+
"streamId": "jsonavrodatatypeit",
10+
"dbName": "S1L2",
11+
"logicalShardId": "L2"
12+
},
13+
{
14+
"streamId": "jsonavrodatatypeitupdate",
15+
"dbName": "S2L1",
16+
"logicalShardId": "L3"
17+
},
18+
{
19+
"streamId": "jsonavrodatatypeitupdate",
20+
"dbName": "S2L2",
21+
"logicalShardId": "L4"
22+
}
23+
]
24+
}

v2/datastream-to-spanner/src/test/resources/DatastreamToSpannerSingleDFShardedMigrationIT/sharding-context.json

Lines changed: 0 additions & 6 deletions
This file was deleted.

0 commit comments

Comments
 (0)