Skip to content

Commit 157a4a4

Browse files
committed
UniformizationIT
1 parent f3a6148 commit 157a4a4

3 files changed

Lines changed: 147 additions & 0 deletions

File tree

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Copyright (C) 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package com.google.cloud.teleport.v2.templates;
17+
18+
import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult;
19+
20+
import com.google.cloud.spanner.Struct;
21+
import com.google.cloud.teleport.metadata.SkipDirectRunnerTest;
22+
import com.google.cloud.teleport.metadata.TemplateIntegrationTest;
23+
import java.time.Duration;
24+
import java.util.ArrayList;
25+
import java.util.HashMap;
26+
import java.util.List;
27+
import java.util.Map;
28+
import org.apache.beam.it.common.PipelineLauncher;
29+
import org.apache.beam.it.common.PipelineOperator;
30+
import org.apache.beam.it.common.utils.ResourceManagerUtils;
31+
import org.apache.beam.it.gcp.spanner.SpannerResourceManager;
32+
import org.apache.beam.it.gcp.spanner.matchers.SpannerAsserts;
33+
import org.apache.beam.it.jdbc.MySQLResourceManager;
34+
import org.junit.After;
35+
import org.junit.Before;
36+
import org.junit.Test;
37+
import org.junit.experimental.categories.Category;
38+
import org.junit.runner.RunWith;
39+
import org.junit.runners.JUnit4;
40+
import org.slf4j.Logger;
41+
import org.slf4j.LoggerFactory;
42+
43+
/**
44+
* An integration test for {@link SourceDbToSpanner} Flex template which tests MySQL data migration
45+
* with uniformization enabled.
46+
*/
47+
@Category({TemplateIntegrationTest.class, SkipDirectRunnerTest.class})
48+
@TemplateIntegrationTest(SourceDbToSpanner.class)
49+
@RunWith(JUnit4.class)
50+
public class MySQLWithUniformizationIT extends SourceDbToSpannerITBase {
51+
private static final Logger LOG = LoggerFactory.getLogger(MySQLWithUniformizationIT.class);
52+
53+
public static MySQLResourceManager mySQLResourceManager;
54+
public static SpannerResourceManager spannerResourceManager;
55+
56+
private static final String MYSQL_DDL_RESOURCE = "DataTypesIT/mysql-uniformization-test.sql";
57+
private static final String SPANNER_DDL_RESOURCE =
58+
"DataTypesIT/mysql-spanner-uniformization-test.sql";
59+
60+
/** Setup resource managers. */
61+
@Before
62+
public void setUp() {
63+
mySQLResourceManager = setUpMySQLResourceManager();
64+
spannerResourceManager = setUpSpannerResourceManager();
65+
}
66+
67+
/** Cleanup dataflow job, all the resources, and resource managers. */
68+
@After
69+
public void tearDown() {
70+
ResourceManagerUtils.cleanResources(spannerResourceManager, mySQLResourceManager);
71+
}
72+
73+
@Test
74+
public void withUniformizationTest() throws Exception {
75+
loadSQLFileResource(mySQLResourceManager, MYSQL_DDL_RESOURCE);
76+
createSpannerDDL(spannerResourceManager, SPANNER_DDL_RESOURCE);
77+
System.setProperty("numWorkers", "20");
78+
Map<String, String> jobParameters = new HashMap<>();
79+
jobParameters.put("numPartitions", "100");
80+
jobParameters.put("uniformizationStageCountHint", "-1");
81+
PipelineLauncher.LaunchInfo jobInfo =
82+
launchDataflowJob(
83+
getClass().getSimpleName(),
84+
null,
85+
null,
86+
mySQLResourceManager,
87+
spannerResourceManager,
88+
jobParameters,
89+
null);
90+
PipelineOperator.Result result =
91+
pipelineOperator().waitUntilDone(createConfig(jobInfo, Duration.ofMinutes(30L)));
92+
assertThatResult(result).isLaunchFinished();
93+
94+
Map<String, List<Map<String, Object>>> expectedData = getExpectedData();
95+
for (Map.Entry<String, List<Map<String, Object>>> entry : expectedData.entrySet()) {
96+
String tableName = entry.getKey();
97+
LOG.info("Asserting table:{}", tableName);
98+
99+
List<Struct> rows = spannerResourceManager.readTableRecords(tableName, "id", "col");
100+
for (Struct row : rows) {
101+
LOG.info("Found row: {}", row);
102+
}
103+
SpannerAsserts.assertThatStructs(rows)
104+
.hasRecordsUnorderedCaseInsensitiveColumns(entry.getValue());
105+
}
106+
}
107+
108+
private Map<String, List<Map<String, Object>>> getExpectedData() {
109+
HashMap<String, List<Map<String, Object>>> result = new HashMap<>();
110+
result.put("t_bigint", createRows("-9223372036854775808", "9223372036854775807", "42", "NULL"));
111+
result.put("t_varchar", createRows("apple", "banana", "cherry", "NULL"));
112+
result.put("t_decimal", createRows("123.45678", "-999.99999", "0", "NULL"));
113+
return result;
114+
}
115+
116+
private List<Map<String, Object>> createRows(Object... values) {
117+
List<Map<String, Object>> rows = new ArrayList<>();
118+
for (int i = 0; i < values.length; i++) {
119+
Map<String, Object> row = new HashMap<>();
120+
row.put("id", i + 1);
121+
row.put("col", values[i]);
122+
rows.add(row);
123+
}
124+
return rows;
125+
}
126+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
CREATE TABLE t_bigint (
2+
id INT64 NOT NULL,
3+
col INT64,
4+
) PRIMARY KEY(id);
5+
6+
CREATE TABLE t_varchar (
7+
id INT64 NOT NULL,
8+
col STRING(255),
9+
) PRIMARY KEY(id);
10+
11+
CREATE TABLE t_decimal (
12+
id INT64 NOT NULL,
13+
col NUMERIC,
14+
) PRIMARY KEY(id);
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
CREATE TABLE t_bigint (id INT AUTO_INCREMENT PRIMARY KEY, col BIGINT);
2+
CREATE TABLE t_varchar (id INT AUTO_INCREMENT PRIMARY KEY, col VARCHAR(255));
3+
CREATE TABLE t_decimal (id INT AUTO_INCREMENT PRIMARY KEY, col DECIMAL(20, 5));
4+
5+
INSERT INTO t_bigint (col) VALUES (-9223372036854775808), (9223372036854775807), (42), (NULL);
6+
INSERT INTO t_varchar (col) VALUES ('apple'), ('banana'), ('cherry'), (NULL);
7+
INSERT INTO t_decimal (col) VALUES (123.45678), (-999.99999), (0.0), (NULL);

0 commit comments

Comments
 (0)