Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ version: '3.5'
services:
standalone:
container_name: milvus-javasdk-standalone-1
image: milvusdb/milvus:v2.6.0
image: milvusdb/milvus:v2.6.1
command: [ "milvus", "run", "standalone" ]
environment:
- COMMON_STORAGETYPE=local
Expand All @@ -24,7 +24,7 @@ services:

standaloneslave:
container_name: milvus-javasdk-standalone-2
image: milvusdb/milvus:v2.6.0
image: milvusdb/milvus:v2.6.1
command: [ "milvus", "run", "standalone" ]
environment:
- COMMON_STORAGETYPE=local
Expand Down
290 changes: 214 additions & 76 deletions examples/src/main/java/io/milvus/v1/ClientPoolExample.java

Large diffs are not rendered by default.

235 changes: 190 additions & 45 deletions examples/src/main/java/io/milvus/v2/ClientPoolExample.java

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions sdk-core/src/main/java/io/milvus/pool/ClientPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;

public class ClientPool<C, T> {
protected static final Logger logger = LoggerFactory.getLogger(ClientPool.class);
protected GenericKeyedObjectPool<String, T> clientPool;
Expand Down Expand Up @@ -40,6 +38,10 @@ protected ClientPool(PoolConfig config, PoolClientFactory clientFactory) {
this.clientPool = new GenericKeyedObjectPool<String, T>(clientFactory, poolConfig);
}

public void configForKey(String key, C config) {
this.clientFactory.configForKey(key, config);
}

/**
* Get a client object which is idle from the pool.
* Once the client is hold by the caller, it will be marked as active state and cannot be fetched by other caller.
Expand Down
23 changes: 17 additions & 6 deletions sdk-core/src/main/java/io/milvus/pool/PoolClientFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,22 @@

import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

public class PoolClientFactory<C, T> extends BaseKeyedPooledObjectFactory<String, T> {
protected static final Logger logger = LoggerFactory.getLogger(PoolClientFactory.class);
private final C config;
private final C configDefault;
private ConcurrentMap<String, C> configForKeys = new ConcurrentHashMap<>();
private Constructor<?> constructor;
private Method closeMethod;
private Method verifyMethod;

public PoolClientFactory(C config, String clientClassName) throws ClassNotFoundException, NoSuchMethodException {
this.config = config;
public PoolClientFactory(C configDefault, String clientClassName) throws ClassNotFoundException, NoSuchMethodException {
this.configDefault = configDefault;
try {
Class<?> clientCls = Class.forName(clientClassName);
Class<?> configCls = Class.forName(config.getClass().getName());
Class<?> configCls = Class.forName(configDefault.getClass().getName());
constructor = clientCls.getConstructor(configCls);
closeMethod = clientCls.getMethod("close", long.class);
verifyMethod = clientCls.getMethod("clientIsReady");
Expand All @@ -32,11 +35,19 @@ public PoolClientFactory(C config, String clientClassName) throws ClassNotFoundE
}
}

public void configForKey(String key, C config) {
configForKeys.put(key, config);
}

@Override
public T create(String key) throws Exception {
try {
T client = (T) constructor.newInstance(this.config);
return client;
C keyConfig = configForKeys.get(key);
if (keyConfig == null) {
return (T) constructor.newInstance(this.configDefault);
} else {
return (T) constructor.newInstance(keyConfig);
}
} catch (Exception e) {
logger.error("Failed to create client, exception: ", e);
throw new MilvusClientException(ErrorCode.CLIENT_ERROR, e);
Expand Down
6 changes: 3 additions & 3 deletions sdk-core/src/main/java/io/milvus/pool/PoolConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@
@SuperBuilder
public class PoolConfig {
@Builder.Default
private int maxIdlePerKey = 5;
private int maxIdlePerKey = 10;
@Builder.Default
private int minIdlePerKey = 0;
@Builder.Default
private int maxTotalPerKey = 10;
private int maxTotalPerKey = 30;
@Builder.Default
private int maxTotal = 50;
private int maxTotal = 1000;
@Builder.Default
private boolean blockWhenExhausted = true;
@Builder.Default
Expand Down
2 changes: 1 addition & 1 deletion sdk-core/src/test/java/io/milvus/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class TestUtils {
private int dimension = 256;
private static final Random RANDOM = new Random();

public static final String MilvusDockerImageID = "milvusdb/milvus:v2.6.0";
public static final String MilvusDockerImageID = "milvusdb/milvus:v2.6.1";

public TestUtils(int dimension) {
this.dimension = dimension;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2112,7 +2112,14 @@ void testDatabase() {

@Test
void testClientPool() {
// create a temp database
String dummyDb = "dummy_db";
client.createDatabase(CreateDatabaseReq.builder()
.databaseName(dummyDb)
.build());

try {
// the default connection config will connect to default db
ConnectConfig connectConfig = ConnectConfig.builder()
.uri(milvus.getEndpoint())
.rpcDeadlineMs(100L)
Expand All @@ -2121,16 +2128,24 @@ void testClientPool() {
.build();
MilvusClientV2Pool pool = new MilvusClientV2Pool(poolConfig, connectConfig);

// clients of the key "dummy_db" will connect to this db
pool.configForKey(dummyDb, ConnectConfig.builder()
.uri(milvus.getEndpoint())
.dbName(dummyDb)
.rpcDeadlineMs(100L)
.build());

List<Thread> threadList = new ArrayList<>();
int threadCount = 10;
int requestPerThread = 10;
String key = "192.168.1.1";
String key = "default";
for (int k = 0; k < threadCount; k++) {
Thread t = new Thread(() -> {
for (int i = 0; i < requestPerThread; i++) {
MilvusClientV2 client = pool.getClient(key);
String version = client.getServerVersion();
// System.out.printf("%d, %s%n", i, version);
Assertions.assertEquals(client.currentUsedDatabase(), "default");
System.out.printf("idle %d, active %d%n", pool.getIdleClientNumber(key), pool.getActiveClientNumber(key));
pool.returnClient(key, client);
}
Expand All @@ -2146,6 +2161,10 @@ void testClientPool() {

System.out.println(String.format("idle %d, active %d", pool.getIdleClientNumber(key), pool.getActiveClientNumber(key)));
System.out.println(String.format("total idle %d, total active %d", pool.getTotalIdleClientNumber(), pool.getTotalActiveClientNumber()));

// get client connect to the dummy db
MilvusClientV2 dummyClient = pool.getClient(dummyDb);
Assertions.assertEquals(dummyClient.currentUsedDatabase(), dummyDb);
pool.close();
} catch (Exception e) {
System.out.println(e.getMessage());
Expand Down
Loading