Skip to content

Commit a6b118e

Browse files
committed
Optimize client pool
Signed-off-by: yhmo <yihua.mo@zilliz.com>
1 parent eb10051 commit a6b118e

8 files changed

Lines changed: 794 additions & 66 deletions

File tree

Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
package io.milvus.v2;
2+
3+
import com.google.gson.Gson;
4+
import com.google.gson.JsonObject;
5+
import io.milvus.pool.MilvusClientV2Pool;
6+
import io.milvus.pool.PoolConfig;
7+
import io.milvus.v1.CommonUtils;
8+
import io.milvus.v2.client.ConnectConfig;
9+
import io.milvus.v2.client.MilvusClientV2;
10+
import io.milvus.v2.common.ConsistencyLevel;
11+
import io.milvus.v2.common.DataType;
12+
import io.milvus.v2.common.IndexParam;
13+
import io.milvus.v2.service.collection.request.AddFieldReq;
14+
import io.milvus.v2.service.collection.request.CreateCollectionReq;
15+
import io.milvus.v2.service.collection.request.DropCollectionReq;
16+
import io.milvus.v2.service.collection.request.HasCollectionReq;
17+
import io.milvus.v2.service.vector.request.InsertReq;
18+
import io.milvus.v2.service.vector.request.QueryReq;
19+
import io.milvus.v2.service.vector.request.SearchReq;
20+
import io.milvus.v2.service.vector.request.data.FloatVec;
21+
import io.milvus.v2.service.vector.response.InsertResp;
22+
import io.milvus.v2.service.vector.response.QueryResp;
23+
import io.milvus.v2.service.vector.response.SearchResp;
24+
25+
import java.time.Duration;
26+
import java.util.ArrayList;
27+
import java.util.Collections;
28+
import java.util.List;
29+
import java.util.concurrent.ExecutorService;
30+
import java.util.concurrent.Executors;
31+
import java.util.concurrent.TimeUnit;
32+
33+
public class ClientPoolDemo {
34+
private static final String ServerUri = "http://localhost:19530";
35+
private static final String CollectionName = "java_sdk_example_pool_demo";
36+
private static final String IDFieldName = "id";
37+
private static final String VectorFieldName = "vector";
38+
private static final String TextFieldName = "text";
39+
private static final int DIM = 128;
40+
private static final String DemoKey = "for_demo";
41+
42+
private static final MilvusClientV2Pool pool;
43+
44+
static {
45+
ConnectConfig defaultConnectConfig = ConnectConfig.builder()
46+
.uri(ServerUri)
47+
.build();
48+
// read this issue for more details about the pool configurations:
49+
// https://github.com/milvus-io/milvus-sdk-java/issues/1577
50+
PoolConfig poolConfig = PoolConfig.builder()
51+
.minIdlePerKey(1)
52+
.maxIdlePerKey(1)
53+
.maxTotalPerKey(5)
54+
.maxTotal(1000)
55+
.maxBlockWaitDuration(Duration.ofSeconds(5L)) // getClient() will wait 5 seconds if no idle client available
56+
.build();
57+
try {
58+
pool = new MilvusClientV2Pool(poolConfig, defaultConnectConfig);
59+
} catch (ClassNotFoundException | NoSuchMethodException e) {
60+
throw new RuntimeException(e);
61+
}
62+
}
63+
64+
private static void createCollection(boolean reCreate) {
65+
System.out.println("========== createCollection() ==========");
66+
MilvusClientV2 client = null;
67+
try {
68+
client = pool.getClient(DemoKey);
69+
70+
if (reCreate) {
71+
client.dropCollection(DropCollectionReq.builder()
72+
.collectionName(CollectionName)
73+
.build());
74+
} else if (client.hasCollection(HasCollectionReq.builder()
75+
.collectionName(CollectionName)
76+
.build())) {
77+
return;
78+
}
79+
80+
CreateCollectionReq.CollectionSchema collectionSchema = CreateCollectionReq.CollectionSchema.builder()
81+
.build();
82+
collectionSchema.addField(AddFieldReq.builder()
83+
.fieldName("id")
84+
.dataType(DataType.Int64)
85+
.isPrimaryKey(true)
86+
.autoID(true)
87+
.build());
88+
collectionSchema.addField(AddFieldReq.builder()
89+
.fieldName(VectorFieldName)
90+
.dataType(DataType.FloatVector)
91+
.dimension(DIM)
92+
.build());
93+
collectionSchema.addField(AddFieldReq.builder()
94+
.fieldName(TextFieldName)
95+
.dataType(DataType.VarChar)
96+
.maxLength(1024)
97+
.build());
98+
99+
List<IndexParam> indexes = new ArrayList<>();
100+
indexes.add(IndexParam.builder()
101+
.fieldName(VectorFieldName)
102+
.indexType(IndexParam.IndexType.AUTOINDEX)
103+
.metricType(IndexParam.MetricType.COSINE)
104+
.build());
105+
106+
CreateCollectionReq requestCreate = CreateCollectionReq.builder()
107+
.collectionName(CollectionName)
108+
.collectionSchema(collectionSchema)
109+
.indexParams(indexes)
110+
.consistencyLevel(ConsistencyLevel.BOUNDED)
111+
.build();
112+
client.createCollection(requestCreate);
113+
114+
long rowCount = 10000;
115+
insertData(rowCount);
116+
} finally {
117+
pool.returnClient(DemoKey, client);
118+
}
119+
}
120+
121+
private static void insertData(long rowCount) {
122+
System.out.println("========== insertData() ==========");
123+
MilvusClientV2 client = null;
124+
try {
125+
client = pool.getClient(DemoKey);
126+
127+
Gson gson = new Gson();
128+
long inserted = 0L;
129+
while (inserted < rowCount) {
130+
long batch = 1000L;
131+
if (rowCount - inserted < batch) {
132+
batch = rowCount - inserted;
133+
}
134+
List<JsonObject> rows = new ArrayList<>();
135+
for (long i = 0; i < batch; i++) {
136+
JsonObject row = new JsonObject();
137+
row.add(VectorFieldName, gson.toJsonTree(CommonUtils.generateFloatVector(DIM)));
138+
row.addProperty(TextFieldName, "text_" + i);
139+
rows.add(row);
140+
}
141+
InsertResp resp = client.insert(InsertReq.builder()
142+
.collectionName(CollectionName)
143+
.data(rows)
144+
.build());
145+
inserted += resp.getInsertCnt();
146+
System.out.println("Inserted count:" + resp.getInsertCnt());
147+
}
148+
149+
QueryResp countR = client.query(QueryReq.builder()
150+
.collectionName(CollectionName)
151+
.outputFields(Collections.singletonList("count(*)"))
152+
.consistencyLevel(ConsistencyLevel.STRONG)
153+
.build());
154+
System.out.printf("%d rows persisted\n", (long) countR.getQueryResults().get(0).getEntity().get("count(*)"));
155+
} finally {
156+
pool.returnClient(DemoKey, client);
157+
}
158+
}
159+
160+
private static void search() {
161+
MilvusClientV2 client = null;
162+
try {
163+
client = pool.getClient(DemoKey);
164+
165+
// long start = System.currentTimeMillis();
166+
FloatVec vector = new FloatVec(CommonUtils.generateFloatVector(DIM));
167+
SearchResp resp = client.search(SearchReq.builder()
168+
.collectionName(CollectionName)
169+
.limit(10)
170+
.data(Collections.singletonList(vector))
171+
.annsField(VectorFieldName)
172+
.outputFields(Collections.singletonList(TextFieldName))
173+
.build());
174+
// System.out.printf("search time cost: %dms%n", System.currentTimeMillis() - start);
175+
} finally {
176+
pool.returnClient(DemoKey, client);
177+
}
178+
}
179+
180+
private static void printPoolState() {
181+
System.out.println("========== printPoolState() ==========");
182+
System.out.printf("%d idle clients and %d active clients%n",
183+
pool.getIdleClientNumber(DemoKey), pool.getActiveClientNumber(DemoKey));
184+
System.out.printf("QPS: %.2f%n", pool.fetchClientPerSecond(DemoKey));
185+
}
186+
187+
private static void concurrentSearch(int threadCount, int requestCount) {
188+
System.out.println("\n======================================================================");
189+
System.out.println("======================= ConcurrentSearch =============================");
190+
System.out.println("======================================================================");
191+
192+
class Worker implements Runnable {
193+
@Override
194+
public void run() {
195+
search();
196+
}
197+
}
198+
199+
try {
200+
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
201+
for (int i = 0; i < requestCount; i++) {
202+
Runnable worker = new Worker();
203+
executor.execute(worker);
204+
}
205+
executor.shutdown();
206+
207+
// with requests start, more active clients will be created
208+
boolean done = false;
209+
while (!done) {
210+
printPoolState();
211+
done = executor.awaitTermination(1, TimeUnit.SECONDS);
212+
}
213+
214+
// after all requests are done, the active clients will be retired and eventually only one idle client left
215+
while (pool.getActiveClientNumber(DemoKey) > 1) {
216+
TimeUnit.SECONDS.sleep(1);
217+
printPoolState();
218+
}
219+
} catch (Exception e) {
220+
System.err.println("Failed to create executor: " + e);
221+
}
222+
}
223+
224+
public static void main(String[] args) throws InterruptedException {
225+
createCollection(true);
226+
227+
int threadCount = 50;
228+
int requestCount = 100000;
229+
concurrentSearch(threadCount, requestCount);
230+
231+
// do again
232+
concurrentSearch(threadCount, requestCount);
233+
234+
pool.close();
235+
}
236+
}

