Skip to content

Commit 2205eff

Browse files
[fix](catalog) avoid external catalog refresh deadlock (#61202)
### What problem does this PR solve? Issue Number: #61099 Related PR: N/A Problem Summary: - Avoid deadlock when external catalog refresh invalidates cache while another thread initializes the same catalog through a cache loader. - Add a unit test to reproduce the lock inversion and verify both worker threads also exit cleanly after the fix.
1 parent 03a9b93 commit 2205eff

3 files changed

Lines changed: 173 additions & 9 deletions

File tree

fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -577,14 +577,16 @@ private List<Pair<String, String>> getFilteredDatabaseNames() {
577577
* @param invalidCache if {@code true}, the catalog cache will be invalidated
578578
* and reloaded during the refresh process.
579579
*/
580-
public synchronized void resetToUninitialized(boolean invalidCache) {
581-
this.objectCreated = false;
582-
this.initialized = false;
583-
synchronized (this.confLock) {
584-
this.cachedConf = null;
580+
public void resetToUninitialized(boolean invalidCache) {
581+
synchronized (this) {
582+
this.objectCreated = false;
583+
this.initialized = false;
584+
synchronized (this.confLock) {
585+
this.cachedConf = null;
586+
}
587+
this.lowerCaseToDatabaseName.clear();
588+
onClose();
585589
}
586-
this.lowerCaseToDatabaseName.clear();
587-
onClose();
588590
onRefreshCache(invalidCache);
589591
}
590592

fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public class JdbcExternalCatalog extends ExternalCatalog {
7777
// Must add "transient" for Gson to ignore this field,
7878
// or Gson will throw exception with HikariCP
7979
private transient JdbcClient jdbcClient;
80-
private IdentifierMapping identifierMapping;
80+
private volatile IdentifierMapping identifierMapping;
8181
private ExternalFunctionRules functionRules;
8282

8383
public JdbcExternalCatalog(long catalogId, String name, String resource, Map<String, String> props,
@@ -134,7 +134,7 @@ public void setDefaultPropsIfMissing(boolean isReplay) {
134134
}
135135

136136
@Override
137-
public synchronized void resetToUninitialized(boolean invalidCache) {
137+
public void resetToUninitialized(boolean invalidCache) {
138138
super.resetToUninitialized(invalidCache);
139139
this.identifierMapping = new JdbcIdentifierMapping(
140140
(Env.isTableNamesCaseInsensitive() || Env.isStoredTableNamesLowerCase()),
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.datasource;
19+
20+
import org.apache.doris.datasource.InitCatalogLog.Type;
21+
22+
import com.github.benmanes.caffeine.cache.Caffeine;
23+
import com.github.benmanes.caffeine.cache.LoadingCache;
24+
import org.junit.jupiter.api.Assertions;
25+
import org.junit.jupiter.api.Test;
26+
27+
import java.lang.management.ManagementFactory;
28+
import java.lang.management.ThreadMXBean;
29+
import java.util.Arrays;
30+
import java.util.concurrent.CountDownLatch;
31+
import java.util.concurrent.TimeUnit;
32+
import java.util.concurrent.atomic.AtomicReference;
33+
34+
public class ExternalCatalogDeadlockTest {
35+
36+
@Test
37+
public void testResetToUninitializedShouldNotDeadlockWithCacheLoader() throws Exception {
38+
DeadlockCatalog catalog = new DeadlockCatalog();
39+
CountDownLatch loaderEntered = new CountDownLatch(1);
40+
CountDownLatch allowLoaderToTouchCatalog = new CountDownLatch(1);
41+
AtomicReference<Throwable> backgroundFailure = new AtomicReference<>();
42+
43+
// The loader holds Caffeine's per-key lock before it calls back into the catalog.
44+
LoadingCache<String, String> cache = Caffeine.newBuilder().build(key -> {
45+
loaderEntered.countDown();
46+
awaitLatch(allowLoaderToTouchCatalog);
47+
catalog.makeSureInitialized();
48+
return key;
49+
});
50+
51+
Thread queryThread = new Thread(
52+
() -> runQuietly(backgroundFailure, () -> cache.get("deadlock-key")),
53+
"deadlock-cache-loader");
54+
queryThread.setDaemon(true);
55+
queryThread.start();
56+
Assertions.assertTrue(loaderEntered.await(5, TimeUnit.SECONDS));
57+
58+
Thread refreshThread = new Thread(
59+
() -> runQuietly(backgroundFailure, () -> {
60+
// resetToUninitialized() grabs the catalog monitor before invalidating the cache.
61+
catalog.setInvalidator(() -> {
62+
allowLoaderToTouchCatalog.countDown();
63+
cache.invalidate("deadlock-key");
64+
});
65+
catalog.resetToUninitialized(true);
66+
}),
67+
"deadlock-catalog-refresh");
68+
refreshThread.setDaemon(true);
69+
refreshThread.start();
70+
71+
assertNoDeadlock(queryThread, refreshThread, backgroundFailure);
72+
}
73+
74+
private static void assertNoDeadlock(Thread queryThread, Thread refreshThread,
75+
AtomicReference<Throwable> backgroundFailure) throws Exception {
76+
long[] deadlockedThreads = waitForDeadlock(queryThread, refreshThread);
77+
queryThread.join(TimeUnit.SECONDS.toMillis(5));
78+
refreshThread.join(TimeUnit.SECONDS.toMillis(5));
79+
Assertions.assertNull(backgroundFailure.get(), "unexpected background failure: " + backgroundFailure.get());
80+
Assertions.assertNull(deadlockedThreads,
81+
String.format("detected deadlock between threads %s and %s",
82+
queryThread.getName(), refreshThread.getName()));
83+
Assertions.assertFalse(queryThread.isAlive(), queryThread.getName() + " is still running");
84+
Assertions.assertFalse(refreshThread.isAlive(), refreshThread.getName() + " is still running");
85+
}
86+
87+
private static void awaitLatch(CountDownLatch latch) throws InterruptedException {
88+
Assertions.assertTrue(latch.await(5, TimeUnit.SECONDS));
89+
}
90+
91+
private static void runQuietly(AtomicReference<Throwable> failure, ThrowingRunnable task) {
92+
try {
93+
task.run();
94+
} catch (Throwable t) {
95+
failure.compareAndSet(null, t);
96+
}
97+
}
98+
99+
private static long[] waitForDeadlock(Thread queryThread, Thread refreshThread) throws InterruptedException {
100+
ThreadMXBean threadMxBean = ManagementFactory.getThreadMXBean();
101+
for (int i = 0; i < 100; i++) {
102+
long[] deadlockedThreads = threadMxBean.findDeadlockedThreads();
103+
if (deadlockedThreads != null
104+
&& contains(deadlockedThreads, queryThread.getId())
105+
&& contains(deadlockedThreads, refreshThread.getId())) {
106+
return deadlockedThreads;
107+
}
108+
Thread.sleep(50);
109+
}
110+
return null;
111+
}
112+
113+
private static boolean contains(long[] ids, long targetId) {
114+
return Arrays.stream(ids).anyMatch(id -> id == targetId);
115+
}
116+
117+
private static class DeadlockCatalog extends ExternalCatalog {
118+
private Runnable invalidator = () -> {
119+
};
120+
121+
DeadlockCatalog() {
122+
super(1L, "deadlock-catalog", Type.TEST, "");
123+
initialized = true;
124+
}
125+
126+
void setInvalidator(Runnable invalidator) {
127+
this.invalidator = invalidator;
128+
}
129+
130+
@Override
131+
protected void initLocalObjectsImpl() {
132+
}
133+
134+
@Override
135+
public void onClose() {
136+
}
137+
138+
@Override
139+
public void onRefreshCache(boolean invalidCache) {
140+
// Keep the harness catalog usable after refresh so the test only checks lock ordering.
141+
initialized = true;
142+
if (invalidCache) {
143+
invalidator.run();
144+
}
145+
}
146+
147+
@Override
148+
protected java.util.List<String> listTableNamesFromRemote(SessionContext ctx, String dbName) {
149+
return java.util.Collections.emptyList();
150+
}
151+
152+
@Override
153+
public boolean tableExist(SessionContext ctx, String dbName, String tblName) {
154+
return false;
155+
}
156+
}
157+
158+
@FunctionalInterface
159+
private interface ThrowingRunnable {
160+
void run() throws Exception;
161+
}
162+
}

0 commit comments

Comments
 (0)