Skip to content

Commit 6ad1e9f

Browse files
committed
Optimize client pool
Signed-off-by: yhmo <yihua.mo@zilliz.com>
1 parent eb10051 commit 6ad1e9f

8 files changed

Lines changed: 833 additions & 68 deletions

File tree

Lines changed: 260 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,260 @@
1+
package io.milvus.v2;
2+
3+
import com.google.gson.Gson;
4+
import com.google.gson.JsonObject;
5+
import io.milvus.pool.MilvusClientV2Pool;
6+
import io.milvus.pool.PoolConfig;
7+
import io.milvus.v1.CommonUtils;
8+
import io.milvus.v2.client.ConnectConfig;
9+
import io.milvus.v2.client.MilvusClientV2;
10+
import io.milvus.v2.common.ConsistencyLevel;
11+
import io.milvus.v2.common.DataType;
12+
import io.milvus.v2.common.IndexParam;
13+
import io.milvus.v2.service.collection.request.AddFieldReq;
14+
import io.milvus.v2.service.collection.request.CreateCollectionReq;
15+
import io.milvus.v2.service.collection.request.DropCollectionReq;
16+
import io.milvus.v2.service.collection.request.HasCollectionReq;
17+
import io.milvus.v2.service.vector.request.InsertReq;
18+
import io.milvus.v2.service.vector.request.QueryReq;
19+
import io.milvus.v2.service.vector.request.SearchReq;
20+
import io.milvus.v2.service.vector.request.data.FloatVec;
21+
import io.milvus.v2.service.vector.response.InsertResp;
22+
import io.milvus.v2.service.vector.response.QueryResp;
23+
import io.milvus.v2.service.vector.response.SearchResp;
24+
25+
import java.time.Duration;
26+
import java.util.ArrayList;
27+
import java.util.Collections;
28+
import java.util.List;
29+
import java.util.concurrent.ExecutorService;
30+
import java.util.concurrent.Executors;
31+
import java.util.concurrent.TimeUnit;
32+
33+
public class ClientPoolDemo {
34+
private static final String ServerUri = "http://localhost:19530";
35+
private static final String CollectionName = "java_sdk_example_pool_demo";
36+
private static final String IDFieldName = "id";
37+
private static final String VectorFieldName = "vector";
38+
private static final String TextFieldName = "text";
39+
private static final int DIM = 128;
40+
private static final String DemoKey = "for_demo";
41+
42+
private static final MilvusClientV2Pool pool;
43+
44+
static {
45+
ConnectConfig defaultConnectConfig = ConnectConfig.builder()
46+
.uri(ServerUri)
47+
.build();
48+
// read this issue for more details about the pool configurations:
49+
// https://github.com/milvus-io/milvus-sdk-java/issues/1577
50+
PoolConfig poolConfig = PoolConfig.builder()
51+
.minIdlePerKey(1)
52+
.maxIdlePerKey(1)
53+
.maxTotalPerKey(5)
54+
.maxTotal(1000)
55+
.maxBlockWaitDuration(Duration.ofSeconds(5L)) // getClient() will wait 5 seconds if no idle client available
56+
.build();
57+
try {
58+
pool = new MilvusClientV2Pool(poolConfig, defaultConnectConfig);
59+
// prepare the pool to pre-create some clients according to the minIdlePerKey
60+
// it is like a warmup to reduce the first time cost to call the getClient()
61+
pool.preparePool(DemoKey);
62+
} catch (ClassNotFoundException | NoSuchMethodException e) {
63+
throw new RuntimeException(e);
64+
}
65+
}
66+
67+
private static void createCollection(boolean recreate) {
68+
System.out.println("========== createCollection() ==========");
69+
MilvusClientV2 client = null;
70+
try {
71+
client = pool.getClient(DemoKey);
72+
if (client == null) {
73+
System.out.println("Cannot not get client from key:" + DemoKey);
74+
return;
75+
}
76+
77+
if (recreate) {
78+
client.dropCollection(DropCollectionReq.builder()
79+
.collectionName(CollectionName)
80+
.build());
81+
} else if (client.hasCollection(HasCollectionReq.builder()
82+
.collectionName(CollectionName)
83+
.build())) {
84+
return;
85+
}
86+
87+
CreateCollectionReq.CollectionSchema collectionSchema = CreateCollectionReq.CollectionSchema.builder()
88+
.build();
89+
collectionSchema.addField(AddFieldReq.builder()
90+
.fieldName("id")
91+
.dataType(DataType.Int64)
92+
.isPrimaryKey(true)
93+
.autoID(true)
94+
.build());
95+
collectionSchema.addField(AddFieldReq.builder()
96+
.fieldName(VectorFieldName)
97+
.dataType(DataType.FloatVector)
98+
.dimension(DIM)
99+
.build());
100+
collectionSchema.addField(AddFieldReq.builder()
101+
.fieldName(TextFieldName)
102+
.dataType(DataType.VarChar)
103+
.maxLength(1024)
104+
.build());
105+
106+
List<IndexParam> indexes = new ArrayList<>();
107+
indexes.add(IndexParam.builder()
108+
.fieldName(VectorFieldName)
109+
.indexType(IndexParam.IndexType.AUTOINDEX)
110+
.metricType(IndexParam.MetricType.COSINE)
111+
.build());
112+
113+
CreateCollectionReq requestCreate = CreateCollectionReq.builder()
114+
.collectionName(CollectionName)
115+
.collectionSchema(collectionSchema)
116+
.indexParams(indexes)
117+
.consistencyLevel(ConsistencyLevel.BOUNDED)
118+
.build();
119+
client.createCollection(requestCreate);
120+
121+
long rowCount = 10000;
122+
insertData(rowCount);
123+
} finally {
124+
pool.returnClient(DemoKey, client);
125+
}
126+
}
127+
128+
private static void insertData(long rowCount) {
129+
System.out.println("========== insertData() ==========");
130+
MilvusClientV2 client = null;
131+
try {
132+
client = pool.getClient(DemoKey);
133+
if (client == null) {
134+
System.out.println("Cannot not get client from key:" + DemoKey);
135+
return;
136+
}
137+
138+
Gson gson = new Gson();
139+
long inserted = 0L;
140+
while (inserted < rowCount) {
141+
long batch = 1000L;
142+
if (rowCount - inserted < batch) {
143+
batch = rowCount - inserted;
144+
}
145+
List<JsonObject> rows = new ArrayList<>();
146+
for (long i = 0; i < batch; i++) {
147+
JsonObject row = new JsonObject();
148+
row.add(VectorFieldName, gson.toJsonTree(CommonUtils.generateFloatVector(DIM)));
149+
row.addProperty(TextFieldName, "text_" + i);
150+
rows.add(row);
151+
}
152+
InsertResp resp = client.insert(InsertReq.builder()
153+
.collectionName(CollectionName)
154+
.data(rows)
155+
.build());
156+
inserted += resp.getInsertCnt();
157+
System.out.println("Inserted count:" + resp.getInsertCnt());
158+
}
159+
160+
QueryResp countR = client.query(QueryReq.builder()
161+
.collectionName(CollectionName)
162+
.outputFields(Collections.singletonList("count(*)"))
163+
.consistencyLevel(ConsistencyLevel.STRONG)
164+
.build());
165+
System.out.printf("%d rows persisted\n", (long) countR.getQueryResults().get(0).getEntity().get("count(*)"));
166+
} finally {
167+
pool.returnClient(DemoKey, client);
168+
}
169+
}
170+
171+
private static void search() {
172+
MilvusClientV2 client = null;
173+
try {
174+
client = pool.getClient(DemoKey);
175+
while (client == null) {
176+
try {
177+
// getClient() might exceeds the borrowMaxWaitMillis and throw exception if the pool is full
178+
// retry to call until it return a client
179+
client = pool.getClient(DemoKey);
180+
} catch (Exception e) {
181+
System.out.printf("Failed to get client, will retry, error: %s%n", e.getMessage());
182+
}
183+
}
184+
185+
// long start = System.currentTimeMillis();
186+
FloatVec vector = new FloatVec(CommonUtils.generateFloatVector(DIM));
187+
SearchResp resp = client.search(SearchReq.builder()
188+
.collectionName(CollectionName)
189+
.limit(10)
190+
.data(Collections.singletonList(vector))
191+
.annsField(VectorFieldName)
192+
.outputFields(Collections.singletonList(TextFieldName))
193+
.build());
194+
// System.out.printf("search time cost: %dms%n", System.currentTimeMillis() - start);
195+
} finally {
196+
pool.returnClient(DemoKey, client);
197+
}
198+
}
199+
200+
private static void printPoolState() {
201+
System.out.println("========== printPoolState() ==========");
202+
System.out.printf("%d idle clients and %d active clients%n",
203+
pool.getIdleClientNumber(DemoKey), pool.getActiveClientNumber(DemoKey));
204+
System.out.printf("QPS: %.2f%n", pool.fetchClientPerSecond(DemoKey));
205+
}
206+
207+
private static void concurrentSearch(int threadCount, int requestCount) {
208+
System.out.println("\n======================================================================");
209+
System.out.println("======================= ConcurrentSearch =============================");
210+
System.out.println("======================================================================");
211+
212+
class Worker implements Runnable {
213+
@Override
214+
public void run() {
215+
search();
216+
}
217+
}
218+
219+
try {
220+
long start = System.currentTimeMillis();
221+
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
222+
for (int i = 0; i < requestCount; i++) {
223+
Runnable worker = new Worker();
224+
executor.execute(worker);
225+
}
226+
executor.shutdown();
227+
228+
// with requests start, more active clients will be created
229+
boolean done = false;
230+
while (!done) {
231+
printPoolState();
232+
done = executor.awaitTermination(1, TimeUnit.SECONDS);
233+
}
234+
235+
long end = System.currentTimeMillis();
236+
System.out.printf("%n%d requests done in %.1f seconds%n%n", requestCount, (float) (end - start) / 1000);
237+
238+
// after all requests are done, the active clients will be retired and eventually only one idle client left
239+
while (pool.getActiveClientNumber(DemoKey) > 1) {
240+
TimeUnit.SECONDS.sleep(1);
241+
printPoolState();
242+
}
243+
} catch (Exception e) {
244+
System.err.println("Failed to create executor: " + e);
245+
}
246+
}
247+
248+
public static void main(String[] args) throws InterruptedException {
249+
createCollection(true);
250+
251+
int threadCount = 50;
252+
int requestCount = 100000;
253+
concurrentSearch(threadCount, requestCount);
254+
255+
// do again
256+
concurrentSearch(threadCount, requestCount);
257+
258+
pool.close();
259+
}
260+
}

