Skip to content

Commit b2c4a84

Browse files
authored
[server]: prevent delete default database (#3312)
1 parent 3d6abab commit b2c4a84

2 files changed

Lines changed: 22 additions & 12 deletions

File tree

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -196,19 +196,23 @@ void beforeEach() throws Exception {
196196
mockLakeCatalog);
197197
catalog.open();
198198

199-
// First check if database exists, and drop it if it does
199+
// Clean up any leftover tables from previous tests
200200
if (catalog.databaseExists(DEFAULT_DB)) {
201-
catalog.dropDatabase(DEFAULT_DB, true, true);
202-
}
203-
try {
204-
catalog.createDatabase(
205-
DEFAULT_DB, new CatalogDatabaseImpl(Collections.emptyMap(), null), true);
206-
} catch (CatalogException e) {
207-
// the auto partitioned manager may create the db zk node
208-
// in another thread, so if exception is NodeExistsException, just ignore
209-
if (!ExceptionUtils.findThrowableWithMessage(e, "KeeperException$NodeExistsException")
210-
.isPresent()) {
211-
throw e;
201+
for (String table : catalog.listTables(DEFAULT_DB)) {
202+
catalog.dropTable(new ObjectPath(DEFAULT_DB, table), true);
203+
}
204+
} else {
205+
try {
206+
catalog.createDatabase(
207+
DEFAULT_DB, new CatalogDatabaseImpl(Collections.emptyMap(), null), true);
208+
} catch (CatalogException e) {
209+
// the auto partitioned manager may create the db zk node
210+
// in another thread, so if exception is NodeExistsException, just ignore
211+
if (!ExceptionUtils.findThrowableWithMessage(
212+
e, "KeeperException$NodeExistsException")
213+
.isPresent()) {
214+
throw e;
215+
}
212216
}
213217
}
214218
}

fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,12 @@ public Map<String, PartitionRegistration> listPartitions(
314314

315315
public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
316316
throws DatabaseNotExistException, DatabaseNotEmptyException {
317+
if (CoordinatorServer.DEFAULT_DATABASE.equals(name)) {
318+
throw new UnsupportedOperationException(
319+
"Cannot drop the default database '"
320+
+ name
321+
+ "'. The default database is required for cluster operation.");
322+
}
317323
if (!databaseExists(name)) {
318324
if (ignoreIfNotExists) {
319325
return;

0 commit comments

Comments
 (0)