Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 45 additions & 39 deletions examples/src/main/java/io/milvus/v2/ConsistencyLevelExample.java
Original file line number Diff line number Diff line change
Expand Up @@ -136,47 +136,51 @@ private static void testSessionLevel() throws Exception {
.build();
MilvusClientV2Pool pool = new MilvusClientV2Pool(poolConfig, connectConfig);

// The same process, different MilvusClient object, insert and search with Session level.
// The Session level ensure that the newly inserted data instantaneously become searchable.
Gson gson = new Gson();
for (int i = 0; i < 100; i++) {
List<Float> vector = CommonUtils.generateFloatVector(VECTOR_DIM);
JsonObject row = new JsonObject();
row.addProperty("id", i);
row.add("vector", gson.toJsonTree(vector));

// insert by a MilvusClient
String clientName1 = String.format("client_%d", i % 10);
MilvusClientV2 client1 = pool.getClient(clientName1);
client1.insert(InsertReq.builder()
.collectionName(collectionName)
.data(Collections.singletonList(row))
.build());
pool.returnClient(clientName1, client1); // don't forget to return the client to pool
System.out.println("insert");

// search by another MilvusClient, use the just inserted vector to search
// the returned item is expected to be the just inserted item
String clientName2 = String.format("client_%d", i % 10 + 1);
MilvusClientV2 client2 = pool.getClient(clientName2);
SearchResp searchR = client2.search(SearchReq.builder()
.collectionName(collectionName)
.data(Collections.singletonList(new FloatVec(vector)))
.limit(1)
.build());
pool.returnClient(clientName2, client2); // don't forget to return the client to pool
List<List<SearchResp.SearchResult>> searchResults = searchR.getSearchResults();
List<SearchResp.SearchResult> results = searchResults.get(0);
if (results.size() != 1) {
throw new RuntimeException("Search result is empty");
}
if (i != (Long) results.get(0).getId()) {
throw new RuntimeException("The just inserted entity is not found");
try {
// The same process, different MilvusClient object, insert and search with Session level.
// The Session level ensures that the newly inserted data instantaneously become searchable.
Gson gson = new Gson();
for (int i = 0; i < 100; i++) {
List<Float> vector = CommonUtils.generateFloatVector(VECTOR_DIM);
JsonObject row = new JsonObject();
row.addProperty("id", i);
row.add("vector", gson.toJsonTree(vector));

// insert by a MilvusClient
String clientName1 = String.format("client_%d", i % 10);
MilvusClientV2 client1 = pool.getClient(clientName1);
client1.insert(InsertReq.builder()
.collectionName(collectionName)
.data(Collections.singletonList(row))
.build());
pool.returnClient(clientName1, client1); // don't forget to return the client to pool
System.out.println("insert");

// search by another MilvusClient, use the just inserted vector to search
// the returned item is expected to be the just inserted item
String clientName2 = String.format("client_%d", i % 10 + 1);
MilvusClientV2 client2 = pool.getClient(clientName2);
SearchResp searchR = client2.search(SearchReq.builder()
.collectionName(collectionName)
.data(Collections.singletonList(new FloatVec(vector)))
.limit(1)
.build());
pool.returnClient(clientName2, client2); // don't forget to return the client to pool
List<List<SearchResp.SearchResult>> searchResults = searchR.getSearchResults();
List<SearchResp.SearchResult> results = searchResults.get(0);
if (results.size() != 1) {
throw new RuntimeException("Search result is empty");
}
if (i != (Long) results.get(0).getId()) {
throw new RuntimeException("The just inserted entity is not found");
}
System.out.println("search");
}
System.out.println("search");
}

System.out.println("Session level is working fine");
System.out.println("Session level is working fine");
} finally {
pool.close();
}
}

private static void testBoundedLevel() {
Expand Down Expand Up @@ -207,5 +211,7 @@ public static void main(String[] args) throws Exception {
testBoundedLevel();
System.out.println("==============================================================");
testEventuallyLevel();

client.close();
}
}
63 changes: 49 additions & 14 deletions sdk-core/src/main/java/io/milvus/pool/ClientCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ protected ClientCache(String key, GenericKeyedObjectPool<String, T> pool) {
@Override
public Thread newThread(@NotNull Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.setPriority(Thread.MAX_PRIORITY); // set the highest priority for the timer
return t;
}
Expand All @@ -49,24 +50,38 @@ public Thread newThread(@NotNull Runnable r) {

public void preparePool() {
try {
// preparePool() will create minIdlePerKey MilvusClient objects in advance, put the pre-created clients
// into activeClientList
// preparePool() will create minIdlePerKey MilvusClient objects in advance
clientPool.preparePool(this.key);
int minIdlePerKey = clientPool.getMinIdlePerKey();
for (int i = 0; i < minIdlePerKey; i++) {
activeClientList.add(new ClientWrapper<>(clientPool.borrowObject(this.key)));
}
} catch (Exception e) {
logger.error("Failed to prepare pool {}, exception: ", key, e);
throw new MilvusClientException(ErrorCode.CLIENT_ERROR, e);
}

// Put the pre-created clients into activeClientList
// avoid borrow client from pool multiple times when multiple threads call preparePool() at the same time,
// add a lock here since preparePool() is called only one time in practice, doesn't affect major performance
int minIdlePerKey = clientPool.getMinIdlePerKey();

if (logger.isDebugEnabled()) {
logger.debug("ClientCache key: {} cache clients: {} ", key, activeClientList.size());
logger.debug("Pool initialize idle: {} active: {} ", clientPool.getNumIdle(key), clientPool.getNumActive(key));
clientListLock.lock();
try {
if (activeClientList.isEmpty()) {
for (int i = 0; i < minIdlePerKey; i++) {
activeClientList.add(new ClientWrapper<>(clientPool.borrowObject(this.key)));
}
}
// System.out.printf("Key: %s, cache client: %d%n", key, activeClientList.size());
// System.out.printf("Pool idle %d, active %d%n", clientPool.getNumIdle(key), clientPool.getNumActive(key));
} catch (Exception e) {
logger.error("Failed to prepare pool {}, exception: ", key, e);
logger.error("Failed to borrow client from pool {}, exception: ", key, e);
throw new MilvusClientException(ErrorCode.CLIENT_ERROR, e);
} finally {
clientListLock.unlock();
}

if (logger.isDebugEnabled()) {
logger.debug("ClientCache key: {} cache clients: {} ", key, activeClientList.size());
logger.debug("Pool initialize idle: {} active: {} ", clientPool.getNumIdle(key), clientPool.getNumActive(key));
}
// System.out.printf("Key: %s, cache client: %d%n", key, activeClientList.size());
// System.out.printf("Pool idle %d, active %d%n", clientPool.getNumIdle(key), clientPool.getNumActive(key));
}

// this method is called in an interval, it does the following tasks:
Expand Down Expand Up @@ -161,7 +176,7 @@ private void checkQPS() {
private void returnRetiredClients() {
retireClientList.removeIf(wrapper -> {
if (wrapper.getRefCount() <= 0) {
returnToPool(wrapper.getClient());
returnToPool(wrapper.getRawClient());

if (logger.isDebugEnabled()) {
logger.debug("ClientCache key: {} returns a client", key);
Expand All @@ -188,7 +203,23 @@ public void run() {
}

public void stopTimer() {
scheduler.shutdown();
// Stop scheduled tasks and wait for any in-flight checkQPS() execution to finish
scheduler.shutdownNow();
try {
scheduler.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

// Return all active and retired clients to the pool so they can be properly destroyed
for (ClientWrapper<T> wrapper : activeClientList) {
returnToPool(wrapper.getRawClient());
}
activeClientList.clear();
for (ClientWrapper<T> wrapper : retireClientList) {
returnToPool(wrapper.getRawClient());
}
retireClientList.clear();
}

public T getClient() {
Expand Down Expand Up @@ -323,6 +354,10 @@ public T getClient() {
return this.client;
}

public T getRawClient() {
return this.client;
}
Comment thread
yhmo marked this conversation as resolved.

public void returnClient() {
this.refCount.decrementAndGet();
}
Expand Down
8 changes: 8 additions & 0 deletions sdk-core/src/main/java/io/milvus/pool/ClientPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.milvus.v2.exception.ErrorCode;
import io.milvus.v2.exception.MilvusClientException;

import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand All @@ -19,6 +22,7 @@ public class ClientPool<C, T> {
protected PoolClientFactory<C, T> clientFactory;
private final ConcurrentMap<String, ClientCache<T>> clientsCache = new ConcurrentHashMap<>();
private final Lock cacheMapLock = new ReentrantLock(true);
private volatile boolean closed = false;

protected ClientPool() {

Expand Down Expand Up @@ -87,6 +91,9 @@ public void preparePool(String key) {
* @return MilvusClient or MilvusClientV2
*/
public T getClient(String key) {
if (closed) {
throw new MilvusClientException(ErrorCode.CLIENT_ERROR, "Client pool is closed");
}
ClientCache<T> cache = getCache(key);
if (cache == null) {
logger.error("Not able to create a client cache for key: {}", key);
Expand Down Expand Up @@ -141,6 +148,7 @@ public void returnClient(String key, T grpcClient) {
*
*/
public void close() {
closed = true;
if (clientPool != null && !clientPool.isClosed()) {
// how about if clientPool and clientsCache are cleared but some clients are not returned?
// after clear(), all the milvus clients will be closed, if user continue to use the unreturned client
Expand Down
4 changes: 2 additions & 2 deletions sdk-core/src/main/java/io/milvus/pool/PoolClientFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public C getConfig(String key) {
public T create(String key) throws Exception {
try {
if (logger.isDebugEnabled()) {
logger.info("PoolClientFactory key: {} creates a client", key);
logger.debug("PoolClientFactory key: {} creates a client", key);
}
C keyConfig = configForKeys.get(key);
if (keyConfig == null) {
Expand All @@ -78,7 +78,7 @@ public PooledObject<T> wrap(T client) {
@Override
public void destroyObject(String key, PooledObject<T> p) throws Exception {
if (logger.isDebugEnabled()) {
logger.info("PoolClientFactory key: {} closes a client", key);
logger.debug("PoolClientFactory key: {} closes a client", key);
}
T client = p.getObject();
closeMethod.invoke(client, 3L);
Expand Down
Loading