Skip to content

Commit 656b7ab

Browse files
committed
Close rpc channel if MilvusClient fails to initialize
Signed-off-by: yhmo <yihua.mo@zilliz.com>
1 parent cbf0629 commit 656b7ab

File tree

2 files changed

+35
-16
lines changed

2 files changed

+35
-16
lines changed

sdk-core/src/main/java/io/milvus/client/MilvusServiceClient.java

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -184,17 +184,27 @@ public void start(ClientCall.Listener<RespT> responseListener, Metadata headers)
184184
}
185185

186186
assert channel != null;
187-
blockingStub = MilvusServiceGrpc.newBlockingStub(channel);
188-
futureStub = MilvusServiceGrpc.newFutureStub(channel);
189-
190-
// calls a RPC Connect() to the remote server, and sends the client info to the server
191-
// so that the server knows which client is interacting, especially for accesses log.
192-
this.timeoutMs = connectParam.getConnectTimeoutMs(); // set this value to connectTimeoutMs to control the retry()
193-
R<ConnectResponse> resp = this.retry(() -> connect(connectParam));
194-
if (resp.getStatus() != R.Status.Success.getCode()) {
195-
String msg = "Failed to initialize connection. Error: " + resp.getMessage();
196-
logError(msg);
197-
throw new RuntimeException(msg);
187+
188+
try {
189+
blockingStub = MilvusServiceGrpc.newBlockingStub(channel);
190+
futureStub = MilvusServiceGrpc.newFutureStub(channel);
191+
192+
// calls a RPC Connect() to the remote server, and sends the client info to the server
193+
// so that the server knows which client is interacting, especially for accesses log.
194+
this.timeoutMs = connectParam.getConnectTimeoutMs(); // set this value to connectTimeoutMs to control the retry()
195+
R<ConnectResponse> resp = this.retry(() -> connect(connectParam));
196+
if (resp.getStatus() != R.Status.Success.getCode()) {
197+
String msg = "Failed to initialize connection. Error: " + resp.getMessage();
198+
logError(msg);
199+
throw new RuntimeException(msg);
200+
}
201+
} catch (Exception e) {
202+
// close the channel if connect() throws exception, avoid leakage
203+
try {
204+
close(3);
205+
} catch (InterruptedException ignored) {
206+
}
207+
throw e;
198208
}
199209
this.timeoutMs = 0; // reset the timeout value to default
200210
}

sdk-core/src/main/java/io/milvus/v2/client/MilvusClientV2.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -140,12 +140,21 @@ private void connect(ConnectConfig connectConfig) {
140140
}
141141
channel = clientUtils.getChannel(connectConfig);
142142

143-
blockingStub = MilvusServiceGrpc.newBlockingStub(channel).withWaitForReady();
144-
connect(connectConfig, blockingStub);
143+
try {
144+
blockingStub = MilvusServiceGrpc.newBlockingStub(channel).withWaitForReady();
145+
connect(connectConfig, blockingStub);
145146

146-
if (connectConfig.getDbName() != null) {
147-
// check if database exists
148-
clientUtils.checkDatabaseExist(this.blockingStub, connectConfig.getDbName());
147+
if (connectConfig.getDbName() != null) {
148+
// check if database exists
149+
clientUtils.checkDatabaseExist(this.blockingStub, connectConfig.getDbName());
150+
}
151+
} catch (Exception e) {
152+
// close the channel if connect() and checkDatabaseExist() throws exception, avoid leakage
153+
try {
154+
close(3);
155+
} catch (InterruptedException ignored) {
156+
}
157+
throw e;
149158
}
150159
}
151160

0 commit comments

Comments
 (0)