Skip to content

Commit d3bdd8b

Browse files
navis94claude
andcommitted
[FLINK-39341][jdbc-driver] Implement DatabaseMetaData methods for DBeaver compatibility
Implement getTables, getColumns, getSchemas, getPrimaryKeys, getTableTypes in FlinkDatabaseMetaData to enable DBeaver and other JDBC tools to browse Flink catalogs. Also add ROW and ARRAY type support in FlinkResultSet. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 73d71d9 commit d3bdd8b

3 files changed

Lines changed: 479 additions & 45 deletions

File tree

flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkDatabaseMetaData.java

Lines changed: 284 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,13 @@
1919
package org.apache.flink.table.jdbc;
2020

2121
import org.apache.flink.annotation.VisibleForTesting;
22+
import org.apache.flink.table.catalog.Column;
23+
import org.apache.flink.table.catalog.ResolvedSchema;
2224
import org.apache.flink.table.client.gateway.Executor;
2325
import org.apache.flink.table.client.gateway.StatementResult;
26+
import org.apache.flink.table.data.GenericRowData;
27+
import org.apache.flink.table.data.RowData;
28+
import org.apache.flink.table.data.StringData;
2429

2530
import javax.annotation.Nullable;
2631

@@ -29,12 +34,21 @@
2934
import java.sql.SQLException;
3035
import java.sql.Statement;
3136
import java.util.ArrayList;
37+
import java.util.Arrays;
38+
import java.util.Collections;
3239
import java.util.HashMap;
40+
import java.util.HashSet;
3341
import java.util.List;
3442
import java.util.Map;
43+
import java.util.Set;
44+
import java.util.regex.Pattern;
3545

3646
import static org.apache.flink.table.jdbc.utils.DatabaseMetaDataUtils.createCatalogsResultSet;
47+
import static org.apache.flink.table.jdbc.utils.DatabaseMetaDataUtils.createColumnsResultSet;
48+
import static org.apache.flink.table.jdbc.utils.DatabaseMetaDataUtils.createPrimaryKeysResultSet;
3749
import static org.apache.flink.table.jdbc.utils.DatabaseMetaDataUtils.createSchemasResultSet;
50+
import static org.apache.flink.table.jdbc.utils.DatabaseMetaDataUtils.createTableTypesResultSet;
51+
import static org.apache.flink.table.jdbc.utils.DatabaseMetaDataUtils.createTablesResultSet;
3852

