Skip to content

Commit f5388aa

Browse files
committed
Optimize client pool
Signed-off-by: yhmo <yihua.mo@zilliz.com>
1 parent eb10051 commit f5388aa

File tree

8 files changed

+826
-68
lines changed

8 files changed

+826
-68
lines changed
Lines changed: 253 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,253 @@
1+
package io.milvus.v2;
2+
3+
import com.google.gson.Gson;
4+
import com.google.gson.JsonObject;
5+
import io.milvus.pool.MilvusClientV2Pool;
6+
import io.milvus.pool.PoolConfig;
7+
import io.milvus.v1.CommonUtils;
8+
import io.milvus.v2.client.ConnectConfig;
9+
import io.milvus.v2.client.MilvusClientV2;
10+
import io.milvus.v2.common.ConsistencyLevel;
11+
import io.milvus.v2.common.DataType;
12+
import io.milvus.v2.common.IndexParam;
13+
import io.milvus.v2.service.collection.request.AddFieldReq;
14+
import io.milvus.v2.service.collection.request.CreateCollectionReq;
15+
import io.milvus.v2.service.collection.request.DropCollectionReq;
16+
import io.milvus.v2.service.collection.request.HasCollectionReq;
17+
import io.milvus.v2.service.vector.request.InsertReq;
18+
import io.milvus.v2.service.vector.request.QueryReq;
19+
import io.milvus.v2.service.vector.request.SearchReq;
20+
import io.milvus.v2.service.vector.request.data.FloatVec;
21+
import io.milvus.v2.service.vector.response.InsertResp;
22+
import io.milvus.v2.service.vector.response.QueryResp;
23+
import io.milvus.v2.service.vector.response.SearchResp;
24+
25+
import java.time.Duration;
26+
import java.util.ArrayList;
27+
import java.util.Collections;
28+
import java.util.List;
29+
import java.util.concurrent.ExecutorService;
30+
import java.util.concurrent.Executors;
31+
import java.util.concurrent.TimeUnit;
32+
33+
public class ClientPoolDemo {
34+
private static final String ServerUri = "http://localhost:19530";
35+
private static final String CollectionName = "java_sdk_example_pool_demo";
36+
private static final String IDFieldName = "id";
37+
private static final String VectorFieldName = "vector";
38+
private static final String TextFieldName = "text";
39+
private static final int DIM = 128;
40+
private static final String DemoKey = "for_demo";
41+
42+
private static final MilvusClientV2Pool pool;
43+
44+
static {
45+
ConnectConfig defaultConnectConfig = ConnectConfig.builder()
46+
.uri(ServerUri)
47+
.build();
48+
// read this issue for more details about the pool configurations:
49+
// https://github.com/milvus-io/milvus-sdk-java/issues/1577
50+
PoolConfig poolConfig = PoolConfig.builder()
51+
.minIdlePerKey(1)
52+
.maxIdlePerKey(1)
53+
.maxTotalPerKey(5)
54+
.maxTotal(1000)
55+
.maxBlockWaitDuration(Duration.ofSeconds(5L)) // getClient() will wait 5 seconds if no idle client available
56+
.build();
57+
try {
58+
pool = new MilvusClientV2Pool(poolConfig, defaultConnectConfig);
59+
} catch (ClassNotFoundException | NoSuchMethodException e) {
60+
throw new RuntimeException(e);
61+
}
62+
}
63+
64+
private static void createCollection(boolean recreate) {
65+
System.out.println("========== createCollection() ==========");
66+
MilvusClientV2 client = null;
67+
try {
68+
client = pool.getClient(DemoKey);
69+
if (client == null) {
70+
System.out.println("Cannot not get client from key:" + DemoKey);
71+
return;
72+
}
73+
74+
if (recreate) {
75+
client.dropCollection(DropCollectionReq.builder()
76+
.collectionName(CollectionName)
77+
.build());
78+
} else if (client.hasCollection(HasCollectionReq.builder()
79+
.collectionName(CollectionName)
80+
.build())) {
81+
return;
82+
}
83+
84+
CreateCollectionReq.CollectionSchema collectionSchema = CreateCollectionReq.CollectionSchema.builder()
85+
.build();
86+
collectionSchema.addField(AddFieldReq.builder()
87+
.fieldName("id")
88+
.dataType(DataType.Int64)
89+
.isPrimaryKey(true)
90+
.autoID(true)
91+
.build());
92+
collectionSchema.addField(AddFieldReq.builder()
93+
.fieldName(VectorFieldName)
94+
.dataType(DataType.FloatVector)
95+
.dimension(DIM)
96+
.build());
97+
collectionSchema.addField(AddFieldReq.builder()
98+
.fieldName(TextFieldName)
99+
.dataType(DataType.VarChar)
100+
.maxLength(1024)
101+
.build());
102+
103+
List<IndexParam> indexes = new ArrayList<>();
104+
indexes.add(IndexParam.builder()
105+
.fieldName(VectorFieldName)
106+
.indexType(IndexParam.IndexType.AUTOINDEX)
107+
.metricType(IndexParam.MetricType.COSINE)
108+
.build());
109+
110+
CreateCollectionReq requestCreate = CreateCollectionReq.builder()
111+
.collectionName(CollectionName)
112+
.collectionSchema(collectionSchema)
113+
.indexParams(indexes)
114+
.consistencyLevel(ConsistencyLevel.BOUNDED)
115+
.build();
116+
client.createCollection(requestCreate);
117+
118+
long rowCount = 10000;
119+
insertData(rowCount);
120+
} finally {
121+
pool.returnClient(DemoKey, client);
122+
}
123+
}
124+
125+
private static void insertData(long rowCount) {
126+
System.out.println("========== insertData() ==========");
127+
MilvusClientV2 client = null;
128+
try {
129+
client = pool.getClient(DemoKey);
130+
if (client == null) {
131+
System.out.println("Cannot not get client from key:" + DemoKey);
132+
return;
133+
}
134+
135+
Gson gson = new Gson();
136+
long inserted = 0L;
137+
while (inserted < rowCount) {
138+
long batch = 1000L;
139+
if (rowCount - inserted < batch) {
140+
batch = rowCount - inserted;
141+
}
142+
List<JsonObject> rows = new ArrayList<>();
143+
for (long i = 0; i < batch; i++) {
144+
JsonObject row = new JsonObject();
145+
row.add(VectorFieldName, gson.toJsonTree(CommonUtils.generateFloatVector(DIM)));
146+
row.addProperty(TextFieldName, "text_" + i);
147+
rows.add(row);
148+
}
149+
InsertResp resp = client.insert(InsertReq.builder()
150+
.collectionName(CollectionName)
151+
.data(rows)
152+
.build());
153+
inserted += resp.getInsertCnt();
154+
System.out.println("Inserted count:" + resp.getInsertCnt());
155+
}
156+
157+
QueryResp countR = client.query(QueryReq.builder()
158+
.collectionName(CollectionName)
159+
.outputFields(Collections.singletonList("count(*)"))
160+
.consistencyLevel(ConsistencyLevel.STRONG)
161+
.build());
162+
System.out.printf("%d rows persisted\n", (long) countR.getQueryResults().get(0).getEntity().get("count(*)"));
163+
} finally {
164+
pool.returnClient(DemoKey, client);
165+
}
166+
}
167+
168+
private static void search() {
169+
MilvusClientV2 client = null;
170+
try {
171+
client = pool.getClient(DemoKey);
172+
while (client == null) {
173+
try {
174+
// getClient() might exceeds the borrowMaxWaitMillis and throw exception if the pool is full
175+
// retry to call until it return a client
176+
client = pool.getClient(DemoKey);
177+
} catch (Exception e) {
178+
System.out.printf("Failed to get client, will retry, error: %s%n", e.getMessage());
179+
}
180+
}
181+
182+
// long start = System.currentTimeMillis();
183+
FloatVec vector = new FloatVec(CommonUtils.generateFloatVector(DIM));
184+
SearchResp resp = client.search(SearchReq.builder()
185+
.collectionName(CollectionName)
186+
.limit(10)
187+
.data(Collections.singletonList(vector))
188+
.annsField(VectorFieldName)
189+
.outputFields(Collections.singletonList(TextFieldName))
190+
.build());
191+
// System.out.printf("search time cost: %dms%n", System.currentTimeMillis() - start);
192+
} finally {
193+
pool.returnClient(DemoKey, client);
194+
}
195+
}
196+
197+
private static void printPoolState() {
198+
System.out.println("========== printPoolState() ==========");
199+
System.out.printf("%d idle clients and %d active clients%n",
200+
pool.getIdleClientNumber(DemoKey), pool.getActiveClientNumber(DemoKey));
201+
System.out.printf("QPS: %.2f%n", pool.fetchClientPerSecond(DemoKey));
202+
}
203+
204+
private static void concurrentSearch(int threadCount, int requestCount) {
205+
System.out.println("\n======================================================================");
206+
System.out.println("======================= ConcurrentSearch =============================");
207+
System.out.println("======================================================================");
208+
209+
class Worker implements Runnable {
210+
@Override
211+
public void run() {
212+
search();
213+
}
214+
}
215+
216+
try {
217+
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
218+
for (int i = 0; i < requestCount; i++) {
219+
Runnable worker = new Worker();
220+
executor.execute(worker);
221+
}
222+
executor.shutdown();
223+
224+
// with requests start, more active clients will be created
225+
boolean done = false;
226+
while (!done) {
227+
printPoolState();
228+
done = executor.awaitTermination(1, TimeUnit.SECONDS);
229+
}
230+
231+
// after all requests are done, the active clients will be retired and eventually only one idle client left
232+
while (pool.getActiveClientNumber(DemoKey) > 1) {
233+
TimeUnit.SECONDS.sleep(1);
234+
printPoolState();
235+
}
236+
} catch (Exception e) {
237+
System.err.println("Failed to create executor: " + e);
238+
}
239+
}
240+
241+
public static void main(String[] args) throws InterruptedException {
242+
createCollection(true);
243+
244+
int threadCount = 50;
245+
int requestCount = 100000;
246+
concurrentSearch(threadCount, requestCount);
247+
248+
// do again
249+
concurrentSearch(threadCount, requestCount);
250+
251+
pool.close();
252+
}
253+
}

