Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions examples/src/main/java/io/milvus/v1/IteratorExample.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,16 @@
import io.milvus.client.MilvusServiceClient;
import io.milvus.common.clientenum.ConsistencyLevelEnum;
import io.milvus.grpc.DataType;
import io.milvus.grpc.FlushResponse;
import io.milvus.grpc.GetCollectionStatisticsResponse;
import io.milvus.grpc.MutationResult;
import io.milvus.param.ConnectParam;
import io.milvus.param.IndexType;
import io.milvus.param.MetricType;
import io.milvus.param.R;
import io.milvus.param.RetryParam;
import io.milvus.param.RpcStatus;
import io.milvus.orm.iterator.QueryIterator;
import io.milvus.orm.iterator.SearchIterator;
import io.milvus.param.*;
import io.milvus.param.collection.*;
import io.milvus.param.dml.InsertParam;
import io.milvus.param.dml.QueryIteratorParam;
import io.milvus.param.dml.SearchIteratorParam;
import io.milvus.param.index.CreateIndexParam;
import io.milvus.orm.iterator.QueryIterator;
import io.milvus.orm.iterator.SearchIterator;
import io.milvus.response.GetCollStatResponseWrapper;
import io.milvus.response.QueryResultsWrapper;

Expand Down Expand Up @@ -156,7 +150,7 @@ private void insertColumns() {
List<Long> ages = new ArrayList<>();
List<Long> ids = new ArrayList<>();
for (long i = 0L; i < NUM_ENTITIES; ++i) {
ages.add((long) batch * NUM_ENTITIES + i);
ages.add(((long) batch * NUM_ENTITIES + i) % 100);
ids.add((long) batch * NUM_ENTITIES + i);
}

Expand Down Expand Up @@ -205,20 +199,20 @@ private void prepareData() {
}

private void queryIterateCollectionNoOffset() {
String expr = String.format("10 <= %s <= 100", AGE_FIELD);
String expr = String.format("10 <= %s <= 30", AGE_FIELD);

QueryIterator queryIterator = getQueryIterator(expr, 0L, 5L, null);
QueryIterator queryIterator = getQueryIterator(expr, 0L, 1L, null);
iterateQueryResult(queryIterator);
}

private void queryIterateCollectionWithOffset() {
String expr = String.format("10 <= %s <= 100", AGE_FIELD);
String expr = String.format("10 <= %s <= 100", ID_FIELD);
QueryIterator queryIterator = getQueryIterator(expr, 10L, 50L, null);
iterateQueryResult(queryIterator);
}

private void queryIterateCollectionWithLimit() {
String expr = String.format("10 <= %s <= 100", AGE_FIELD);
String expr = String.format("10 <= %s <= 100", ID_FIELD);
QueryIterator queryIterator = getQueryIterator(expr, null, 80L, 530L);
iterateQueryResult(queryIterator);
}
Expand All @@ -238,6 +232,7 @@ private void searchIteratorCollectionWithLimit() {
}

private void iterateQueryResult(QueryIterator queryIterator) {
System.out.println("\n========== queryIterator() ==========");
int pageIdx = 0;
int iterateCount = 0;
while (true) {
Expand All @@ -258,6 +253,7 @@ private void iterateQueryResult(QueryIterator queryIterator) {
}

private void iterateSearchResult(SearchIterator searchIterator) {
System.out.println("\n========== searchIterator() ==========");
int pageIdx = 0;
int iterateCount = 0;
while (true) {
Expand Down Expand Up @@ -327,6 +323,10 @@ public static void main(String[] args) {
example.prepareData();
}

// set rpcTimeoutMs, just to verify it works for each call of query/search inside the iterator
// in versions older than 2.5.16/2.6.11, iterator.next() will timeout after several calls if the rpcTimeoutMs is greater than 0
milvusClient.withTimeout(500, TimeUnit.MILLISECONDS);

example.queryIterateCollectionNoOffset();
example.queryIterateCollectionWithOffset();
example.queryIterateCollectionWithLimit();
Expand Down
26 changes: 17 additions & 9 deletions examples/src/main/java/io/milvus/v2/IteratorExample.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,18 @@
import org.apache.commons.lang3.StringUtils;

import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

public class IteratorExample {
private static final MilvusClientV2 client;

static {
client = new MilvusClientV2(ConnectConfig.builder()
.uri("http://localhost:19530")
.build());
}

private static final String COLLECTION_NAME = "java_sdk_example_iterator_v2";
private static final String ID_FIELD = "userID";
private static final String AGE_FIELD = "userAge";
Expand Down Expand Up @@ -200,15 +203,15 @@ private static void searchIteratorV2(String filter, Map<String, Object> params,
Function<List<SearchResp.SearchResult>, List<SearchResp.SearchResult>> externalFilterFunc) {
System.out.println("\n========== searchIteratorV2() ==========");
System.out.println(String.format("expr='%s', params='%s', batchSize=%d, topK=%d",
filter, params==null ? "" : params.toString(), batchSize, topK));
filter, params == null ? "" : params.toString(), batchSize, topK));
SearchIteratorV2 searchIterator = client.searchIteratorV2(SearchIteratorReqV2.builder()
.collectionName(COLLECTION_NAME)
.outputFields(Lists.newArrayList(AGE_FIELD))
.batchSize(batchSize)
.vectorFieldName(VECTOR_FIELD)
.vectors(Collections.singletonList(new FloatVec(CommonUtils.generateFloatVector(VECTOR_DIM))))
.filter(filter)
.searchParams(params==null ? new HashMap<>() : params)
.searchParams(params == null ? new HashMap<>() : params)
.limit(topK)
.metricType(IndexParam.MetricType.L2)
.consistencyLevel(ConsistencyLevel.BOUNDED)
Expand All @@ -235,21 +238,26 @@ private static void searchIteratorV2(String filter, Map<String, Object> params,

public static void main(String[] args) {
buildCollection();
queryIterator("userID < 300",50, 5,400);

// set rpcTimeoutMs, just to verify it works for each call of query/search inside the iterator
// in versions older than 2.5.16/2.6.11, iterator.next() will timeout after several calls if the rpcTimeoutMs is greater than 0
client.withTimeout(500, TimeUnit.MILLISECONDS);

queryIterator("userID < 3000", 1, 5, 10000);
searchIteratorV1("userAge > 50 &&userAge < 100", "{\"range_filter\": 15.0, \"radius\": 20.0}", 100, 500);
searchIteratorV1("", "", 10, 99);
searchIteratorV1("", "", 1, 3000);
searchIteratorV2("userAge > 10 &&userAge < 20", null, 50, 120, null);

Map<String,Object> extraParams = new HashMap<>();
extraParams.put("radius",15.0);
Map<String, Object> extraParams = new HashMap<>();
extraParams.put("radius", 15.0);
searchIteratorV2("", extraParams, 50, 100, null);

// use external function to filter the result
Function<List<SearchResp.SearchResult>, List<SearchResp.SearchResult>> externalFilterFunc = (List<SearchResp.SearchResult> src)->{
Function<List<SearchResp.SearchResult>, List<SearchResp.SearchResult>> externalFilterFunc = (List<SearchResp.SearchResult> src) -> {
List<SearchResp.SearchResult> newRes = new ArrayList<>();
for (SearchResp.SearchResult res : src) {
long id = (long)res.getId();
if (id%2 == 0) {
long id = (long) res.getId();
if (id % 2 == 0) {
newRes.add(res);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,36 @@
package io.milvus.client;

import com.google.common.collect.Lists;
import com.google.common.util.concurrent.*;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.gson.reflect.TypeToken;
import io.grpc.StatusRuntimeException;
import io.milvus.common.utils.GTsDict;
import io.milvus.common.utils.JsonUtils;
import io.milvus.common.utils.VectorUtils;
import io.milvus.exception.*;
import io.milvus.exception.ClientNotConnectedException;
import io.milvus.exception.IllegalResponseException;
import io.milvus.exception.ServerException;
import io.milvus.grpc.*;
import io.milvus.orm.iterator.QueryIterator;
import io.milvus.orm.iterator.RpcStubWrapper;
import io.milvus.orm.iterator.SearchIterator;
import io.milvus.param.*;
import io.milvus.param.alias.*;
import io.milvus.param.bulkinsert.*;
import io.milvus.param.alias.AlterAliasParam;
import io.milvus.param.alias.CreateAliasParam;
import io.milvus.param.alias.DropAliasParam;
import io.milvus.param.alias.ListAliasesParam;
import io.milvus.param.bulkinsert.BulkInsertParam;
import io.milvus.param.bulkinsert.GetBulkInsertStateParam;
import io.milvus.param.bulkinsert.ListBulkInsertTasksParam;
import io.milvus.param.collection.*;
import io.milvus.param.control.*;
import io.milvus.param.credential.*;
import io.milvus.param.credential.CreateCredentialParam;
import io.milvus.param.credential.DeleteCredentialParam;
import io.milvus.param.credential.ListCredUsersParam;
import io.milvus.param.credential.UpdateCredentialParam;
import io.milvus.param.dml.*;
import io.milvus.param.highlevel.collection.CreateSimpleCollectionParam;
import io.milvus.param.highlevel.collection.ListCollectionsParam;
Expand Down Expand Up @@ -413,7 +427,7 @@ private void handleResponse(String requestInfo, io.milvus.grpc.Status status) {
logDebug("{} successfully!", requestInfo);
}

///////////////////// API implementation //////////////////////
/// ////////////////// API implementation //////////////////////
@Override
public R<Boolean> hasCollection(@NonNull HasCollectionParam requestParam) {
if (!clientIsReady()) {
Expand Down Expand Up @@ -541,9 +555,9 @@ public R<RpcStatus> alterDatabase(AlterDatabaseParam requestParam) {
try {
List<KeyValuePair> propertiesList = ParamUtils.AssembleKvPair(requestParam.getProperties());
AlterDatabaseRequest alterDatabaseRequest = AlterDatabaseRequest.newBuilder()
.setDbName(requestParam.getDatabaseName())
.addAllProperties(propertiesList)
.build();
.setDbName(requestParam.getDatabaseName())
.addAllProperties(propertiesList)
.build();

Status response = blockingStub().alterDatabase(alterDatabaseRequest);
handleResponse(title, response);
Expand All @@ -567,8 +581,8 @@ public R<DescribeDatabaseResponse> describeDatabase(DescribeDatabaseParam reques
String title = String.format("DescribeDatabaseRequest databaseName:%s", requestParam.getDatabaseName());
try {
DescribeDatabaseRequest describeDatabaseRequest = DescribeDatabaseRequest.newBuilder()
.setDbName(requestParam.getDatabaseName())
.build();
.setDbName(requestParam.getDatabaseName())
.build();

DescribeDatabaseResponse response = blockingStub().describeDatabase(describeDatabaseRequest);
handleResponse(title, response.getStatus());
Expand Down Expand Up @@ -1351,7 +1365,8 @@ public R<RpcStatus> createIndex(@NonNull CreateIndexParam requestParam) {
Map<String, String> extraParams = requestParam.getExtraParam();
for (Map.Entry<String, String> entry : extraParams.entrySet()) {
if (entry.getKey().equals(Constant.PARAMS)) {
Map<String, String> tempParams = JsonUtils.fromJson(entry.getValue(), new TypeToken<Map<String, String>>() {}.getType());
Map<String, String> tempParams = JsonUtils.fromJson(entry.getValue(), new TypeToken<Map<String, String>>() {
}.getType());
for (String key : tempParams.keySet()) {
createIndexRequestBuilder.addExtraParams(KeyValuePair.newBuilder()
.setKey(key)
Expand Down Expand Up @@ -3185,7 +3200,7 @@ public R<RpcStatus> updateResourceGroups(UpdateResourceGroupsParam requestParam)
}
}

///////////////////// High Level API//////////////////////
/// ////////////////// High Level API//////////////////////
@Override
public R<RpcStatus> createCollection(CreateSimpleCollectionParam requestParam) {
if (!clientIsReady()) {
Expand All @@ -3197,21 +3212,21 @@ public R<RpcStatus> createCollection(CreateSimpleCollectionParam requestParam) {
try {
// step1: create collection
R<RpcStatus> createCollectionStatus = createCollection(requestParam.getCreateCollectionParam());
if(!Objects.equals(createCollectionStatus.getStatus(), R.success().getStatus())){
if (!Objects.equals(createCollectionStatus.getStatus(), R.success().getStatus())) {
logError("CreateCollection failed: {}", createCollectionStatus.getException().getMessage());
return R.failed(createCollectionStatus.getException());
}

// step2: create index
R<RpcStatus> createIndexStatus = createIndex(requestParam.getCreateIndexParam());
if(!Objects.equals(createIndexStatus.getStatus(), R.success().getStatus())){
if (!Objects.equals(createIndexStatus.getStatus(), R.success().getStatus())) {
logError("CreateIndex failed: {}", createIndexStatus.getException().getMessage());
return R.failed(createIndexStatus.getException());
}

// step3: load collection
R<RpcStatus> loadCollectionStatus = loadCollection(requestParam.getLoadCollectionParam());
if(!Objects.equals(loadCollectionStatus.getStatus(), R.success().getStatus())){
if (!Objects.equals(loadCollectionStatus.getStatus(), R.success().getStatus())) {
logError("LoadCollection failed: {}", loadCollectionStatus.getException().getMessage());
return R.failed(loadCollectionStatus.getException());
}
Expand All @@ -3234,10 +3249,10 @@ public R<ListCollectionsResponse> listCollections(ListCollectionsParam requestPa
}
logDebug(requestParam.toString());
String title = "ListCollectionsRequest";

try {
R<ShowCollectionsResponse> response = showCollections(requestParam.getShowCollectionsParam());
if(!Objects.equals(response.getStatus(), R.success().getStatus())){
if (!Objects.equals(response.getStatus(), R.success().getStatus())) {
logError("ListCollections failed: {}", response.getException().getMessage());
return R.failed(response.getException());
}
Expand All @@ -3261,10 +3276,10 @@ public R<InsertResponse> insert(InsertRowsParam requestParam) {
}
logDebug(requestParam.toString());
String title = "InsertRowsRequest";

try {
R<MutationResult> response = insert(requestParam.getInsertParam());
if(!Objects.equals(response.getStatus(), R.success().getStatus())){
if (!Objects.equals(response.getStatus(), R.success().getStatus())) {
logError("Insert failed: {}", response.getException().getMessage());
return R.failed(response.getException());
}
Expand Down Expand Up @@ -3394,7 +3409,7 @@ public R<QueryResponse> query(QuerySimpleParam requestParam) {
.withConsistencyLevel(requestParam.getConsistencyLevel())
.build();
R<QueryResults> response = query(queryParam);
if(!Objects.equals(response.getStatus(), R.success().getStatus())){
if (!Objects.equals(response.getStatus(), R.success().getStatus())) {
logError("Query failed: {}", response.getException().getMessage());
return R.failed(response.getException());
}
Expand Down Expand Up @@ -3452,7 +3467,7 @@ public R<SearchResponse> search(SearchSimpleParam requestParam) {

// search
R<SearchResults> response = search(searchParam);
if(!Objects.equals(response.getStatus(), R.success().getStatus())){
if (!Objects.equals(response.getStatus(), R.success().getStatus())) {
logError("Search failed: {}", response.getException().getMessage());
return R.failed(response.getException());
}
Expand Down Expand Up @@ -3483,7 +3498,8 @@ public R<QueryIterator> queryIterator(QueryIteratorParam requestParam) {
return R.failed(descResp.getException());
}
DescCollResponseWrapper descCollResponseWrapper = new DescCollResponseWrapper(descResp.getData());
QueryIterator queryIterator = new QueryIterator(requestParam, this.blockingStub(), descCollResponseWrapper.getPrimaryField());
// for MilvusClientV1, we don't support to set rpcDeadlineMs for iterator, rpcDeadlineMs is always 0(no deadline)
QueryIterator queryIterator = new QueryIterator(requestParam, new RpcStubWrapper(this.blockingStub(), 0L), descCollResponseWrapper.getPrimaryField());
return R.success(queryIterator);
}

Expand All @@ -3498,11 +3514,12 @@ public R<SearchIterator> searchIterator(SearchIteratorParam requestParam) {
return R.failed(descResp.getException());
}
DescCollResponseWrapper descCollResponseWrapper = new DescCollResponseWrapper(descResp.getData());
SearchIterator searchIterator = new SearchIterator(requestParam, this.blockingStub(), descCollResponseWrapper.getPrimaryField());
// for MilvusClientV1, we don't support to set rpcDeadlineMs for iterator, rpcDeadlineMs is always 0(no deadline)
SearchIterator searchIterator = new SearchIterator(requestParam, new RpcStubWrapper(this.blockingStub(), 0L), descCollResponseWrapper.getPrimaryField());
return R.success(searchIterator);
}

///////////////////// Log Functions//////////////////////
/// ////////////////// Log Functions//////////////////////
protected void logDebug(String msg, Object... params) {
if (logLevel.ordinal() <= LogLevel.Debug.ordinal()) {
logger.debug(msg, params);
Expand Down
Loading
Loading