Skip to content

Commit 7a3f7fd

Browse files
authored
Fix a bug that rpcDeadline incorrectly work for iterator (milvus-io#1720)
Signed-off-by: yhmo <yihua.mo@zilliz.com>
1 parent bd6a476 commit 7a3f7fd

10 files changed

Lines changed: 496 additions & 310 deletions

File tree

examples/src/main/java/io/milvus/v1/IteratorExample.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,22 +24,16 @@
2424
import io.milvus.client.MilvusServiceClient;
2525
import io.milvus.common.clientenum.ConsistencyLevelEnum;
2626
import io.milvus.grpc.DataType;
27-
import io.milvus.grpc.FlushResponse;
2827
import io.milvus.grpc.GetCollectionStatisticsResponse;
2928
import io.milvus.grpc.MutationResult;
30-
import io.milvus.param.ConnectParam;
31-
import io.milvus.param.IndexType;
32-
import io.milvus.param.MetricType;
33-
import io.milvus.param.R;
34-
import io.milvus.param.RetryParam;
35-
import io.milvus.param.RpcStatus;
29+
import io.milvus.orm.iterator.QueryIterator;
30+
import io.milvus.orm.iterator.SearchIterator;
31+
import io.milvus.param.*;
3632
import io.milvus.param.collection.*;
3733
import io.milvus.param.dml.InsertParam;
3834
import io.milvus.param.dml.QueryIteratorParam;
3935
import io.milvus.param.dml.SearchIteratorParam;
4036
import io.milvus.param.index.CreateIndexParam;
41-
import io.milvus.orm.iterator.QueryIterator;
42-
import io.milvus.orm.iterator.SearchIterator;
4337
import io.milvus.response.GetCollStatResponseWrapper;
4438
import io.milvus.response.QueryResultsWrapper;
4539

@@ -156,7 +150,7 @@ private void insertColumns() {
156150
List<Long> ages = new ArrayList<>();
157151
List<Long> ids = new ArrayList<>();
158152
for (long i = 0L; i < NUM_ENTITIES; ++i) {
159-
ages.add((long) batch * NUM_ENTITIES + i);
153+
ages.add(((long) batch * NUM_ENTITIES + i) % 100);
160154
ids.add((long) batch * NUM_ENTITIES + i);
161155
}
162156

@@ -205,20 +199,20 @@ private void prepareData() {
205199
}
206200

207201
private void queryIterateCollectionNoOffset() {
208-
String expr = String.format("10 <= %s <= 100", AGE_FIELD);
202+
String expr = String.format("10 <= %s <= 30", AGE_FIELD);
209203

210-
QueryIterator queryIterator = getQueryIterator(expr, 0L, 5L, null);
204+
QueryIterator queryIterator = getQueryIterator(expr, 0L, 1L, null);
211205
iterateQueryResult(queryIterator);
212206
}
213207

214208
private void queryIterateCollectionWithOffset() {
215-
String expr = String.format("10 <= %s <= 100", AGE_FIELD);
209+
String expr = String.format("10 <= %s <= 100", ID_FIELD);
216210
QueryIterator queryIterator = getQueryIterator(expr, 10L, 50L, null);
217211
iterateQueryResult(queryIterator);
218212
}
219213

220214
private void queryIterateCollectionWithLimit() {
221-
String expr = String.format("10 <= %s <= 100", AGE_FIELD);
215+
String expr = String.format("10 <= %s <= 100", ID_FIELD);
222216
QueryIterator queryIterator = getQueryIterator(expr, null, 80L, 530L);
223217
iterateQueryResult(queryIterator);
224218
}
@@ -238,6 +232,7 @@ private void searchIteratorCollectionWithLimit() {
238232
}
239233

240234
private void iterateQueryResult(QueryIterator queryIterator) {
235+
System.out.println("\n========== queryIterator() ==========");
241236
int pageIdx = 0;
242237
int iterateCount = 0;
243238
while (true) {
@@ -258,6 +253,7 @@ private void iterateQueryResult(QueryIterator queryIterator) {
258253
}
259254

260255
private void iterateSearchResult(SearchIterator searchIterator) {
256+
System.out.println("\n========== searchIterator() ==========");
261257
int pageIdx = 0;
262258
int iterateCount = 0;
263259
while (true) {
@@ -327,6 +323,10 @@ public static void main(String[] args) {
327323
example.prepareData();
328324
}
329325

326+
// set rpcTimeoutMs, just to verify it works for each call of query/search inside the iterator
327+
// in versions older than 2.5.16/2.6.11, iterator.next() will timeout after several calls if the rpcTimeoutMs is greater than 0
328+
milvusClient.withTimeout(500, TimeUnit.MILLISECONDS);
329+
330330
example.queryIterateCollectionNoOffset();
331331
example.queryIterateCollectionWithOffset();
332332
example.queryIterateCollectionWithLimit();

examples/src/main/java/io/milvus/v2/IteratorExample.java

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,15 +43,18 @@
4343
import org.apache.commons.lang3.StringUtils;
4444

4545
import java.util.*;
46+
import java.util.concurrent.TimeUnit;
4647
import java.util.function.Function;
4748

4849
public class IteratorExample {
4950
private static final MilvusClientV2 client;
51+
5052
static {
5153
client = new MilvusClientV2(ConnectConfig.builder()
5254
.uri("http://localhost:19530")
5355
.build());
5456
}
57+
5558
private static final String COLLECTION_NAME = "java_sdk_example_iterator_v2";
5659
private static final String ID_FIELD = "userID";
5760
private static final String AGE_FIELD = "userAge";
@@ -200,15 +203,15 @@ private static void searchIteratorV2(String filter, Map<String, Object> params,
200203
Function<List<SearchResp.SearchResult>, List<SearchResp.SearchResult>> externalFilterFunc) {
201204
System.out.println("\n========== searchIteratorV2() ==========");
202205
System.out.println(String.format("expr='%s', params='%s', batchSize=%d, topK=%d",
203-
filter, params==null ? "" : params.toString(), batchSize, topK));
206+
filter, params == null ? "" : params.toString(), batchSize, topK));
204207
SearchIteratorV2 searchIterator = client.searchIteratorV2(SearchIteratorReqV2.builder()
205208
.collectionName(COLLECTION_NAME)
206209
.outputFields(Lists.newArrayList(AGE_FIELD))
207210
.batchSize(batchSize)
208211
.vectorFieldName(VECTOR_FIELD)
209212
.vectors(Collections.singletonList(new FloatVec(CommonUtils.generateFloatVector(VECTOR_DIM))))
210213
.filter(filter)
211-
.searchParams(params==null ? new HashMap<>() : params)
214+
.searchParams(params == null ? new HashMap<>() : params)
212215
.limit(topK)
213216
.metricType(IndexParam.MetricType.L2)
214217
.consistencyLevel(ConsistencyLevel.BOUNDED)
@@ -235,21 +238,26 @@ private static void searchIteratorV2(String filter, Map<String, Object> params,
235238

236239
public static void main(String[] args) {
237240
buildCollection();
238-
queryIterator("userID < 300",50, 5,400);
241+
242+
// set rpcTimeoutMs, just to verify it works for each call of query/search inside the iterator
243+
// in versions older than 2.5.16/2.6.11, iterator.next() will timeout after several calls if the rpcTimeoutMs is greater than 0
244+
client.withTimeout(500, TimeUnit.MILLISECONDS);
245+
246+
queryIterator("userID < 3000", 1, 5, 10000);
239247
searchIteratorV1("userAge > 50 &&userAge < 100", "{\"range_filter\": 15.0, \"radius\": 20.0}", 100, 500);
240-
searchIteratorV1("", "", 10, 99);
248+
searchIteratorV1("", "", 1, 3000);
241249
searchIteratorV2("userAge > 10 &&userAge < 20", null, 50, 120, null);
242250

243-
Map<String,Object> extraParams = new HashMap<>();
244-
extraParams.put("radius",15.0);
251+
Map<String, Object> extraParams = new HashMap<>();
252+
extraParams.put("radius", 15.0);
245253
searchIteratorV2("", extraParams, 50, 100, null);
246254

247255
// use external function to filter the result
248-
Function<List<SearchResp.SearchResult>, List<SearchResp.SearchResult>> externalFilterFunc = (List<SearchResp.SearchResult> src)->{
256+
Function<List<SearchResp.SearchResult>, List<SearchResp.SearchResult>> externalFilterFunc = (List<SearchResp.SearchResult> src) -> {
249257
List<SearchResp.SearchResult> newRes = new ArrayList<>();
250258
for (SearchResp.SearchResult res : src) {
251-
long id = (long)res.getId();
252-
if (id%2 == 0) {
259+
long id = (long) res.getId();
260+
if (id % 2 == 0) {
253261
newRes.add(res);
254262
}
255263
}

sdk-core/src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java

Lines changed: 42 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,36 @@
2020
package io.milvus.client;
2121

2222
import com.google.common.collect.Lists;
23-
import com.google.common.util.concurrent.*;
23+
import com.google.common.util.concurrent.FutureCallback;
24+
import com.google.common.util.concurrent.Futures;
25+
import com.google.common.util.concurrent.ListenableFuture;
26+
import com.google.common.util.concurrent.MoreExecutors;
2427
import com.google.gson.reflect.TypeToken;
2528
import io.grpc.StatusRuntimeException;
2629
import io.milvus.common.utils.GTsDict;
2730
import io.milvus.common.utils.JsonUtils;
2831
import io.milvus.common.utils.VectorUtils;
29-
import io.milvus.exception.*;
32+
import io.milvus.exception.ClientNotConnectedException;
33+
import io.milvus.exception.IllegalResponseException;
34+
import io.milvus.exception.ServerException;
3035
import io.milvus.grpc.*;
3136
import io.milvus.orm.iterator.QueryIterator;
37+
import io.milvus.orm.iterator.RpcStubWrapper;
3238
import io.milvus.orm.iterator.SearchIterator;
3339
import io.milvus.param.*;
34-
import io.milvus.param.alias.*;
35-
import io.milvus.param.bulkinsert.*;
40+
import io.milvus.param.alias.AlterAliasParam;
41+
import io.milvus.param.alias.CreateAliasParam;
42+
import io.milvus.param.alias.DropAliasParam;
43+
import io.milvus.param.alias.ListAliasesParam;
44+
import io.milvus.param.bulkinsert.BulkInsertParam;
45+
import io.milvus.param.bulkinsert.GetBulkInsertStateParam;
46+
import io.milvus.param.bulkinsert.ListBulkInsertTasksParam;
3647
import io.milvus.param.collection.*;
3748
import io.milvus.param.control.*;
38-
import io.milvus.param.credential.*;
49+
import io.milvus.param.credential.CreateCredentialParam;
50+
import io.milvus.param.credential.DeleteCredentialParam;
51+
import io.milvus.param.credential.ListCredUsersParam;
52+
import io.milvus.param.credential.UpdateCredentialParam;
3953
import io.milvus.param.dml.*;
4054
import io.milvus.param.highlevel.collection.CreateSimpleCollectionParam;
4155
import io.milvus.param.highlevel.collection.ListCollectionsParam;
@@ -413,7 +427,7 @@ private void handleResponse(String requestInfo, io.milvus.grpc.Status status) {
413427
logDebug("{} successfully!", requestInfo);
414428
}
415429

416-
///////////////////// API implementation //////////////////////
430+
/// ////////////////// API implementation //////////////////////
417431
@Override
418432
public R<Boolean> hasCollection(@NonNull HasCollectionParam requestParam) {
419433
if (!clientIsReady()) {
@@ -541,9 +555,9 @@ public R<RpcStatus> alterDatabase(AlterDatabaseParam requestParam) {
541555
try {
542556
List<KeyValuePair> propertiesList = ParamUtils.AssembleKvPair(requestParam.getProperties());
543557
AlterDatabaseRequest alterDatabaseRequest = AlterDatabaseRequest.newBuilder()
544-
.setDbName(requestParam.getDatabaseName())
545-
.addAllProperties(propertiesList)
546-
.build();
558+
.setDbName(requestParam.getDatabaseName())
559+
.addAllProperties(propertiesList)
560+
.build();
547561

548562
Status response = blockingStub().alterDatabase(alterDatabaseRequest);
549563
handleResponse(title, response);
@@ -567,8 +581,8 @@ public R<DescribeDatabaseResponse> describeDatabase(DescribeDatabaseParam reques
567581
String title = String.format("DescribeDatabaseRequest databaseName:%s", requestParam.getDatabaseName());
568582
try {
569583
DescribeDatabaseRequest describeDatabaseRequest = DescribeDatabaseRequest.newBuilder()
570-
.setDbName(requestParam.getDatabaseName())
571-
.build();
584+
.setDbName(requestParam.getDatabaseName())
585+
.build();
572586

573587
DescribeDatabaseResponse response = blockingStub().describeDatabase(describeDatabaseRequest);
574588
handleResponse(title, response.getStatus());
@@ -1351,7 +1365,8 @@ public R<RpcStatus> createIndex(@NonNull CreateIndexParam requestParam) {
13511365
Map<String, String> extraParams = requestParam.getExtraParam();
13521366
for (Map.Entry<String, String> entry : extraParams.entrySet()) {
13531367
if (entry.getKey().equals(Constant.PARAMS)) {
1354-
Map<String, String> tempParams = JsonUtils.fromJson(entry.getValue(), new TypeToken<Map<String, String>>() {}.getType());
1368+
Map<String, String> tempParams = JsonUtils.fromJson(entry.getValue(), new TypeToken<Map<String, String>>() {
1369+
}.getType());
13551370
for (String key : tempParams.keySet()) {
13561371
createIndexRequestBuilder.addExtraParams(KeyValuePair.newBuilder()
13571372
.setKey(key)
@@ -3185,7 +3200,7 @@ public R<RpcStatus> updateResourceGroups(UpdateResourceGroupsParam requestParam)
31853200
}
31863201
}
31873202

3188-
///////////////////// High Level API//////////////////////
3203+
/// ////////////////// High Level API//////////////////////
31893204
@Override
31903205
public R<RpcStatus> createCollection(CreateSimpleCollectionParam requestParam) {
31913206
if (!clientIsReady()) {
@@ -3197,21 +3212,21 @@ public R<RpcStatus> createCollection(CreateSimpleCollectionParam requestParam) {
31973212
try {
31983213
// step1: create collection
31993214
R<RpcStatus> createCollectionStatus = createCollection(requestParam.getCreateCollectionParam());
3200-
if(!Objects.equals(createCollectionStatus.getStatus(), R.success().getStatus())){
3215+
if (!Objects.equals(createCollectionStatus.getStatus(), R.success().getStatus())) {
32013216
logError("CreateCollection failed: {}", createCollectionStatus.getException().getMessage());
32023217
return R.failed(createCollectionStatus.getException());
32033218
}
32043219

32053220
// step2: create index
32063221
R<RpcStatus> createIndexStatus = createIndex(requestParam.getCreateIndexParam());
3207-
if(!Objects.equals(createIndexStatus.getStatus(), R.success().getStatus())){
3222+
if (!Objects.equals(createIndexStatus.getStatus(), R.success().getStatus())) {
32083223
logError("CreateIndex failed: {}", createIndexStatus.getException().getMessage());
32093224
return R.failed(createIndexStatus.getException());
32103225
}
32113226

32123227
// step3: load collection
32133228
R<RpcStatus> loadCollectionStatus = loadCollection(requestParam.getLoadCollectionParam());
3214-
if(!Objects.equals(loadCollectionStatus.getStatus(), R.success().getStatus())){
3229+
if (!Objects.equals(loadCollectionStatus.getStatus(), R.success().getStatus())) {
32153230
logError("LoadCollection failed: {}", loadCollectionStatus.getException().getMessage());
32163231
return R.failed(loadCollectionStatus.getException());
32173232
}
@@ -3234,10 +3249,10 @@ public R<ListCollectionsResponse> listCollections(ListCollectionsParam requestPa
32343249
}
32353250
logDebug(requestParam.toString());
32363251
String title = "ListCollectionsRequest";
3237-
3252+
32383253
try {
32393254
R<ShowCollectionsResponse> response = showCollections(requestParam.getShowCollectionsParam());
3240-
if(!Objects.equals(response.getStatus(), R.success().getStatus())){
3255+
if (!Objects.equals(response.getStatus(), R.success().getStatus())) {
32413256
logError("ListCollections failed: {}", response.getException().getMessage());
32423257
return R.failed(response.getException());
32433258
}
@@ -3261,10 +3276,10 @@ public R<InsertResponse> insert(InsertRowsParam requestParam) {
32613276
}
32623277
logDebug(requestParam.toString());
32633278
String title = "InsertRowsRequest";
3264-
3279+
32653280
try {
32663281
R<MutationResult> response = insert(requestParam.getInsertParam());
3267-
if(!Objects.equals(response.getStatus(), R.success().getStatus())){
3282+
if (!Objects.equals(response.getStatus(), R.success().getStatus())) {
32683283
logError("Insert failed: {}", response.getException().getMessage());
32693284
return R.failed(response.getException());
32703285
}
@@ -3394,7 +3409,7 @@ public R<QueryResponse> query(QuerySimpleParam requestParam) {
33943409
.withConsistencyLevel(requestParam.getConsistencyLevel())
33953410
.build();
33963411
R<QueryResults> response = query(queryParam);
3397-
if(!Objects.equals(response.getStatus(), R.success().getStatus())){
3412+
if (!Objects.equals(response.getStatus(), R.success().getStatus())) {
33983413
logError("Query failed: {}", response.getException().getMessage());
33993414
return R.failed(response.getException());
34003415
}
@@ -3452,7 +3467,7 @@ public R<SearchResponse> search(SearchSimpleParam requestParam) {
34523467

34533468
// search
34543469
R<SearchResults> response = search(searchParam);
3455-
if(!Objects.equals(response.getStatus(), R.success().getStatus())){
3470+
if (!Objects.equals(response.getStatus(), R.success().getStatus())) {
34563471
logError("Search failed: {}", response.getException().getMessage());
34573472
return R.failed(response.getException());
34583473
}
@@ -3483,7 +3498,8 @@ public R<QueryIterator> queryIterator(QueryIteratorParam requestParam) {
34833498
return R.failed(descResp.getException());
34843499
}
34853500
DescCollResponseWrapper descCollResponseWrapper = new DescCollResponseWrapper(descResp.getData());
3486-
QueryIterator queryIterator = new QueryIterator(requestParam, this.blockingStub(), descCollResponseWrapper.getPrimaryField());
3501+
// for MilvusClientV1, we don't support to set rpcDeadlineMs for iterator, rpcDeadlineMs is always 0(no deadline)
3502+
QueryIterator queryIterator = new QueryIterator(requestParam, new RpcStubWrapper(this.blockingStub(), 0L), descCollResponseWrapper.getPrimaryField());
34873503
return R.success(queryIterator);
34883504
}
34893505

@@ -3498,11 +3514,12 @@ public R<SearchIterator> searchIterator(SearchIteratorParam requestParam) {
34983514
return R.failed(descResp.getException());
34993515
}
35003516
DescCollResponseWrapper descCollResponseWrapper = new DescCollResponseWrapper(descResp.getData());
3501-
SearchIterator searchIterator = new SearchIterator(requestParam, this.blockingStub(), descCollResponseWrapper.getPrimaryField());
3517+
// for MilvusClientV1, we don't support to set rpcDeadlineMs for iterator, rpcDeadlineMs is always 0(no deadline)
3518+
SearchIterator searchIterator = new SearchIterator(requestParam, new RpcStubWrapper(this.blockingStub(), 0L), descCollResponseWrapper.getPrimaryField());
35023519
return R.success(searchIterator);
35033520
}
35043521

3505-
///////////////////// Log Functions//////////////////////
3522+
/// ////////////////// Log Functions//////////////////////
35063523
protected void logDebug(String msg, Object... params) {
35073524
if (logLevel.ordinal() <= LogLevel.Debug.ordinal()) {
35083525
logger.debug(msg, params);

0 commit comments

Comments
 (0)