3953
/** Implementation of {@link java.sql.DatabaseMetaData} for flink jdbc driver. */
4054
public class FlinkDatabaseMetaData extends BaseDatabaseMetaData {
@@ -66,57 +80,25 @@ private StatementResult catalogs() {
6680

6781
@Override
6882
public ResultSet getSchemas() throws SQLException {
83+
return getSchemas(null, null);
84+
}
85+
86+
@Override
87+
public ResultSet getSchemas(String catalog, String schemaPattern) throws SQLException {
6988
try {
70-
String currentCatalog = connection.getCatalog();
71-
String currentDatabase = connection.getSchema();
72-
List<String> catalogList = new ArrayList<>();
89+
List<String> catalogList = getCatalogList(catalog);
90+
7391
Map<String, List<String>> catalogSchemaList = new HashMap<>();
74-
try (StatementResult result = catalogs()) {
75-
while (result.hasNext()) {
76-
String catalog = result.next().getString(0).toString();
77-
connection.setCatalog(catalog);
78-
getSchemasForCatalog(catalogList, catalogSchemaList, catalog, null);
79-
}
92+
for (String cat : catalogList) {
93+
catalogSchemaList.put(cat, getSchemaList(cat, schemaPattern));
8094
}
81-
connection.setCatalog(currentCatalog);
82-
connection.setSchema(currentDatabase);
8395

8496
return createSchemasResultSet(statement, catalogList, catalogSchemaList);
8597
} catch (Exception e) {
8698
throw new SQLException("Get schemas fail", e);
8799
}
88100
}
89101

90-
private void getSchemasForCatalog(
91-
List<String> catalogList,
92-
Map<String, List<String>> catalogSchemaList,
93-
String catalog,
94-
@Nullable String schemaPattern)
95-
throws SQLException {
96-
catalogList.add(catalog);
97-
List<String> schemas = new ArrayList<>();
98-
try (StatementResult schemaResult = schemas()) {
99-
while (schemaResult.hasNext()) {
100-
String schema = schemaResult.next().getString(0).toString();
101-
if (schemaPattern == null || schema.contains(schemaPattern)) {
102-
schemas.add(schema);
103-
}
104-
}
105-
}
106-
catalogSchemaList.put(catalog, schemas);
107-
}
108-
109-
private StatementResult schemas() {
110-
return executor.executeStatement("SHOW DATABASES;");
111-
}
112-
113-
// TODO Flink will support SHOW DATABASES LIKE statement in FLIP-297, this method will be
114-
// supported after that issue.
115-
@Override
116-
public ResultSet getSchemas(String catalog, String schemaPattern) throws SQLException {
117-
throw new UnsupportedOperationException();
118-
}
119-
120102
@Override
121103
public boolean supportsStoredFunctionsUsingCallSyntax() throws SQLException {
122104
return false;
@@ -136,25 +118,282 @@ public boolean generatedKeyAlwaysReturned() throws SQLException {
136118
public ResultSet getTables(
137119
String catalog, String schemaPattern, String tableNamePattern, String[] types)
138120
throws SQLException {
139-
throw new UnsupportedOperationException();
121+
try {
122+
Set<String> typeFilter = null;
123+
if (types != null) {
124+
typeFilter = new HashSet<>(Arrays.asList(types));
125+
}
126+
127+
List<String> catalogList = getCatalogList(catalog);
128+
129+
List<RowData> tableRows = new ArrayList<>();
130+
for (String cat : catalogList) {
131+
List<String> schemaList = getSchemaList(cat, schemaPattern);
132+
133+
for (String schema : schemaList) {
134+
String qualifiedPath = String.format("`%s`.`%s`", cat, schema);
135+
collectTables(
136+
tableRows,
137+
cat,
138+
schema,
139+
tableNamePattern,
140+
"TABLE",
141+
typeFilter,
142+
qualifiedPath);
143+
collectTables(
144+
tableRows,
145+
cat,
146+
schema,
147+
tableNamePattern,
148+
"VIEW",
149+
typeFilter,
150+
qualifiedPath);
151+
}
152+
}
153+
154+
return createTablesResultSet(statement, tableRows);
155+
} catch (Exception e) {
156+
throw new SQLException("Get tables fail", e);
157+
}
158+
}
159+
160+
private void collectTables(
161+
List<RowData> tableRows,
162+
String catalog,
163+
String schema,
164+
@Nullable String tableNamePattern,
165+
String tableType,
166+
@Nullable Set<String> typeFilter,
167+
String qualifiedPath) {
168+
if (typeFilter != null && !typeFilter.contains(tableType)) {
169+
return;
170+
}
171+
String command = "VIEW".equals(tableType) ? "SHOW VIEWS" : "SHOW TABLES";
172+
String sql = String.format("%s FROM %s", command, qualifiedPath);
173+
try (StatementResult result = executor.executeStatement(sql)) {
174+
while (result.hasNext()) {
175+
String tableName = result.next().getString(0).toString();
176+
if (matchesPattern(tableName, tableNamePattern)) {
177+
tableRows.add(
178+
GenericRowData.of(
179+
StringData.fromString(catalog),
180+
StringData.fromString(schema),
181+
StringData.fromString(tableName),
182+
StringData.fromString(tableType),
183+
null,
184+
null,
185+
null,
186+
null,
187+
null,
188+
null));
189+
}
190+
}
191+
}
192+
}
193+
194+
private static boolean matchesPattern(String value, @Nullable String pattern) {
195+
if (pattern == null) {
196+
return true;
197+
}
198+
String regex = Pattern.quote(pattern).replace("%", "\\E.*\\Q").replace("_", "\\E.\\Q");
199+
return value.matches(regex);
140200
}
141201

142202
@Override
143203
public ResultSet getColumns(
144204
String catalog, String schemaPattern, String tableNamePattern, String columnNamePattern)
145205
throws SQLException {
146-
throw new UnsupportedOperationException();
206+
try {
207+
List<RowData> columnRows = new ArrayList<>();
208+
for (String cat : getCatalogList(catalog)) {
209+
for (String schema : getSchemaList(cat, schemaPattern)) {
210+
for (String table : getTableList(cat, schema, tableNamePattern)) {
211+
columnRows.addAll(collectColumns(cat, schema, table, columnNamePattern));
212+
}
213+
}
214+
}
215+
return createColumnsResultSet(statement, columnRows);
216+
} catch (Exception e) {
217+
throw new SQLException("Get columns fail", e);
218+
}
219+
}
220+
221+
private List<RowData> collectColumns(
222+
String catalog, String schema, String table, @Nullable String columnNamePattern) {
223+
List<RowData> columnRows = new ArrayList<>();
224+
String qualifiedTable = String.format("`%s`.`%s`.`%s`", catalog, schema, table);
225+
try (StatementResult result =
226+
executor.executeStatement(
227+
String.format("SELECT * FROM %s LIMIT 0", qualifiedTable))) {
228+
List<Column> columns = result.getResultSchema().getColumns();
229+
int ordinal = 1;
230+
for (Column col : columns) {
231+
if (matchesPattern(col.getName(), columnNamePattern)) {
232+
columnRows.add(toColumnRow(catalog, schema, table, col, ordinal));
233+
ordinal++;
234+
}
235+
}
236+
} catch (Exception e) {
237+
// skip tables that cannot be described
238+
}
239+
return columnRows;
240+
}
241+
242+
private static RowData toColumnRow(
243+
String catalog, String schema, String table, Column col, int ordinal) {
244+
String typeName = col.getDataType().getLogicalType().asSummaryString();
245+
boolean nullable = col.getDataType().getLogicalType().isNullable();
246+
return GenericRowData.of(
247+
StringData.fromString(catalog),
248+
StringData.fromString(schema),
249+
StringData.fromString(table),
250+
StringData.fromString(col.getName()),
251+
getJdbcType(typeName),
252+
StringData.fromString(typeName),
253+
null,
254+
null,
255+
null,
256+
null,
257+
nullable
258+
? java.sql.DatabaseMetaData.columnNullable
259+
: java.sql.DatabaseMetaData.columnNoNulls,
260+
null,
261+
null,
262+
null,
263+
null,
264+
null,
265+
ordinal,
266+
StringData.fromString(nullable ? "YES" : "NO"),
267+
null,
268+
null,
269+
null,
270+
null,
271+
StringData.fromString("NO"));
272+
}
273+
274+
private static int getJdbcType(String flinkType) {
275+
String upper = flinkType.toUpperCase();
276+
if (upper.startsWith("BOOLEAN")) {
277+
return java.sql.Types.BOOLEAN;
278+
} else if (upper.startsWith("TINYINT")) {
279+
return java.sql.Types.TINYINT;
280+
} else if (upper.startsWith("SMALLINT")) {
281+
return java.sql.Types.SMALLINT;
282+
} else if (upper.startsWith("INT")) {
283+
return java.sql.Types.INTEGER;
284+
} else if (upper.startsWith("BIGINT")) {
285+
return java.sql.Types.BIGINT;
286+
} else if (upper.startsWith("FLOAT")) {
287+
return java.sql.Types.FLOAT;
288+
} else if (upper.startsWith("DOUBLE")) {
289+
return java.sql.Types.DOUBLE;
290+
} else if (upper.startsWith("DECIMAL") || upper.startsWith("NUMERIC")) {
291+
return java.sql.Types.DECIMAL;
292+
} else if (upper.startsWith("VARCHAR") || upper.startsWith("STRING")) {
293+
return java.sql.Types.VARCHAR;
294+
} else if (upper.startsWith("CHAR")) {
295+
return java.sql.Types.CHAR;
296+
} else if (upper.startsWith("DATE")) {
297+
return java.sql.Types.DATE;
298+
} else if (upper.startsWith("TIMESTAMP")) {
299+
return java.sql.Types.TIMESTAMP;
300+
} else if (upper.startsWith("TIME")) {
301+
return java.sql.Types.TIME;
302+
} else if (upper.startsWith("BINARY")
303+
|| upper.startsWith("VARBINARY")
304+
|| upper.startsWith("BYTES")) {
305+
return java.sql.Types.BINARY;
306+
} else if (upper.startsWith("ARRAY")) {
307+
return java.sql.Types.ARRAY;
308+
} else if (upper.startsWith("MAP") || upper.startsWith("ROW")) {
309+
return java.sql.Types.STRUCT;
310+
}
311+
return java.sql.Types.OTHER;
147312
}
148313

149314
@Override
150315
public ResultSet getPrimaryKeys(String catalog, String schema, String table)
151316
throws SQLException {
152-
throw new UnsupportedOperationException();
317+
// tableSchema.getPrimaryKey() is not working
318+
try {
319+
List<RowData> pkRows = new ArrayList<>();
320+
String qualifiedTable = String.format("`%s`.`%s`.`%s`", catalog, schema, table);
321+
try (StatementResult result =
322+
executor.executeStatement(
323+
String.format("SELECT * FROM %s LIMIT 0", qualifiedTable))) {
324+
ResolvedSchema tableSchema = result.getResultSchema();
325+
tableSchema
326+
.getPrimaryKey()
327+
.ifPresent(
328+
pk -> {
329+
List<String> columns = pk.getColumns();
330+
for (int i = 0; i < columns.size(); i++) {
331+
pkRows.add(
332+
GenericRowData.of(
333+
StringData.fromString(catalog),
334+
StringData.fromString(schema),
335+
StringData.fromString(table),
336+
StringData.fromString(columns.get(i)),
337+
i + 1,
338+
StringData.fromString(pk.getName())));
339+
}
340+
});
341+
}
342+
return createPrimaryKeysResultSet(statement, pkRows);
343+
} catch (Exception e) {
344+
throw new SQLException("Get primary keys fail", e);
345+
}
153346
}
154347

155348
@Override
156349
public ResultSet getTableTypes() throws SQLException {
157-
throw new UnsupportedOperationException();
350+
List<RowData> typeRows = new ArrayList<>();
351+
typeRows.add(GenericRowData.of(StringData.fromString("TABLE")));
352+
typeRows.add(GenericRowData.of(StringData.fromString("VIEW")));
353+
return createTableTypesResultSet(statement, typeRows);
354+
}
355+
356+
private List<String> getCatalogList(@Nullable String catalog) {
357+
if (catalog != null) {
358+
return Collections.singletonList(catalog);
359+
}
360+
List<String> catalogList = new ArrayList<>();
361+
try (StatementResult result = catalogs()) {
362+
while (result.hasNext()) {
363+
catalogList.add(result.next().getString(0).toString());
364+
}
365+
}
366+
return catalogList;
367+
}
368+
369+
private List<String> getSchemaList(String catalog, @Nullable String schemaPattern) {
370+
List<String> schemas = new ArrayList<>();
371+
try (StatementResult result =
372+
executor.executeStatement(String.format("SHOW DATABASES IN `%s`", catalog))) {
373+
while (result.hasNext()) {
374+
String schema = result.next().getString(0).toString();
375+
if (matchesPattern(schema, schemaPattern)) {
376+
schemas.add(schema);
377+
}
378+
}
379+
}
380+
return schemas;
381+
}
382+
383+
private List<String> getTableList(
384+
String catalog, String schema, @Nullable String tableNamePattern) {
385+
List<String> tables = new ArrayList<>();
386+
String qualifiedPath = String.format("`%s`.`%s`", catalog, schema);
387+
try (StatementResult result =
388+
executor.executeStatement(String.format("SHOW TABLES FROM %s", qualifiedPath))) {
389+
while (result.hasNext()) {
390+
String tableName = result.next().getString(0).toString();
391+
if (matchesPattern(tableName, tableNamePattern)) {
392+
tables.add(tableName);
393+
}
394+
}
395+
}
396+
return tables;
158397
}
159398

160399
@Override

0 commit comments

Comments
 (0)