Skip to content

Commit 750ed35

Browse files
committed
feat: 修复线程池泄露问题,并增加相应测试。
1 parent b0eceff commit 750ed35

14 files changed

Lines changed: 461 additions & 12 deletions

File tree

trpc-container/trpc-container-default/src/test/java/com/tencent/trpc/container/container/ConsumerProxyTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.tencent.trpc.core.exception.TRpcException;
2323
import com.tencent.trpc.core.rpc.RpcClientContext;
2424
import com.tencent.trpc.core.rpc.TRpcProxy;
25+
import com.tencent.trpc.proto.support.DefResponseFutureManager;
2526
import org.junit.After;
2627
import org.junit.Before;
2728
import org.junit.Test;
@@ -44,6 +45,7 @@ public void after() {
4445
container.stop();
4546
}
4647
ConfigManager.stopTest();
48+
DefResponseFutureManager.reset();
4749
}
4850

4951
@Test

trpc-core/src/main/java/com/tencent/trpc/core/common/ConfigManager.java

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import java.util.Map;
4545
import java.util.Optional;
4646
import java.util.Set;
47+
import java.util.WeakHashMap;
4748
import java.util.concurrent.TimeUnit;
4849

4950
public class ConfigManager {
@@ -55,6 +56,12 @@ public class ConfigManager {
5556
* Container startup listener
5657
*/
5758
private final Set<TRPCRunListener> tRPCRunListeners = Sets.newConcurrentHashSet();
59+
60+
/**
61+
* Shutdown listeners for decoupled cleanup, using WeakHashMap to prevent classloader leaks
62+
*/
63+
private final Map<ShutdownListener, Boolean> shutdownListeners = new WeakHashMap<>();
64+
5865
/**
5966
* Business initialization
6067
*/
@@ -107,6 +114,7 @@ public static void stopTest() {
107114
RpcClusterClientManager.reset();
108115
RpcServerManager.reset();
109116
WorkerPoolManager.reset();
117+
110118
instance.setDefault = false;
111119
}
112120

@@ -251,6 +259,41 @@ public void addTRPCRunListener(TRPCRunListener trpcRunListener) {
251259
tRPCRunListeners.add(trpcRunListener);
252260
}
253261

262+
/**
263+
* Get all shutdown listeners loaded via SPI.
264+
*
265+
* @return set of shutdown listeners
266+
*/
267+
public void registerShutdownListener(ShutdownListener listener) {
268+
if (listener != null) {
269+
shutdownListeners.put(listener, Boolean.TRUE);
270+
}
271+
}
272+
273+
/**
274+
* Unregister a shutdown listener.
275+
*
276+
* @param listener the shutdown listener to unregister
277+
*/
278+
public void unregisterShutdownListener(ShutdownListener listener) {
279+
if (listener != null) {
280+
shutdownListeners.remove(listener);
281+
}
282+
}
283+
284+
/**
285+
* Notify all registered shutdown listeners.
286+
*/
287+
private void notifyShutdownListeners() {
288+
shutdownListeners.keySet().forEach(listener -> {
289+
try {
290+
listener.onShutdown();
291+
} catch (Exception e) {
292+
logger.error("Error executing shutdown listener: {}", listener.getClass().getName(), e);
293+
}
294+
});
295+
}
296+
254297
@Override
255298
public String toString() {
256299
return "ApplicationConfig {globalConfig=" + globalConfig + ", serverConfig=" + serverConfig
@@ -301,10 +344,10 @@ private void warmupSelector(BackendConfig backendConfig) {
301344
try {
302345
String selectorId = nm.getSelectorId();
303346
SelectorManager.getManager().get(selectorId).warmup(backendConfig.toNamingServiceId());
304-
logger.warn("Warm up selector success(selectorId={},naming={}) ", selectorId,
347+
logger.warn("Warm up selector success.(selectorId={},naming={}) ", selectorId,
305348
backendConfig.getNamingOptions().getServiceNaming());
306349
} catch (Exception ex) {
307-
logger.warn("Warm up selector exception(selectorId={}, naming={}) ",
350+
logger.warn("Warm up selector exception.(selectorId={}, naming={}) ",
308351
nm.getSelectorId(), nm.getServiceNaming(), ex);
309352
}
310353
});
@@ -385,6 +428,10 @@ protected void stopInternal() throws Exception {
385428
ExtensionLoader.destroyAllPlugin();
386429
// 8) close client cluster
387430
RpcClusterClientManager.close();
431+
432+
// Notify all shutdown listeners before actual shutdown
433+
notifyShutdownListeners();
434+
388435
logger.info(">>>tRPC Server stopped");
389436
}
390437

@@ -394,4 +441,4 @@ public String toString() {
394441
}
395442
}
396443

397-
}
444+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Tencent is pleased to support the open source community by making tRPC available.
3+
*
4+
* Copyright (C) 2023 Tencent.
5+
* All rights reserved.
6+
*
7+
* If you have downloaded a copy of the tRPC source code from Tencent,
8+
* please note that tRPC source code is licensed under the Apache 2.0 License,
9+
* A copy of the Apache 2.0 License can be found in the LICENSE file.
10+
*/
11+
12+
package com.tencent.trpc.core.common;
13+
14+
/**
15+
* Shutdown listener for components that need to perform cleanup when the container stops.
16+
* This provides a decoupled way for components to register shutdown hooks without
17+
* creating circular dependencies.
18+
*/
19+
public interface ShutdownListener {
20+
21+
/**
22+
* Called when the container is shutting down.
23+
*/
24+
void onShutdown();
25+
26+
}

trpc-core/src/main/java/com/tencent/trpc/core/management/support/MBeanRegistryHelper.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,20 @@ public static void registerMBean(Object object, ObjectName objectName) {
3636
}
3737
}
3838

39+
/**
40+
* Unregister mbean
41+
*
42+
* @param objectName mbean object name {@link ObjectName}
43+
*/
44+
public static void unregisterMBean(ObjectName objectName) {
45+
try {
46+
MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
47+
if (mBeanServer.isRegistered(objectName)) {
48+
mBeanServer.unregisterMBean(objectName);
49+
}
50+
} catch (Exception e) {
51+
logger.warn("unregister mbean exception: ", e);
52+
}
53+
}
54+
3955
}