examples/src/main/java/io/milvus/v2/ClientPoolExample.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
import java.util.List;
4848

4949
public class ClientPoolExample {
50-
public static String serverUri = "http://localhost:19530";
50+
public static String ServerUri = "http://localhost:19530";
5151
public static String CollectionName = "java_sdk_example_pool_v2";
5252
public static String VectorFieldName = "vector";
5353
public static int DIM = 128;
@@ -95,7 +95,7 @@ public static void createDatabases(MilvusClientV2Pool pool) {
9595
// the ClientPool will use different config to create client to connect to specific database
9696
for (String dbName : dbNames) {
9797
ConnectConfig config = ConnectConfig.builder()
98-
.uri(serverUri)
98+
.uri(ServerUri)
9999
.dbName(dbName)
100100
.build();
101101
pool.configForKey(dbName, config);
@@ -288,13 +288,13 @@ public static void dropDatabases(MilvusClientV2Pool pool) {
288288

289289
public static void main(String[] args) throws InterruptedException {
290290
ConnectConfig defaultConfig = ConnectConfig.builder()
291-
.uri(serverUri)
291+
.uri(ServerUri)
292292
.build();
293293
// read this issue for more details about the pool configurations:
294294
// https://github.com/milvus-io/milvus-sdk-java/issues/1577
295295
PoolConfig poolConfig = PoolConfig.builder()
296-
.maxIdlePerKey(10) // max idle clients per key
297-
.maxTotalPerKey(50) // max total(idle + active) clients per key
296+
.maxIdlePerKey(1) // max idle clients per key
297+
.maxTotalPerKey(5) // max total(idle + active) clients per key
298298
.maxTotal(1000) // max total clients for all keys
299299
.maxBlockWaitDuration(Duration.ofSeconds(5L)) // getClient() will wait 5 seconds if no idle client available
300300
.minEvictableIdleDuration(Duration.ofSeconds(10L)) // if number of idle clients is larger than maxIdlePerKey, redundant idle clients will be evicted after 10 seconds
@@ -340,7 +340,7 @@ public static void main(String[] args) throws InterruptedException {
340340

341341
long end = System.currentTimeMillis();
342342
System.out.printf("%d insert requests and %d search requests finished in %.3f seconds%n",
343-
threadCount * repeatRequests * 3, threadCount * repeatRequests * 3, (end - start) * 0.001);
343+
threadCount * repeatRequests * dbNames.size(), threadCount * repeatRequests * dbNames.size(), (end - start) * 0.001);
344344

345345
printClientNumber(pool);
346346
pool.clear(); // clear idle clients

0 commit comments

Comments
 (0)