Skip to content

Commit a0d41ce

Browse files
committed
Fix a bug of ClientPool that rpc channel is not properly closed
Signed-off-by: yhmo <yihua.mo@zilliz.com>
1 parent b8f2b41 commit a0d41ce

File tree

4 files changed

+104
-55
lines changed

4 files changed

+104
-55
lines changed

examples/src/main/java/io/milvus/v2/ConsistencyLevelExample.java

Lines changed: 45 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -136,47 +136,51 @@ private static void testSessionLevel() throws Exception {
136136
.build();
137137
MilvusClientV2Pool pool = new MilvusClientV2Pool(poolConfig, connectConfig);
138138

139-
// The same process, different MilvusClient object, insert and search with Session level.
140-
// The Session level ensure that the newly inserted data instantaneously become searchable.
141-
Gson gson = new Gson();
142-
for (int i = 0; i < 100; i++) {
143-
List<Float> vector = CommonUtils.generateFloatVector(VECTOR_DIM);
144-
JsonObject row = new JsonObject();
145-
row.addProperty("id", i);
146-
row.add("vector", gson.toJsonTree(vector));
147-
148-
// insert by a MilvusClient
149-
String clientName1 = String.format("client_%d", i % 10);
150-
MilvusClientV2 client1 = pool.getClient(clientName1);
151-
client1.insert(InsertReq.builder()
152-
.collectionName(collectionName)
153-
.data(Collections.singletonList(row))
154-
.build());
155-
pool.returnClient(clientName1, client1); // don't forget to return the client to pool
156-
System.out.println("insert");
157-
158-
// search by another MilvusClient, use the just inserted vector to search
159-
// the returned item is expected to be the just inserted item
160-
String clientName2 = String.format("client_%d", i % 10 + 1);
161-
MilvusClientV2 client2 = pool.getClient(clientName2);
162-
SearchResp searchR = client2.search(SearchReq.builder()
163-
.collectionName(collectionName)
164-
.data(Collections.singletonList(new FloatVec(vector)))
165-
.limit(1)
166-
.build());
167-
pool.returnClient(clientName2, client2); // don't forget to return the client to pool
168-
List<List<SearchResp.SearchResult>> searchResults = searchR.getSearchResults();
169-
List<SearchResp.SearchResult> results = searchResults.get(0);
170-
if (results.size() != 1) {
171-
throw new RuntimeException("Search result is empty");
172-
}
173-
if (i != (Long) results.get(0).getId()) {
174-
throw new RuntimeException("The just inserted entity is not found");
139+
try {
140+
// The same process, different MilvusClient object, insert and search with Session level.
141+
// The Session level ensures that the newly inserted data instantaneously become searchable.
142+
Gson gson = new Gson();
143+
for (int i = 0; i < 100; i++) {
144+
List<Float> vector = CommonUtils.generateFloatVector(VECTOR_DIM);
145+
JsonObject row = new JsonObject();
146+
row.addProperty("id", i);
147+
row.add("vector", gson.toJsonTree(vector));
148+
149+
// insert by a MilvusClient
150+
String clientName1 = String.format("client_%d", i % 10);
151+
MilvusClientV2 client1 = pool.getClient(clientName1);
152+
client1.insert(InsertReq.builder()
153+
.collectionName(collectionName)
154+
.data(Collections.singletonList(row))
155+
.build());
156+
pool.returnClient(clientName1, client1); // don't forget to return the client to pool
157+
System.out.println("insert");
158+
159+
// search by another MilvusClient, use the just inserted vector to search
160+
// the returned item is expected to be the just inserted item
161+
String clientName2 = String.format("client_%d", i % 10 + 1);
162+
MilvusClientV2 client2 = pool.getClient(clientName2);
163+
SearchResp searchR = client2.search(SearchReq.builder()
164+
.collectionName(collectionName)
165+
.data(Collections.singletonList(new FloatVec(vector)))
166+
.limit(1)
167+
.build());
168+
pool.returnClient(clientName2, client2); // don't forget to return the client to pool
169+
List<List<SearchResp.SearchResult>> searchResults = searchR.getSearchResults();
170+
List<SearchResp.SearchResult> results = searchResults.get(0);
171+
if (results.size() != 1) {
172+
throw new RuntimeException("Search result is empty");
173+
}
174+
if (i != (Long) results.get(0).getId()) {
175+
throw new RuntimeException("The just inserted entity is not found");
176+
}
177+
System.out.println("search");
175178
}
176-
System.out.println("search");
177-
}
178179

179-
System.out.println("Session level is working fine");
180+
System.out.println("Session level is working fine");
181+
} finally {
182+
pool.close();
183+
}
180184
}
181185

182186
private static void testBoundedLevel() {
@@ -207,5 +211,7 @@ public static void main(String[] args) throws Exception {
207211
testBoundedLevel();
208212
System.out.println("==============================================================");
209213
testEventuallyLevel();
214+
215+
client.close();
210216
}
211217
}

sdk-core/src/main/java/io/milvus/pool/ClientCache.java

Lines changed: 49 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ protected ClientCache(String key, GenericKeyedObjectPool<String, T> pool) {
3838
@Override
3939
public Thread newThread(@NotNull Runnable r) {
4040
Thread t = new Thread(r);
41+
t.setDaemon(true);
4142
t.setPriority(Thread.MAX_PRIORITY); // set the highest priority for the timer
4243
return t;
4344
}
@@ -49,24 +50,38 @@ public Thread newThread(@NotNull Runnable r) {
4950

5051
public void preparePool() {
5152
try {
52-
// preparePool() will create minIdlePerKey MilvusClient objects in advance, put the pre-created clients
53-
// into activeClientList
53+
// preparePool() will create minIdlePerKey MilvusClient objects in advance
5454
clientPool.preparePool(this.key);
55-
int minIdlePerKey = clientPool.getMinIdlePerKey();
56-
for (int i = 0; i < minIdlePerKey; i++) {
57-
activeClientList.add(new ClientWrapper<>(clientPool.borrowObject(this.key)));
58-
}
55+
} catch (Exception e) {
56+
logger.error("Failed to prepare pool {}, exception: ", key, e);
57+
throw new MilvusClientException(ErrorCode.CLIENT_ERROR, e);
58+
}
59+
60+
// Put the pre-created clients into activeClientList
61+
// avoid borrow client from pool multiple times when multiple threads call preparePool() at the same time,
62+
// add a lock here since preparePool() is called only one time in practice, doesn't affect major performance
63+
int minIdlePerKey = clientPool.getMinIdlePerKey();
5964

60-
if (logger.isDebugEnabled()) {
61-
logger.debug("ClientCache key: {} cache clients: {} ", key, activeClientList.size());
62-
logger.debug("Pool initialize idle: {} active: {} ", clientPool.getNumIdle(key), clientPool.getNumActive(key));
65+
clientListLock.lock();
66+
try {
67+
if (activeClientList.isEmpty()) {
68+
for (int i = 0; i < minIdlePerKey; i++) {
69+
activeClientList.add(new ClientWrapper<>(clientPool.borrowObject(this.key)));
70+
}
6371
}
64-
// System.out.printf("Key: %s, cache client: %d%n", key, activeClientList.size());
65-
// System.out.printf("Pool idle %d, active %d%n", clientPool.getNumIdle(key), clientPool.getNumActive(key));
6672
} catch (Exception e) {
67-
logger.error("Failed to prepare pool {}, exception: ", key, e);
73+
logger.error("Failed to borrow client from pool {}, exception: ", key, e);
6874
throw new MilvusClientException(ErrorCode.CLIENT_ERROR, e);
75+
} finally {
76+
clientListLock.unlock();
77+
}
78+
79+
if (logger.isDebugEnabled()) {
80+
logger.debug("ClientCache key: {} cache clients: {} ", key, activeClientList.size());
81+
logger.debug("Pool initialize idle: {} active: {} ", clientPool.getNumIdle(key), clientPool.getNumActive(key));
6982
}
83+
// System.out.printf("Key: %s, cache client: %d%n", key, activeClientList.size());
84+
// System.out.printf("Pool idle %d, active %d%n", clientPool.getNumIdle(key), clientPool.getNumActive(key));
7085
}
7186

7287
// this method is called in an interval, it does the following tasks:
@@ -161,7 +176,7 @@ private void checkQPS() {
161176
private void returnRetiredClients() {
162177
retireClientList.removeIf(wrapper -> {
163178
if (wrapper.getRefCount() <= 0) {
164-
returnToPool(wrapper.getClient());
179+
returnToPool(wrapper.getRawClient());
165180

166181
if (logger.isDebugEnabled()) {
167182
logger.debug("ClientCache key: {} returns a client", key);
@@ -188,7 +203,23 @@ public void run() {
188203
}
189204

190205
public void stopTimer() {
191-
scheduler.shutdown();
206+
// Stop scheduled tasks and wait for any in-flight checkQPS() execution to finish
207+
scheduler.shutdownNow();
208+
try {
209+
scheduler.awaitTermination(5, TimeUnit.SECONDS);
210+
} catch (InterruptedException e) {
211+
Thread.currentThread().interrupt();
212+
}
213+
214+
// Return all active and retired clients to the pool so they can be properly destroyed
215+
for (ClientWrapper<T> wrapper : activeClientList) {
216+
returnToPool(wrapper.getRawClient());
217+
}
218+
activeClientList.clear();
219+
for (ClientWrapper<T> wrapper : retireClientList) {
220+
returnToPool(wrapper.getRawClient());
221+
}
222+
retireClientList.clear();
192223
}
193224

194225
public T getClient() {
@@ -323,6 +354,10 @@ public T getClient() {
323354
return this.client;
324355
}
325356

357+
public T getRawClient() {
358+
return this.client;
359+
}
360+
326361
public void returnClient() {
327362
this.refCount.decrementAndGet();
328363
}

sdk-core/src/main/java/io/milvus/pool/ClientPool.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55
import org.slf4j.Logger;
66
import org.slf4j.LoggerFactory;
77

8+
import io.milvus.v2.exception.ErrorCode;
9+
import io.milvus.v2.exception.MilvusClientException;
10+
811
import java.util.Set;
912
import java.util.concurrent.ConcurrentHashMap;
1013
import java.util.concurrent.ConcurrentMap;
@@ -19,6 +22,7 @@ public class ClientPool<C, T> {
1922
protected PoolClientFactory<C, T> clientFactory;
2023
private final ConcurrentMap<String, ClientCache<T>> clientsCache = new ConcurrentHashMap<>();
2124
private final Lock cacheMapLock = new ReentrantLock(true);
25+
private volatile boolean closed = false;
2226

2327
protected ClientPool() {
2428

@@ -87,6 +91,9 @@ public void preparePool(String key) {
8791
* @return MilvusClient or MilvusClientV2
8892
*/
8993
public T getClient(String key) {
94+
if (closed) {
95+
throw new MilvusClientException(ErrorCode.CLIENT_ERROR, "Client pool is closed");
96+
}
9097
ClientCache<T> cache = getCache(key);
9198
if (cache == null) {
9299
logger.error("Not able to create a client cache for key: {}", key);
@@ -141,6 +148,7 @@ public void returnClient(String key, T grpcClient) {
141148
*
142149
*/
143150
public void close() {
151+
closed = true;
144152
if (clientPool != null && !clientPool.isClosed()) {
145153
// how about if clientPool and clientsCache are cleared but some clients are not returned?
146154
// after clear(), all the milvus clients will be closed, if user continue to use the unreturned client

sdk-core/src/main/java/io/milvus/pool/PoolClientFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public C getConfig(String key) {
5656
public T create(String key) throws Exception {
5757
try {
5858
if (logger.isDebugEnabled()) {
59-
logger.info("PoolClientFactory key: {} creates a client", key);
59+
logger.debug("PoolClientFactory key: {} creates a client", key);
6060
}
6161
C keyConfig = configForKeys.get(key);
6262
if (keyConfig == null) {
@@ -78,7 +78,7 @@ public PooledObject<T> wrap(T client) {
7878
@Override
7979
public void destroyObject(String key, PooledObject<T> p) throws Exception {
8080
if (logger.isDebugEnabled()) {
81-
logger.info("PoolClientFactory key: {} closes a client", key);
81+
logger.debug("PoolClientFactory key: {} closes a client", key);
8282
}
8383
T client = p.getObject();
8484
closeMethod.invoke(client, 3L);

0 commit comments

Comments
 (0)