Skip to content

Commit dd51c4c

Browse files
authored
fix jdbc transform validation (#35141)
* fix jdbc transform validation * add test * annotations * spotless
1 parent a4707ab commit dd51c4c

2 files changed

Lines changed: 47 additions & 1 deletion

File tree

sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ protected JdbcIO.DataSourceConfiguration dataSourceConfiguration() {
214214

215215
@Override
216216
public PCollectionRowTuple expand(PCollectionRowTuple input) {
217-
config.validate();
217+
config.validate(jdbcType);
218218
// If we define a partition column, we follow a different route.
219219
@Nullable String partitionColumn = config.getPartitionColumn();
220220
@Nullable String location = config.getLocation();

sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static org.junit.Assert.assertNotNull;
2323
import static org.junit.Assert.assertThrows;
2424

25+
import com.google.auto.service.AutoService;
2526
import java.sql.Connection;
2627
import java.sql.PreparedStatement;
2728
import java.sql.SQLException;
@@ -47,6 +48,25 @@
4748
@RunWith(JUnit4.class)
4849
public class JdbcReadSchemaTransformProviderTest {
4950

51+
@AutoService(SchemaTransformProvider.class)
52+
public static class ReadFromDerbySchemaTransformProvider extends JdbcReadSchemaTransformProvider {
53+
54+
@Override
55+
public String identifier() {
56+
return "beam:schematransform:org.apache.beam:derby_read:v1";
57+
}
58+
59+
@Override
60+
public String description() {
61+
return inheritedDescription("Derby", "ReadFromDerby", "derby", 5432);
62+
}
63+
64+
@Override
65+
protected String jdbcType() {
66+
return "derby";
67+
}
68+
}
69+
5070
private static final JdbcIO.DataSourceConfiguration DATA_SOURCE_CONFIGURATION =
5171
JdbcIO.DataSourceConfiguration.create(
5272
"org.apache.derby.jdbc.EmbeddedDriver", "jdbc:derby:memory:testDB;create=true");
@@ -199,6 +219,32 @@ public void testReadWithJdbcTypeSpecified() {
199219
pipeline.run();
200220
}
201221

222+
@Test
223+
public void testReadWithJdbcDerbyTransformTypeSpecified() {
224+
JdbcReadSchemaTransformProvider provider = null;
225+
for (SchemaTransformProvider p : ServiceLoader.load(SchemaTransformProvider.class)) {
226+
if (p instanceof ReadFromDerbySchemaTransformProvider) {
227+
provider = (JdbcReadSchemaTransformProvider) p;
228+
break;
229+
}
230+
}
231+
assertNotNull(provider);
232+
233+
PCollection<Row> output =
234+
PCollectionRowTuple.empty(pipeline)
235+
.apply(
236+
provider.from(
237+
ReadFromDerbySchemaTransformProvider.JdbcReadSchemaTransformConfiguration
238+
.builder()
239+
.setJdbcUrl(DATA_SOURCE_CONFIGURATION.getUrl().get())
240+
.setLocation(READ_TABLE_NAME)
241+
.build()))
242+
.get("output");
243+
Long expected = Long.valueOf(EXPECTED_ROW_COUNT);
244+
PAssert.that(output.apply(Count.globally())).containsInAnyOrder(expected);
245+
pipeline.run();
246+
}
247+
202248
@Test
203249
public void testReadWithPartitions() {
204250
JdbcReadSchemaTransformProvider provider = null;

0 commit comments

Comments
 (0)