Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions v2/datastream-to-spanner/README_Cloud_Datastream_to_Spanner.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ on [Metadata Annotations](https://github.com/GoogleCloudPlatform/DataflowTemplat
* **transformationClassName**: Fully qualified class name having the custom transformation logic. It is a mandatory field in case transformationJarPath is specified. Defaults to empty.
* **transformationCustomParameters**: String containing any custom parameters to be passed to the custom transformation class. Defaults to empty.
* **filteredEventsDirectory**: This is the file path to store the events filtered via custom transformation. Default is a directory under the Dataflow job's temp location. The default value is enough under most conditions.
* **shardingContextFilePath**: Sharding context file path in cloud storage is used to populate the shard id in spanner database for each source shard.It expects a JSON file with the format: {\"StreamToDbAndShardMap\": Map<stream_name, Map<db_name, shard_id>>}.
* **sourceConfigURL**: Cloud Storage path to a shard config file for sharded migrations. It expects a HOCON or JSON file. For a sample file, please refer to v2/datastream-to-spanner/src/test/resources/DatastreamToSpannerSingleDFShardedMigrationIT/sharding-config.conf in the repository. For example, `gs://my-bucket/my-shard-config.conf`.
* **tableOverrides**: These are the table name overrides from source to spanner. They are written in thefollowing format: [{SourceTableName1, SpannerTableName1}, {SourceTableName2, SpannerTableName2}]This example shows mapping Singers table to Vocalists and Albums table to Records. For example, `[{Singers, Vocalists}, {Albums, Records}]`. Defaults to empty.
* **columnOverrides**: These are the column name overrides from source to spanner. They are written in thefollowing format: [{SourceTableName1.SourceColumnName1, SourceTableName1.SpannerColumnName1}, {SourceTableName2.SourceColumnName1, SourceTableName2.SpannerColumnName1}]Note that the SourceTableName should remain the same in both the source and spanner pair. To override table names, use tableOverrides.The example shows mapping SingerName to TalentName and AlbumName to RecordName in Singers and Albums table respectively. For example, `[{Singers.SingerName, Singers.TalentName}, {Albums.AlbumName, Albums.RecordName}]`. Defaults to empty.
* **schemaOverridesFilePath**: A file which specifies the table and the column name overrides from source to spanner. Defaults to empty.
Expand Down Expand Up @@ -207,7 +207,7 @@ export TRANSFORMATION_JAR_PATH=""
export TRANSFORMATION_CLASS_NAME=""
export TRANSFORMATION_CUSTOM_PARAMETERS=""
export FILTERED_EVENTS_DIRECTORY=""
export SHARDING_CONTEXT_FILE_PATH=<shardingContextFilePath>
export SOURCE_CONFIG_URL=<sourceConfigURL>
export TABLE_OVERRIDES=""
export COLUMN_OVERRIDES=""
export SCHEMA_OVERRIDES_FILE_PATH=""
Expand Down Expand Up @@ -247,7 +247,7 @@ gcloud dataflow flex-template run "cloud-datastream-to-spanner-job" \
--parameters "transformationClassName=$TRANSFORMATION_CLASS_NAME" \
--parameters "transformationCustomParameters=$TRANSFORMATION_CUSTOM_PARAMETERS" \
--parameters "filteredEventsDirectory=$FILTERED_EVENTS_DIRECTORY" \
--parameters "shardingContextFilePath=$SHARDING_CONTEXT_FILE_PATH" \
--parameters "sourceConfigURL=$SOURCE_CONFIG_URL" \
--parameters "tableOverrides=$TABLE_OVERRIDES" \
--parameters "columnOverrides=$COLUMN_OVERRIDES" \
--parameters "schemaOverridesFilePath=$SCHEMA_OVERRIDES_FILE_PATH" \
Expand Down Expand Up @@ -302,7 +302,7 @@ export TRANSFORMATION_JAR_PATH=""
export TRANSFORMATION_CLASS_NAME=""
export TRANSFORMATION_CUSTOM_PARAMETERS=""
export FILTERED_EVENTS_DIRECTORY=""
export SHARDING_CONTEXT_FILE_PATH=<shardingContextFilePath>
export SOURCE_CONFIG_URL=<sourceConfigURL>
export TABLE_OVERRIDES=""
export COLUMN_OVERRIDES=""
export SCHEMA_OVERRIDES_FILE_PATH=""
Expand All @@ -317,7 +317,7 @@ mvn clean package -PtemplatesRun \
-Dregion="$REGION" \
-DjobName="cloud-datastream-to-spanner-job" \
-DtemplateName="Cloud_Datastream_to_Spanner" \
-Dparameters="inputFilePattern=$INPUT_FILE_PATTERN,inputFileFormat=$INPUT_FILE_FORMAT,sessionFilePath=$SESSION_FILE_PATH,instanceId=$INSTANCE_ID,databaseId=$DATABASE_ID,projectId=$PROJECT_ID,spannerHost=$SPANNER_HOST,gcsPubSubSubscription=$GCS_PUB_SUB_SUBSCRIPTION,streamName=$STREAM_NAME,shadowTablePrefix=$SHADOW_TABLE_PREFIX,shouldCreateShadowTables=$SHOULD_CREATE_SHADOW_TABLES,rfcStartDateTime=$RFC_START_DATE_TIME,fileReadConcurrency=$FILE_READ_CONCURRENCY,deadLetterQueueDirectory=$DEAD_LETTER_QUEUE_DIRECTORY,dlqRetryMinutes=$DLQ_RETRY_MINUTES,dlqMaxRetryCount=$DLQ_MAX_RETRY_COUNT,dataStreamRootUrl=$DATA_STREAM_ROOT_URL,datastreamSourceType=$DATASTREAM_SOURCE_TYPE,roundJsonDecimals=$ROUND_JSON_DECIMALS,runMode=$RUN_MODE,transformationContextFilePath=$TRANSFORMATION_CONTEXT_FILE_PATH,directoryWatchDurationInMinutes=$DIRECTORY_WATCH_DURATION_IN_MINUTES,spannerPriority=$SPANNER_PRIORITY,dlqGcsPubSubSubscription=$DLQ_GCS_PUB_SUB_SUBSCRIPTION,transformationJarPath=$TRANSFORMATION_JAR_PATH,transformationClassName=$TRANSFORMATION_CLASS_NAME,transformationCustomParameters=$TRANSFORMATION_CUSTOM_PARAMETERS,filteredEventsDirectory=$FILTERED_EVENTS_DIRECTORY,shardingContextFilePath=$SHARDING_CONTEXT_FILE_PATH,tableOverrides=$TABLE_OVERRIDES,columnOverrides=$COLUMN_OVERRIDES,schemaOverridesFilePath=$SCHEMA_OVERRIDES_FILE_PATH,shadowTableSpannerDatabaseId=$SHADOW_TABLE_SPANNER_DATABASE_ID,shadowTableSpannerInstanceId=$SHADOW_TABLE_SPANNER_INSTANCE_ID,failureInjectionParameter=$FAILURE_INJECTION_PARAMETER" \
-Dparameters="inputFilePattern=$INPUT_FILE_PATTERN,inputFileFormat=$INPUT_FILE_FORMAT,sessionFilePath=$SESSION_FILE_PATH,instanceId=$INSTANCE_ID,databaseId=$DATABASE_ID,projectId=$PROJECT_ID,spannerHost=$SPANNER_HOST,gcsPubSubSubscription=$GCS_PUB_SUB_SUBSCRIPTION,streamName=$STREAM_NAME,shadowTablePrefix=$SHADOW_TABLE_PREFIX,shouldCreateShadowTables=$SHOULD_CREATE_SHADOW_TABLES,rfcStartDateTime=$RFC_START_DATE_TIME,fileReadConcurrency=$FILE_READ_CONCURRENCY,deadLetterQueueDirectory=$DEAD_LETTER_QUEUE_DIRECTORY,dlqRetryMinutes=$DLQ_RETRY_MINUTES,dlqMaxRetryCount=$DLQ_MAX_RETRY_COUNT,dataStreamRootUrl=$DATA_STREAM_ROOT_URL,datastreamSourceType=$DATASTREAM_SOURCE_TYPE,roundJsonDecimals=$ROUND_JSON_DECIMALS,runMode=$RUN_MODE,transformationContextFilePath=$TRANSFORMATION_CONTEXT_FILE_PATH,directoryWatchDurationInMinutes=$DIRECTORY_WATCH_DURATION_IN_MINUTES,spannerPriority=$SPANNER_PRIORITY,dlqGcsPubSubSubscription=$DLQ_GCS_PUB_SUB_SUBSCRIPTION,transformationJarPath=$TRANSFORMATION_JAR_PATH,transformationClassName=$TRANSFORMATION_CLASS_NAME,transformationCustomParameters=$TRANSFORMATION_CUSTOM_PARAMETERS,filteredEventsDirectory=$FILTERED_EVENTS_DIRECTORY,sourceConfigURL=$SOURCE_CONFIG_URL,tableOverrides=$TABLE_OVERRIDES,columnOverrides=$COLUMN_OVERRIDES,schemaOverridesFilePath=$SCHEMA_OVERRIDES_FILE_PATH,shadowTableSpannerDatabaseId=$SHADOW_TABLE_SPANNER_DATABASE_ID,shadowTableSpannerInstanceId=$SHADOW_TABLE_SPANNER_INSTANCE_ID,failureInjectionParameter=$FAILURE_INJECTION_PARAMETER" \
-f v2/datastream-to-spanner
```

Expand Down Expand Up @@ -390,7 +390,7 @@ resource "google_dataflow_flex_template_job" "cloud_datastream_to_spanner" {
# transformationClassName = ""
# transformationCustomParameters = ""
# filteredEventsDirectory = ""
# shardingContextFilePath = "<shardingContextFilePath>"
# sourceConfigURL = "<sourceConfigURL>"
# tableOverrides = ""
# columnOverrides = ""
# schemaOverridesFilePath = ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,16 @@
import com.google.cloud.teleport.v2.spanner.migrations.schema.Schema;
import com.google.cloud.teleport.v2.spanner.migrations.schema.SchemaFileOverridesParser;
import com.google.cloud.teleport.v2.spanner.migrations.schema.SchemaStringOverridesParser;
import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
import com.google.cloud.teleport.v2.spanner.migrations.shard.ShardingContext;
import com.google.cloud.teleport.v2.spanner.migrations.source.config.JdbcShardConfig;
import com.google.cloud.teleport.v2.spanner.migrations.source.config.SourceConfigParser;
import com.google.cloud.teleport.v2.spanner.migrations.source.config.SourceConnectionConfig;
import com.google.cloud.teleport.v2.spanner.migrations.transformation.CustomTransformation;
import com.google.cloud.teleport.v2.spanner.migrations.transformation.TransformationContext;
import com.google.cloud.teleport.v2.spanner.migrations.utils.DataflowWorkerMachineTypeUtils;
import com.google.cloud.teleport.v2.spanner.migrations.utils.SecretManagerAccessorImpl;
import com.google.cloud.teleport.v2.spanner.migrations.utils.SessionFileReader;
import com.google.cloud.teleport.v2.spanner.migrations.utils.ShardingContextReader;
import com.google.cloud.teleport.v2.spanner.migrations.utils.TransformationContextReader;
import com.google.cloud.teleport.v2.templates.DataStreamToSpanner.Options;
import com.google.cloud.teleport.v2.templates.constants.DatastreamToSpannerConstants;
Expand All @@ -55,6 +59,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
Expand Down Expand Up @@ -494,13 +499,13 @@ public interface Options
@TemplateParameter.GcsReadFile(
order = 29,
optional = true,
description = "Source Config URL",
helpText =
"Sharding context file path in cloud storage is used to populate the shard id in spanner database for each source shard."
+ "It expects a JSON file with the format: {\\\"StreamToDbAndShardMap\\\": Map<stream_name, Map<db_name, shard_id>>}",
description = "Sharding context file path in cloud storage")
String getShardingContextFilePath();
"Cloud Storage path to a shard config file for sharded migrations. It expects a HOCON or JSON file. For a sample file, please refer to v2/datastream-to-spanner/src/test/resources/DatastreamToSpannerSingleDFShardedMigrationIT/sharding-config.conf in the repository. For example, `gs://my-bucket/my-shard-config.conf`.",
example = "gs://my-bucket/my-shard-config.conf")
String getSourceConfigURL();

void setShardingContextFilePath(String value);
void setSourceConfigURL(String value);

@TemplateParameter.Text(
order = 30,
Expand Down Expand Up @@ -814,8 +819,31 @@ static Pipeline buildPipeline(Options options) {
options.getTransformationContextFilePath());

// Ingest sharding context file into memory.
ShardingContext shardingContext =
ShardingContextReader.getShardingContext(options.getShardingContextFilePath());
ShardingContext shardingContext = new ShardingContext();
if (options.getSourceConfigURL() != null && !options.getSourceConfigURL().isEmpty()) {
try {
SourceConfigParser parser = new SourceConfigParser(new SecretManagerAccessorImpl());
SourceConnectionConfig sourceConfig =
parser.parseConfiguration(
options.getDatastreamSourceType(),
options.getSourceConfigURL(),
/* resolveSecrets= */ false);
Comment on lines +823 to +830

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

In retry modes (such as retryDLQ), validateSourceType returns early without setting datastreamSourceType, which means options.getDatastreamSourceType() can be null. Passing a null source type to parser.parseConfiguration will cause a NullPointerException or IllegalArgumentException during startup. We should add a defensive check to ensure datastreamSourceType is provided when sourceConfigURL is used, and log the error message before throwing the exception to ensure it is recorded in the Dataflow environment.

    if (options.getSourceConfigURL() != null && !options.getSourceConfigURL().isEmpty()) {
      try {
        String sourceType = options.getDatastreamSourceType();
        if (sourceType == null || sourceType.isEmpty()) {
          LOG.error("datastreamSourceType must be specified when sourceConfigURL is provided.");
          throw new IllegalArgumentException(
              "datastreamSourceType must be specified when sourceConfigURL is provided.");
        }
        SourceConfigParser parser = new SourceConfigParser(new SecretManagerAccessorImpl());
        SourceConnectionConfig sourceConfig =
            parser.parseConfiguration(
                sourceType,
                options.getSourceConfigURL(),
                /* resolveSecrets= */ false);
References
  1. When throwing exceptions in environments where the caller or global exception handler might not log them (such as certain Dataflow templates), log the error message before throwing the exception to ensure the failure is recorded.


if (sourceConfig instanceof JdbcShardConfig) {
JdbcShardConfig jdbcShardConfig = (JdbcShardConfig) sourceConfig;
List<Shard> shards = jdbcShardConfig.getShardConfigs();
Map<String, Map<String, String>> streamToDbAndShardMap = new HashMap<>();
for (Shard shard : shards) {
streamToDbAndShardMap
.computeIfAbsent(shard.getStreamId(), k -> new HashMap<>())
.put(shard.getDbName(), shard.getLogicalShardId());
}
shardingContext = new ShardingContext(streamToDbAndShardMap);
}
} catch (Exception e) {
throw new RuntimeException("Failed to parse source config URL", e);
}
}

CustomTransformation customTransformation =
CustomTransformation.builder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ protected LaunchInfo launchDataflowJob(
LOG.info(
"Uploading sharding context file from resource: {}", shardingContextFileResourceName);
gcsResourceManager.uploadArtifact(
gcsPathPrefix + "/shardingContext.json",
gcsPathPrefix + "/shardingConfig.conf",
Resources.getResource(shardingContextFileResourceName).getPath());
} else {
LOG.info("No sharding context file provided, skipping upload.");
Expand Down Expand Up @@ -398,8 +398,8 @@ protected LaunchInfo launchDataflowJob(

if (shardingContextFileResourceName != null) {
params.put(
"shardingContextFilePath",
getGcsPath(gcsPathPrefix + "/shardingContext.json", gcsResourceManager));
"sourceConfigURL",
getGcsPath(gcsPathPrefix + "/shardingConfig.conf", gcsResourceManager));
}

if (customTransformation != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,9 @@ public void setUp() throws IOException, InterruptedException {
datastreamResourceManager.startStream(streamB);
streamNameB = streamB.getName().substring(streamB.getName().lastIndexOf('/') + 1);

// Generate Shard Context JSON
String shardContextJson =
generateShardContextJson(
// Generate Shard Config
String shardConfig =
generateSourceConfig(
streamNameA,
jdbcResourceManagerShardA.getDatabaseName(),
"shard1",
Expand All @@ -205,7 +205,7 @@ public void setUp() throws IOException, InterruptedException {
"shard2");

gcsResourceManager.createArtifact(
"input/shardingContext.json", shardContextJson.getBytes(StandardCharsets.UTF_8));
"input/shardingConfig.conf", shardConfig.getBytes(StandardCharsets.UTF_8));

// Prepare job parameters
Map<String, String> jobParameters = new HashMap<>();
Expand All @@ -223,8 +223,7 @@ public void setUp() throws IOException, InterruptedException {
jobParameters.put(
"deadLetterQueueDirectory", getGcsPath(GCS_PATH_PREFIX + "/dlq/", gcsResourceManager));
jobParameters.put(
"shardingContextFilePath",
getGcsPath("input/shardingContext.json", gcsResourceManager));
"sourceConfigURL", getGcsPath("input/shardingConfig.conf", gcsResourceManager));
jobParameters.put(
"inputFilePattern", getGcsPath(GCS_PATH_PREFIX + "/cdc/", gcsResourceManager));

Expand Down Expand Up @@ -378,8 +377,7 @@ public void testDataStreamToSpannerShardedRetryAllDLQ() throws Exception {
"deadLetterQueueDirectory", getGcsPath(GCS_PATH_PREFIX + "/dlq/", gcsResourceManager));
retryParams.put("dlqMaxRetryCount", "20");
retryParams.put("dlqRetryMinutes", "60");
retryParams.put(
"shardingContextFilePath", getGcsPath("input/shardingContext.json", gcsResourceManager));
retryParams.put("sourceConfigURL", getGcsPath("input/shardingConfig.conf", gcsResourceManager));

PipelineLauncher.LaunchInfo retryJobInfo =
launchDataflowJob(
Expand Down Expand Up @@ -514,25 +512,33 @@ private void insertDataInMySQL() {
// error)
}

private String generateShardContextJson(
private String generateSourceConfig(
String streamA, String dbA, String shardA, String streamB, String dbB, String shardB) {
return "{\n"
+ " \"StreamToDbAndShardMap\": {\n"
+ " \""
+ streamA
+ "\": {\""
+ dbA
+ "\": \""
+ " \"shardConfigs\": [\n"
+ " {\n"
+ " \"logicalShardId\": \""
+ shardA
+ "\"},\n"
+ " \""
+ streamB
+ "\": {\""
+ dbB
+ "\": \""
+ "\",\n"
+ " \"dbName\": \""
+ dbA
+ "\",\n"
+ " \"streamId\": \""
+ streamA
+ "\"\n"
+ " },\n"
+ " {\n"
+ " \"logicalShardId\": \""
+ shardB
+ "\"}\n"
+ " }\n"
+ "\",\n"
+ " \"dbName\": \""
+ dbB
+ "\",\n"
+ " \"streamId\": \""
+ streamB
+ "\"\n"
+ " }\n"
+ " ]\n"
+ "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,9 @@ public void setUp() throws IOException, InterruptedException {
datastreamResourceManager.startStream(streamB);
streamNameB = streamB.getName().substring(streamB.getName().lastIndexOf('/') + 1);

// Generate Shard Context JSON
String shardContextJson =
generateShardContextJson(
// Generate Shard Config
String shardConfig =
generateSourceConfig(
streamNameA,
jdbcResourceManagerShardA.getDatabaseName(),
"shard1",
Expand All @@ -208,7 +208,7 @@ public void setUp() throws IOException, InterruptedException {
"shard2");

gcsResourceManager.createArtifact(
"input/shardingContext.json", shardContextJson.getBytes(StandardCharsets.UTF_8));
"input/shardingConfig.conf", shardConfig.getBytes(StandardCharsets.UTF_8));

// Prepare job parameters
Map<String, String> jobParameters = new HashMap<>();
Expand All @@ -219,8 +219,7 @@ public void setUp() throws IOException, InterruptedException {
jobParameters.put(
"dlqMaxRetryCount", "1000"); // High retry count to keep items in retry/ bucket
jobParameters.put(
"shardingContextFilePath",
getGcsPath("input/shardingContext.json", gcsResourceManager));
"sourceConfigURL", getGcsPath("input/shardingConfig.conf", gcsResourceManager));
jobParameters.put(
"inputFilePattern", getGcsPath(GCS_PATH_PREFIX + "/cdc/", gcsResourceManager));
jobParameters.put(
Expand Down Expand Up @@ -363,7 +362,7 @@ public void testDataStreamToSpannerShardedRetryDLQ() throws Exception {
retryJobParameters.put(
"deadLetterQueueDirectory", getGcsPath(GCS_PATH_PREFIX + "/dlq/", gcsResourceManager));
retryJobParameters.put(
"shardingContextFilePath", getGcsPath("input/shardingContext.json", gcsResourceManager));
"sourceConfigURL", getGcsPath("input/shardingConfig.conf", gcsResourceManager));

PipelineLauncher.LaunchInfo retryJobInfo =
launchDataflowJob(
Expand Down Expand Up @@ -532,25 +531,33 @@ private void executeSqlScript(CloudMySQLResourceManager resourceManager, String
}
}

private String generateShardContextJson(
private String generateSourceConfig(
String streamA, String dbA, String shardA, String streamB, String dbB, String shardB) {
return "{\n"
+ " \"StreamToDbAndShardMap\": {\n"
+ " \""
+ streamA
+ "\": {\""
+ dbA
+ "\": \""
+ " \"shardConfigs\": [\n"
+ " {\n"
+ " \"logicalShardId\": \""
+ shardA
+ "\"},\n"
+ " \""
+ streamB
+ "\": {\""
+ dbB
+ "\": \""
+ "\",\n"
+ " \"dbName\": \""
+ dbA
+ "\",\n"
+ " \"streamId\": \""
+ streamA
+ "\"\n"
+ " },\n"
+ " {\n"
+ " \"logicalShardId\": \""
+ shardB
+ "\"}\n"
+ " }\n"
+ "\",\n"
+ " \"dbName\": \""
+ dbB
+ "\",\n"
+ " \"streamId\": \""
+ streamB
+ "\"\n"
+ " }\n"
+ " ]\n"
+ "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public class DatastreamToSpannerSingleDFShardedMigrationIT extends DataStreamToS
"DatastreamToSpannerSingleDFShardedMigrationIT/mysql-session.json";

private static final String SHARDING_CONTEXT_RESOURCE =
"DatastreamToSpannerSingleDFShardedMigrationIT/sharding-context.json";
"DatastreamToSpannerSingleDFShardedMigrationIT/sharding-config.conf";

private static final String SPANNER_DDL_RESOURCE =
"DatastreamToSpannerSingleDFShardedMigrationIT/spanner-schema.sql";
Expand Down
Loading
Loading