examples/src/main/java/io/milvus/v2/ClientPoolExample.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
import java.util.List;
4848

4949
public class ClientPoolExample {
50-
public static String serverUri = "http://localhost:19530";
50+
public static String ServerUri = "http://localhost:19530";
5151
public static String CollectionName = "java_sdk_example_pool_v2";
5252
public static String VectorFieldName = "vector";
5353
public static int DIM = 128;
@@ -95,7 +95,7 @@ public static void createDatabases(MilvusClientV2Pool pool) {
9595
// the ClientPool will use different config to create client to connect to specific database
9696
for (String dbName : dbNames) {
9797
ConnectConfig config = ConnectConfig.builder()
98-
.uri(serverUri)
98+
.uri(ServerUri)
9999
.dbName(dbName)
100100
.build();
101101
pool.configForKey(dbName, config);
@@ -288,13 +288,13 @@ public static void dropDatabases(MilvusClientV2Pool pool) {
288288

289289
public static void main(String[] args) throws InterruptedException {
290290
ConnectConfig defaultConfig = ConnectConfig.builder()
291-
.uri(serverUri)
291+
.uri(ServerUri)
292292
.build();
293293
// read this issue for more details about the pool configurations:
294294
// https://github.com/milvus-io/milvus-sdk-java/issues/1577
295295
PoolConfig poolConfig = PoolConfig.builder()
296-
.maxIdlePerKey(10) // max idle clients per key
297-
.maxTotalPerKey(50) // max total(idle + active) clients per key
296+
.maxIdlePerKey(1) // max idle clients per key
297+
.maxTotalPerKey(5) // max total(idle + active) clients per key
298298
.maxTotal(1000) // max total clients for all keys
299299
.maxBlockWaitDuration(Duration.ofSeconds(5L)) // getClient() will wait 5 seconds if no idle client available
300300
.minEvictableIdleDuration(Duration.ofSeconds(10L)) // if number of idle clients is larger than maxIdlePerKey, redundant idle clients will be evicted after 10 seconds
@@ -340,7 +340,7 @@ public static void main(String[] args) throws InterruptedException {
340340

341341
long end = System.currentTimeMillis();
342342
System.out.printf("%d insert requests and %d search requests finished in %.3f seconds%n",
343-
threadCount * repeatRequests * 3, threadCount * repeatRequests * 3, (end - start) * 0.001);
343+
threadCount * repeatRequests * dbNames.size(), threadCount * repeatRequests * dbNames.size(), (end - start) * 0.001);
344344

345345
printClientNumber(pool);
346346
pool.clear(); // clear idle clients

0 commit comments

Comments
 (0)