trpc-core/src/main/java/com/tencent/trpc/core/rpc/def/DefTimeoutManager.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public DefTimeoutManager(int tickms) {
3434
public Future<?> watch(final Runnable task, long timeout) {
3535
FutureAdapter<?> adapter = new FutureAdapter(task);
3636
adapter.wrap = timer.newTimeout(adapter, timeout, TimeUnit.MILLISECONDS);
37+
3738
return adapter;
3839
}
3940

trpc-core/src/main/java/com/tencent/trpc/core/worker/support/thread/ForkJoinWorkerPool.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,11 @@ public void close(long timeoutMills) {
126126
shutdownGraceful(timeoutMills);
127127
}
128128
}
129+
130+
// 注销MBean注册
131+
if (this.forkJoinPoolMXBean != null) {
132+
MBeanRegistryHelper.unregisterMBean(this.forkJoinPoolMXBean.getObjectName());
133+
}
129134
}
130135

131136
private void shutdownGraceful(long timeoutMills) {

trpc-core/src/main/java/com/tencent/trpc/core/worker/support/thread/ThreadWorkerPool.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,10 @@ public void close(long timeoutMills) {
149149
shutdownGraceful(timeoutMills);
150150
}
151151
}
152+
// 注销MBean注册
153+
if (this.threadPoolMXBean != null) {
154+
MBeanRegistryHelper.unregisterMBean(this.threadPoolMXBean.getObjectName());
155+
}
152156
}
153157

154158
@Override

0 commit comments

Comments
 (0)