Skip to content

Commit 97d64e5

Browse files
test(Spanner): Integration test fix.
1 parent b7b20a3 commit 97d64e5

4 files changed

Lines changed: 122 additions & 69 deletions

File tree

it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/cloudsql/CloudSqlShardOrchestrator.java

Lines changed: 10 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -420,38 +420,28 @@ protected void createLogicalDatabases() {
420420
protected String generateAndUploadConfig(String artifactName) {
421421
LOG.info("Generating and uploading shard configuration...");
422422
JSONObject config = new JSONObject();
423-
config.put("configType", "dataflow");
424-
JSONObject shardConfigBulk = new JSONObject();
425-
JSONArray dataShards = new JSONArray();
423+
JSONArray shardConfigs = new JSONArray();
426424

427425
int shardIdx = 0;
428426
for (Map.Entry<String, List<String>> entry : requestedShardMap.entrySet()) {
429427
String instanceName = entry.getKey();
430428
String ip = instanceIpMap.get(instanceName);
431429
List<String> dbNames = entry.getValue();
432430

433-
JSONObject dataShard = new JSONObject();
434-
dataShard.put("dataShardId", instanceName);
435-
dataShard.put("host", ip);
436-
dataShard.put("port", port);
437-
dataShard.put("user", username);
438-
dataShard.put("password", password);
439-
440-
JSONArray databases = new JSONArray();
441431
for (String dbName : dbNames) {
442-
JSONObject db = new JSONObject();
443-
db.put("dbName", dbName);
444-
db.put("databaseId", String.format("%s%02d%s", "shard_", shardIdx, dbName));
445-
db.put("refDataShardId", instanceName);
446-
databases.put(db);
432+
JSONObject shardConfig = new JSONObject();
433+
shardConfig.put("logicalShardId", String.format("%s%02d_%s", "shard_", shardIdx, dbName));
434+
shardConfig.put("host", ip);
435+
shardConfig.put("port", port);
436+
shardConfig.put("user", username);
437+
shardConfig.put("password", password);
438+
shardConfig.put("dbName", dbName);
439+
shardConfigs.put(shardConfig);
447440
}
448441
shardIdx++;
449-
dataShard.put("databases", databases);
450-
dataShards.put(dataShard);
451442
}
452443

453-
shardConfigBulk.put("dataShards", dataShards);
454-
config.put("shardConfigurationBulk", shardConfigBulk);
444+
config.put("shardConfigs", shardConfigs);
455445

456446
String configContent = config.toString();
457447
GcsArtifact artifact =

v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/SourceDbToSpannerITBase.java

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,11 @@
1919

2020
import com.google.cloud.spanner.Dialect;
2121
import com.google.cloud.teleport.v2.source.reader.io.jdbc.iowrapper.config.SQLDialect;
22+
import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
23+
import com.google.cloud.teleport.v2.spanner.migrations.source.config.JdbcShardConfig;
2224
import com.google.cloud.teleport.v2.spanner.migrations.transformation.CustomTransformation;
2325
import com.google.common.io.Resources;
26+
import com.google.gson.Gson;
2427
import java.io.BufferedWriter;
2528
import java.io.FileWriter;
2629
import java.io.IOException;
@@ -233,7 +236,7 @@ protected PipelineLauncher.LaunchInfo launchDataflowJob(
233236
}
234237
};
235238
if (sourceResourceManager instanceof JDBCResourceManager) {
236-
params.putAll(getJdbcParameters((JDBCResourceManager) sourceResourceManager));
239+
params.putAll(getJdbcParameters((JDBCResourceManager) sourceResourceManager, gcsPathPrefix));
237240
} else if (sourceResourceManager instanceof CassandraResourceManager) {
238241
params.putAll(
239242
getCassandraParameters((CassandraResourceManager) sourceResourceManager, gcsPathPrefix));
@@ -280,13 +283,60 @@ protected PipelineLauncher.LaunchInfo launchDataflowJob(
280283
return jobInfo;
281284
}
282285

283-
private Map<String, String> getJdbcParameters(JDBCResourceManager jdbcResourceManager) {
286+
protected String createAndUploadShardConfigToGcs(
287+
String gcsPathPrefix, JDBCResourceManager jdbcResourceManager) throws IOException {
288+
Shard shard = new Shard();
289+
shard.setLogicalShardId("Shard1");
290+
shard.setUser(jdbcResourceManager.getUsername());
291+
shard.setPassword(jdbcResourceManager.getPassword());
292+
if (jdbcResourceManager instanceof PostgresResourceManager pgRm) {
293+
shard.setHost(pgRm.getHost());
294+
shard.setPort(String.valueOf(pgRm.getPort()));
295+
shard.setDbName(pgRm.getDatabaseName());
296+
} else if (jdbcResourceManager instanceof MySQLResourceManager mySqlRm) {
297+
shard.setHost(mySqlRm.getHost());
298+
shard.setPort(String.valueOf(mySqlRm.getPort()));
299+
shard.setDbName(mySqlRm.getDatabaseName());
300+
} else if (jdbcResourceManager
301+
instanceof org.apache.beam.it.gcp.cloudsql.CloudSqlResourceManager cloudRm) {
302+
shard.setHost(cloudRm.getHost());
303+
shard.setPort(String.valueOf(cloudRm.getPort()));
304+
shard.setDbName(cloudRm.getDatabaseName());
305+
} else {
306+
throw new IllegalArgumentException(
307+
"Unsupported JDBC resource manager type: " + jdbcResourceManager.getClass().getName());
308+
}
309+
310+
JdbcShardConfig jdbcShardConfig = new JdbcShardConfig();
311+
jdbcShardConfig.setShardConfigs(List.of(shard));
312+
String shardFileContents = new Gson().toJson(jdbcShardConfig);
313+
LOG.info("Shard file contents: {}", shardFileContents);
314+
315+
String configBasePath = (gcsPathPrefix == null) ? "null" : gcsPathPrefix;
316+
if (configBasePath.endsWith("/")) {
317+
configBasePath = configBasePath.substring(0, configBasePath.length() - 1);
318+
}
319+
String configGcsPath = getGcsPath(configBasePath + "/shard.json");
320+
321+
gcsClient.createArtifact(configBasePath + "/shard.json", shardFileContents);
322+
323+
return configGcsPath;
324+
}
325+
326+
private Map<String, String> getJdbcParameters(
327+
JDBCResourceManager jdbcResourceManager, String gcsPathPrefix) {
284328

285329
Map<String, String> params =
286330
new HashMap<>() {
287331
{
288332
put("sourceDbDialect", sqlDialectFrom(jdbcResourceManager));
289-
put("sourceConfigURL", jdbcResourceManager.getUri());
333+
try {
334+
put(
335+
"sourceConfigURL",
336+
createAndUploadShardConfigToGcs(gcsPathPrefix, jdbcResourceManager));
337+
} catch (IOException e) {
338+
throw new RuntimeException(e);
339+
}
290340
put("username", jdbcResourceManager.getUsername());
291341
put("password", jdbcResourceManager.getPassword());
292342
put("jdbcDriverClassName", driverClassNameFrom(jdbcResourceManager));

v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/failureinjectiontesting/SourceDbToSpannerFTBase.java

Lines changed: 20 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
2020

2121
import com.google.cloud.teleport.v2.spanner.migrations.constants.Constants;
22+
import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
23+
import com.google.cloud.teleport.v2.spanner.migrations.source.config.JdbcShardConfig;
2224
import com.google.cloud.teleport.v2.spanner.migrations.transformation.CustomTransformation;
25+
import com.google.gson.Gson;
2326
import com.google.pubsub.v1.SubscriptionName;
2427
import com.google.pubsub.v1.TopicName;
2528
import java.io.BufferedReader;
@@ -45,8 +48,6 @@
4548
import org.apache.beam.it.gcp.spanner.SpannerResourceManager;
4649
import org.apache.beam.it.gcp.storage.GcsResourceManager;
4750
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
48-
import org.json.JSONArray;
49-
import org.json.JSONObject;
5051
import org.slf4j.Logger;
5152
import org.slf4j.LoggerFactory;
5253

@@ -189,51 +190,29 @@ protected PipelineLauncher.LaunchInfo launchShardedBulkDataflowJob(
189190

190191
protected void createAndUploadBulkShardConfigToGcs(
191192
ArrayList<DataShard> dataShardsList, GcsResourceManager gcsResourceManager) {
192-
JSONObject bulkConfig = new JSONObject();
193-
bulkConfig.put("configType", "dataflow");
194-
195-
JSONObject shardConfigBulk = new JSONObject();
196-
197-
JSONObject schemaSourceJson = new JSONObject();
198-
schemaSourceJson.put("dataShardId", "");
199-
schemaSourceJson.put("host", "");
200-
schemaSourceJson.put("user", "");
201-
schemaSourceJson.put("password", "");
202-
schemaSourceJson.put("port", "");
203-
schemaSourceJson.put("dbName", "");
204-
shardConfigBulk.put("schemaSource", schemaSourceJson);
205-
206-
JSONArray dataShardsArray = new JSONArray();
193+
List<Shard> shards = new ArrayList<>();
207194
if (dataShardsList != null) {
208195
for (DataShard shardData : dataShardsList) {
209-
JSONObject shardJson = new JSONObject();
210-
211-
shardJson.put("dataShardId", shardData.dataShardId);
212-
shardJson.put("host", shardData.host);
213-
shardJson.put("user", shardData.user);
214-
shardJson.put("password", shardData.password);
215-
shardJson.put("port", shardData.port);
216-
shardJson.put("dbName", shardData.dbName);
217-
shardJson.put("namespace", shardData.namespace);
218-
shardJson.put("connectionProperties", shardData.connectionProperties);
219-
220-
JSONArray databasesArray = new JSONArray();
221-
222-
for (Database dbData : shardData.databases) {
223-
JSONObject dbJson = new JSONObject();
224-
dbJson.put("dbName", dbData.dbName);
225-
dbJson.put("databaseId", dbData.databaseId);
226-
dbJson.put("refDataShardId", dbData.refDataShardId);
227-
databasesArray.put(dbJson);
196+
Shard shard = new Shard();
197+
shard.setLogicalShardId(shardData.dataShardId);
198+
shard.setHost(shardData.host);
199+
shard.setUser(shardData.user);
200+
shard.setPassword(shardData.password);
201+
shard.setPort(shardData.port);
202+
shard.setDbName(shardData.dbName);
203+
if (shardData.namespace != null) {
204+
shard.setNamespace(shardData.namespace);
205+
}
206+
if (shardData.connectionProperties != null) {
207+
shard.setConnectionProperties(shardData.connectionProperties);
228208
}
229-
shardJson.put("databases", databasesArray);
230-
dataShardsArray.put(shardJson);
209+
shards.add(shard);
231210
}
232211
}
233-
shardConfigBulk.put("dataShards", dataShardsArray);
234212

235-
bulkConfig.put("shardConfigurationBulk", shardConfigBulk);
236-
String shardFileContents = bulkConfig.toString();
213+
JdbcShardConfig jdbcShardConfig = new JdbcShardConfig();
214+
jdbcShardConfig.setShardConfigs(shards);
215+
String shardFileContents = new Gson().toJson(jdbcShardConfig);
237216
LOG.info("Shard file contents: {}", shardFileContents);
238217
gcsResourceManager.createArtifact("input/shard-bulk.json", shardFileContents);
239218
}

v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/loadtesting/SourceDbToSpannerLTBase.java

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,15 @@
1919
import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult;
2020

2121
import com.google.cloud.teleport.v2.source.reader.io.jdbc.iowrapper.config.SQLDialect;
22+
import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
23+
import com.google.cloud.teleport.v2.spanner.migrations.source.config.JdbcShardConfig;
24+
import com.google.gson.Gson;
2225
import java.io.IOException;
2326
import java.nio.charset.StandardCharsets;
2427
import java.text.ParseException;
2528
import java.time.Duration;
2629
import java.util.Arrays;
30+
import java.util.Collections;
2731
import java.util.HashMap;
2832
import java.util.List;
2933
import java.util.Map;
@@ -34,6 +38,7 @@
3438
import org.apache.beam.it.common.utils.ResourceManagerUtils;
3539
import org.apache.beam.it.conditions.ConditionCheck;
3640
import org.apache.beam.it.gcp.TemplateLoadTestBase;
41+
import org.apache.beam.it.gcp.artifacts.utils.ArtifactUtils;
3742
import org.apache.beam.it.gcp.secretmanager.SecretManagerResourceManager;
3843
import org.apache.beam.it.gcp.spanner.SpannerResourceManager;
3944
import org.apache.beam.it.gcp.spanner.conditions.SpannerRowsCheck;
@@ -175,11 +180,40 @@ protected String getOutputDirectory() {
175180
}
176181

177182
protected Map<String, String> getJdbcParameters(StaticJDBCResource jdbcResource) {
178-
return getJdbcParameters(
179-
jdbcResource.getconnectionURL(),
180-
jdbcResource.username(),
181-
jdbcResource.password(),
182-
driverClassName());
183+
try {
184+
return getJdbcParameters(
185+
createAndUploadShardConfigToGcs(jdbcResource),
186+
jdbcResource.username(),
187+
jdbcResource.password(),
188+
driverClassName());
189+
} catch (IOException e) {
190+
throw new RuntimeException("Failed to create and upload shard config", e);
191+
}
192+
}
193+
194+
protected String createAndUploadShardConfigToGcs(StaticJDBCResource jdbcResource)
195+
throws IOException {
196+
Shard shard = new Shard();
197+
shard.setLogicalShardId("Shard1");
198+
shard.setUser(jdbcResource.username());
199+
shard.setPassword(jdbcResource.password());
200+
shard.setHost(jdbcResource.hostname());
201+
shard.setPort(String.valueOf(jdbcResource.port()));
202+
shard.setDbName(jdbcResource.database());
203+
204+
JdbcShardConfig jdbcShardConfig = new JdbcShardConfig();
205+
jdbcShardConfig.setShardConfigs(Collections.singletonList(shard));
206+
String shardFileContents = new Gson().toJson(jdbcShardConfig);
207+
gcsResourceManager.createArtifact("input/shard.json", shardFileContents);
208+
return getGcsPath("input/shard.json", gcsResourceManager);
209+
}
210+
211+
protected String getGcsPath(String artifactId, GcsResourceManager gcsResourceManager) {
212+
return ArtifactUtils.getFullGcsPath(
213+
gcsResourceManager.getBucket(),
214+
getClass().getSimpleName(),
215+
gcsResourceManager.runId(),
216+
artifactId);
183217
}
184218

185219
protected Map<String, String> getJdbcParameters(

0 commit comments

Comments
 (0)