Skip to content

Commit 1e945dd

Browse files
authored
Gracefully manage async client threads (#17683)
1 parent 9330c7f commit 1e945dd

5 files changed

Lines changed: 73 additions & 6 deletions

File tree

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.iotdb.ainode.rpc.thrift.TAIHeartbeatReq;
2323
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
24+
import org.apache.iotdb.commons.client.ClientManager;
2425
import org.apache.iotdb.commons.client.ClientPoolFactory;
2526
import org.apache.iotdb.commons.client.IClientManager;
2627
import org.apache.iotdb.commons.client.async.AsyncAINodeInternalServiceClient;
@@ -47,10 +48,22 @@ private AsyncAINodeHeartbeatClientPool() {
4748
*/
4849
public void getAINodeHeartBeat(
4950
TEndPoint endPoint, TAIHeartbeatReq req, AINodeHeartbeatHandler handler) {
51+
AsyncAINodeInternalServiceClient client = null;
52+
boolean dispatched = false;
5053
try {
51-
clientManager.borrowClient(endPoint).getAIHeartbeat(req, handler);
54+
client = clientManager.borrowClient(endPoint);
55+
client.getAIHeartbeat(req, handler);
56+
dispatched = true;
5257
} catch (Exception ignore) {
5358
// Just ignore
59+
} finally {
60+
// After the async call is dispatched, the client's onComplete/onError callback is
61+
// responsible for returning the client. If the RPC was not dispatched (exception
62+
// before/during the call), the client must be returned here to prevent pool leakage.
63+
if (!dispatched && client != null && clientManager instanceof ClientManager) {
64+
((ClientManager<TEndPoint, AsyncAINodeInternalServiceClient>) clientManager)
65+
.returnClient(endPoint, client);
66+
}
5467
}
5568
}
5669

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.iotdb.confignode.client.async;
2121

2222
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
23+
import org.apache.iotdb.commons.client.ClientManager;
2324
import org.apache.iotdb.commons.client.ClientPoolFactory;
2425
import org.apache.iotdb.commons.client.IClientManager;
2526
import org.apache.iotdb.commons.client.async.AsyncConfigNodeInternalServiceClient;
@@ -48,10 +49,22 @@ public void getConfigNodeHeartBeat(
4849
TEndPoint endPoint,
4950
TConfigNodeHeartbeatReq heartbeatReq,
5051
ConfigNodeHeartbeatHandler handler) {
52+
AsyncConfigNodeInternalServiceClient client = null;
53+
boolean dispatched = false;
5154
try {
52-
clientManager.borrowClient(endPoint).getConfigNodeHeartBeat(heartbeatReq, handler);
55+
client = clientManager.borrowClient(endPoint);
56+
client.getConfigNodeHeartBeat(heartbeatReq, handler);
57+
dispatched = true;
5358
} catch (Exception ignore) {
5459
// Just ignore
60+
} finally {
61+
// After the async call is dispatched, the client's onComplete/onError callback is
62+
// responsible for returning the client. If the RPC was not dispatched (exception
63+
// before/during the call), the client must be returned here to prevent pool leakage.
64+
if (!dispatched && client != null && clientManager instanceof ClientManager) {
65+
((ClientManager<TEndPoint, AsyncConfigNodeInternalServiceClient>) clientManager)
66+
.returnClient(endPoint, client);
67+
}
5568
}
5669
}
5770

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.iotdb.confignode.client.async;
2121

2222
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
23+
import org.apache.iotdb.commons.client.ClientManager;
2324
import org.apache.iotdb.commons.client.ClientPoolFactory;
2425
import org.apache.iotdb.commons.client.IClientManager;
2526
import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
@@ -49,19 +50,42 @@ private AsyncDataNodeHeartbeatClientPool() {
4950
*/
5051
public void getDataNodeHeartBeat(
5152
TEndPoint endPoint, TDataNodeHeartbeatReq req, DataNodeHeartbeatHandler handler) {
53+
AsyncDataNodeInternalServiceClient client = null;
54+
boolean dispatched = false;
5255
try {
53-
clientManager.borrowClient(endPoint).getDataNodeHeartBeat(req, handler);
56+
client = clientManager.borrowClient(endPoint);
57+
client.getDataNodeHeartBeat(req, handler);
58+
dispatched = true;
5459
} catch (Exception ignore) {
5560
// Just ignore
61+
} finally {
62+
returnClientIfNotDispatched(endPoint, client, dispatched);
5663
}
5764
}
5865

5966
public void writeAuditLog(
6067
TEndPoint endPoint, TAuditLogReq req, DataNodeWriteAuditLogHandler handler) {
68+
AsyncDataNodeInternalServiceClient client = null;
69+
boolean dispatched = false;
6170
try {
62-
clientManager.borrowClient(endPoint).writeAuditLog(req, handler);
71+
client = clientManager.borrowClient(endPoint);
72+
client.writeAuditLog(req, handler);
73+
dispatched = true;
6374
} catch (Exception e) {
6475
// Just ignore
76+
} finally {
77+
returnClientIfNotDispatched(endPoint, client, dispatched);
78+
}
79+
}
80+
81+
// After the async call is dispatched, the client's onComplete/onError callback is responsible
82+
// for returning the client. If the RPC was not dispatched (exception before/during the call),
83+
// the client must be returned here to prevent pool leakage.
84+
private void returnClientIfNotDispatched(
85+
TEndPoint endPoint, AsyncDataNodeInternalServiceClient client, boolean dispatched) {
86+
if (!dispatched && client != null && clientManager instanceof ClientManager) {
87+
((ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>) clientManager)
88+
.returnClient(endPoint, client);
6589
}
6690
}
6791

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,11 @@ public TSStatus removeConfigNode(
138138
while (status.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
139139
TimeUnit.MILLISECONDS.sleep(2000);
140140
updateConfigNodeLeader(status);
141+
if (configNodeLeader == null) {
142+
LOGGER.warn(
143+
"Redirection recommended for removeConfigNode but no leader endpoint provided, abort retry.");
144+
break;
145+
}
141146
try (SyncConfigNodeIServiceClient clientLeader =
142147
clientManager.borrowClient(configNodeLeader)) {
143148
status = clientLeader.removeConfigNode(configNodeLocation);

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.iotdb.commons.client.request;
2121

2222
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
23+
import org.apache.iotdb.commons.client.ClientManager;
2324
import org.apache.iotdb.commons.client.IClientManager;
2425
import org.apache.iotdb.commons.client.exception.ClientManagerException;
2526
import org.apache.iotdb.commons.i18n.ClientMessages;
@@ -178,27 +179,38 @@ protected void sendAsyncRequest(
178179
int requestId,
179180
NodeLocation targetNode,
180181
int retryCount) {
182+
final TEndPoint endPoint = nodeLocationToEndPoint(targetNode);
183+
Client client = null;
184+
boolean dispatched = false;
181185
try {
182186
if (!actionMap.containsKey(requestContext.getRequestType())) {
183187
throw new UnsupportedOperationException(
184188
"unsupported request type "
185189
+ requestContext.getRequestType()
186190
+ ", please set it in AsyncRequestManager::initActionMapBuilder()");
187191
}
188-
Client client = clientManager.borrowClient(nodeLocationToEndPoint(targetNode));
192+
client = clientManager.borrowClient(endPoint);
189193
adjustClientTimeoutIfNecessary(requestContext.getRequestType(), client);
190194
Object req = requestContext.getRequest(requestId);
191195
AsyncRequestRPCHandler<?, RequestType, NodeLocation> handler =
192196
buildHandler(requestContext, requestId, targetNode);
193197
Objects.requireNonNull(actionMap.get(requestContext.getRequestType()))
194198
.accept(req, client, handler);
199+
// After accept() returns, the async callback (onComplete/onError) takes over the
200+
// responsibility of returning the client to the pool. Before this point, if any exception
201+
// is thrown, the client must be returned/invalidated here to prevent pool leakage.
202+
dispatched = true;
195203
} catch (Exception e) {
196204
LOGGER.warn(
197205
ClientMessages.ASYNC_REQUEST_FAILED_ON_NODE,
198206
requestContext.getRequestType(),
199-
nodeLocationToEndPoint(targetNode),
207+
endPoint,
200208
e.getMessage(),
201209
retryCount);
210+
} finally {
211+
if (!dispatched && client != null && clientManager instanceof ClientManager) {
212+
((ClientManager<TEndPoint, Client>) clientManager).returnClient(endPoint, client);
213+
}
202214
}
203215
}
204216

0 commit comments

Comments
 (0)