Skip to content

Commit 9cb5eeb

Browse files
committed
Fix a bug that rpcDeadline incorrectly work for iterator
Signed-off-by: yhmo <yihua.mo@zilliz.com>
1 parent a1bac33 commit 9cb5eeb

File tree

9 files changed

+69
-30
lines changed

9 files changed

+69
-30
lines changed

examples/src/main/java/io/milvus/v2/IteratorExample.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ public class IteratorExample {
5151
static {
5252
client = new MilvusClientV2(ConnectConfig.builder()
5353
.uri("http://localhost:19530")
54+
.rpcDeadlineMs(1000L)
5455
.build());
5556
}
5657

@@ -315,11 +316,11 @@ private static void searchIteratorV2WithTemplate(int batchSize) {
315316

316317
public static void main(String[] args) {
317318
buildCollection();
318-
queryIterator("userID < 300", 50, 5, 400);
319+
queryIterator("userID < 3000", 1, 5, 10000);
319320
queryIteratorWithTemplate(80);
320321

321322
searchIteratorV1("userAge > 50 &&userAge < 100", "{\"range_filter\": 15.0, \"radius\": 20.0}", 100, 500);
322-
searchIteratorV1("", "", 10, 99);
323+
searchIteratorV1("", "", 1, 3000);
323324
searchIteratorV2("userAge > 10 &&userAge < 20", null, 50, 120, null);
324325

325326
Map<String, Object> extraParams = new HashMap<>();

sdk-core/src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import io.milvus.exception.ServerException;
3636
import io.milvus.grpc.*;
3737
import io.milvus.orm.iterator.QueryIterator;
38+
import io.milvus.orm.iterator.RpcStubWrapper;
3839
import io.milvus.orm.iterator.SearchIterator;
3940
import io.milvus.param.*;
4041
import io.milvus.param.alias.AlterAliasParam;
@@ -3528,7 +3529,8 @@ public R<QueryIterator> queryIterator(QueryIteratorParam requestParam) {
35283529
return R.failed(descResp.getException());
35293530
}
35303531
DescCollResponseWrapper descCollResponseWrapper = new DescCollResponseWrapper(descResp.getData());
3531-
QueryIterator queryIterator = new QueryIterator(requestParam, this.blockingStub(), descCollResponseWrapper.getPrimaryField());
3532+
// for MilvusClientV1, we don't support to set rpcDeadlineMs for iterator, rpcDeadlineMs is always 0(no deadline)
3533+
QueryIterator queryIterator = new QueryIterator(requestParam, new RpcStubWrapper(this.blockingStub(), 0L), descCollResponseWrapper.getPrimaryField());
35323534
return R.success(queryIterator);
35333535
}
35343536

@@ -3543,7 +3545,8 @@ public R<SearchIterator> searchIterator(SearchIteratorParam requestParam) {
35433545
return R.failed(descResp.getException());
35443546
}
35453547
DescCollResponseWrapper descCollResponseWrapper = new DescCollResponseWrapper(descResp.getData());
3546-
SearchIterator searchIterator = new SearchIterator(requestParam, this.blockingStub(), descCollResponseWrapper.getPrimaryField());
3548+
// for MilvusClientV1, we don't support to set rpcDeadlineMs for iterator, rpcDeadlineMs is always 0(no deadline)
3549+
SearchIterator searchIterator = new SearchIterator(requestParam, new RpcStubWrapper(this.blockingStub(), 0L), descCollResponseWrapper.getPrimaryField());
35473550
return R.success(searchIterator);
35483551
}
35493552

sdk-core/src/main/java/io/milvus/orm/iterator/QueryIterator.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
public class QueryIterator {
4242
protected static final Logger logger = LoggerFactory.getLogger(RpcUtils.class);
4343
private final IteratorCache iteratorCache;
44-
private final MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub;
44+
private final RpcStubWrapper blockingStub;
4545
private final FieldType primaryField;
4646

4747
private final QueryIteratorReq queryIteratorReq;
@@ -56,7 +56,7 @@ public class QueryIterator {
5656
private long sessionTs = 0;
5757

5858
public QueryIterator(QueryIteratorParam queryIteratorParam,
59-
MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
59+
RpcStubWrapper blockingStub,
6060
FieldType primaryField) {
6161
this.iteratorCache = new IteratorCache();
6262
this.blockingStub = blockingStub;
@@ -74,14 +74,13 @@ public QueryIterator(QueryIteratorParam queryIteratorParam,
7474
}
7575

7676
public QueryIterator(QueryIteratorReq queryIteratorReq,
77-
MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
77+
RpcStubWrapper blockingStub,
7878
CreateCollectionReq.FieldSchema primaryField) {
7979
this.iteratorCache = new IteratorCache();
8080
this.blockingStub = blockingStub;
8181
this.queryIteratorReq = queryIteratorReq;
8282
this.primaryField = IteratorAdapterV2.convertV2Field(primaryField);
8383

84-
8584
this.batchSize = (int) queryIteratorReq.getBatchSize();
8685
this.expr = queryIteratorReq.getExpr();
8786
this.limit = queryIteratorReq.getLimit();
@@ -247,7 +246,7 @@ private QueryResults executeQuery(String expr, long offset, long limit, long ts,
247246
// set default consistency level
248247
builder.setUseDefaultConsistency(true);
249248

250-
QueryResults response = rpcUtils.retry(() -> blockingStub.query(builder.build()));
249+
QueryResults response = rpcUtils.retry(() -> blockingStub.get().query(builder.build()));
251250
String title = String.format("QueryRequest collectionName:%s", queryIteratorReq.getCollectionName());
252251
rpcUtils.handleResponse(title, response.getStatus());
253252
return response;
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package io.milvus.orm.iterator;
2+
3+
import io.milvus.grpc.MilvusServiceGrpc;
4+
5+
import java.util.concurrent.TimeUnit;
6+
7+
public class RpcStubWrapper {
8+
private final MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub;
9+
10+
// rpcTimeoutMs of MilvusServiceBlockingStub.withDeadlineAfter() is "end of using time", not "timeout of per call",
11+
// we have to reset this value for each time QueryIterator calls the query() interface.
12+
// the rpcDeadlineMs value is passed from MilvusClient
13+
private long rpcDeadlineMs = 0L;
14+
15+
public RpcStubWrapper(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
16+
long rpcDeadlineMs) {
17+
this.blockingStub = blockingStub;
18+
this.rpcDeadlineMs = rpcDeadlineMs;
19+
}
20+
21+
public MilvusServiceGrpc.MilvusServiceBlockingStub get() {
22+
if (rpcDeadlineMs > 0) {
23+
return blockingStub.withDeadlineAfter(rpcDeadlineMs, TimeUnit.MILLISECONDS);
24+
} else {
25+
return blockingStub;
26+
}
27+
}
28+
}

sdk-core/src/main/java/io/milvus/orm/iterator/SearchIterator.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@
5454
public class SearchIterator {
5555
private static final Logger logger = LoggerFactory.getLogger(SearchIterator.class);
5656
private final IteratorCache iteratorCache;
57-
private final MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub;
57+
private final RpcStubWrapper blockingStub;
5858
private final FieldType primaryField;
5959

6060
private final SearchIteratorParam searchIteratorParam;
@@ -76,7 +76,7 @@ public class SearchIterator {
7676
private long sessionTs = 0;
7777

7878
public SearchIterator(SearchIteratorParam searchIteratorParam,
79-
MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
79+
RpcStubWrapper blockingStub,
8080
FieldType primaryField) {
8181
this.iteratorCache = new IteratorCache();
8282
this.searchIteratorParam = searchIteratorParam;
@@ -97,7 +97,7 @@ public SearchIterator(SearchIteratorParam searchIteratorParam,
9797

9898
// to support V2
9999
public SearchIterator(SearchIteratorReq searchIteratorReq,
100-
MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
100+
RpcStubWrapper blockingStub,
101101
CreateCollectionReq.FieldSchema primaryField) {
102102
this.iteratorCache = new IteratorCache();
103103
this.blockingStub = blockingStub;
@@ -296,7 +296,7 @@ private SearchResults executeSearch(Map<String, Object> params, String nextExpr,
296296
// set default consistency level
297297
builder.setUseDefaultConsistency(true);
298298

299-
SearchResults response = rpcUtils.retry(() -> blockingStub.search(builder.build()));
299+
SearchResults response = rpcUtils.retry(() -> blockingStub.get().search(builder.build()));
300300
String title = String.format("SearchRequest collectionName:%s", searchIteratorParam.getCollectionName());
301301
rpcUtils.handleResponse(title, response.getStatus());
302302
return response;

sdk-core/src/main/java/io/milvus/orm/iterator/SearchIteratorV2.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343

4444
public class SearchIteratorV2 {
4545
private static final Logger logger = LoggerFactory.getLogger(SearchIteratorV2.class);
46-
private final MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub;
46+
private final RpcStubWrapper blockingStub;
4747

4848
private final SearchIteratorReqV2 searchIteratorReq;
4949
private final int batchSize;
@@ -58,7 +58,7 @@ public class SearchIteratorV2 {
5858

5959
// to support V2
6060
public SearchIteratorV2(SearchIteratorReqV2 searchIteratorReq,
61-
MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub) {
61+
RpcStubWrapper blockingStub) {
6262
this.blockingStub = blockingStub;
6363
this.searchIteratorReq = searchIteratorReq;
6464

@@ -101,7 +101,7 @@ private void setupCollectionID() {
101101
if (StringUtils.isNotEmpty(searchIteratorReq.getDatabaseName())) {
102102
builder.setDbName(searchIteratorReq.getDatabaseName());
103103
}
104-
DescribeCollectionResponse response = rpcUtils.retry(() -> this.blockingStub.describeCollection(builder.build()));
104+
DescribeCollectionResponse response = rpcUtils.retry(() -> blockingStub.get().describeCollection(builder.build()));
105105
String title = String.format("DescribeCollectionRequest collectionName:%s", searchIteratorReq.getCollectionName());
106106
rpcUtils.handleResponse(title, response.getStatus());
107107

@@ -130,7 +130,7 @@ private SearchResults executeSearch(int limit) {
130130
.filterTemplateValues(searchIteratorReq.getFilterTemplateValues())
131131
.build();
132132
SearchRequest searchRequest = new VectorUtils().ConvertToGrpcSearchRequest(request);
133-
SearchResults response = rpcUtils.retry(() -> this.blockingStub.search(searchRequest));
133+
SearchResults response = rpcUtils.retry(() -> blockingStub.get().search(searchRequest));
134134
String title = String.format("SearchRequest collectionName:%s", searchIteratorReq.getCollectionName());
135135
rpcUtils.handleResponse(title, response.getStatus());
136136

sdk-core/src/main/java/io/milvus/v2/client/MilvusClientV2.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import io.milvus.grpc.ConnectResponse;
2626
import io.milvus.grpc.MilvusServiceGrpc;
2727
import io.milvus.orm.iterator.QueryIterator;
28+
import io.milvus.orm.iterator.RpcStubWrapper;
2829
import io.milvus.orm.iterator.SearchIterator;
2930
import io.milvus.orm.iterator.SearchIteratorV2;
3031
import io.milvus.v2.service.cdc.CDCService;
@@ -716,7 +717,7 @@ public SearchResp hybridSearch(HybridSearchReq request) {
716717
* @return QueryIterator
717718
*/
718719
public QueryIterator queryIterator(QueryIteratorReq request) {
719-
return rpcUtils.retry(() -> vectorService.queryIterator(this.getRpcStub(), request));
720+
return rpcUtils.retry(() -> vectorService.queryIterator(new RpcStubWrapper(this.getRpcStub(), connectConfig.getRpcDeadlineMs()), request));
720721
}
721722

722723
/**
@@ -726,7 +727,7 @@ public QueryIterator queryIterator(QueryIteratorReq request) {
726727
* @return SearchIterator
727728
*/
728729
public SearchIterator searchIterator(SearchIteratorReq request) {
729-
return rpcUtils.retry(() -> vectorService.searchIterator(this.getRpcStub(), request));
730+
return rpcUtils.retry(() -> vectorService.searchIterator(new RpcStubWrapper(this.getRpcStub(), connectConfig.getRpcDeadlineMs()), request));
730731
}
731732

732733
/**
@@ -736,7 +737,7 @@ public SearchIterator searchIterator(SearchIteratorReq request) {
736737
* @return SearchIteratorV2
737738
*/
738739
public SearchIteratorV2 searchIteratorV2(SearchIteratorReqV2 request) {
739-
return rpcUtils.retry(() -> vectorService.searchIteratorV2(this.getRpcStub(), request));
740+
return rpcUtils.retry(() -> vectorService.searchIteratorV2(new RpcStubWrapper(this.getRpcStub(), connectConfig.getRpcDeadlineMs()), request));
740741
}
741742

742743
/**

sdk-core/src/main/java/io/milvus/v2/service/vector/VectorService.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.milvus.common.utils.JsonUtils;
2525
import io.milvus.grpc.*;
2626
import io.milvus.orm.iterator.QueryIterator;
27+
import io.milvus.orm.iterator.RpcStubWrapper;
2728
import io.milvus.orm.iterator.SearchIterator;
2829
import io.milvus.orm.iterator.SearchIteratorV2;
2930
import io.milvus.v2.exception.ErrorCode;
@@ -291,25 +292,25 @@ public SearchResp hybridSearch(MilvusServiceGrpc.MilvusServiceBlockingStub block
291292
.build();
292293
}
293294

294-
public QueryIterator queryIterator(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
295+
public QueryIterator queryIterator(RpcStubWrapper blockingStub,
295296
QueryIteratorReq request) {
296-
DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, request.getDatabaseName(),
297+
DescribeCollectionResponse descResp = getCollectionInfo(blockingStub.get(), request.getDatabaseName(),
297298
request.getCollectionName(), false);
298299
DescribeCollectionResp respR = convertUtils.convertDescCollectionResp(descResp);
299300
CreateCollectionReq.FieldSchema pkField = respR.getCollectionSchema().getField(respR.getPrimaryFieldName());
300301
return new QueryIterator(request, blockingStub, pkField);
301302
}
302303

303-
public SearchIterator searchIterator(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
304+
public SearchIterator searchIterator(RpcStubWrapper blockingStub,
304305
SearchIteratorReq request) {
305-
DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, request.getDatabaseName(),
306+
DescribeCollectionResponse descResp = getCollectionInfo(blockingStub.get(), request.getDatabaseName(),
306307
request.getCollectionName(), false);
307308
DescribeCollectionResp respR = convertUtils.convertDescCollectionResp(descResp);
308309
CreateCollectionReq.FieldSchema pkField = respR.getCollectionSchema().getField(respR.getPrimaryFieldName());
309310
return new SearchIterator(request, blockingStub, pkField);
310311
}
311312

312-
public SearchIteratorV2 searchIteratorV2(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
313+
public SearchIteratorV2 searchIteratorV2(RpcStubWrapper blockingStub,
313314
SearchIteratorReqV2 request) {
314315
return new SearchIteratorV2(request, blockingStub);
315316
}

sdk-core/src/test/java/io/milvus/v2/client/MilvusClientV2DockerTest.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2456,11 +2456,14 @@ public void testIterator() {
24562456
long rowCount = getRowCount("", randomCollectionName);
24572457
Assertions.assertEquals(count, rowCount);
24582458

2459+
// set rpc timeout for each call
2460+
client.withTimeout(300, TimeUnit.MILLISECONDS);
2461+
24592462
// search iterator
24602463
SearchIterator searchIterator = client.searchIterator(SearchIteratorReq.builder()
24612464
.collectionName(randomCollectionName)
24622465
.outputFields(Lists.newArrayList("*"))
2463-
.batchSize(20L)
2466+
.batchSize(1L)
24642467
.vectorFieldName("float_vector")
24652468
.vectors(Collections.singletonList(new FloatVec(utils.generateFloatVector())))
24662469
.expr("int64_field > 500 && int64_field < 1000")
@@ -2536,7 +2539,7 @@ public void testIterator() {
25362539
.collectionName(randomCollectionName)
25372540
.expr("int64_field < " + to)
25382541
.outputFields(Lists.newArrayList("*"))
2539-
.batchSize(50L)
2542+
.batchSize(1L)
25402543
.offset(from)
25412544
.limit(4000)
25422545
.consistencyLevel(ConsistencyLevel.EVENTUALLY)
@@ -2546,7 +2549,7 @@ public void testIterator() {
25462549
while (true) {
25472550
List<QueryResultsWrapper.RowRecord> res = queryIterator.next();
25482551
if (res.isEmpty()) {
2549-
System.out.println("query iteration finished, close");
2552+
System.out.printf("query iteration finished, close, %d items fetched%n", counter);
25502553
queryIterator.close();
25512554
break;
25522555
}
@@ -2602,7 +2605,7 @@ public void testIterator() {
26022605
SearchIteratorV2 searchIteratorV2 = client.searchIteratorV2(SearchIteratorReqV2.builder()
26032606
.collectionName(randomCollectionName)
26042607
.outputFields(Lists.newArrayList("*"))
2605-
.batchSize(1000L)
2608+
.batchSize(100L)
26062609
.vectorFieldName("float_vector")
26072610
.filter("id >= 50")
26082611
.vectors(Collections.singletonList(new FloatVec(utils.generateFloatVector())))
@@ -2613,7 +2616,7 @@ public void testIterator() {
26132616
while (true) {
26142617
List<SearchResp.SearchResult> res = searchIteratorV2.next();
26152618
if (res.isEmpty()) {
2616-
System.out.println("search iteration finished, close");
2619+
System.out.printf("search iteration finished, close, %d items fetched%n", counter);
26172620
searchIteratorV2.close();
26182621
break;
26192622
}
@@ -2665,6 +2668,9 @@ public void testIterator() {
26652668
// expect count is 9950, but sometimes it returns 9949 or 9948
26662669
Assertions.assertTrue(counter > ((int) count - 55) && counter <= ((int) count - 50));
26672670

2671+
// reset rpc timeout to unlimited
2672+
client.withTimeout(0, TimeUnit.MILLISECONDS);
2673+
26682674
client.dropCollection(DropCollectionReq.builder().collectionName(randomCollectionName).build());
26692675
}
26702676

0 commit comments

Comments
 (0)