Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions examples/src/main/java/io/milvus/v1/IteratorExample.java
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ private void insertColumns() {
List<Long> ages = new ArrayList<>();
List<Long> ids = new ArrayList<>();
for (long i = 0L; i < NUM_ENTITIES; ++i) {
ages.add((long) batch * NUM_ENTITIES + i);
ages.add(((long) batch * NUM_ENTITIES + i) % 100);
ids.add((long) batch * NUM_ENTITIES + i);
}

Expand Down Expand Up @@ -199,20 +199,20 @@ private void prepareData() {
}

private void queryIterateCollectionNoOffset() {
String expr = String.format("10 <= %s <= 100", AGE_FIELD);
String expr = String.format("10 <= %s <= 30", AGE_FIELD);

QueryIterator queryIterator = getQueryIterator(expr, 0L, 5L, null);
QueryIterator queryIterator = getQueryIterator(expr, 0L, 1L, null);
iterateQueryResult(queryIterator);
}

private void queryIterateCollectionWithOffset() {
String expr = String.format("10 <= %s <= 100", AGE_FIELD);
String expr = String.format("10 <= %s <= 100", ID_FIELD);
QueryIterator queryIterator = getQueryIterator(expr, 10L, 50L, null);
iterateQueryResult(queryIterator);
}

private void queryIterateCollectionWithLimit() {
String expr = String.format("10 <= %s <= 100", AGE_FIELD);
String expr = String.format("10 <= %s <= 100", ID_FIELD);
QueryIterator queryIterator = getQueryIterator(expr, null, 80L, 530L);
iterateQueryResult(queryIterator);
}
Expand All @@ -232,6 +232,7 @@ private void searchIteratorCollectionWithLimit() {
}

private void iterateQueryResult(QueryIterator queryIterator) {
System.out.println("\n========== queryIterator() ==========");
int pageIdx = 0;
int iterateCount = 0;
while (true) {
Expand All @@ -252,6 +253,7 @@ private void iterateQueryResult(QueryIterator queryIterator) {
}

private void iterateSearchResult(SearchIterator searchIterator) {
System.out.println("\n========== searchIterator() ==========");
int pageIdx = 0;
int iterateCount = 0;
while (true) {
Expand Down Expand Up @@ -321,6 +323,10 @@ public static void main(String[] args) {
example.prepareData();
}

// set rpcTimeoutMs, just to verify it works for each call of query/search inside the iterator
// in versions older than 2.5.16/2.6.11, iterator.next() will timeout after several calls if the rpcTimeoutMs is greater than 0
milvusClient.withTimeout(200, TimeUnit.MILLISECONDS);

example.queryIterateCollectionNoOffset();
example.queryIterateCollectionWithOffset();
example.queryIterateCollectionWithLimit();
Expand Down
10 changes: 8 additions & 2 deletions examples/src/main/java/io/milvus/v2/IteratorExample.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.commons.lang3.StringUtils;

import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

public class IteratorExample {
Expand Down Expand Up @@ -315,11 +316,16 @@ private static void searchIteratorV2WithTemplate(int batchSize) {

public static void main(String[] args) {
buildCollection();
queryIterator("userID < 300", 50, 5, 400);

// set rpcTimeoutMs, just to verify it works for each call of query/search inside the iterator
// in versions older than 2.5.16/2.6.11, iterator.next() will timeout after several calls if the rpcTimeoutMs is greater than 0
client.withTimeout(200, TimeUnit.MILLISECONDS);

queryIterator("userID < 3000", 1, 5, 10000);
queryIteratorWithTemplate(80);

searchIteratorV1("userAge > 50 &&userAge < 100", "{\"range_filter\": 15.0, \"radius\": 20.0}", 100, 500);
searchIteratorV1("", "", 10, 99);
searchIteratorV1("", "", 1, 3000);
searchIteratorV2("userAge > 10 &&userAge < 20", null, 50, 120, null);

Map<String, Object> extraParams = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.milvus.exception.ServerException;
import io.milvus.grpc.*;
import io.milvus.orm.iterator.QueryIterator;
import io.milvus.orm.iterator.RpcStubWrapper;
import io.milvus.orm.iterator.SearchIterator;
import io.milvus.param.*;
import io.milvus.param.alias.AlterAliasParam;
Expand Down Expand Up @@ -3528,7 +3529,8 @@ public R<QueryIterator> queryIterator(QueryIteratorParam requestParam) {
return R.failed(descResp.getException());
}
DescCollResponseWrapper descCollResponseWrapper = new DescCollResponseWrapper(descResp.getData());
QueryIterator queryIterator = new QueryIterator(requestParam, this.blockingStub(), descCollResponseWrapper.getPrimaryField());
// for MilvusClientV1, we don't support to set rpcDeadlineMs for iterator, rpcDeadlineMs is always 0(no deadline)
QueryIterator queryIterator = new QueryIterator(requestParam, new RpcStubWrapper(this.blockingStub(), 0L), descCollResponseWrapper.getPrimaryField());
return R.success(queryIterator);
}

Expand All @@ -3543,7 +3545,8 @@ public R<SearchIterator> searchIterator(SearchIteratorParam requestParam) {
return R.failed(descResp.getException());
}
DescCollResponseWrapper descCollResponseWrapper = new DescCollResponseWrapper(descResp.getData());
SearchIterator searchIterator = new SearchIterator(requestParam, this.blockingStub(), descCollResponseWrapper.getPrimaryField());
// for MilvusClientV1, we don't support to set rpcDeadlineMs for iterator, rpcDeadlineMs is always 0(no deadline)
SearchIterator searchIterator = new SearchIterator(requestParam, new RpcStubWrapper(this.blockingStub(), 0L), descCollResponseWrapper.getPrimaryField());
return R.success(searchIterator);
}

Expand Down
14 changes: 8 additions & 6 deletions sdk-core/src/main/java/io/milvus/orm/iterator/QueryIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@

package io.milvus.orm.iterator;

import io.milvus.grpc.*;
import io.milvus.grpc.DataType;
import io.milvus.grpc.KeyValuePair;
import io.milvus.grpc.QueryRequest;
import io.milvus.grpc.QueryResults;
import io.milvus.param.Constant;
import io.milvus.param.collection.FieldType;
import io.milvus.param.dml.QueryIteratorParam;
Expand All @@ -41,7 +44,7 @@
public class QueryIterator {
protected static final Logger logger = LoggerFactory.getLogger(RpcUtils.class);
private final IteratorCache iteratorCache;
private final MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub;
private final RpcStubWrapper blockingStub;
private final FieldType primaryField;

private final QueryIteratorReq queryIteratorReq;
Expand All @@ -56,7 +59,7 @@ public class QueryIterator {
private long sessionTs = 0;

public QueryIterator(QueryIteratorParam queryIteratorParam,
MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
RpcStubWrapper blockingStub,
FieldType primaryField) {
this.iteratorCache = new IteratorCache();
this.blockingStub = blockingStub;
Expand All @@ -74,14 +77,13 @@ public QueryIterator(QueryIteratorParam queryIteratorParam,
}

public QueryIterator(QueryIteratorReq queryIteratorReq,
MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
RpcStubWrapper blockingStub,
CreateCollectionReq.FieldSchema primaryField) {
this.iteratorCache = new IteratorCache();
this.blockingStub = blockingStub;
this.queryIteratorReq = queryIteratorReq;
this.primaryField = IteratorAdapterV2.convertV2Field(primaryField);


this.batchSize = (int) queryIteratorReq.getBatchSize();
this.expr = queryIteratorReq.getExpr();
this.limit = queryIteratorReq.getLimit();
Expand Down Expand Up @@ -247,7 +249,7 @@ private QueryResults executeQuery(String expr, long offset, long limit, long ts,
// set default consistency level
builder.setUseDefaultConsistency(true);

QueryResults response = rpcUtils.retry(() -> blockingStub.query(builder.build()));
QueryResults response = rpcUtils.retry(() -> blockingStub.get().query(builder.build()));
String title = String.format("QueryRequest collectionName:%s", queryIteratorReq.getCollectionName());
rpcUtils.handleResponse(title, response.getStatus());
return response;
Expand Down
47 changes: 47 additions & 0 deletions sdk-core/src/main/java/io/milvus/orm/iterator/RpcStubWrapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.milvus.orm.iterator;

import io.milvus.grpc.MilvusServiceGrpc;

import java.util.concurrent.TimeUnit;

public class RpcStubWrapper {
private final MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub;

// rpcTimeoutMs of MilvusServiceBlockingStub.withDeadlineAfter() is "end of using time", not "timeout of per call",
// we have to reset this value for each time QueryIterator calls the query() interface.
// the rpcDeadlineMs value is passed from MilvusClient
private long rpcDeadlineMs = 0L;

public RpcStubWrapper(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
long rpcDeadlineMs) {
this.blockingStub = blockingStub;
this.rpcDeadlineMs = rpcDeadlineMs;
}

public MilvusServiceGrpc.MilvusServiceBlockingStub get() {
if (rpcDeadlineMs > 0) {
return blockingStub.withDeadlineAfter(rpcDeadlineMs, TimeUnit.MILLISECONDS);
} else {
return blockingStub;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@
import io.milvus.common.utils.ExceptionUtils;
import io.milvus.common.utils.JsonUtils;
import io.milvus.exception.ParamException;
import io.milvus.grpc.*;
import io.milvus.grpc.DataType;
import io.milvus.grpc.KeyValuePair;
import io.milvus.grpc.SearchRequest;
import io.milvus.grpc.SearchResults;
import io.milvus.param.Constant;
import io.milvus.param.MetricType;
import io.milvus.param.ParamUtils;
Expand Down Expand Up @@ -54,7 +57,7 @@
public class SearchIterator {
private static final Logger logger = LoggerFactory.getLogger(SearchIterator.class);
private final IteratorCache iteratorCache;
private final MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub;
private final RpcStubWrapper blockingStub;
private final FieldType primaryField;

private final SearchIteratorParam searchIteratorParam;
Expand All @@ -76,7 +79,7 @@ public class SearchIterator {
private long sessionTs = 0;

public SearchIterator(SearchIteratorParam searchIteratorParam,
MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
RpcStubWrapper blockingStub,
FieldType primaryField) {
this.iteratorCache = new IteratorCache();
this.searchIteratorParam = searchIteratorParam;
Expand All @@ -97,7 +100,7 @@ public SearchIterator(SearchIteratorParam searchIteratorParam,

// to support V2
public SearchIterator(SearchIteratorReq searchIteratorReq,
MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
RpcStubWrapper blockingStub,
CreateCollectionReq.FieldSchema primaryField) {
this.iteratorCache = new IteratorCache();
this.blockingStub = blockingStub;
Expand Down Expand Up @@ -296,7 +299,7 @@ private SearchResults executeSearch(Map<String, Object> params, String nextExpr,
// set default consistency level
builder.setUseDefaultConsistency(true);

SearchResults response = rpcUtils.retry(() -> blockingStub.search(builder.build()));
SearchResults response = rpcUtils.retry(() -> blockingStub.get().search(builder.build()));
String title = String.format("SearchRequest collectionName:%s", searchIteratorParam.getCollectionName());
rpcUtils.handleResponse(title, response.getStatus());
return response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@

public class SearchIteratorV2 {
private static final Logger logger = LoggerFactory.getLogger(SearchIteratorV2.class);
private final MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub;
private final RpcStubWrapper blockingStub;

private final SearchIteratorReqV2 searchIteratorReq;
private final int batchSize;
Expand All @@ -58,7 +58,7 @@ public class SearchIteratorV2 {

// to support V2
public SearchIteratorV2(SearchIteratorReqV2 searchIteratorReq,
MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub) {
RpcStubWrapper blockingStub) {
this.blockingStub = blockingStub;
this.searchIteratorReq = searchIteratorReq;

Expand Down Expand Up @@ -101,7 +101,7 @@ private void setupCollectionID() {
if (StringUtils.isNotEmpty(searchIteratorReq.getDatabaseName())) {
builder.setDbName(searchIteratorReq.getDatabaseName());
}
DescribeCollectionResponse response = rpcUtils.retry(() -> this.blockingStub.describeCollection(builder.build()));
DescribeCollectionResponse response = rpcUtils.retry(() -> blockingStub.get().describeCollection(builder.build()));
String title = String.format("DescribeCollectionRequest collectionName:%s", searchIteratorReq.getCollectionName());
rpcUtils.handleResponse(title, response.getStatus());

Expand Down Expand Up @@ -130,7 +130,7 @@ private SearchResults executeSearch(int limit) {
.filterTemplateValues(searchIteratorReq.getFilterTemplateValues())
.build();
SearchRequest searchRequest = new VectorUtils().ConvertToGrpcSearchRequest(request);
SearchResults response = rpcUtils.retry(() -> this.blockingStub.search(searchRequest));
SearchResults response = rpcUtils.retry(() -> blockingStub.get().search(searchRequest));
String title = String.format("SearchRequest collectionName:%s", searchIteratorReq.getCollectionName());
rpcUtils.handleResponse(title, response.getStatus());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.milvus.grpc.ConnectResponse;
import io.milvus.grpc.MilvusServiceGrpc;
import io.milvus.orm.iterator.QueryIterator;
import io.milvus.orm.iterator.RpcStubWrapper;
import io.milvus.orm.iterator.SearchIterator;
import io.milvus.orm.iterator.SearchIteratorV2;
import io.milvus.v2.service.cdc.CDCService;
Expand Down Expand Up @@ -716,7 +717,7 @@ public SearchResp hybridSearch(HybridSearchReq request) {
* @return QueryIterator
*/
public QueryIterator queryIterator(QueryIteratorReq request) {
return rpcUtils.retry(() -> vectorService.queryIterator(this.getRpcStub(), request));
return rpcUtils.retry(() -> vectorService.queryIterator(new RpcStubWrapper(this.getRpcStub(), connectConfig.getRpcDeadlineMs()), request));
}

/**
Expand All @@ -726,7 +727,7 @@ public QueryIterator queryIterator(QueryIteratorReq request) {
* @return SearchIterator
*/
public SearchIterator searchIterator(SearchIteratorReq request) {
return rpcUtils.retry(() -> vectorService.searchIterator(this.getRpcStub(), request));
return rpcUtils.retry(() -> vectorService.searchIterator(new RpcStubWrapper(this.getRpcStub(), connectConfig.getRpcDeadlineMs()), request));
}

/**
Expand All @@ -736,7 +737,7 @@ public SearchIterator searchIterator(SearchIteratorReq request) {
* @return SearchIteratorV2
*/
public SearchIteratorV2 searchIteratorV2(SearchIteratorReqV2 request) {
return rpcUtils.retry(() -> vectorService.searchIteratorV2(this.getRpcStub(), request));
return rpcUtils.retry(() -> vectorService.searchIteratorV2(new RpcStubWrapper(this.getRpcStub(), connectConfig.getRpcDeadlineMs()), request));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.milvus.common.utils.JsonUtils;
import io.milvus.grpc.*;
import io.milvus.orm.iterator.QueryIterator;
import io.milvus.orm.iterator.RpcStubWrapper;
import io.milvus.orm.iterator.SearchIterator;
import io.milvus.orm.iterator.SearchIteratorV2;
import io.milvus.v2.exception.ErrorCode;
Expand Down Expand Up @@ -291,25 +292,25 @@ public SearchResp hybridSearch(MilvusServiceGrpc.MilvusServiceBlockingStub block
.build();
}

public QueryIterator queryIterator(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
public QueryIterator queryIterator(RpcStubWrapper blockingStub,
QueryIteratorReq request) {
DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, request.getDatabaseName(),
DescribeCollectionResponse descResp = getCollectionInfo(blockingStub.get(), request.getDatabaseName(),
request.getCollectionName(), false);
DescribeCollectionResp respR = convertUtils.convertDescCollectionResp(descResp);
CreateCollectionReq.FieldSchema pkField = respR.getCollectionSchema().getField(respR.getPrimaryFieldName());
return new QueryIterator(request, blockingStub, pkField);
}

public SearchIterator searchIterator(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
public SearchIterator searchIterator(RpcStubWrapper blockingStub,
SearchIteratorReq request) {
DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, request.getDatabaseName(),
DescribeCollectionResponse descResp = getCollectionInfo(blockingStub.get(), request.getDatabaseName(),
request.getCollectionName(), false);
DescribeCollectionResp respR = convertUtils.convertDescCollectionResp(descResp);
CreateCollectionReq.FieldSchema pkField = respR.getCollectionSchema().getField(respR.getPrimaryFieldName());
return new SearchIterator(request, blockingStub, pkField);
}

public SearchIteratorV2 searchIteratorV2(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
public SearchIteratorV2 searchIteratorV2(RpcStubWrapper blockingStub,
SearchIteratorReqV2 request) {
return new SearchIteratorV2(request, blockingStub);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ public static class SearchReqBuilder {
private int topK = 0; // default value
private String filter;
private List<String> outputFields = new ArrayList<>(); // default value
private List<BaseVector> data;
private List<BaseVector> data = new ArrayList<>(); // default value
private long offset;
private long limit = 0L; // default value
private int roundDecimal = -1; // default value
Expand Down
2 changes: 1 addition & 1 deletion sdk-core/src/main/java/io/milvus/v2/utils/VectorUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public SearchRequest ConvertToGrpcSearchRequest(SearchReq request) {

// prepare target, the input could be vectors or string list for doc-in-doc-out
List<BaseVector> vectors = request.getData();
if (vectors.isEmpty()) {
if (vectors == null || vectors.isEmpty()) {
throw new MilvusClientException(ErrorCode.INVALID_PARAMS, "Target data list of search request is empty.");
}

Expand Down
Loading
Loading