Skip to content

Commit e588706

Browse files
committed
Fix a bug of ClientPool that rpc channel is not properly closed
Signed-off-by: yhmo <yihua.mo@zilliz.com>
1 parent b8f2b41 commit e588706

File tree

2 files changed

+60
-39
lines changed

2 files changed

+60
-39
lines changed

examples/src/main/java/io/milvus/v2/ConsistencyLevelExample.java

Lines changed: 45 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -136,47 +136,51 @@ private static void testSessionLevel() throws Exception {
136136
.build();
137137
MilvusClientV2Pool pool = new MilvusClientV2Pool(poolConfig, connectConfig);
138138

139-
// The same process, different MilvusClient object, insert and search with Session level.
140-
// The Session level ensure that the newly inserted data instantaneously become searchable.
141-
Gson gson = new Gson();
142-
for (int i = 0; i < 100; i++) {
143-
List<Float> vector = CommonUtils.generateFloatVector(VECTOR_DIM);
144-
JsonObject row = new JsonObject();
145-
row.addProperty("id", i);
146-
row.add("vector", gson.toJsonTree(vector));
147-
148-
// insert by a MilvusClient
149-
String clientName1 = String.format("client_%d", i % 10);
150-
MilvusClientV2 client1 = pool.getClient(clientName1);
151-
client1.insert(InsertReq.builder()
152-
.collectionName(collectionName)
153-
.data(Collections.singletonList(row))
154-
.build());
155-
pool.returnClient(clientName1, client1); // don't forget to return the client to pool
156-
System.out.println("insert");
157-
158-
// search by another MilvusClient, use the just inserted vector to search
159-
// the returned item is expected to be the just inserted item
160-
String clientName2 = String.format("client_%d", i % 10 + 1);
161-
MilvusClientV2 client2 = pool.getClient(clientName2);
162-
SearchResp searchR = client2.search(SearchReq.builder()
163-
.collectionName(collectionName)
164-
.data(Collections.singletonList(new FloatVec(vector)))
165-
.limit(1)
166-
.build());
167-
pool.returnClient(clientName2, client2); // don't forget to return the client to pool
168-
List<List<SearchResp.SearchResult>> searchResults = searchR.getSearchResults();
169-
List<SearchResp.SearchResult> results = searchResults.get(0);
170-
if (results.size() != 1) {
171-
throw new RuntimeException("Search result is empty");
172-
}
173-
if (i != (Long) results.get(0).getId()) {
174-
throw new RuntimeException("The just inserted entity is not found");
139+
try {
140+
// The same process, different MilvusClient object, insert and search with Session level.
141+
// The Session level ensure that the newly inserted data instantaneously become searchable.
142+
Gson gson = new Gson();
143+
for (int i = 0; i < 100; i++) {
144+
List<Float> vector = CommonUtils.generateFloatVector(VECTOR_DIM);
145+
JsonObject row = new JsonObject();
146+
row.addProperty("id", i);
147+
row.add("vector", gson.toJsonTree(vector));
148+
149+
// insert by a MilvusClient
150+
String clientName1 = String.format("client_%d", i % 10);
151+
MilvusClientV2 client1 = pool.getClient(clientName1);
152+
client1.insert(InsertReq.builder()
153+
.collectionName(collectionName)
154+
.data(Collections.singletonList(row))
155+
.build());
156+
pool.returnClient(clientName1, client1); // don't forget to return the client to pool
157+
System.out.println("insert");
158+
159+
// search by another MilvusClient, use the just inserted vector to search
160+
// the returned item is expected to be the just inserted item
161+
String clientName2 = String.format("client_%d", i % 10 + 1);
162+
MilvusClientV2 client2 = pool.getClient(clientName2);
163+
SearchResp searchR = client2.search(SearchReq.builder()
164+
.collectionName(collectionName)
165+
.data(Collections.singletonList(new FloatVec(vector)))
166+
.limit(1)
167+
.build());
168+
pool.returnClient(clientName2, client2); // don't forget to return the client to pool
169+
List<List<SearchResp.SearchResult>> searchResults = searchR.getSearchResults();
170+
List<SearchResp.SearchResult> results = searchResults.get(0);
171+
if (results.size() != 1) {
172+
throw new RuntimeException("Search result is empty");
173+
}
174+
if (i != (Long) results.get(0).getId()) {
175+
throw new RuntimeException("The just inserted entity is not found");
176+
}
177+
System.out.println("search");
175178
}
176-
System.out.println("search");
177-
}
178179

179-
System.out.println("Session level is working fine");
180+
System.out.println("Session level is working fine");
181+
} finally {
182+
pool.close();
183+
}
180184
}
181185

182186
private static void testBoundedLevel() {
@@ -207,5 +211,7 @@ public static void main(String[] args) throws Exception {
207211
testBoundedLevel();
208212
System.out.println("==============================================================");
209213
testEventuallyLevel();
214+
215+
client.close();
210216
}
211217
}

sdk-core/src/main/java/io/milvus/pool/ClientCache.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ protected ClientCache(String key, GenericKeyedObjectPool<String, T> pool) {
3838
@Override
3939
public Thread newThread(@NotNull Runnable r) {
4040
Thread t = new Thread(r);
41+
t.setDaemon(true);
4142
t.setPriority(Thread.MAX_PRIORITY); // set the highest priority for the timer
4243
return t;
4344
}
@@ -189,6 +190,16 @@ public void run() {
189190

190191
public void stopTimer() {
191192
scheduler.shutdown();
193+
194+
// Return all active and retired clients to the pool so they can be properly destroyed
195+
for (ClientWrapper<T> wrapper : activeClientList) {
196+
returnToPool(wrapper.getRawClient());
197+
}
198+
activeClientList.clear();
199+
for (ClientWrapper<T> wrapper : retireClientList) {
200+
returnToPool(wrapper.getRawClient());
201+
}
202+
retireClientList.clear();
192203
}
193204

194205
public T getClient() {
@@ -323,6 +334,10 @@ public T getClient() {
323334
return this.client;
324335
}
325336

337+
public T getRawClient() {
338+
return this.client;
339+
}
340+
326341
public void returnClient() {
327342
this.refCount.decrementAndGet();
328343
}

0 commit comments

Comments
 (0)