Skip to content

Commit a19cf9e

Browse files
test(Spanner): Integration test fix.
1 parent b7b20a3 commit a19cf9e

6 files changed

Lines changed: 151 additions & 88 deletions

File tree

it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/cloudsql/CloudSqlShardOrchestrator.java

Lines changed: 10 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -420,38 +420,28 @@ protected void createLogicalDatabases() {
420420
protected String generateAndUploadConfig(String artifactName) {
421421
LOG.info("Generating and uploading shard configuration...");
422422
JSONObject config = new JSONObject();
423-
config.put("configType", "dataflow");
424-
JSONObject shardConfigBulk = new JSONObject();
425-
JSONArray dataShards = new JSONArray();
423+
JSONArray shardConfigs = new JSONArray();
426424

427425
int shardIdx = 0;
428426
for (Map.Entry<String, List<String>> entry : requestedShardMap.entrySet()) {
429427
String instanceName = entry.getKey();
430428
String ip = instanceIpMap.get(instanceName);
431429
List<String> dbNames = entry.getValue();
432430

433-
JSONObject dataShard = new JSONObject();
434-
dataShard.put("dataShardId", instanceName);
435-
dataShard.put("host", ip);
436-
dataShard.put("port", port);
437-
dataShard.put("user", username);
438-
dataShard.put("password", password);
439-
440-
JSONArray databases = new JSONArray();
441431
for (String dbName : dbNames) {
442-
JSONObject db = new JSONObject();
443-
db.put("dbName", dbName);
444-
db.put("databaseId", String.format("%s%02d%s", "shard_", shardIdx, dbName));
445-
db.put("refDataShardId", instanceName);
446-
databases.put(db);
432+
JSONObject shardConfig = new JSONObject();
433+
shardConfig.put("logicalShardId", String.format("%s%02d_%s", "shard_", shardIdx, dbName));
434+
shardConfig.put("host", ip);
435+
shardConfig.put("port", port);
436+
shardConfig.put("user", username);
437+
shardConfig.put("password", password);
438+
shardConfig.put("dbName", dbName);
439+
shardConfigs.put(shardConfig);
447440
}
448441
shardIdx++;
449-
dataShard.put("databases", databases);
450-
dataShards.put(dataShard);
451442
}
452443

453-
shardConfigBulk.put("dataShards", dataShards);
454-
config.put("shardConfigurationBulk", shardConfigBulk);
444+
config.put("shardConfigs", shardConfigs);
455445

456446
String configContent = config.toString();
457447
GcsArtifact artifact =

v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/options/SourceDbToSpannerOptions.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -252,17 +252,6 @@ public interface SourceDbToSpannerOptions extends CommonTemplateOptions {
252252

253253
void setTransformationCustomParameters(String value);
254254

255-
@TemplateParameter.Text(
256-
order = 20,
257-
optional = true,
258-
description = "Namespace",
259-
helpText =
260-
"Namespace to exported. For PostgreSQL, if no namespace is provided, 'public' will be used")
261-
@Default.String("")
262-
String getNamespace();
263-
264-
void setNamespace(String value);
265-
266255
@TemplateParameter.Text(
267256
order = 21,
268257
optional = true,

v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/PipelineController.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import java.util.HashMap;
4444
import java.util.List;
4545
import java.util.Map;
46-
import java.util.Optional;
4746
import java.util.stream.Collectors;
4847
import org.apache.beam.repackaged.core.org.apache.commons.lang3.StringUtils;
4948
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
@@ -374,10 +373,7 @@ public JdbcIoWrapperConfigGroup getJdbcIoWrapperConfigGroup(
374373
// Read data from source
375374
String shardId = entry.getValue();
376375

377-
// If a namespace is configured for a shard uses that, otherwise uses the namespace
378-
// configured in the options if there is one.
379-
String namespace =
380-
Optional.ofNullable(shard.getNamespace()).orElse(options.getNamespace());
376+
String namespace = shard.getNamespace();
381377
String dbName = entry.getKey();
382378
JdbcIOWrapperConfig shardConfig =
383379
OptionsToConfigBuilder.getJdbcIOWrapperConfig(

v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/SourceDbToSpannerITBase.java

Lines changed: 65 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,11 @@
1919

2020
import com.google.cloud.spanner.Dialect;
2121
import com.google.cloud.teleport.v2.source.reader.io.jdbc.iowrapper.config.SQLDialect;
22+
import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
23+
import com.google.cloud.teleport.v2.spanner.migrations.source.config.JdbcShardConfig;
2224
import com.google.cloud.teleport.v2.spanner.migrations.transformation.CustomTransformation;
2325
import com.google.common.io.Resources;
26+
import com.google.gson.Gson;
2427
import java.io.BufferedWriter;
2528
import java.io.FileWriter;
2629
import java.io.IOException;
@@ -233,7 +236,9 @@ protected PipelineLauncher.LaunchInfo launchDataflowJob(
233236
}
234237
};
235238
if (sourceResourceManager instanceof JDBCResourceManager) {
236-
params.putAll(getJdbcParameters((JDBCResourceManager) sourceResourceManager));
239+
params.putAll(
240+
getJdbcParameters(
241+
(JDBCResourceManager) sourceResourceManager, gcsPathPrefix, jobParameters));
237242
} else if (sourceResourceManager instanceof CassandraResourceManager) {
238243
params.putAll(
239244
getCassandraParameters((CassandraResourceManager) sourceResourceManager, gcsPathPrefix));
@@ -280,13 +285,70 @@ protected PipelineLauncher.LaunchInfo launchDataflowJob(
280285
return jobInfo;
281286
}
282287

283-
private Map<String, String> getJdbcParameters(JDBCResourceManager jdbcResourceManager) {
288+
protected String createAndUploadShardConfigToGcs(
289+
String gcsPathPrefix,
290+
JDBCResourceManager jdbcResourceManager,
291+
Map<String, String> jobParameters)
292+
throws IOException {
293+
Shard shard = new Shard();
294+
shard.setLogicalShardId("Shard1");
295+
shard.setUser(jdbcResourceManager.getUsername());
296+
shard.setPassword(jdbcResourceManager.getPassword());
297+
if (jdbcResourceManager instanceof PostgresResourceManager pgRm) {
298+
shard.setHost(pgRm.getHost());
299+
shard.setPort(String.valueOf(pgRm.getPort()));
300+
shard.setDbName(pgRm.getDatabaseName());
301+
} else if (jdbcResourceManager instanceof MySQLResourceManager mySqlRm) {
302+
shard.setHost(mySqlRm.getHost());
303+
shard.setPort(String.valueOf(mySqlRm.getPort()));
304+
shard.setDbName(mySqlRm.getDatabaseName());
305+
} else if (jdbcResourceManager
306+
instanceof org.apache.beam.it.gcp.cloudsql.CloudSqlResourceManager cloudRm) {
307+
shard.setHost(cloudRm.getHost());
308+
shard.setPort(String.valueOf(cloudRm.getPort()));
309+
shard.setDbName(cloudRm.getDatabaseName());
310+
} else {
311+
throw new IllegalArgumentException(
312+
"Unsupported JDBC resource manager type: " + jdbcResourceManager.getClass().getName());
313+
}
314+
315+
if (jobParameters != null && jobParameters.containsKey("namespace")) {
316+
shard.setNamespace(jobParameters.get("namespace"));
317+
}
318+
319+
JdbcShardConfig jdbcShardConfig = new JdbcShardConfig();
320+
jdbcShardConfig.setShardConfigs(List.of(shard));
321+
String shardFileContents = new Gson().toJson(jdbcShardConfig);
322+
LOG.info("Shard file contents: {}", shardFileContents);
323+
324+
String configBasePath = (gcsPathPrefix == null) ? "null" : gcsPathPrefix;
325+
if (configBasePath.endsWith("/")) {
326+
configBasePath = configBasePath.substring(0, configBasePath.length() - 1);
327+
}
328+
String configGcsPath = getGcsPath(configBasePath + "/shard.json");
329+
330+
gcsClient.createArtifact(configBasePath + "/shard.json", shardFileContents);
331+
332+
return configGcsPath;
333+
}
334+
335+
private Map<String, String> getJdbcParameters(
336+
JDBCResourceManager jdbcResourceManager,
337+
String gcsPathPrefix,
338+
Map<String, String> jobParameters) {
284339

285340
Map<String, String> params =
286341
new HashMap<>() {
287342
{
288343
put("sourceDbDialect", sqlDialectFrom(jdbcResourceManager));
289-
put("sourceConfigURL", jdbcResourceManager.getUri());
344+
try {
345+
put(
346+
"sourceConfigURL",
347+
createAndUploadShardConfigToGcs(
348+
gcsPathPrefix, jdbcResourceManager, jobParameters));
349+
} catch (IOException e) {
350+
throw new RuntimeException(e);
351+
}
290352
put("username", jdbcResourceManager.getUsername());
291353
put("password", jdbcResourceManager.getPassword());
292354
put("jdbcDriverClassName", driverClassNameFrom(jdbcResourceManager));

v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/failureinjectiontesting/SourceDbToSpannerFTBase.java

Lines changed: 36 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
2020

2121
import com.google.cloud.teleport.v2.spanner.migrations.constants.Constants;
22+
import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
23+
import com.google.cloud.teleport.v2.spanner.migrations.source.config.JdbcShardConfig;
2224
import com.google.cloud.teleport.v2.spanner.migrations.transformation.CustomTransformation;
25+
import com.google.gson.Gson;
2326
import com.google.pubsub.v1.SubscriptionName;
2427
import com.google.pubsub.v1.TopicName;
2528
import java.io.BufferedReader;
@@ -45,8 +48,6 @@
4548
import org.apache.beam.it.gcp.spanner.SpannerResourceManager;
4649
import org.apache.beam.it.gcp.storage.GcsResourceManager;
4750
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
48-
import org.json.JSONArray;
49-
import org.json.JSONObject;
5051
import org.slf4j.Logger;
5152
import org.slf4j.LoggerFactory;
5253

@@ -116,6 +117,21 @@ protected PipelineLauncher.LaunchInfo launchBulkDataflowJob(
116117
CloudSqlResourceManager cloudSqlResourceManager,
117118
CustomTransformation customTransformation)
118119
throws IOException {
120+
121+
Shard shard = new Shard();
122+
shard.setLogicalShardId("Shard1");
123+
shard.setHost(cloudSqlResourceManager.getHost());
124+
shard.setPort(String.valueOf(cloudSqlResourceManager.getPort()));
125+
shard.setDbName(cloudSqlResourceManager.getDatabaseName());
126+
shard.setUser(cloudSqlResourceManager.getUsername());
127+
shard.setPassword(cloudSqlResourceManager.getPassword());
128+
129+
JdbcShardConfig jdbcShardConfig = new JdbcShardConfig();
130+
jdbcShardConfig.setShardConfigs(List.of(shard));
131+
String shardFileContents = new Gson().toJson(jdbcShardConfig);
132+
LOG.info("Shard file contents: {}", shardFileContents);
133+
gcsResourceManager.createArtifact("input/shard.json", shardFileContents);
134+
119135
// launch dataflow template
120136
FlexTemplateDataflowJobResourceManager.Builder flexTemplateBuilder =
121137
FlexTemplateDataflowJobResourceManager.builder(jobName)
@@ -125,9 +141,7 @@ protected PipelineLauncher.LaunchInfo launchBulkDataflowJob(
125141
.addParameter("databaseId", spannerResourceManager.getDatabaseId())
126142
.addParameter("projectId", PROJECT)
127143
.addParameter("outputDirectory", getGcsPath("output", gcsResourceManager))
128-
.addParameter("sourceConfigURL", cloudSqlResourceManager.getUri())
129-
.addParameter("username", cloudSqlResourceManager.getUsername())
130-
.addParameter("password", cloudSqlResourceManager.getPassword())
144+
.addParameter("sourceConfigURL", getGcsPath("input/shard.json", gcsResourceManager))
131145
.addParameter("jdbcDriverClassName", "com.mysql.jdbc.Driver")
132146
.addParameter("workerMachineType", "n2-standard-4")
133147
.addEnvironmentVariable(
@@ -189,51 +203,29 @@ protected PipelineLauncher.LaunchInfo launchShardedBulkDataflowJob(
189203

190204
protected void createAndUploadBulkShardConfigToGcs(
191205
ArrayList<DataShard> dataShardsList, GcsResourceManager gcsResourceManager) {
192-
JSONObject bulkConfig = new JSONObject();
193-
bulkConfig.put("configType", "dataflow");
194-
195-
JSONObject shardConfigBulk = new JSONObject();
196-
197-
JSONObject schemaSourceJson = new JSONObject();
198-
schemaSourceJson.put("dataShardId", "");
199-
schemaSourceJson.put("host", "");
200-
schemaSourceJson.put("user", "");
201-
schemaSourceJson.put("password", "");
202-
schemaSourceJson.put("port", "");
203-
schemaSourceJson.put("dbName", "");
204-
shardConfigBulk.put("schemaSource", schemaSourceJson);
205-
206-
JSONArray dataShardsArray = new JSONArray();
206+
List<Shard> shards = new ArrayList<>();
207207
if (dataShardsList != null) {
208208
for (DataShard shardData : dataShardsList) {
209-
JSONObject shardJson = new JSONObject();
210-
211-
shardJson.put("dataShardId", shardData.dataShardId);
212-
shardJson.put("host", shardData.host);
213-
shardJson.put("user", shardData.user);
214-
shardJson.put("password", shardData.password);
215-
shardJson.put("port", shardData.port);
216-
shardJson.put("dbName", shardData.dbName);
217-
shardJson.put("namespace", shardData.namespace);
218-
shardJson.put("connectionProperties", shardData.connectionProperties);
219-
220-
JSONArray databasesArray = new JSONArray();
221-
222-
for (Database dbData : shardData.databases) {
223-
JSONObject dbJson = new JSONObject();
224-
dbJson.put("dbName", dbData.dbName);
225-
dbJson.put("databaseId", dbData.databaseId);
226-
dbJson.put("refDataShardId", dbData.refDataShardId);
227-
databasesArray.put(dbJson);
209+
Shard shard = new Shard();
210+
shard.setLogicalShardId(shardData.dataShardId);
211+
shard.setHost(shardData.host);
212+
shard.setUser(shardData.user);
213+
shard.setPassword(shardData.password);
214+
shard.setPort(shardData.port);
215+
shard.setDbName(shardData.dbName);
216+
if (shardData.namespace != null) {
217+
shard.setNamespace(shardData.namespace);
218+
}
219+
if (shardData.connectionProperties != null) {
220+
shard.setConnectionProperties(shardData.connectionProperties);
228221
}
229-
shardJson.put("databases", databasesArray);
230-
dataShardsArray.put(shardJson);
222+
shards.add(shard);
231223
}
232224
}
233-
shardConfigBulk.put("dataShards", dataShardsArray);
234225

235-
bulkConfig.put("shardConfigurationBulk", shardConfigBulk);
236-
String shardFileContents = bulkConfig.toString();
226+
JdbcShardConfig jdbcShardConfig = new JdbcShardConfig();
227+
jdbcShardConfig.setShardConfigs(shards);
228+
String shardFileContents = new Gson().toJson(jdbcShardConfig);
237229
LOG.info("Shard file contents: {}", shardFileContents);
238230
gcsResourceManager.createArtifact("input/shard-bulk.json", shardFileContents);
239231
}

v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/loadtesting/SourceDbToSpannerLTBase.java

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,15 @@
1919
import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult;
2020

2121
import com.google.cloud.teleport.v2.source.reader.io.jdbc.iowrapper.config.SQLDialect;
22+
import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
23+
import com.google.cloud.teleport.v2.spanner.migrations.source.config.JdbcShardConfig;
24+
import com.google.gson.Gson;
2225
import java.io.IOException;
2326
import java.nio.charset.StandardCharsets;
2427
import java.text.ParseException;
2528
import java.time.Duration;
2629
import java.util.Arrays;
30+
import java.util.Collections;
2731
import java.util.HashMap;
2832
import java.util.List;
2933
import java.util.Map;
@@ -34,6 +38,7 @@
3438
import org.apache.beam.it.common.utils.ResourceManagerUtils;
3539
import org.apache.beam.it.conditions.ConditionCheck;
3640
import org.apache.beam.it.gcp.TemplateLoadTestBase;
41+
import org.apache.beam.it.gcp.artifacts.utils.ArtifactUtils;
3742
import org.apache.beam.it.gcp.secretmanager.SecretManagerResourceManager;
3843
import org.apache.beam.it.gcp.spanner.SpannerResourceManager;
3944
import org.apache.beam.it.gcp.spanner.conditions.SpannerRowsCheck;
@@ -175,11 +180,40 @@ protected String getOutputDirectory() {
175180
}
176181

177182
protected Map<String, String> getJdbcParameters(StaticJDBCResource jdbcResource) {
178-
return getJdbcParameters(
179-
jdbcResource.getconnectionURL(),
180-
jdbcResource.username(),
181-
jdbcResource.password(),
182-
driverClassName());
183+
try {
184+
return getJdbcParameters(
185+
createAndUploadShardConfigToGcs(jdbcResource),
186+
jdbcResource.username(),
187+
jdbcResource.password(),
188+
driverClassName());
189+
} catch (IOException e) {
190+
throw new RuntimeException("Failed to create and upload shard config", e);
191+
}
192+
}
193+
194+
protected String createAndUploadShardConfigToGcs(StaticJDBCResource jdbcResource)
195+
throws IOException {
196+
Shard shard = new Shard();
197+
shard.setLogicalShardId("Shard1");
198+
shard.setUser(jdbcResource.username());
199+
shard.setPassword(jdbcResource.password());
200+
shard.setHost(jdbcResource.hostname());
201+
shard.setPort(String.valueOf(jdbcResource.port()));
202+
shard.setDbName(jdbcResource.database());
203+
204+
JdbcShardConfig jdbcShardConfig = new JdbcShardConfig();
205+
jdbcShardConfig.setShardConfigs(Collections.singletonList(shard));
206+
String shardFileContents = new Gson().toJson(jdbcShardConfig);
207+
gcsResourceManager.createArtifact("input/shard.json", shardFileContents);
208+
return getGcsPath("input/shard.json", gcsResourceManager);
209+
}
210+
211+
protected String getGcsPath(String artifactId, GcsResourceManager gcsResourceManager) {
212+
return ArtifactUtils.getFullGcsPath(
213+
gcsResourceManager.getBucket(),
214+
getClass().getSimpleName(),
215+
gcsResourceManager.runId(),
216+
artifactId);
183217
}
184218

185219
protected Map<String, String> getJdbcParameters(

0 commit comments

Comments
 (0)