examples/src/main/java/io/milvus/v2/ClientPoolExample.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
import java.util.List;
4848

4949
public class ClientPoolExample {
50-
public static String serverUri = "http://localhost:19530";
50+
public static String ServerUri = "http://localhost:19530";
5151
public static String CollectionName = "java_sdk_example_pool_v2";
5252
public static String VectorFieldName = "vector";
5353
public static int DIM = 128;
@@ -95,7 +95,7 @@ public static void createDatabases(MilvusClientV2Pool pool) {
9595
// the ClientPool will use different config to create client to connect to specific database
9696
for (String dbName : dbNames) {
9797
ConnectConfig config = ConnectConfig.builder()
98-
.uri(serverUri)
98+
.uri(ServerUri)
9999
.dbName(dbName)
100100
.build();
101101
pool.configForKey(dbName, config);
@@ -288,13 +288,13 @@ public static void dropDatabases(MilvusClientV2Pool pool) {
288288

289289
public static void main(String[] args) throws InterruptedException {
290290
ConnectConfig defaultConfig = ConnectConfig.builder()
291-
.uri(serverUri)
291+
.uri(ServerUri)
292292
.build();
293293
// read this issue for more details about the pool configurations:
294294
// https://github.com/milvus-io/milvus-sdk-java/issues/1577
295295
PoolConfig poolConfig = PoolConfig.builder()
296-
.maxIdlePerKey(10) // max idle clients per key
297-
.maxTotalPerKey(50) // max total(idle + active) clients per key
296+
.maxIdlePerKey(1) // max idle clients per key
297+
.maxTotalPerKey(5) // max total(idle + active) clients per key
298298
.maxTotal(1000) // max total clients for all keys
299299
.maxBlockWaitDuration(Duration.ofSeconds(5L)) // getClient() will wait 5 seconds if no idle client available
300300
.minEvictableIdleDuration(Duration.ofSeconds(10L)) // if number of idle clients is larger than maxIdlePerKey, redundant idle clients will be evicted after 10 seconds
@@ -340,7 +340,7 @@ public static void main(String[] args) throws InterruptedException {
340340

341341
long end = System.currentTimeMillis();
342342
System.out.printf("%d insert requests and %d search requests finished in %.3f seconds%n",
343-
threadCount * repeatRequests * 3, threadCount * repeatRequests * 3, (end - start) * 0.001);
343+
threadCount * repeatRequests * dbNames.size(), threadCount * repeatRequests * dbNames.size(), (end - start) * 0.001);
344344

345345
printClientNumber(pool);
346346
pool.clear(); // clear idle clients

0 commit comments

Comments
 (0)