|
28 | 28 | import java.sql.DatabaseMetaData; |
29 | 29 | import java.sql.ResultSet; |
30 | 30 | import java.util.ArrayList; |
| 31 | +import java.util.HashSet; |
31 | 32 | import java.util.List; |
| 33 | +import java.util.Set; |
32 | 34 |
|
33 | 35 | import static org.assertj.core.api.Assertions.assertThat; |
34 | 36 | import static org.junit.jupiter.api.Assertions.assertEquals; |
@@ -193,6 +195,156 @@ public void testCatalogSchemas() throws Exception { |
193 | 195 | } |
194 | 196 | } |
195 | 197 |
|
| 198 | + @Test |
| 199 | + public void testGetSchemasWithFilter() throws Exception { |
| 200 | + DriverUri driverUri = getDriverUri(); |
| 201 | + try (FlinkConnection connection = new FlinkConnection(driverUri)) { |
| 202 | + Executor executor = connection.getExecutor(); |
| 203 | + executeDDL("CREATE DATABASE database_a", executor); |
| 204 | + executeDDL("CREATE DATABASE database_b", executor); |
| 205 | + |
| 206 | + DatabaseMetaData metaData = |
| 207 | + new FlinkDatabaseMetaData( |
| 208 | + driverUri.getURL(), connection, new TestingStatement()); |
| 209 | + |
| 210 | + // Filter by catalog |
| 211 | + List<String> schemas = |
| 212 | + resultSetToListAndClose(metaData.getSchemas("default_catalog", null)); |
| 213 | + assertThat(schemas).allMatch(s -> s.contains("default_catalog")); |
| 214 | + |
| 215 | + // Filter by schema pattern |
| 216 | + List<String> filteredSchemas = |
| 217 | + resultSetToListAndClose(metaData.getSchemas("default_catalog", "database_%")); |
| 218 | + assertThat(filteredSchemas) |
| 219 | + .allMatch(s -> s.startsWith("database_a,") || s.startsWith("database_b,")); |
| 220 | + } |
| 221 | + } |
| 222 | + |
| 223 | + @Test |
| 224 | + public void testGetTables() throws Exception { |
| 225 | + DriverUri driverUri = getDriverUri(); |
| 226 | + try (FlinkConnection connection = new FlinkConnection(driverUri)) { |
| 227 | + Executor executor = connection.getExecutor(); |
| 228 | + executeDDL( |
| 229 | + "CREATE TABLE test_table1 (id INT, name STRING) WITH ('connector' = 'datagen')", |
| 230 | + executor); |
| 231 | + executeDDL( |
| 232 | + "CREATE TABLE test_table2 (id INT, val DOUBLE) WITH ('connector' = 'datagen')", |
| 233 | + executor); |
| 234 | + executeDDL("CREATE VIEW test_view1 AS SELECT id, name FROM test_table1", executor); |
| 235 | + |
| 236 | + DatabaseMetaData metaData = |
| 237 | + new FlinkDatabaseMetaData( |
| 238 | + driverUri.getURL(), connection, new TestingStatement()); |
| 239 | + |
| 240 | + // Get all tables and views |
| 241 | + ResultSet rs = metaData.getTables("default_catalog", "default_database", null, null); |
| 242 | + List<String> allResults = resultSetToListAndClose(rs); |
| 243 | + Set<String> tableNames = new HashSet<>(); |
| 244 | + for (String row : allResults) { |
| 245 | + tableNames.add(row.split(",")[2]); |
| 246 | + } |
| 247 | + assertThat(tableNames).contains("test_table1", "test_table2", "test_view1"); |
| 248 | + |
| 249 | + // Filter by type TABLE only |
| 250 | + rs = |
| 251 | + metaData.getTables( |
| 252 | + "default_catalog", "default_database", null, new String[] {"TABLE"}); |
| 253 | + List<String> tableOnly = resultSetToListAndClose(rs); |
| 254 | + for (String row : tableOnly) { |
| 255 | + assertThat(row).contains("TABLE"); |
| 256 | + } |
| 257 | + Set<String> tableOnlyNames = new HashSet<>(); |
| 258 | + for (String row : tableOnly) { |
| 259 | + tableOnlyNames.add(row.split(",")[2]); |
| 260 | + } |
| 261 | + assertThat(tableOnlyNames).contains("test_table1", "test_table2"); |
| 262 | + assertThat(tableOnlyNames).doesNotContain("test_view1"); |
| 263 | + |
| 264 | + // Filter by name pattern |
| 265 | + rs = metaData.getTables("default_catalog", "default_database", "test_table%", null); |
| 266 | + List<String> patternResults = resultSetToListAndClose(rs); |
| 267 | + for (String row : patternResults) { |
| 268 | + assertThat(row.split(",")[2]).startsWith("test_table"); |
| 269 | + } |
| 270 | + } |
| 271 | + } |
| 272 | + |
| 273 | + @Test |
| 274 | + public void testGetColumns() throws Exception { |
| 275 | + DriverUri driverUri = getDriverUri(); |
| 276 | + try (FlinkConnection connection = new FlinkConnection(driverUri)) { |
| 277 | + Executor executor = connection.getExecutor(); |
| 278 | + executeDDL( |
| 279 | + "CREATE TABLE col_test (id INT NOT NULL, name STRING, score DOUBLE) WITH ('connector' = 'datagen')", |
| 280 | + executor); |
| 281 | + |
| 282 | + DatabaseMetaData metaData = |
| 283 | + new FlinkDatabaseMetaData( |
| 284 | + driverUri.getURL(), connection, new TestingStatement()); |
| 285 | + |
| 286 | + ResultSet rs = |
| 287 | + metaData.getColumns("default_catalog", "default_database", "col_test", null); |
| 288 | + |
| 289 | + // Verify column count and names using COLUMN_NAME (column 4) |
| 290 | + List<String> columnNames = new ArrayList<>(); |
| 291 | + while (rs.next()) { |
| 292 | + columnNames.add(rs.getString("COLUMN_NAME")); |
| 293 | + } |
| 294 | + rs.close(); |
| 295 | + assertThat(columnNames).containsExactly("id", "name", "score"); |
| 296 | + |
| 297 | + // Filter by column name pattern |
| 298 | + rs = metaData.getColumns("default_catalog", "default_database", "col_test", "na%"); |
| 299 | + List<String> filtered = new ArrayList<>(); |
| 300 | + while (rs.next()) { |
| 301 | + filtered.add(rs.getString("COLUMN_NAME")); |
| 302 | + } |
| 303 | + rs.close(); |
| 304 | + assertThat(filtered).containsExactly("name"); |
| 305 | + } |
| 306 | + } |
| 307 | + |
| 308 | + @Test |
| 309 | + public void testGetTableTypes() throws Exception { |
| 310 | + DriverUri driverUri = getDriverUri(); |
| 311 | + try (FlinkConnection connection = new FlinkConnection(driverUri)) { |
| 312 | + DatabaseMetaData metaData = |
| 313 | + new FlinkDatabaseMetaData( |
| 314 | + driverUri.getURL(), connection, new TestingStatement()); |
| 315 | + |
| 316 | + List<String> types = resultSetToListAndClose(metaData.getTableTypes()); |
| 317 | + assertThat(types).containsExactly("TABLE", "VIEW"); |
| 318 | + } |
| 319 | + } |
| 320 | + |
| 321 | + @Test |
| 322 | + public void testGetPrimaryKeys() throws Exception { |
| 323 | + DriverUri driverUri = getDriverUri(); |
| 324 | + try (FlinkConnection connection = new FlinkConnection(driverUri)) { |
| 325 | + Executor executor = connection.getExecutor(); |
| 326 | + executeDDL( |
| 327 | + "CREATE TABLE pk_test (id INT NOT NULL, name STRING, PRIMARY KEY (id) NOT ENFORCED) WITH ('connector' = 'datagen')", |
| 328 | + executor); |
| 329 | + |
| 330 | + DatabaseMetaData metaData = |
| 331 | + new FlinkDatabaseMetaData( |
| 332 | + driverUri.getURL(), connection, new TestingStatement()); |
| 333 | + |
| 334 | + // getPrimaryKeys uses SELECT * LIMIT 0 which may not preserve PK info |
| 335 | + // in the result schema, so we verify at least it does not throw |
| 336 | + ResultSet rs = |
| 337 | + metaData.getPrimaryKeys("default_catalog", "default_database", "pk_test"); |
| 338 | + List<String> pkColumns = new ArrayList<>(); |
| 339 | + while (rs.next()) { |
| 340 | + pkColumns.add(rs.getString("COLUMN_NAME")); |
| 341 | + } |
| 342 | + rs.close(); |
| 343 | + // PK info may or may not be available depending on the catalog implementation |
| 344 | + assertThat(rs.getMetaData().getColumnCount()).isEqualTo(6); |
| 345 | + } |
| 346 | + } |
| 347 | + |
196 | 348 | private List<String> resultSetToListAndClose(ResultSet resultSet) throws Exception { |
197 | 349 | List<String> resultList = new ArrayList<>(); |
198 | 350 | int columnCount = resultSet.getMetaData().getColumnCount(); |
|
0 commit comments