Skip to content

Commit 845bdd5

Browse files
authored
[spark] Optimize exception message in checkNamespace method (#6384)
1 parent 77ccc50 commit 845bdd5

10 files changed

Lines changed: 63 additions & 27 deletions

File tree

paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,11 @@ public Catalog paimonCatalog() {
167167
return catalog;
168168
}
169169

170+
@Override
171+
public String paimonCatalogName() {
172+
return catalogName;
173+
}
174+
170175
// ======================= database methods ===============================
171176

172177
@Override
@@ -177,7 +182,7 @@ public String[] defaultNamespace() {
177182
@Override
178183
public void createNamespace(String[] namespace, Map<String, String> metadata)
179184
throws NamespaceAlreadyExistsException {
180-
checkNamespace(namespace);
185+
checkNamespace(namespace, catalogName);
181186
try {
182187
String databaseName = getDatabaseNameFromNamespace(namespace);
183188
catalog.createDatabase(databaseName, false, metadata);
@@ -201,7 +206,7 @@ public String[][] listNamespaces(String[] namespace) throws NoSuchNamespaceExcep
201206
if (namespace.length == 0) {
202207
return listNamespaces();
203208
}
204-
checkNamespace(namespace);
209+
checkNamespace(namespace, catalogName);
205210
try {
206211
String databaseName = getDatabaseNameFromNamespace(namespace);
207212
catalog.getDatabase(databaseName);
@@ -214,7 +219,7 @@ public String[][] listNamespaces(String[] namespace) throws NoSuchNamespaceExcep
214219
@Override
215220
public Map<String, String> loadNamespaceMetadata(String[] namespace)
216221
throws NoSuchNamespaceException {
217-
checkNamespace(namespace);
222+
checkNamespace(namespace, catalogName);
218223
try {
219224
String databaseName = getDatabaseNameFromNamespace(namespace);
220225
return catalog.getDatabase(databaseName).options();
@@ -252,7 +257,7 @@ public boolean dropNamespace(String[] namespace) throws NoSuchNamespaceException
252257
*/
253258
public boolean dropNamespace(String[] namespace, boolean cascade)
254259
throws NoSuchNamespaceException {
255-
checkNamespace(namespace);
260+
checkNamespace(namespace, catalogName);
256261
try {
257262
String databaseName = getDatabaseNameFromNamespace(namespace);
258263
catalog.dropDatabase(databaseName, false, cascade);
@@ -268,7 +273,7 @@ public boolean dropNamespace(String[] namespace, boolean cascade)
268273
@Override
269274
public void alterNamespace(String[] namespace, NamespaceChange... changes)
270275
throws NoSuchNamespaceException {
271-
checkNamespace(namespace);
276+
checkNamespace(namespace, catalogName);
272277
try {
273278
String databaseName = getDatabaseNameFromNamespace(namespace);
274279
List<PropertyChange> propertyChanges =
@@ -283,7 +288,7 @@ public void alterNamespace(String[] namespace, NamespaceChange... changes)
283288

284289
@Override
285290
public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException {
286-
checkNamespace(namespace);
291+
checkNamespace(namespace, catalogName);
287292
try {
288293
String databaseName = getDatabaseNameFromNamespace(namespace);
289294
return catalog.listTables(databaseName).stream()
@@ -296,7 +301,7 @@ public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceExcepti
296301

297302
@Override
298303
public void invalidateTable(Identifier ident) {
299-
catalog.invalidateTable(toIdentifier(ident));
304+
catalog.invalidateTable(toIdentifier(ident, catalogName));
300305
}
301306

302307
@Override
@@ -349,7 +354,7 @@ public org.apache.spark.sql.connector.catalog.Table alterTable(
349354
List<SchemaChange> schemaChanges =
350355
Arrays.stream(changes).map(this::toSchemaChange).collect(Collectors.toList());
351356
try {
352-
catalog.alterTable(toIdentifier(ident), schemaChanges, false);
357+
catalog.alterTable(toIdentifier(ident, catalogName), schemaChanges, false);
353358
return loadTable(ident);
354359
} catch (Catalog.TableNotExistException e) {
355360
throw new NoSuchTableException(ident);
@@ -367,7 +372,9 @@ public org.apache.spark.sql.connector.catalog.Table createTable(
367372
throws TableAlreadyExistsException, NoSuchNamespaceException {
368373
try {
369374
catalog.createTable(
370-
toIdentifier(ident), toInitialSchema(schema, partitions, properties), false);
375+
toIdentifier(ident, catalogName),
376+
toInitialSchema(schema, partitions, properties),
377+
false);
371378
return loadTable(ident);
372379
} catch (Catalog.TableAlreadyExistException e) {
373380
throw new TableAlreadyExistsException(ident);
@@ -381,7 +388,7 @@ public org.apache.spark.sql.connector.catalog.Table createTable(
381388
@Override
382389
public boolean dropTable(Identifier ident) {
383390
try {
384-
catalog.dropTable(toIdentifier(ident), false);
391+
catalog.dropTable(toIdentifier(ident, catalogName), false);
385392
return true;
386393
} catch (Catalog.TableNotExistException e) {
387394
return false;
@@ -524,8 +531,8 @@ public void renameTable(Identifier oldIdent, Identifier newIdent)
524531
throws NoSuchTableException, TableAlreadyExistsException {
525532
try {
526533
catalog.renameTable(
527-
toIdentifier(oldIdent),
528-
toIdentifier(removeCatalogName(newIdent, catalogName)),
534+
toIdentifier(oldIdent, catalogName),
535+
toIdentifier(removeCatalogName(newIdent, catalogName), catalogName),
529536
false);
530537
} catch (Catalog.TableNotExistException e) {
531538
throw new NoSuchTableException(oldIdent);
@@ -566,7 +573,7 @@ public UnboundFunction loadFunction(Identifier ident) throws NoSuchFunctionExcep
566573
}
567574
} else if (isDatabaseFunctionNamespace(namespace)) {
568575
try {
569-
Function paimonFunction = catalog.getFunction(toIdentifier(ident));
576+
Function paimonFunction = catalog.getFunction(toIdentifier(ident, catalogName));
570577
FunctionDefinition functionDefinition =
571578
paimonFunction.definition(FUNCTION_DEFINITION_NAME);
572579
if (functionDefinition instanceof FunctionDefinition.LambdaFunctionDefinition) {
@@ -654,13 +661,17 @@ public void dropV1Function(FunctionIdentifier funcIdent, boolean ifExists) throw
654661
protected org.apache.spark.sql.connector.catalog.Table loadSparkTable(
655662
Identifier ident, Map<String, String> extraOptions) throws NoSuchTableException {
656663
try {
657-
org.apache.paimon.table.Table paimonTable = catalog.getTable(toIdentifier(ident));
664+
org.apache.paimon.table.Table paimonTable =
665+
catalog.getTable(toIdentifier(ident, catalogName));
658666
if (paimonTable instanceof FormatTable) {
659667
return toSparkFormatTable(ident, (FormatTable) paimonTable);
660668
} else {
661669
return new SparkTable(
662670
copyWithSQLConf(
663-
paimonTable, catalogName, toIdentifier(ident), extraOptions));
671+
paimonTable,
672+
catalogName,
673+
toIdentifier(ident, catalogName),
674+
extraOptions));
664675
}
665676
} catch (Catalog.TableNotExistException e) {
666677
throw new NoSuchTableException(ident);

paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,10 @@ public Catalog paimonCatalog() {
9393
return this.sparkCatalog.paimonCatalog();
9494
}
9595

96+
@Override
97+
public String paimonCatalogName() {
98+
return catalogName;
99+
}
96100
// ======================= database methods ===============================
97101

98102
@Override

paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportView.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,15 @@ public interface SupportView extends WithPaimonCatalog {
4141

4242
default List<String> listViews(String[] namespace) throws NoSuchNamespaceException {
4343
try {
44-
checkNamespace(namespace);
44+
checkNamespace(namespace, paimonCatalogName());
4545
return paimonCatalog().listViews(namespace[0]);
4646
} catch (Catalog.DatabaseNotExistException e) {
4747
throw new NoSuchNamespaceException(namespace);
4848
}
4949
}
5050

5151
default View loadView(Identifier ident) throws Catalog.ViewNotExistException {
52-
return paimonCatalog().getView(toIdentifier(ident));
52+
return paimonCatalog().getView(toIdentifier(ident, paimonCatalogName()));
5353
}
5454

5555
default void createView(
@@ -60,7 +60,7 @@ default void createView(
6060
Map<String, String> properties,
6161
Boolean ignoreIfExists)
6262
throws NoSuchNamespaceException {
63-
org.apache.paimon.catalog.Identifier paimonIdent = toIdentifier(ident);
63+
org.apache.paimon.catalog.Identifier paimonIdent = toIdentifier(ident, paimonCatalogName());
6464
try {
6565
paimonCatalog()
6666
.createView(
@@ -82,7 +82,7 @@ default void createView(
8282

8383
default void dropView(Identifier ident, Boolean ignoreIfExists) {
8484
try {
85-
paimonCatalog().dropView(toIdentifier(ident), ignoreIfExists);
85+
paimonCatalog().dropView(toIdentifier(ident, paimonCatalogName()), ignoreIfExists);
8686
} catch (Catalog.ViewNotExistException e) {
8787
throw new RuntimeException("view not exists: " + ident, e);
8888
}

paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/WithPaimonCatalog.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,6 @@
2323
/** With paimon catalog. */
2424
public interface WithPaimonCatalog {
2525
Catalog paimonCatalog();
26+
27+
String paimonCatalogName();
2628
}

paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/AlterFunctionProcedure.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,10 @@ public StructType outputType() {
7777
@Override
7878
public InternalRow[] call(InternalRow args) {
7979
Catalog paimonCatalog = ((WithPaimonCatalog) tableCatalog()).paimonCatalog();
80+
String paimonCatalogName = ((WithPaimonCatalog) tableCatalog()).paimonCatalogName();
8081
org.apache.spark.sql.connector.catalog.Identifier ident =
8182
toIdentifier(args.getString(0), PARAMETERS[0].name());
82-
Identifier function = CatalogUtils.toIdentifier(ident);
83+
Identifier function = CatalogUtils.toIdentifier(ident, paimonCatalogName);
8384
FunctionChange functionChange =
8485
JsonSerdeUtil.fromJson(args.getString(1), FunctionChange.class);
8586
try {

paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/AlterViewDialectProcedure.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,9 +91,10 @@ public StructType outputType() {
9191
@Override
9292
public InternalRow[] call(InternalRow args) {
9393
Catalog paimonCatalog = ((WithPaimonCatalog) tableCatalog()).paimonCatalog();
94+
String paimonCatalogName = ((WithPaimonCatalog) tableCatalog()).paimonCatalogName();
9495
org.apache.spark.sql.connector.catalog.Identifier ident =
9596
toIdentifier(args.getString(0), PARAMETERS[0].name());
96-
Identifier view = CatalogUtils.toIdentifier(ident);
97+
Identifier view = CatalogUtils.toIdentifier(ident, paimonCatalogName);
9798
ViewChange viewChange;
9899
String dialect =
99100
((GenericInternalRow) args).genericGet(2) == null

paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateFunctionProcedure.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,10 @@ public StructType outputType() {
9090
@Override
9191
public InternalRow[] call(InternalRow args) {
9292
Catalog paimonCatalog = ((WithPaimonCatalog) tableCatalog()).paimonCatalog();
93+
String paimonCatalogName = ((WithPaimonCatalog) tableCatalog()).paimonCatalogName();
9394
org.apache.spark.sql.connector.catalog.Identifier ident =
9495
toIdentifier(args.getString(0), PARAMETERS[0].name());
95-
Identifier function = CatalogUtils.toIdentifier(ident);
96+
Identifier function = CatalogUtils.toIdentifier(ident, paimonCatalogName);
9697
List<DataField> inputParams = getDataFieldsFromArguments(1, args);
9798
List<DataField> returnParams = getDataFieldsFromArguments(2, args);
9899
boolean deterministic = args.isNullAt(3) ? true : args.getBoolean(3);

paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DropFunctionProcedure.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,10 @@ public StructType outputType() {
7070
@Override
7171
public InternalRow[] call(InternalRow args) {
7272
Catalog paimonCatalog = ((WithPaimonCatalog) tableCatalog()).paimonCatalog();
73+
String paimonCatalogName = ((WithPaimonCatalog) tableCatalog()).paimonCatalogName();
7374
org.apache.spark.sql.connector.catalog.Identifier ident =
7475
toIdentifier(args.getString(0), PARAMETERS[0].name());
75-
Identifier function = CatalogUtils.toIdentifier(ident);
76+
Identifier function = CatalogUtils.toIdentifier(ident, paimonCatalogName);
7677
try {
7778
paimonCatalog.dropFunction(function, false);
7879
} catch (Exception e) {

paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/CatalogUtils.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,15 +60,18 @@
6060
/** Utils of catalog. */
6161
public class CatalogUtils {
6262

63-
public static void checkNamespace(String[] namespace) {
63+
public static void checkNamespace(String[] namespace, String catalogName) {
6464
checkArgument(
6565
namespace.length == 1,
66-
"Paimon only support single namespace, but got %s",
66+
"Current catalog is %s, catalog %s does not exist or Paimon only support single namespace, but got %s",
67+
catalogName,
68+
namespace.length > 0 ? namespace[0] : "unknown",
6769
Arrays.toString(namespace));
6870
}
6971

70-
public static org.apache.paimon.catalog.Identifier toIdentifier(Identifier ident) {
71-
checkNamespace(ident.namespace());
72+
public static org.apache.paimon.catalog.Identifier toIdentifier(
73+
Identifier ident, String catalogName) {
74+
checkNamespace(ident.namespace(), catalogName);
7275
return new org.apache.paimon.catalog.Identifier(ident.namespace()[0], ident.name());
7376
}
7477

paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -529,6 +529,18 @@ abstract class AnalyzeTableTestBase extends PaimonSparkTestBase {
529529
assert(withColStat == noColStat)
530530
}
531531

532+
test("Query a non-existent catalog") {
533+
assert(intercept[Exception] {
534+
sql("SELECT * FROM paimon1.default.t")
535+
}.getMessage.contains("Current catalog is paimon, catalog paimon1 does not exist"))
536+
}
537+
538+
test("Query a table with multiple namespaces") {
539+
assert(intercept[Exception] {
540+
sql("SELECT * FROM paimon.x.default.t")
541+
}.getMessage.contains("Paimon only support single namespace"))
542+
}
543+
532544
protected def statsFileCount(tableLocation: Path, fileIO: FileIO): Int = {
533545
fileIO.listStatus(new Path(tableLocation, "statistics")).length
534546
}

0 commit comments

Comments
 (0)