Skip to content

Commit 466301e

Browse files
authored
[SpannerToSourceDb] Fixing flaky UT and also improving coverage (#3874)
* [SpannerToSourceDb] Fixing flaky UT and also improving coverage * Addressing comments
1 parent 09afa0d commit 466301e

11 files changed

Lines changed: 615 additions & 85 deletions

File tree

v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/dml/CassandraDMLGenerator.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ public DMLGeneratorResponse getDMLStatement(DMLGeneratorRequest dmlGeneratorRequ
130130
dmlGeneratorRequest.getKeyValuesJson(),
131131
dmlGeneratorRequest.getSourceDbTimezoneOffset(),
132132
dmlGeneratorRequest.getCustomTransformationResponse());
133-
if (pkColumnNameValues == null) {
133+
if (pkColumnNameValues == null || pkColumnNameValues.isEmpty()) {
134134
throw new InvalidDMLGenerationException(
135135
String.format(
136136
"Cannot reverse replicate for table %s without primary key, skipping the record",
@@ -435,16 +435,13 @@ static Map<String, PreparedStatementValueObject<?>> getPkColumnValues(
435435
continue;
436436
}
437437
if (spannerColName == null || spannerColName == "") {
438-
LOG.warn(
439-
"The corresponding spanner table for {} was not found in schema mapping",
440-
sourceColName);
441-
return null;
438+
continue;
442439
}
443440
Column spannerColDef = spannerTable.column(spannerColName);
444441
if (spannerColDef == null) {
445442
LOG.warn(
446443
"The spanner column definition for {} was not found in spanner schema", spannerColName);
447-
return null;
444+
continue;
448445
}
449446
if (keyValuesJson.has(spannerColName)) {
450447
columnValue =

v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/dml/DMLGeneratorUtils.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -175,16 +175,13 @@ public static Map<String, String> getPkColumnValues(
175175
continue;
176176
}
177177
if (spannerColName == null) {
178-
LOG.warn(
179-
"The corresponding spanner table for {} was not found in schema mapping",
180-
sourceColName);
181-
return null;
178+
continue;
182179
}
183180
Column spannerColDef = spannerTable.column(spannerColName);
184181
if (spannerColDef == null) {
185182
LOG.warn(
186183
"The spanner column definition for {} was not found in spanner schema", spannerColName);
187-
return null;
184+
continue;
188185
}
189186
String columnValue = "";
190187
String actualColName = spannerColDef.name();

v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/dml/MySQLDMLGenerator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ public DMLGeneratorResponse getDMLStatement(DMLGeneratorRequest dmlGeneratorRequ
9595
dmlGeneratorRequest.getSourceDbTimezoneOffset(),
9696
dmlGeneratorRequest.getCustomTransformationResponse(),
9797
MySQLDMLGenerator::getMappedColumnValue);
98-
if (pkcolumnNameValues == null) {
98+
if (pkcolumnNameValues == null || pkcolumnNameValues.isEmpty()) {
9999
throw new InvalidDMLGenerationException(
100100
String.format(
101101
"Cannot reverse replicate for table %s without primary key, skipping the record",

v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/dml/PostgreSQLDMLGenerator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public DMLGeneratorResponse getDMLStatement(DMLGeneratorRequest dmlGeneratorRequ
9797
dmlGeneratorRequest.getSourceDbTimezoneOffset(),
9898
dmlGeneratorRequest.getCustomTransformationResponse(),
9999
PostgreSQLDMLGenerator::getMappedColumnValue);
100-
if (pkcolumnNameValues == null) {
100+
if (pkcolumnNameValues == null || pkcolumnNameValues.isEmpty()) {
101101
throw new InvalidDMLGenerationException(
102102
String.format(
103103
"Cannot reverse replicate for table %s without primary key, skipping the record",

v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/processor/SourceProcessorFactory.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,10 @@ public static void setConnectionHelperMap(Map<String, IConnectionHelper> connect
126126
connectionHelperMap = connectionHelper;
127127
}
128128

129+
static Map<String, IConnectionHelper> getConnectionHelperMap() {
130+
return connectionHelperMap;
131+
}
132+
129133
/**
130134
* Creates a SourceProcessor instance for the specified source type.
131135
*

v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/dbutils/dml/CassandraDMLGeneratorTest.java

Lines changed: 251 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package com.google.cloud.teleport.v2.templates.dbutils.dml;
1717

1818
import static org.junit.Assert.assertEquals;
19+
import static org.junit.Assert.assertFalse;
1920
import static org.junit.Assert.assertNotNull;
2021
import static org.junit.Assert.assertNull;
2122
import static org.junit.Assert.assertThrows;
@@ -1186,7 +1187,7 @@ public void testGetColumnValues_MissingSpannerMapping() {
11861187
Mockito.when(sourceTable.name()).thenReturn("src_table");
11871188

11881189
Mockito.when(schemaMapper.getSpannerColumnName("", "src_table", "col1"))
1189-
.thenThrow(new java.util.NoSuchElementException());
1190+
.thenThrow(new NoSuchElementException());
11901191

11911192
Map<String, PreparedStatementValueObject<?>> response =
11921193
CassandraDMLGenerator.getColumnValues(
@@ -1231,4 +1232,253 @@ public void testGetPkColumnValues_MissingValueInJson() {
12311232

12321233
assertNull(response);
12331234
}
1235+
1236+
@Test
1237+
public void testGetDMLStatement_PkColSpannerColNameThrowsNoSuchElement() {
1238+
ISchemaMapper schemaMapper = Mockito.mock(ISchemaMapper.class);
1239+
Ddl spannerDdl = Ddl.builder().build();
1240+
spannerDdl = Mockito.spy(spannerDdl);
1241+
Table spannerTable = Mockito.mock(Table.class);
1242+
SourceSchema sourceSchema = Mockito.mock(SourceSchema.class);
1243+
SourceTable sourceTable = Mockito.mock(SourceTable.class);
1244+
SourceColumn sourceCol = Mockito.mock(SourceColumn.class);
1245+
1246+
Mockito.doReturn(spannerTable).when(spannerDdl).table("Singers");
1247+
Mockito.when(schemaMapper.getSourceTableName("", "Singers")).thenReturn("Singers");
1248+
Mockito.when(sourceSchema.table("Singers")).thenReturn(sourceTable);
1249+
Mockito.when(sourceTable.primaryKeyColumns()).thenReturn(ImmutableList.of("SingerId"));
1250+
Mockito.when(sourceTable.column("SingerId")).thenReturn(sourceCol);
1251+
Mockito.when(sourceCol.type()).thenReturn("int");
1252+
Mockito.when(sourceTable.name()).thenReturn("Singers");
1253+
1254+
Mockito.when(schemaMapper.getSpannerColumnName("", "Singers", "SingerId"))
1255+
.thenThrow(new NoSuchElementException("Not found"));
1256+
1257+
DMLGeneratorRequest request =
1258+
new DMLGeneratorRequest.Builder(
1259+
"INSERT", "Singers", new JSONObject(), new JSONObject(), "+00:00")
1260+
.setSchemaMapper(schemaMapper)
1261+
.setDdl(spannerDdl)
1262+
.setSourceSchema(sourceSchema)
1263+
.setCommitTimestamp(Timestamp.now())
1264+
.build();
1265+
1266+
assertThrows(
1267+
InvalidDMLGenerationException.class, () -> cassandraDMLGenerator.getDMLStatement(request));
1268+
}
1269+
1270+
@Test
1271+
public void testGetDMLStatement_PkColSpannerColNameNull() {
1272+
ISchemaMapper schemaMapper = Mockito.mock(ISchemaMapper.class);
1273+
Ddl spannerDdl = Ddl.builder().build();
1274+
spannerDdl = Mockito.spy(spannerDdl);
1275+
Table spannerTable = Mockito.mock(Table.class);
1276+
SourceSchema sourceSchema = Mockito.mock(SourceSchema.class);
1277+
SourceTable sourceTable = Mockito.mock(SourceTable.class);
1278+
SourceColumn sourceCol = Mockito.mock(SourceColumn.class);
1279+
1280+
Mockito.doReturn(spannerTable).when(spannerDdl).table("Singers");
1281+
Mockito.when(schemaMapper.getSourceTableName("", "Singers")).thenReturn("Singers");
1282+
Mockito.when(sourceSchema.table("Singers")).thenReturn(sourceTable);
1283+
Mockito.when(sourceTable.primaryKeyColumns()).thenReturn(ImmutableList.of("SingerId"));
1284+
Mockito.when(sourceTable.column("SingerId")).thenReturn(sourceCol);
1285+
Mockito.when(sourceCol.type()).thenReturn("int");
1286+
Mockito.when(sourceTable.name()).thenReturn("Singers");
1287+
1288+
Mockito.when(schemaMapper.getSpannerColumnName("", "Singers", "SingerId")).thenReturn(null);
1289+
1290+
DMLGeneratorRequest request =
1291+
new DMLGeneratorRequest.Builder(
1292+
"INSERT", "Singers", new JSONObject(), new JSONObject(), "+00:00")
1293+
.setSchemaMapper(schemaMapper)
1294+
.setDdl(spannerDdl)
1295+
.setSourceSchema(sourceSchema)
1296+
.setCommitTimestamp(Timestamp.now())
1297+
.build();
1298+
1299+
assertThrows(
1300+
InvalidDMLGenerationException.class, () -> cassandraDMLGenerator.getDMLStatement(request));
1301+
}
1302+
1303+
@Test
1304+
public void testGetDMLStatement_PkColSpannerColDefNull() {
1305+
ISchemaMapper schemaMapper = Mockito.mock(ISchemaMapper.class);
1306+
Ddl spannerDdl = Ddl.builder().build();
1307+
spannerDdl = Mockito.spy(spannerDdl);
1308+
Table spannerTable = Mockito.mock(Table.class);
1309+
SourceSchema sourceSchema = Mockito.mock(SourceSchema.class);
1310+
SourceTable sourceTable = Mockito.mock(SourceTable.class);
1311+
SourceColumn sourceCol = Mockito.mock(SourceColumn.class);
1312+
1313+
Mockito.doReturn(spannerTable).when(spannerDdl).table("Singers");
1314+
Mockito.when(schemaMapper.getSourceTableName("", "Singers")).thenReturn("Singers");
1315+
Mockito.when(sourceSchema.table("Singers")).thenReturn(sourceTable);
1316+
Mockito.when(sourceTable.primaryKeyColumns()).thenReturn(ImmutableList.of("SingerId"));
1317+
Mockito.when(sourceTable.column("SingerId")).thenReturn(sourceCol);
1318+
Mockito.when(sourceCol.type()).thenReturn("int");
1319+
Mockito.when(sourceTable.name()).thenReturn("Singers");
1320+
1321+
Mockito.when(schemaMapper.getSpannerColumnName("", "Singers", "SingerId"))
1322+
.thenReturn("SingerId");
1323+
Mockito.when(spannerTable.column("SingerId")).thenReturn(null);
1324+
1325+
DMLGeneratorRequest request =
1326+
new DMLGeneratorRequest.Builder(
1327+
"INSERT", "Singers", new JSONObject(), new JSONObject(), "+00:00")
1328+
.setSchemaMapper(schemaMapper)
1329+
.setDdl(spannerDdl)
1330+
.setSourceSchema(sourceSchema)
1331+
.setCommitTimestamp(Timestamp.now())
1332+
.build();
1333+
1334+
assertThrows(
1335+
InvalidDMLGenerationException.class, () -> cassandraDMLGenerator.getDMLStatement(request));
1336+
}
1337+
1338+
@Test
1339+
public void testGetDMLStatement_ColValuesSpannerColDefNullAndKeyValuesHasNonPk() {
1340+
ISchemaMapper schemaMapper = Mockito.mock(ISchemaMapper.class);
1341+
Table spannerTable = Mockito.mock(Table.class);
1342+
SourceSchema sourceSchema = Mockito.mock(SourceSchema.class);
1343+
SourceTable sourceTable = Mockito.mock(SourceTable.class);
1344+
SourceColumn pkCol = Mockito.mock(SourceColumn.class);
1345+
SourceColumn nonPkCol = Mockito.mock(SourceColumn.class);
1346+
SourceColumn missingCol = Mockito.mock(SourceColumn.class);
1347+
Column spannerPkCol = Mockito.mock(Column.class);
1348+
Column spannerNonPkCol = Mockito.mock(Column.class);
1349+
1350+
Ddl spannerDdl = Ddl.builder().build();
1351+
spannerDdl = Mockito.spy(spannerDdl);
1352+
Mockito.doReturn(spannerTable).when(spannerDdl).table("Singers");
1353+
1354+
Mockito.when(schemaMapper.getSourceTableName("", "Singers")).thenReturn("Singers");
1355+
Mockito.when(sourceSchema.table("Singers")).thenReturn(sourceTable);
1356+
Mockito.when(sourceTable.primaryKeyColumns()).thenReturn(ImmutableList.of("SingerId"));
1357+
Mockito.when(sourceTable.column("SingerId")).thenReturn(pkCol);
1358+
Mockito.when(pkCol.type()).thenReturn("int");
1359+
Mockito.when(sourceTable.name()).thenReturn("Singers");
1360+
1361+
Mockito.when(schemaMapper.getSpannerColumnName("", "Singers", "SingerId"))
1362+
.thenReturn("SingerId");
1363+
Mockito.when(spannerTable.column("SingerId")).thenReturn(spannerPkCol);
1364+
Mockito.when(spannerPkCol.name()).thenReturn("SingerId");
1365+
Mockito.when(spannerPkCol.type()).thenReturn(Type.int64());
1366+
1367+
Mockito.when(sourceTable.columns()).thenReturn(ImmutableList.of(pkCol, nonPkCol, missingCol));
1368+
Mockito.when(nonPkCol.name()).thenReturn("LastName");
1369+
Mockito.when(nonPkCol.type()).thenReturn("varchar");
1370+
Mockito.when(missingCol.name()).thenReturn("MissingCol");
1371+
Mockito.when(missingCol.type()).thenReturn("varchar");
1372+
1373+
Mockito.when(schemaMapper.getSpannerColumnName("", "Singers", "LastName"))
1374+
.thenReturn("LastName");
1375+
Mockito.when(spannerTable.column("LastName")).thenReturn(spannerNonPkCol);
1376+
Mockito.when(spannerNonPkCol.name()).thenReturn("LastName");
1377+
Mockito.when(spannerNonPkCol.type()).thenReturn(Type.string());
1378+
1379+
Mockito.when(schemaMapper.getSpannerColumnName("", "Singers", "MissingCol"))
1380+
.thenReturn("MissingCol");
1381+
Mockito.when(spannerTable.column("MissingCol")).thenReturn(null);
1382+
1383+
JSONObject keyValuesJson = new JSONObject();
1384+
keyValuesJson.put("SingerId", "999");
1385+
keyValuesJson.put("LastName", "Smith");
1386+
1387+
DMLGeneratorRequest request =
1388+
new DMLGeneratorRequest.Builder(
1389+
"INSERT", "Singers", new JSONObject(), keyValuesJson, "+00:00")
1390+
.setSchemaMapper(schemaMapper)
1391+
.setDdl(spannerDdl)
1392+
.setSourceSchema(sourceSchema)
1393+
.setCommitTimestamp(Timestamp.now())
1394+
.build();
1395+
1396+
DMLGeneratorResponse response = cassandraDMLGenerator.getDMLStatement(request);
1397+
1398+
assertTrue(response instanceof PreparedStatementGeneratedResponse);
1399+
PreparedStatementGeneratedResponse prepResponse = (PreparedStatementGeneratedResponse) response;
1400+
1401+
assertTrue(prepResponse.getDmlStatement().contains("\"LastName\""));
1402+
assertFalse(prepResponse.getDmlStatement().contains("\"MissingCol\""));
1403+
1404+
assertEquals(3, prepResponse.getValues().size());
1405+
}
1406+
1407+
@Test
1408+
public void testGetDMLStatement_CompositePk_OnePkSpannerColDefNull() {
1409+
ISchemaMapper schemaMapper = Mockito.mock(ISchemaMapper.class);
1410+
Table spannerTable = Mockito.mock(Table.class);
1411+
SourceSchema sourceSchema = Mockito.mock(SourceSchema.class);
1412+
SourceTable sourceTable = Mockito.mock(SourceTable.class);
1413+
SourceColumn pkCol1 = Mockito.mock(SourceColumn.class);
1414+
SourceColumn pkCol2 = Mockito.mock(SourceColumn.class);
1415+
SourceColumn nonPkCol = Mockito.mock(SourceColumn.class);
1416+
Column spannerPkCol1 = Mockito.mock(Column.class);
1417+
Column spannerNonPkCol = Mockito.mock(Column.class);
1418+
1419+
Ddl spannerDdl = Ddl.builder().build();
1420+
spannerDdl = Mockito.spy(spannerDdl);
1421+
Mockito.doReturn(spannerTable).when(spannerDdl).table("Singers");
1422+
1423+
Mockito.when(schemaMapper.getSourceTableName("", "Singers")).thenReturn("Singers");
1424+
Mockito.when(sourceSchema.table("Singers")).thenReturn(sourceTable);
1425+
1426+
// Composite PK
1427+
Mockito.when(sourceTable.primaryKeyColumns())
1428+
.thenReturn(ImmutableList.of("SingerId", "LastName"));
1429+
Mockito.when(sourceTable.column("SingerId")).thenReturn(pkCol1);
1430+
Mockito.when(pkCol1.type()).thenReturn("int");
1431+
Mockito.when(sourceTable.column("LastName")).thenReturn(pkCol2);
1432+
Mockito.when(pkCol2.type()).thenReturn("varchar");
1433+
Mockito.when(sourceTable.name()).thenReturn("Singers");
1434+
1435+
// SingerId (pkCol1) maps successfully
1436+
Mockito.when(schemaMapper.getSpannerColumnName("", "Singers", "SingerId"))
1437+
.thenReturn("SingerId");
1438+
Mockito.when(spannerTable.column("SingerId")).thenReturn(spannerPkCol1);
1439+
Mockito.when(spannerPkCol1.name()).thenReturn("SingerId");
1440+
Mockito.when(spannerPkCol1.type()).thenReturn(Type.int64());
1441+
1442+
// LastName (pkCol2) fails to map because Spanner column def is null
1443+
Mockito.when(schemaMapper.getSpannerColumnName("", "Singers", "LastName"))
1444+
.thenReturn("LastName");
1445+
Mockito.when(spannerTable.column("LastName")).thenReturn(null);
1446+
1447+
// Non-PK col (Age) maps successfully
1448+
Mockito.when(sourceTable.columns()).thenReturn(ImmutableList.of(pkCol1, pkCol2, nonPkCol));
1449+
Mockito.when(nonPkCol.name()).thenReturn("Age");
1450+
Mockito.when(nonPkCol.type()).thenReturn("int");
1451+
1452+
Mockito.when(schemaMapper.getSpannerColumnName("", "Singers", "Age")).thenReturn("Age");
1453+
Mockito.when(spannerTable.column("Age")).thenReturn(spannerNonPkCol);
1454+
Mockito.when(spannerNonPkCol.name()).thenReturn("Age");
1455+
Mockito.when(spannerNonPkCol.type()).thenReturn(Type.int64());
1456+
1457+
JSONObject keyValuesJson = new JSONObject();
1458+
keyValuesJson.put("SingerId", 999L);
1459+
keyValuesJson.put("LastName", "Smith");
1460+
keyValuesJson.put("Age", 30L);
1461+
1462+
DMLGeneratorRequest request =
1463+
new DMLGeneratorRequest.Builder(
1464+
"INSERT", "Singers", new JSONObject(), keyValuesJson, "+00:00")
1465+
.setSchemaMapper(schemaMapper)
1466+
.setDdl(spannerDdl)
1467+
.setSourceSchema(sourceSchema)
1468+
.setCommitTimestamp(Timestamp.now())
1469+
.build();
1470+
1471+
DMLGeneratorResponse response = cassandraDMLGenerator.getDMLStatement(request);
1472+
1473+
assertTrue(response instanceof PreparedStatementGeneratedResponse);
1474+
PreparedStatementGeneratedResponse prepResponse = (PreparedStatementGeneratedResponse) response;
1475+
1476+
// The statement should contain SingerId and Age, but NOT LastName
1477+
assertTrue(prepResponse.getDmlStatement().contains("\"SingerId\""));
1478+
assertTrue(prepResponse.getDmlStatement().contains("\"Age\""));
1479+
assertFalse(prepResponse.getDmlStatement().contains("\"LastName\""));
1480+
1481+
// Expected values: SingerId (999), Age (30), and using_timestamp
1482+
assertEquals(3, prepResponse.getValues().size());
1483+
}
12341484
}

0 commit comments

Comments
 (0)