Skip to content

Commit b7b20a3

Browse files
feat(Spanner): integrate SourceConfigParser to centralize shard configuration loading for SourceDbToSpanner pipelines
1 parent 71935bb commit b7b20a3

5 files changed

Lines changed: 93 additions & 84 deletions

File tree

v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/options/OptionsToConfigBuilder.java

Lines changed: 16 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.google.cloud.teleport.v2.source.reader.io.jdbc.iowrapper.config.SQLDialect;
2525
import com.google.cloud.teleport.v2.source.reader.io.jdbc.iowrapper.config.defaults.MySqlConfigDefaults;
2626
import com.google.cloud.teleport.v2.source.reader.io.schema.SourceSchemaReference;
27+
import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
2728
import com.google.cloud.teleport.v2.spanner.migrations.utils.DataflowWorkerMachineTypeUtils;
2829
import com.google.common.annotations.VisibleForTesting;
2930
import com.google.common.collect.ImmutableList;
@@ -63,15 +64,11 @@ public static String extractWorkerZone(PipelineOptions options) {
6364

6465
public static JdbcIOWrapperConfig getJdbcIOWrapperConfigWithDefaults(
6566
SourceDbToSpannerOptions options,
67+
Shard shard,
6668
List<String> tables,
6769
String shardId,
6870
Wait.OnSignal<?> waitOn) {
6971
SQLDialect sqlDialect = SQLDialect.valueOf(options.getSourceDbDialect());
70-
String sourceDbURL = options.getSourceConfigURL();
71-
String dbName = extractDbFromURL(sourceDbURL);
72-
String username = options.getUsername();
73-
String password = options.getPassword();
74-
String namespace = options.getNamespace();
7572

7673
String jdbcDriverClassName = options.getJdbcDriverClassName();
7774
String jdbcDriverJars = options.getJdbcDriverJars();
@@ -88,14 +85,13 @@ public static JdbcIOWrapperConfig getJdbcIOWrapperConfigWithDefaults(
8885
return getJdbcIOWrapperConfig(
8986
sqlDialect,
9087
tables,
91-
sourceDbURL,
92-
null,
93-
null,
94-
0,
95-
username,
96-
password,
97-
dbName,
98-
namespace,
88+
shard.getHost(),
89+
shard.getConnectionProperties(),
90+
Integer.parseInt(shard.getPort()),
91+
shard.getUserName(),
92+
shard.getPassword(),
93+
shard.getDbName(),
94+
shard.getNamespace(),
9995
shardId,
10096
jdbcDriverClassName,
10197
jdbcDriverJars,
@@ -112,7 +108,6 @@ public static JdbcIOWrapperConfig getJdbcIOWrapperConfigWithDefaults(
112108
public static JdbcIOWrapperConfig getJdbcIOWrapperConfig(
113109
SQLDialect sqlDialect,
114110
List<String> tables,
115-
String sourceDbURL,
116111
String host,
117112
String connectionProperties,
118113
int port,
@@ -157,13 +152,12 @@ public static JdbcIOWrapperConfig getJdbcIOWrapperConfig(
157152
builder = builder.setMaxConnections(maxConnections);
158153
}
159154

155+
String sourceDbURL = "";
160156
switch (sqlDialect) {
161157
case MYSQL:
162-
if (sourceDbURL == null) {
163-
sourceDbURL = "jdbc:mysql://" + host + ":" + port + "/" + dbName;
164-
if (StringUtils.isNotBlank(connectionProperties)) {
165-
sourceDbURL = sourceDbURL + "?" + connectionProperties;
166-
}
158+
sourceDbURL = "jdbc:mysql://" + host + ":" + port + "/" + dbName;
159+
if (StringUtils.isNotBlank(connectionProperties)) {
160+
sourceDbURL = sourceDbURL + "?" + connectionProperties;
167161
}
168162
for (Entry<String, String> entry :
169163
MySqlConfigDefaults.DEFAULT_MYSQL_URL_PROPERTIES.entrySet()) {
@@ -172,14 +166,14 @@ public static JdbcIOWrapperConfig getJdbcIOWrapperConfig(
172166
sourceDbURL = mysqlSetCursorModeIfNeeded(sqlDialect, sourceDbURL, fetchSize);
173167
break;
174168
case POSTGRESQL:
175-
if (sourceDbURL == null) {
176-
sourceDbURL = "jdbc:postgresql://" + host + ":" + port + "/" + dbName;
177-
}
169+
sourceDbURL = "jdbc:postgresql://" + host + ":" + port + "/" + dbName;
178170
sourceDbURL = sourceDbURL + "?currentSchema=" + sourceSchemaReference.jdbc().namespace();
179171
if (StringUtils.isNotBlank(connectionProperties)) {
180172
sourceDbURL = sourceDbURL + "&" + connectionProperties;
181173
}
182174
break;
175+
default:
176+
throw new IllegalArgumentException("Unsupported SQL Dialect: " + sqlDialect);
183177
}
184178

185179
builder.setSourceDbURL(sourceDbURL);

v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/PipelineController.java

Lines changed: 51 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,14 @@
3131
import com.google.cloud.teleport.v2.spanner.migrations.schema.SchemaStringOverridesBasedMapper;
3232
import com.google.cloud.teleport.v2.spanner.migrations.schema.SessionBasedMapper;
3333
import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
34+
import com.google.cloud.teleport.v2.spanner.migrations.source.config.JdbcShardConfig;
35+
import com.google.cloud.teleport.v2.spanner.migrations.source.config.SourceConfigParser;
36+
import com.google.cloud.teleport.v2.spanner.migrations.source.config.SourceConnectionConfig;
3437
import com.google.cloud.teleport.v2.spanner.migrations.spanner.SpannerSchema;
38+
import com.google.cloud.teleport.v2.spanner.migrations.utils.ISecretManagerAccessor;
39+
import com.google.cloud.teleport.v2.spanner.migrations.utils.SecretManagerAccessorImpl;
3540
import com.google.common.annotations.VisibleForTesting;
41+
import com.google.common.base.Preconditions;
3642
import com.google.common.collect.ImmutableList;
3743
import java.util.HashMap;
3844
import java.util.List;
@@ -86,12 +92,34 @@ protected static PipelineResult executeSingleInstanceMigrationForDbConfigContain
8692
}
8793

8894
static PipelineResult executeJdbcSingleInstanceMigration(
89-
SourceDbToSpannerOptions options, Pipeline pipeline, SpannerConfig spannerConfig) {
90-
JdbcDbConfigContainer jdbcDbConfigContainer = new SingleInstanceJdbcDbConfigContainer(options);
95+
SourceDbToSpannerOptions options,
96+
Shard shard,
97+
Pipeline pipeline,
98+
SpannerConfig spannerConfig) {
99+
JdbcDbConfigContainer jdbcDbConfigContainer =
100+
new SingleInstanceJdbcDbConfigContainer(options, shard);
91101
return executeSingleInstanceMigrationForDbConfigContainer(
92102
options, pipeline, spannerConfig, jdbcDbConfigContainer);
93103
}
94104

105+
static PipelineResult executeJdbcMigration(
106+
SourceDbToSpannerOptions options,
107+
JdbcShardConfig jdbcShardConfig,
108+
Pipeline pipeline,
109+
SpannerConfig spannerConfig) {
110+
Preconditions.checkArgument(
111+
(jdbcShardConfig.getShardConfigs() != null && !jdbcShardConfig.getShardConfigs().isEmpty()),
112+
"Shard list should have at least one shard.");
113+
if (jdbcShardConfig.getShardConfigs().size() > 1) {
114+
List<Shard> shards = jdbcShardConfig.getShardConfigs();
115+
return PipelineController.executeJdbcShardedMigration(
116+
options, pipeline, shards, spannerConfig);
117+
} else {
118+
return PipelineController.executeJdbcSingleInstanceMigration(
119+
options, jdbcShardConfig.getShardConfigs().get(0), pipeline, spannerConfig);
120+
}
121+
}
122+
95123
static PipelineResult executeJdbcShardedMigration(
96124
SourceDbToSpannerOptions options,
97125
Pipeline pipeline,
@@ -355,7 +383,6 @@ public JdbcIoWrapperConfigGroup getJdbcIoWrapperConfigGroup(
355383
OptionsToConfigBuilder.getJdbcIOWrapperConfig(
356384
sqlDialect,
357385
sourceTables,
358-
null,
359386
shard.getHost(),
360387
shard.getConnectionProperties(),
361388
Integer.parseInt(shard.getPort()),
@@ -382,10 +409,12 @@ public JdbcIoWrapperConfigGroup getJdbcIoWrapperConfigGroup(
382409
}
383410

384411
static class SingleInstanceJdbcDbConfigContainer implements JdbcDbConfigContainer {
385-
private SourceDbToSpannerOptions options;
412+
private final SourceDbToSpannerOptions options;
413+
private final Shard shard;
386414

387-
public SingleInstanceJdbcDbConfigContainer(SourceDbToSpannerOptions options) {
415+
public SingleInstanceJdbcDbConfigContainer(SourceDbToSpannerOptions options, Shard shard) {
388416
this.options = options;
417+
this.shard = shard;
389418
}
390419

391420
@Override
@@ -394,8 +423,24 @@ public JdbcIoWrapperConfigGroup getJdbcIoWrapperConfigGroup(
394423
return JdbcIoWrapperConfigGroup.builder()
395424
.addShardConfig(
396425
OptionsToConfigBuilder.getJdbcIOWrapperConfigWithDefaults(
397-
options, sourceTables, null, waitOnSignal))
426+
options, shard, sourceTables, null, waitOnSignal))
398427
.build();
399428
}
400429
}
430+
431+
public static SourceConnectionConfig getSourceConnectionConfig(
432+
String sourceType, String sourceShardsFilePath) {
433+
ISecretManagerAccessor secretManagerAccessor = new SecretManagerAccessorImpl();
434+
SourceConfigParser sourceConfigParser = new SourceConfigParser(secretManagerAccessor);
435+
SourceConnectionConfig sourceConnectionConfig;
436+
try {
437+
// Parse the source shards configuration file to respective
438+
// SourceConnectionConfig.
439+
LOG.info("Parsing source shards configuration file: {}", sourceShardsFilePath);
440+
return sourceConfigParser.parseConfiguration(sourceType, sourceShardsFilePath);
441+
} catch (Exception e) {
442+
LOG.error("Error parsing source config", e);
443+
throw new RuntimeException("Error parsing source config", e);
444+
}
445+
}
401446
}

v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/SourceDbToSpanner.java

Lines changed: 10 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,11 @@
2020
import com.google.cloud.teleport.v2.common.CommonTemplateJvmInitializer;
2121
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
2222
import com.google.cloud.teleport.v2.options.SourceDbToSpannerOptions;
23-
import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
23+
import com.google.cloud.teleport.v2.spanner.migrations.source.config.JdbcShardConfig;
24+
import com.google.cloud.teleport.v2.spanner.migrations.source.config.SourceConnectionConfig;
2425
import com.google.cloud.teleport.v2.spanner.migrations.utils.DataflowWorkerMachineTypeUtils;
25-
import com.google.cloud.teleport.v2.spanner.migrations.utils.SecretManagerAccessorImpl;
26-
import com.google.cloud.teleport.v2.spanner.migrations.utils.ShardFileReader;
2726
import com.google.common.annotations.VisibleForTesting;
2827
import com.google.common.base.Preconditions;
29-
import java.util.List;
3028
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
3129
import org.apache.beam.sdk.Pipeline;
3230
import org.apache.beam.sdk.PipelineResult;
@@ -107,6 +105,9 @@ static PipelineResult run(SourceDbToSpannerOptions options) {
107105
DataflowWorkerMachineTypeUtils.validateMachineSpecs(workerMachineType, 4);
108106

109107
SpannerConfig spannerConfig = createSpannerConfig(options);
108+
SourceConnectionConfig sourceConnectionConfig =
109+
PipelineController.getSourceConnectionConfig(
110+
options.getSourceDbDialect(), options.getSourceConfigURL());
110111

111112
// Decide type and source of migration
112113
// TODO(vardhanvthigle): Move this within pipelineController.
@@ -125,22 +126,11 @@ static PipelineResult run(SourceDbToSpannerOptions options) {
125126
Preconditions.checkArgument(
126127
StringUtils.isNotEmpty(options.getSourceConfigURL()),
127128
"JDBC based source needs sourceConfigURL to be set.");
128-
return executeJdbcMigration(options, pipeline, spannerConfig);
129-
}
130-
}
131-
132-
// TODO(vardhanvthigle): Move this within pipelineController.
133-
private static PipelineResult executeJdbcMigration(
134-
SourceDbToSpannerOptions options, Pipeline pipeline, SpannerConfig spannerConfig) {
135-
if (options.getSourceConfigURL().startsWith("gs://")) {
136-
List<Shard> shards =
137-
new ShardFileReader(new SecretManagerAccessorImpl())
138-
.readForwardMigrationShardingConfig(options.getSourceConfigURL());
139-
return PipelineController.executeJdbcShardedMigration(
140-
options, pipeline, shards, spannerConfig);
141-
} else {
142-
return PipelineController.executeJdbcSingleInstanceMigration(
143-
options, pipeline, spannerConfig);
129+
Preconditions.checkArgument(
130+
(sourceConnectionConfig instanceof JdbcShardConfig),
131+
"Source config is not type of JdbcShardConfig.");
132+
return PipelineController.executeJdbcMigration(
133+
options, (JdbcShardConfig) sourceConnectionConfig, pipeline, spannerConfig);
144134
}
145135
}
146136

0 commit comments

Comments
 (0)