Skip to content

Commit ec2a448

Browse files
authored
[AMORO-4190][ams] Graceful handling of catalog initialization failures in DefaultCatalogManager (#4189)
[Improvement] Graceful handling of catalog initialization failures in DefaultCatalogManager - Wrap individual catalog loading in try-catch during initialization so that a single catalog failure is logged and skipped rather than crashing AMS - When a catalog is not present in serverCatalogMap (failed to initialize), fall back to updating DB directly and attempt to rebuild with the new metadata - Add tests verifying both initialization resilience and update recovery Signed-off-by: Jiwon Park <jpark92@outlook.kr>
1 parent dad2c9f commit ec2a448

2 files changed

Lines changed: 174 additions & 10 deletions

File tree

amoro-ams/src/main/java/org/apache/amoro/server/catalog/DefaultCatalogManager.java

Lines changed: 47 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,19 @@ public DefaultCatalogManager(Configurations serverConfiguration) {
8080
listCatalogMetas()
8181
.forEach(
8282
c -> {
83-
ServerCatalog serverCatalog =
84-
CatalogBuilder.buildServerCatalog(c, serverConfiguration);
85-
serverCatalogMap.put(c.getCatalogName(), serverCatalog);
86-
metaCache.put(c.getCatalogName(), Optional.of(c));
87-
LOG.info("Load catalog {}, type:{}", c.getCatalogName(), c.getCatalogType());
83+
try {
84+
ServerCatalog serverCatalog =
85+
CatalogBuilder.buildServerCatalog(c, serverConfiguration);
86+
serverCatalogMap.put(c.getCatalogName(), serverCatalog);
87+
metaCache.put(c.getCatalogName(), Optional.of(c));
88+
LOG.info("Load catalog {}, type:{}", c.getCatalogName(), c.getCatalogType());
89+
} catch (Exception e) {
90+
LOG.error(
91+
"Failed to load catalog {}, type:{}, skipping",
92+
c.getCatalogName(),
93+
c.getCatalogType(),
94+
e);
95+
}
8896
});
8997
LOG.info("DefaultCatalogManager initialized, total catalogs: {}", serverCatalogMap.size());
9098
}
@@ -137,6 +145,11 @@ public void setServerCatalog(ServerCatalog catalog) {
137145
serverCatalogMap.put(catalog.name(), catalog);
138146
}
139147

148+
@VisibleForTesting
149+
public void removeServerCatalog(String catalogName) {
150+
serverCatalogMap.remove(catalogName);
151+
}
152+
140153
@Override
141154
public InternalCatalog getInternalCatalog(String catalogName) {
142155
ServerCatalog serverCatalog = getServerCatalog(catalogName);
@@ -216,12 +229,36 @@ public void dropCatalog(String catalogName) {
216229

217230
@Override
218231
public void updateCatalog(CatalogMeta catalogMeta) {
219-
ServerCatalog catalog = getServerCatalog(catalogMeta.getCatalogName());
220-
validateCatalogUpdate(catalog.getMetadata(), catalogMeta);
232+
String catalogName = catalogMeta.getCatalogName();
233+
ServerCatalog catalog = serverCatalogMap.get(catalogName);
234+
235+
if (catalog != null) {
236+
validateCatalogUpdate(catalog.getMetadata(), catalogMeta);
237+
catalog.updateMetadata(catalogMeta);
238+
} else {
239+
// Catalog failed to load during initialization — update DB directly
240+
// and attempt to rebuild with the new metadata
241+
CatalogMeta oldMeta =
242+
getAs(CatalogMetaMapper.class, mapper -> mapper.getCatalog(catalogName));
243+
if (oldMeta == null) {
244+
throw new ObjectNotExistsException("Catalog " + catalogName);
245+
}
246+
validateCatalogUpdate(oldMeta, catalogMeta);
247+
doAs(CatalogMetaMapper.class, mapper -> mapper.updateCatalog(catalogMeta));
248+
try {
249+
ServerCatalog rebuilt = CatalogBuilder.buildServerCatalog(catalogMeta, serverConfiguration);
250+
serverCatalogMap.put(catalogName, rebuilt);
251+
LOG.info("Catalog {} rebuilt successfully after metadata update", catalogName);
252+
} catch (Exception e) {
253+
LOG.warn(
254+
"Catalog {} metadata updated in DB but failed to initialize: {}",
255+
catalogName,
256+
e.getMessage());
257+
}
258+
}
221259

222-
catalog.updateMetadata(catalogMeta);
223-
metaCache.invalidate(catalogMeta.getCatalogName());
224-
LOG.info("Update catalog metadata: {}", catalogMeta.getCatalogName());
260+
metaCache.invalidate(catalogName);
261+
LOG.info("Updated catalog metadata: {}", catalogName);
225262
}
226263

227264
@Override
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.amoro.server.catalog;
20+
21+
import org.apache.amoro.TableFormat;
22+
import org.apache.amoro.api.CatalogMeta;
23+
import org.apache.amoro.catalog.CatalogTestHelpers;
24+
import org.apache.amoro.config.Configurations;
25+
import org.apache.amoro.properties.CatalogMetaProperties;
26+
import org.apache.amoro.server.AMSManagerTestBase;
27+
import org.apache.amoro.server.persistence.PersistentBase;
28+
import org.apache.amoro.server.persistence.mapper.CatalogMetaMapper;
29+
import org.junit.After;
30+
import org.junit.Assert;
31+
import org.junit.Test;
32+
33+
import java.util.HashMap;
34+
import java.util.Map;
35+
36+
/**
37+
* Tests for {@link DefaultCatalogManager} graceful handling of catalog initialization failures.
38+
* Verifies that:
39+
*
40+
* <ol>
41+
* <li>A single catalog failure does not crash the entire AMS during startup
42+
* <li>Failed catalogs can still be updated via the API to allow operational recovery
43+
* </ol>
44+
*/
45+
public class TestDefaultCatalogManagerRecovery extends AMSManagerTestBase {
46+
47+
private static final String TEST_CATALOG_NAME = "test_failed_rest_catalog";
48+
49+
/** Helper to directly access the database, bypassing CatalogManager and buildServerCatalog. */
50+
static class DirectDbAccess extends PersistentBase {
51+
void insertCatalog(CatalogMeta meta) {
52+
doAs(CatalogMetaMapper.class, mapper -> mapper.insertCatalog(meta));
53+
}
54+
55+
void deleteCatalog(String name) {
56+
doAs(CatalogMetaMapper.class, mapper -> mapper.deleteCatalog(name));
57+
}
58+
}
59+
60+
private static final DirectDbAccess DB = new DirectDbAccess();
61+
62+
private CatalogMeta buildUnreachableRestCatalog() {
63+
Map<String, String> properties = new HashMap<>();
64+
properties.put("uri", "http://localhost:1/nonexistent");
65+
return CatalogTestHelpers.buildCatalogMeta(
66+
TEST_CATALOG_NAME,
67+
CatalogMetaProperties.CATALOG_TYPE_REST,
68+
properties,
69+
TableFormat.ICEBERG);
70+
}
71+
72+
@After
73+
public void cleanup() {
74+
CATALOG_MANAGER.removeServerCatalog(TEST_CATALOG_NAME);
75+
try {
76+
DB.deleteCatalog(TEST_CATALOG_NAME);
77+
} catch (Exception e) {
78+
// ignore
79+
}
80+
}
81+
82+
/**
83+
* Verifies that DefaultCatalogManager initialization does not crash when a catalog in the
84+
* database fails to load (e.g., an unreachable REST endpoint). The failed catalog should be
85+
* skipped and other catalogs should still be available.
86+
*/
87+
@Test
88+
public void testInitializationSkipsFailedCatalog() {
89+
// Insert a REST catalog with an unreachable endpoint directly into DB
90+
DB.insertCatalog(buildUnreachableRestCatalog());
91+
92+
// Creating a new DefaultCatalogManager should NOT throw — the failed catalog is skipped
93+
DefaultCatalogManager manager = new DefaultCatalogManager(new Configurations());
94+
95+
// The manager should be functional — the catalog exists in DB but was skipped during init
96+
Assert.assertTrue(manager.catalogExist(TEST_CATALOG_NAME));
97+
// Verify that the catalog metadata is still readable from DB
98+
Assert.assertNotNull(manager.getCatalogMeta(TEST_CATALOG_NAME));
99+
}
100+
101+
/**
102+
* Simulates a REST catalog that failed to initialize (exists in DB but not in serverCatalogMap),
103+
* then verifies that updateCatalog() can update its metadata without throwing. This is the core
104+
* recovery scenario — operators must be able to fix catalog configuration (e.g., correcting a
105+
* URI) from the dashboard without restarting AMS.
106+
*/
107+
@Test
108+
public void testUpdateCatalogThatFailedToInitialize() {
109+
// Insert directly into DB — simulates the state after init try-catch skips a failed catalog
110+
CatalogMeta meta = buildUnreachableRestCatalog();
111+
DB.insertCatalog(meta);
112+
113+
Assert.assertTrue(CATALOG_MANAGER.catalogExist(TEST_CATALOG_NAME));
114+
115+
// Update the catalog metadata — this must succeed
116+
CatalogMeta updatedMeta = new CatalogMeta(meta);
117+
updatedMeta.getCatalogProperties().put("uri", "http://localhost:2/also-unreachable");
118+
updatedMeta.getCatalogProperties().put("new-property", "new-value");
119+
CATALOG_MANAGER.updateCatalog(updatedMeta);
120+
121+
// Verify the update was persisted
122+
CatalogMeta readMeta = CATALOG_MANAGER.getCatalogMeta(TEST_CATALOG_NAME);
123+
Assert.assertEquals(
124+
"http://localhost:2/also-unreachable", readMeta.getCatalogProperties().get("uri"));
125+
Assert.assertEquals("new-value", readMeta.getCatalogProperties().get("new-property"));
126+
}
127+
}

0 commit comments

Comments
 (0)