Skip to content

Commit 6ca19c2

Browse files
feat(Spanner): implement Standardize shard config in live migration.
1 parent 3d09e7b commit 6ca19c2

14 files changed

Lines changed: 213 additions & 100 deletions

File tree

v2/datastream-to-spanner/README_Cloud_Datastream_to_Spanner.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ on [Metadata Annotations](https://github.com/GoogleCloudPlatform/DataflowTemplat
7878
* **transformationClassName**: Fully qualified class name having the custom transformation logic. It is a mandatory field in case transformationJarPath is specified. Defaults to empty.
7979
* **transformationCustomParameters**: String containing any custom parameters to be passed to the custom transformation class. Defaults to empty.
8080
* **filteredEventsDirectory**: This is the file path to store the events filtered via custom transformation. Default is a directory under the Dataflow job's temp location. The default value is enough under most conditions.
81-
* **shardingContextFilePath**: Sharding context file path in cloud storage is used to populate the shard id in spanner database for each source shard.It expects a JSON file with the format: {\"StreamToDbAndShardMap\": Map<stream_name, Map<db_name, shard_id>>}.
81+
* **sourceConfigURL**: Cloud Storage path to a shard config file for sharded migrations. It expects a HOCON or JSON file. For a sample file, please refer to v2/datastream-to-spanner/src/test/resources/DatastreamToSpannerSingleDFShardedMigrationIT/sharding-config.conf in the repository. For example, `gs://my-bucket/my-shard-config.conf`.
8282
* **tableOverrides**: These are the table name overrides from source to spanner. They are written in thefollowing format: [{SourceTableName1, SpannerTableName1}, {SourceTableName2, SpannerTableName2}]This example shows mapping Singers table to Vocalists and Albums table to Records. For example, `[{Singers, Vocalists}, {Albums, Records}]`. Defaults to empty.
8383
* **columnOverrides**: These are the column name overrides from source to spanner. They are written in thefollowing format: [{SourceTableName1.SourceColumnName1, SourceTableName1.SpannerColumnName1}, {SourceTableName2.SourceColumnName1, SourceTableName2.SpannerColumnName1}]Note that the SourceTableName should remain the same in both the source and spanner pair. To override table names, use tableOverrides.The example shows mapping SingerName to TalentName and AlbumName to RecordName in Singers and Albums table respectively. For example, `[{Singers.SingerName, Singers.TalentName}, {Albums.AlbumName, Albums.RecordName}]`. Defaults to empty.
8484
* **schemaOverridesFilePath**: A file which specifies the table and the column name overrides from source to spanner. Defaults to empty.
@@ -207,7 +207,7 @@ export TRANSFORMATION_JAR_PATH=""
207207
export TRANSFORMATION_CLASS_NAME=""
208208
export TRANSFORMATION_CUSTOM_PARAMETERS=""
209209
export FILTERED_EVENTS_DIRECTORY=""
210-
export SHARDING_CONTEXT_FILE_PATH=<shardingContextFilePath>
210+
export SOURCE_CONFIG_URL=<sourceConfigURL>
211211
export TABLE_OVERRIDES=""
212212
export COLUMN_OVERRIDES=""
213213
export SCHEMA_OVERRIDES_FILE_PATH=""
@@ -247,7 +247,7 @@ gcloud dataflow flex-template run "cloud-datastream-to-spanner-job" \
247247
--parameters "transformationClassName=$TRANSFORMATION_CLASS_NAME" \
248248
--parameters "transformationCustomParameters=$TRANSFORMATION_CUSTOM_PARAMETERS" \
249249
--parameters "filteredEventsDirectory=$FILTERED_EVENTS_DIRECTORY" \
250-
--parameters "shardingContextFilePath=$SHARDING_CONTEXT_FILE_PATH" \
250+
--parameters "sourceConfigURL=$SOURCE_CONFIG_URL" \
251251
--parameters "tableOverrides=$TABLE_OVERRIDES" \
252252
--parameters "columnOverrides=$COLUMN_OVERRIDES" \
253253
--parameters "schemaOverridesFilePath=$SCHEMA_OVERRIDES_FILE_PATH" \
@@ -302,7 +302,7 @@ export TRANSFORMATION_JAR_PATH=""
302302
export TRANSFORMATION_CLASS_NAME=""
303303
export TRANSFORMATION_CUSTOM_PARAMETERS=""
304304
export FILTERED_EVENTS_DIRECTORY=""
305-
export SHARDING_CONTEXT_FILE_PATH=<shardingContextFilePath>
305+
export SOURCE_CONFIG_URL=<sourceConfigURL>
306306
export TABLE_OVERRIDES=""
307307
export COLUMN_OVERRIDES=""
308308
export SCHEMA_OVERRIDES_FILE_PATH=""
@@ -317,7 +317,7 @@ mvn clean package -PtemplatesRun \
317317
-Dregion="$REGION" \
318318
-DjobName="cloud-datastream-to-spanner-job" \
319319
-DtemplateName="Cloud_Datastream_to_Spanner" \
320-
-Dparameters="inputFilePattern=$INPUT_FILE_PATTERN,inputFileFormat=$INPUT_FILE_FORMAT,sessionFilePath=$SESSION_FILE_PATH,instanceId=$INSTANCE_ID,databaseId=$DATABASE_ID,projectId=$PROJECT_ID,spannerHost=$SPANNER_HOST,gcsPubSubSubscription=$GCS_PUB_SUB_SUBSCRIPTION,streamName=$STREAM_NAME,shadowTablePrefix=$SHADOW_TABLE_PREFIX,shouldCreateShadowTables=$SHOULD_CREATE_SHADOW_TABLES,rfcStartDateTime=$RFC_START_DATE_TIME,fileReadConcurrency=$FILE_READ_CONCURRENCY,deadLetterQueueDirectory=$DEAD_LETTER_QUEUE_DIRECTORY,dlqRetryMinutes=$DLQ_RETRY_MINUTES,dlqMaxRetryCount=$DLQ_MAX_RETRY_COUNT,dataStreamRootUrl=$DATA_STREAM_ROOT_URL,datastreamSourceType=$DATASTREAM_SOURCE_TYPE,roundJsonDecimals=$ROUND_JSON_DECIMALS,runMode=$RUN_MODE,transformationContextFilePath=$TRANSFORMATION_CONTEXT_FILE_PATH,directoryWatchDurationInMinutes=$DIRECTORY_WATCH_DURATION_IN_MINUTES,spannerPriority=$SPANNER_PRIORITY,dlqGcsPubSubSubscription=$DLQ_GCS_PUB_SUB_SUBSCRIPTION,transformationJarPath=$TRANSFORMATION_JAR_PATH,transformationClassName=$TRANSFORMATION_CLASS_NAME,transformationCustomParameters=$TRANSFORMATION_CUSTOM_PARAMETERS,filteredEventsDirectory=$FILTERED_EVENTS_DIRECTORY,shardingContextFilePath=$SHARDING_CONTEXT_FILE_PATH,tableOverrides=$TABLE_OVERRIDES,columnOverrides=$COLUMN_OVERRIDES,schemaOverridesFilePath=$SCHEMA_OVERRIDES_FILE_PATH,shadowTableSpannerDatabaseId=$SHADOW_TABLE_SPANNER_DATABASE_ID,shadowTableSpannerInstanceId=$SHADOW_TABLE_SPANNER_INSTANCE_ID,failureInjectionParameter=$FAILURE_INJECTION_PARAMETER" \
320+
-Dparameters="inputFilePattern=$INPUT_FILE_PATTERN,inputFileFormat=$INPUT_FILE_FORMAT,sessionFilePath=$SESSION_FILE_PATH,instanceId=$INSTANCE_ID,databaseId=$DATABASE_ID,projectId=$PROJECT_ID,spannerHost=$SPANNER_HOST,gcsPubSubSubscription=$GCS_PUB_SUB_SUBSCRIPTION,streamName=$STREAM_NAME,shadowTablePrefix=$SHADOW_TABLE_PREFIX,shouldCreateShadowTables=$SHOULD_CREATE_SHADOW_TABLES,rfcStartDateTime=$RFC_START_DATE_TIME,fileReadConcurrency=$FILE_READ_CONCURRENCY,deadLetterQueueDirectory=$DEAD_LETTER_QUEUE_DIRECTORY,dlqRetryMinutes=$DLQ_RETRY_MINUTES,dlqMaxRetryCount=$DLQ_MAX_RETRY_COUNT,dataStreamRootUrl=$DATA_STREAM_ROOT_URL,datastreamSourceType=$DATASTREAM_SOURCE_TYPE,roundJsonDecimals=$ROUND_JSON_DECIMALS,runMode=$RUN_MODE,transformationContextFilePath=$TRANSFORMATION_CONTEXT_FILE_PATH,directoryWatchDurationInMinutes=$DIRECTORY_WATCH_DURATION_IN_MINUTES,spannerPriority=$SPANNER_PRIORITY,dlqGcsPubSubSubscription=$DLQ_GCS_PUB_SUB_SUBSCRIPTION,transformationJarPath=$TRANSFORMATION_JAR_PATH,transformationClassName=$TRANSFORMATION_CLASS_NAME,transformationCustomParameters=$TRANSFORMATION_CUSTOM_PARAMETERS,filteredEventsDirectory=$FILTERED_EVENTS_DIRECTORY,sourceConfigURL=$SOURCE_CONFIG_URL,tableOverrides=$TABLE_OVERRIDES,columnOverrides=$COLUMN_OVERRIDES,schemaOverridesFilePath=$SCHEMA_OVERRIDES_FILE_PATH,shadowTableSpannerDatabaseId=$SHADOW_TABLE_SPANNER_DATABASE_ID,shadowTableSpannerInstanceId=$SHADOW_TABLE_SPANNER_INSTANCE_ID,failureInjectionParameter=$FAILURE_INJECTION_PARAMETER" \
321321
-f v2/datastream-to-spanner
322322
```
323323

@@ -390,7 +390,7 @@ resource "google_dataflow_flex_template_job" "cloud_datastream_to_spanner" {
390390
# transformationClassName = ""
391391
# transformationCustomParameters = ""
392392
# filteredEventsDirectory = ""
393-
# shardingContextFilePath = "<shardingContextFilePath>"
393+
# sourceConfigURL = "<sourceConfigURL>"
394394
# tableOverrides = ""
395395
# columnOverrides = ""
396396
# schemaOverridesFilePath = ""

v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamToSpanner.java

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,16 @@
3636
import com.google.cloud.teleport.v2.spanner.migrations.schema.Schema;
3737
import com.google.cloud.teleport.v2.spanner.migrations.schema.SchemaFileOverridesParser;
3838
import com.google.cloud.teleport.v2.spanner.migrations.schema.SchemaStringOverridesParser;
39+
import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
3940
import com.google.cloud.teleport.v2.spanner.migrations.shard.ShardingContext;
41+
import com.google.cloud.teleport.v2.spanner.migrations.source.config.JdbcShardConfig;
42+
import com.google.cloud.teleport.v2.spanner.migrations.source.config.SourceConfigParser;
43+
import com.google.cloud.teleport.v2.spanner.migrations.source.config.SourceConnectionConfig;
4044
import com.google.cloud.teleport.v2.spanner.migrations.transformation.CustomTransformation;
4145
import com.google.cloud.teleport.v2.spanner.migrations.transformation.TransformationContext;
4246
import com.google.cloud.teleport.v2.spanner.migrations.utils.DataflowWorkerMachineTypeUtils;
47+
import com.google.cloud.teleport.v2.spanner.migrations.utils.SecretManagerAccessorImpl;
4348
import com.google.cloud.teleport.v2.spanner.migrations.utils.SessionFileReader;
44-
import com.google.cloud.teleport.v2.spanner.migrations.utils.ShardingContextReader;
4549
import com.google.cloud.teleport.v2.spanner.migrations.utils.TransformationContextReader;
4650
import com.google.cloud.teleport.v2.templates.DataStreamToSpanner.Options;
4751
import com.google.cloud.teleport.v2.templates.constants.DatastreamToSpannerConstants;
@@ -55,6 +59,7 @@
5559
import java.util.ArrayList;
5660
import java.util.Arrays;
5761
import java.util.HashMap;
62+
import java.util.List;
5863
import java.util.Map;
5964
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
6065
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
@@ -491,16 +496,17 @@ public interface Options
491496

492497
void setFilteredEventsDirectory(String value);
493498

494-
@TemplateParameter.GcsReadFile(
499+
@TemplateParameter.Text(
495500
order = 29,
496501
optional = true,
502+
regexes = {"^.+$"},
503+
description = "Source Config URL",
497504
helpText =
498-
"Sharding context file path in cloud storage is used to populate the shard id in spanner database for each source shard."
499-
+ "It expects a JSON file with the format: {\\\"StreamToDbAndShardMap\\\": Map<stream_name, Map<db_name, shard_id>>}",
500-
description = "Sharding context file path in cloud storage")
501-
String getShardingContextFilePath();
505+
"Cloud Storage path to a shard config file for sharded migrations. It expects a HOCON or JSON file. For a sample file, please refer to v2/datastream-to-spanner/src/test/resources/DatastreamToSpannerSingleDFShardedMigrationIT/sharding-config.conf in the repository. For example, `gs://my-bucket/my-shard-config.conf`.",
506+
example = "gs://my-bucket/my-shard-config.conf")
507+
String getSourceConfigURL();
502508

503-
void setShardingContextFilePath(String value);
509+
void setSourceConfigURL(String value);
504510

505511
@TemplateParameter.Text(
506512
order = 30,
@@ -814,8 +820,31 @@ static Pipeline buildPipeline(Options options) {
814820
options.getTransformationContextFilePath());
815821

816822
// Ingest sharding context file into memory.
817-
ShardingContext shardingContext =
818-
ShardingContextReader.getShardingContext(options.getShardingContextFilePath());
823+
ShardingContext shardingContext = new ShardingContext();
824+
if (options.getSourceConfigURL() != null && !options.getSourceConfigURL().isEmpty()) {
825+
try {
826+
SourceConfigParser parser = new SourceConfigParser(new SecretManagerAccessorImpl());
827+
SourceConnectionConfig sourceConfig =
828+
parser.parseConfiguration(
829+
options.getDatastreamSourceType(),
830+
options.getSourceConfigURL(),
831+
/* resolveSecrets= */ false);
832+
833+
if (sourceConfig instanceof JdbcShardConfig) {
834+
JdbcShardConfig jdbcShardConfig = (JdbcShardConfig) sourceConfig;
835+
List<Shard> shards = jdbcShardConfig.getShardConfigs();
836+
Map<String, Map<String, String>> streamToDbAndShardMap = new HashMap<>();
837+
for (Shard shard : shards) {
838+
streamToDbAndShardMap
839+
.computeIfAbsent(shard.getStreamId(), k -> new HashMap<>())
840+
.put(shard.getDbName(), shard.getLogicalShardId());
841+
}
842+
shardingContext = new ShardingContext(streamToDbAndShardMap);
843+
}
844+
} catch (Exception e) {
845+
throw new RuntimeException("Failed to parse source config URL", e);
846+
}
847+
}
819848

820849
CustomTransformation customTransformation =
821850
CustomTransformation.builder(

v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerITBase.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ protected LaunchInfo launchDataflowJob(
313313
LOG.info(
314314
"Uploading sharding context file from resource: {}", shardingContextFileResourceName);
315315
gcsResourceManager.uploadArtifact(
316-
gcsPathPrefix + "/shardingContext.json",
316+
gcsPathPrefix + "/shardingConfig.conf",
317317
Resources.getResource(shardingContextFileResourceName).getPath());
318318
} else {
319319
LOG.info("No sharding context file provided, skipping upload.");
@@ -398,8 +398,8 @@ protected LaunchInfo launchDataflowJob(
398398

399399
if (shardingContextFileResourceName != null) {
400400
params.put(
401-
"shardingContextFilePath",
402-
getGcsPath(gcsPathPrefix + "/shardingContext.json", gcsResourceManager));
401+
"sourceConfigURL",
402+
getGcsPath(gcsPathPrefix + "/shardingConfig.conf", gcsResourceManager));
403403
}
404404

405405
if (customTransformation != null) {

v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerShardedMySQLRetryAllDLQIT.java

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -194,9 +194,9 @@ public void setUp() throws IOException, InterruptedException {
194194
datastreamResourceManager.startStream(streamB);
195195
streamNameB = streamB.getName().substring(streamB.getName().lastIndexOf('/') + 1);
196196

197-
// Generate Shard Context JSON
198-
String shardContextJson =
199-
generateShardContextJson(
197+
// Generate Shard Config
198+
String shardConfig =
199+
generateSourceConfig(
200200
streamNameA,
201201
jdbcResourceManagerShardA.getDatabaseName(),
202202
"shard1",
@@ -205,7 +205,7 @@ public void setUp() throws IOException, InterruptedException {
205205
"shard2");
206206

207207
gcsResourceManager.createArtifact(
208-
"input/shardingContext.json", shardContextJson.getBytes(StandardCharsets.UTF_8));
208+
"input/shardingConfig.conf", shardConfig.getBytes(StandardCharsets.UTF_8));
209209

210210
// Prepare job parameters
211211
Map<String, String> jobParameters = new HashMap<>();
@@ -223,8 +223,7 @@ public void setUp() throws IOException, InterruptedException {
223223
jobParameters.put(
224224
"deadLetterQueueDirectory", getGcsPath(GCS_PATH_PREFIX + "/dlq/", gcsResourceManager));
225225
jobParameters.put(
226-
"shardingContextFilePath",
227-
getGcsPath("input/shardingContext.json", gcsResourceManager));
226+
"sourceConfigURL", getGcsPath("input/shardingConfig.conf", gcsResourceManager));
228227
jobParameters.put(
229228
"inputFilePattern", getGcsPath(GCS_PATH_PREFIX + "/cdc/", gcsResourceManager));
230229

@@ -378,8 +377,7 @@ public void testDataStreamToSpannerShardedRetryAllDLQ() throws Exception {
378377
"deadLetterQueueDirectory", getGcsPath(GCS_PATH_PREFIX + "/dlq/", gcsResourceManager));
379378
retryParams.put("dlqMaxRetryCount", "20");
380379
retryParams.put("dlqRetryMinutes", "60");
381-
retryParams.put(
382-
"shardingContextFilePath", getGcsPath("input/shardingContext.json", gcsResourceManager));
380+
retryParams.put("sourceConfigURL", getGcsPath("input/shardingConfig.conf", gcsResourceManager));
383381

384382
PipelineLauncher.LaunchInfo retryJobInfo =
385383
launchDataflowJob(
@@ -514,25 +512,33 @@ private void insertDataInMySQL() {
514512
// error)
515513
}
516514

517-
private String generateShardContextJson(
515+
private String generateSourceConfig(
518516
String streamA, String dbA, String shardA, String streamB, String dbB, String shardB) {
519517
return "{\n"
520-
+ " \"StreamToDbAndShardMap\": {\n"
521-
+ " \""
522-
+ streamA
523-
+ "\": {\""
524-
+ dbA
525-
+ "\": \""
518+
+ " \"shardConfigs\": [\n"
519+
+ " {\n"
520+
+ " \"logicalShardId\": \""
526521
+ shardA
527-
+ "\"},\n"
528-
+ " \""
529-
+ streamB
530-
+ "\": {\""
531-
+ dbB
532-
+ "\": \""
522+
+ "\",\n"
523+
+ " \"dbName\": \""
524+
+ dbA
525+
+ "\",\n"
526+
+ " \"streamId\": \""
527+
+ streamA
528+
+ "\"\n"
529+
+ " },\n"
530+
+ " {\n"
531+
+ " \"logicalShardId\": \""
533532
+ shardB
534-
+ "\"}\n"
535-
+ " }\n"
533+
+ "\",\n"
534+
+ " \"dbName\": \""
535+
+ dbB
536+
+ "\",\n"
537+
+ " \"streamId\": \""
538+
+ streamB
539+
+ "\"\n"
540+
+ " }\n"
541+
+ " ]\n"
536542
+ "}";
537543
}
538544
}

v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerShardedMySQLRetryDLQIT.java

Lines changed: 29 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -197,9 +197,9 @@ public void setUp() throws IOException, InterruptedException {
197197
datastreamResourceManager.startStream(streamB);
198198
streamNameB = streamB.getName().substring(streamB.getName().lastIndexOf('/') + 1);
199199

200-
// Generate Shard Context JSON
201-
String shardContextJson =
202-
generateShardContextJson(
200+
// Generate Shard Config
201+
String shardConfig =
202+
generateSourceConfig(
203203
streamNameA,
204204
jdbcResourceManagerShardA.getDatabaseName(),
205205
"shard1",
@@ -208,7 +208,7 @@ public void setUp() throws IOException, InterruptedException {
208208
"shard2");
209209

210210
gcsResourceManager.createArtifact(
211-
"input/shardingContext.json", shardContextJson.getBytes(StandardCharsets.UTF_8));
211+
"input/shardingConfig.conf", shardConfig.getBytes(StandardCharsets.UTF_8));
212212

213213
// Prepare job parameters
214214
Map<String, String> jobParameters = new HashMap<>();
@@ -219,8 +219,7 @@ public void setUp() throws IOException, InterruptedException {
219219
jobParameters.put(
220220
"dlqMaxRetryCount", "1000"); // High retry count to keep items in retry/ bucket
221221
jobParameters.put(
222-
"shardingContextFilePath",
223-
getGcsPath("input/shardingContext.json", gcsResourceManager));
222+
"sourceConfigURL", getGcsPath("input/shardingConfig.conf", gcsResourceManager));
224223
jobParameters.put(
225224
"inputFilePattern", getGcsPath(GCS_PATH_PREFIX + "/cdc/", gcsResourceManager));
226225
jobParameters.put(
@@ -363,7 +362,7 @@ public void testDataStreamToSpannerShardedRetryDLQ() throws Exception {
363362
retryJobParameters.put(
364363
"deadLetterQueueDirectory", getGcsPath(GCS_PATH_PREFIX + "/dlq/", gcsResourceManager));
365364
retryJobParameters.put(
366-
"shardingContextFilePath", getGcsPath("input/shardingContext.json", gcsResourceManager));
365+
"sourceConfigURL", getGcsPath("input/shardingConfig.conf", gcsResourceManager));
367366

368367
PipelineLauncher.LaunchInfo retryJobInfo =
369368
launchDataflowJob(
@@ -532,25 +531,33 @@ private void executeSqlScript(CloudMySQLResourceManager resourceManager, String
532531
}
533532
}
534533

535-
private String generateShardContextJson(
534+
private String generateSourceConfig(
536535
String streamA, String dbA, String shardA, String streamB, String dbB, String shardB) {
537536
return "{\n"
538-
+ " \"StreamToDbAndShardMap\": {\n"
539-
+ " \""
540-
+ streamA
541-
+ "\": {\""
542-
+ dbA
543-
+ "\": \""
537+
+ " \"shardConfigs\": [\n"
538+
+ " {\n"
539+
+ " \"logicalShardId\": \""
544540
+ shardA
545-
+ "\"},\n"
546-
+ " \""
547-
+ streamB
548-
+ "\": {\""
549-
+ dbB
550-
+ "\": \""
541+
+ "\",\n"
542+
+ " \"dbName\": \""
543+
+ dbA
544+
+ "\",\n"
545+
+ " \"streamId\": \""
546+
+ streamA
547+
+ "\"\n"
548+
+ " },\n"
549+
+ " {\n"
550+
+ " \"logicalShardId\": \""
551551
+ shardB
552-
+ "\"}\n"
553-
+ " }\n"
552+
+ "\",\n"
553+
+ " \"dbName\": \""
554+
+ dbB
555+
+ "\",\n"
556+
+ " \"streamId\": \""
557+
+ streamB
558+
+ "\"\n"
559+
+ " }\n"
560+
+ " ]\n"
554561
+ "}";
555562
}
556563
}

v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DatastreamToSpannerSingleDFShardedMigrationIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public class DatastreamToSpannerSingleDFShardedMigrationIT extends DataStreamToS
6767
"DatastreamToSpannerSingleDFShardedMigrationIT/mysql-session.json";
6868

6969
private static final String SHARDING_CONTEXT_RESOURCE =
70-
"DatastreamToSpannerSingleDFShardedMigrationIT/sharding-context.json";
70+
"DatastreamToSpannerSingleDFShardedMigrationIT/sharding-config.conf";
7171

7272
private static final String SPANNER_DDL_RESOURCE =
7373
"DatastreamToSpannerSingleDFShardedMigrationIT/spanner-schema.sql";

0 commit comments

Comments
 (0)