diff --git a/examples/src/main/java/io/milvus/v1/IteratorExample.java b/examples/src/main/java/io/milvus/v1/IteratorExample.java index 3615dd2e7..2515be5d5 100644 --- a/examples/src/main/java/io/milvus/v1/IteratorExample.java +++ b/examples/src/main/java/io/milvus/v1/IteratorExample.java @@ -24,22 +24,16 @@ import io.milvus.client.MilvusServiceClient; import io.milvus.common.clientenum.ConsistencyLevelEnum; import io.milvus.grpc.DataType; -import io.milvus.grpc.FlushResponse; import io.milvus.grpc.GetCollectionStatisticsResponse; import io.milvus.grpc.MutationResult; -import io.milvus.param.ConnectParam; -import io.milvus.param.IndexType; -import io.milvus.param.MetricType; -import io.milvus.param.R; -import io.milvus.param.RetryParam; -import io.milvus.param.RpcStatus; +import io.milvus.orm.iterator.QueryIterator; +import io.milvus.orm.iterator.SearchIterator; +import io.milvus.param.*; import io.milvus.param.collection.*; import io.milvus.param.dml.InsertParam; import io.milvus.param.dml.QueryIteratorParam; import io.milvus.param.dml.SearchIteratorParam; import io.milvus.param.index.CreateIndexParam; -import io.milvus.orm.iterator.QueryIterator; -import io.milvus.orm.iterator.SearchIterator; import io.milvus.response.GetCollStatResponseWrapper; import io.milvus.response.QueryResultsWrapper; @@ -156,7 +150,7 @@ private void insertColumns() { List ages = new ArrayList<>(); List ids = new ArrayList<>(); for (long i = 0L; i < NUM_ENTITIES; ++i) { - ages.add((long) batch * NUM_ENTITIES + i); + ages.add(((long) batch * NUM_ENTITIES + i) % 100); ids.add((long) batch * NUM_ENTITIES + i); } @@ -205,20 +199,20 @@ private void prepareData() { } private void queryIterateCollectionNoOffset() { - String expr = String.format("10 <= %s <= 100", AGE_FIELD); + String expr = String.format("10 <= %s <= 30", AGE_FIELD); - QueryIterator queryIterator = getQueryIterator(expr, 0L, 5L, null); + QueryIterator queryIterator = getQueryIterator(expr, 0L, 1L, null); iterateQueryResult(queryIterator); } private void queryIterateCollectionWithOffset() { - String expr = String.format("10 <= %s <= 100", AGE_FIELD); + String expr = String.format("10 <= %s <= 100", ID_FIELD); QueryIterator queryIterator = getQueryIterator(expr, 10L, 50L, null); iterateQueryResult(queryIterator); } private void queryIterateCollectionWithLimit() { - String expr = String.format("10 <= %s <= 100", AGE_FIELD); + String expr = String.format("10 <= %s <= 100", ID_FIELD); QueryIterator queryIterator = getQueryIterator(expr, null, 80L, 530L); iterateQueryResult(queryIterator); } @@ -238,6 +232,7 @@ private void searchIteratorCollectionWithLimit() { } private void iterateQueryResult(QueryIterator queryIterator) { + System.out.println("\n========== queryIterator() =========="); int pageIdx = 0; int iterateCount = 0; while (true) { @@ -258,6 +253,7 @@ private void iterateQueryResult(QueryIterator queryIterator) { } private void iterateSearchResult(SearchIterator searchIterator) { + System.out.println("\n========== searchIterator() =========="); int pageIdx = 0; int iterateCount = 0; while (true) { @@ -327,6 +323,10 @@ public static void main(String[] args) { example.prepareData(); } + // set rpcTimeoutMs, just to verify it works for each call of query/search inside the iterator + // in versions older than 2.5.16/2.6.11, iterator.next() will timeout after several calls if the rpcTimeoutMs is greater than 0 + milvusClient.withTimeout(500, TimeUnit.MILLISECONDS); + example.queryIterateCollectionNoOffset(); example.queryIterateCollectionWithOffset(); example.queryIterateCollectionWithLimit(); diff --git a/examples/src/main/java/io/milvus/v2/IteratorExample.java b/examples/src/main/java/io/milvus/v2/IteratorExample.java index 7fae70b5e..74313a9ff 100644 --- a/examples/src/main/java/io/milvus/v2/IteratorExample.java +++ b/examples/src/main/java/io/milvus/v2/IteratorExample.java @@ -43,15 +43,18 @@ import org.apache.commons.lang3.StringUtils; import java.util.*; +import java.util.concurrent.TimeUnit; import java.util.function.Function; public class IteratorExample { private static final MilvusClientV2 client; + static { client = new MilvusClientV2(ConnectConfig.builder() .uri("http://localhost:19530") .build()); } + private static final String COLLECTION_NAME = "java_sdk_example_iterator_v2"; private static final String ID_FIELD = "userID"; private static final String AGE_FIELD = "userAge"; @@ -200,7 +203,7 @@ private static void searchIteratorV2(String filter, Map params, Function, List> externalFilterFunc) { System.out.println("\n========== searchIteratorV2() =========="); System.out.println(String.format("expr='%s', params='%s', batchSize=%d, topK=%d", - filter, params==null ? "" : params.toString(), batchSize, topK)); + filter, params == null ? "" : params.toString(), batchSize, topK)); SearchIteratorV2 searchIterator = client.searchIteratorV2(SearchIteratorReqV2.builder() .collectionName(COLLECTION_NAME) .outputFields(Lists.newArrayList(AGE_FIELD)) @@ -208,7 +211,7 @@ private static void searchIteratorV2(String filter, Map params, .vectorFieldName(VECTOR_FIELD) .vectors(Collections.singletonList(new FloatVec(CommonUtils.generateFloatVector(VECTOR_DIM)))) .filter(filter) - .searchParams(params==null ? new HashMap<>() : params) + .searchParams(params == null ? new HashMap<>() : params) .limit(topK) .metricType(IndexParam.MetricType.L2) .consistencyLevel(ConsistencyLevel.BOUNDED) @@ -235,21 +238,26 @@ private static void searchIteratorV2(String filter, Map params, public static void main(String[] args) { buildCollection(); - queryIterator("userID < 300",50, 5,400); + + // set rpcTimeoutMs, just to verify it works for each call of query/search inside the iterator + // in versions older than 2.5.16/2.6.11, iterator.next() will timeout after several calls if the rpcTimeoutMs is greater than 0 + client.withTimeout(500, TimeUnit.MILLISECONDS); + + queryIterator("userID < 3000", 1, 5, 10000); searchIteratorV1("userAge > 50 &&userAge < 100", "{\"range_filter\": 15.0, \"radius\": 20.0}", 100, 500); - searchIteratorV1("", "", 10, 99); + searchIteratorV1("", "", 1, 3000); searchIteratorV2("userAge > 10 &&userAge < 20", null, 50, 120, null); - Map extraParams = new HashMap<>(); - extraParams.put("radius",15.0); + Map extraParams = new HashMap<>(); + extraParams.put("radius", 15.0); searchIteratorV2("", extraParams, 50, 100, null); // use external function to filter the result - Function, List> externalFilterFunc = (List src)->{ + Function, List> externalFilterFunc = (List src) -> { List newRes = new ArrayList<>(); for (SearchResp.SearchResult res : src) { - long id = (long)res.getId(); - if (id%2 == 0) { + long id = (long) res.getId(); + if (id % 2 == 0) { newRes.add(res); } } diff --git a/sdk-core/src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java b/sdk-core/src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java index e3bf60599..2d08dfc40 100644 --- a/sdk-core/src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java +++ b/sdk-core/src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java @@ -20,22 +20,36 @@ package io.milvus.client; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.*; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.gson.reflect.TypeToken; import io.grpc.StatusRuntimeException; import io.milvus.common.utils.GTsDict; import io.milvus.common.utils.JsonUtils; import io.milvus.common.utils.VectorUtils; -import io.milvus.exception.*; +import io.milvus.exception.ClientNotConnectedException; +import io.milvus.exception.IllegalResponseException; +import io.milvus.exception.ServerException; import io.milvus.grpc.*; import io.milvus.orm.iterator.QueryIterator; +import io.milvus.orm.iterator.RpcStubWrapper; import io.milvus.orm.iterator.SearchIterator; import io.milvus.param.*; -import io.milvus.param.alias.*; -import io.milvus.param.bulkinsert.*; +import io.milvus.param.alias.AlterAliasParam; +import io.milvus.param.alias.CreateAliasParam; +import io.milvus.param.alias.DropAliasParam; +import io.milvus.param.alias.ListAliasesParam; +import io.milvus.param.bulkinsert.BulkInsertParam; +import io.milvus.param.bulkinsert.GetBulkInsertStateParam; +import io.milvus.param.bulkinsert.ListBulkInsertTasksParam; import io.milvus.param.collection.*; import io.milvus.param.control.*; -import io.milvus.param.credential.*; +import io.milvus.param.credential.CreateCredentialParam; +import io.milvus.param.credential.DeleteCredentialParam; +import io.milvus.param.credential.ListCredUsersParam; +import io.milvus.param.credential.UpdateCredentialParam; import io.milvus.param.dml.*; import io.milvus.param.highlevel.collection.CreateSimpleCollectionParam; import io.milvus.param.highlevel.collection.ListCollectionsParam; @@ -413,7 +427,7 @@ private void handleResponse(String requestInfo, io.milvus.grpc.Status status) { logDebug("{} successfully!", requestInfo); } - ///////////////////// API implementation ////////////////////// + /// ////////////////// API implementation ////////////////////// @Override public R hasCollection(@NonNull HasCollectionParam requestParam) { if (!clientIsReady()) { @@ -541,9 +555,9 @@ public R alterDatabase(AlterDatabaseParam requestParam) { try { List propertiesList = ParamUtils.AssembleKvPair(requestParam.getProperties()); AlterDatabaseRequest alterDatabaseRequest = AlterDatabaseRequest.newBuilder() - .setDbName(requestParam.getDatabaseName()) - .addAllProperties(propertiesList) - .build(); + .setDbName(requestParam.getDatabaseName()) + .addAllProperties(propertiesList) + .build(); Status response = blockingStub().alterDatabase(alterDatabaseRequest); handleResponse(title, response); @@ -567,8 +581,8 @@ public R describeDatabase(DescribeDatabaseParam reques String title = String.format("DescribeDatabaseRequest databaseName:%s", requestParam.getDatabaseName()); try { DescribeDatabaseRequest describeDatabaseRequest = DescribeDatabaseRequest.newBuilder() - .setDbName(requestParam.getDatabaseName()) - .build(); + .setDbName(requestParam.getDatabaseName()) + .build(); DescribeDatabaseResponse response = blockingStub().describeDatabase(describeDatabaseRequest); handleResponse(title, response.getStatus()); @@ -1351,7 +1365,8 @@ public R createIndex(@NonNull CreateIndexParam requestParam) { Map extraParams = requestParam.getExtraParam(); for (Map.Entry entry : extraParams.entrySet()) { if (entry.getKey().equals(Constant.PARAMS)) { - Map tempParams = JsonUtils.fromJson(entry.getValue(), new TypeToken>() {}.getType()); + Map tempParams = JsonUtils.fromJson(entry.getValue(), new TypeToken>() { + }.getType()); for (String key : tempParams.keySet()) { createIndexRequestBuilder.addExtraParams(KeyValuePair.newBuilder() .setKey(key) @@ -3185,7 +3200,7 @@ public R updateResourceGroups(UpdateResourceGroupsParam requestParam) } } - ///////////////////// High Level API////////////////////// + /// ////////////////// High Level API////////////////////// @Override public R createCollection(CreateSimpleCollectionParam requestParam) { if (!clientIsReady()) { @@ -3197,21 +3212,21 @@ public R createCollection(CreateSimpleCollectionParam requestParam) { try { // step1: create collection R createCollectionStatus = createCollection(requestParam.getCreateCollectionParam()); - if(!Objects.equals(createCollectionStatus.getStatus(), R.success().getStatus())){ + if (!Objects.equals(createCollectionStatus.getStatus(), R.success().getStatus())) { logError("CreateCollection failed: {}", createCollectionStatus.getException().getMessage()); return R.failed(createCollectionStatus.getException()); } // step2: create index R createIndexStatus = createIndex(requestParam.getCreateIndexParam()); - if(!Objects.equals(createIndexStatus.getStatus(), R.success().getStatus())){ + if (!Objects.equals(createIndexStatus.getStatus(), R.success().getStatus())) { logError("CreateIndex failed: {}", createIndexStatus.getException().getMessage()); return R.failed(createIndexStatus.getException()); } // step3: load collection R loadCollectionStatus = loadCollection(requestParam.getLoadCollectionParam()); - if(!Objects.equals(loadCollectionStatus.getStatus(), R.success().getStatus())){ + if (!Objects.equals(loadCollectionStatus.getStatus(), R.success().getStatus())) { logError("LoadCollection failed: {}", loadCollectionStatus.getException().getMessage()); return R.failed(loadCollectionStatus.getException()); } @@ -3234,10 +3249,10 @@ public R listCollections(ListCollectionsParam requestPa } logDebug(requestParam.toString()); String title = "ListCollectionsRequest"; - + try { R response = showCollections(requestParam.getShowCollectionsParam()); - if(!Objects.equals(response.getStatus(), R.success().getStatus())){ + if (!Objects.equals(response.getStatus(), R.success().getStatus())) { logError("ListCollections failed: {}", response.getException().getMessage()); return R.failed(response.getException()); } @@ -3261,10 +3276,10 @@ public R insert(InsertRowsParam requestParam) { } logDebug(requestParam.toString()); String title = "InsertRowsRequest"; - + try { R response = insert(requestParam.getInsertParam()); - if(!Objects.equals(response.getStatus(), R.success().getStatus())){ + if (!Objects.equals(response.getStatus(), R.success().getStatus())) { logError("Insert failed: {}", response.getException().getMessage()); return R.failed(response.getException()); } @@ -3394,7 +3409,7 @@ public R query(QuerySimpleParam requestParam) { .withConsistencyLevel(requestParam.getConsistencyLevel()) .build(); R response = query(queryParam); - if(!Objects.equals(response.getStatus(), R.success().getStatus())){ + if (!Objects.equals(response.getStatus(), R.success().getStatus())) { logError("Query failed: {}", response.getException().getMessage()); return R.failed(response.getException()); } @@ -3452,7 +3467,7 @@ public R search(SearchSimpleParam requestParam) { // search R response = search(searchParam); - if(!Objects.equals(response.getStatus(), R.success().getStatus())){ + if (!Objects.equals(response.getStatus(), R.success().getStatus())) { logError("Search failed: {}", response.getException().getMessage()); return R.failed(response.getException()); } @@ -3483,7 +3498,8 @@ public R queryIterator(QueryIteratorParam requestParam) { return R.failed(descResp.getException()); } DescCollResponseWrapper descCollResponseWrapper = new DescCollResponseWrapper(descResp.getData()); - QueryIterator queryIterator = new QueryIterator(requestParam, this.blockingStub(), descCollResponseWrapper.getPrimaryField()); + // for MilvusClientV1, we don't support to set rpcDeadlineMs for iterator, rpcDeadlineMs is always 0(no deadline) + QueryIterator queryIterator = new QueryIterator(requestParam, new RpcStubWrapper(this.blockingStub(), 0L), descCollResponseWrapper.getPrimaryField()); return R.success(queryIterator); } @@ -3498,11 +3514,12 @@ public R searchIterator(SearchIteratorParam requestParam) { return R.failed(descResp.getException()); } DescCollResponseWrapper descCollResponseWrapper = new DescCollResponseWrapper(descResp.getData()); - SearchIterator searchIterator = new SearchIterator(requestParam, this.blockingStub(), descCollResponseWrapper.getPrimaryField()); + // for MilvusClientV1, we don't support to set rpcDeadlineMs for iterator, rpcDeadlineMs is always 0(no deadline) + SearchIterator searchIterator = new SearchIterator(requestParam, new RpcStubWrapper(this.blockingStub(), 0L), descCollResponseWrapper.getPrimaryField()); return R.success(searchIterator); } - ///////////////////// Log Functions////////////////////// + /// ////////////////// Log Functions////////////////////// protected void logDebug(String msg, Object... params) { if (logLevel.ordinal() <= LogLevel.Debug.ordinal()) { logger.debug(msg, params); diff --git a/sdk-core/src/main/java/io/milvus/orm/iterator/QueryIterator.java b/sdk-core/src/main/java/io/milvus/orm/iterator/QueryIterator.java index 8286529d6..06f6487f9 100644 --- a/sdk-core/src/main/java/io/milvus/orm/iterator/QueryIterator.java +++ b/sdk-core/src/main/java/io/milvus/orm/iterator/QueryIterator.java @@ -19,7 +19,10 @@ package io.milvus.orm.iterator; -import io.milvus.grpc.*; +import io.milvus.grpc.DataType; +import io.milvus.grpc.KeyValuePair; +import io.milvus.grpc.QueryRequest; +import io.milvus.grpc.QueryResults; import io.milvus.param.Constant; import io.milvus.param.ParamUtils; import io.milvus.param.collection.FieldType; @@ -36,14 +39,12 @@ import java.util.ArrayList; import java.util.List; -import static io.milvus.param.Constant.NO_CACHE_ID; -import static io.milvus.param.Constant.MAX_BATCH_SIZE; -import static io.milvus.param.Constant.UNLIMITED; +import static io.milvus.param.Constant.*; public class QueryIterator { protected static final Logger logger = LoggerFactory.getLogger(RpcUtils.class); private final IteratorCache iteratorCache; - private final MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub; + private final RpcStubWrapper blockingStub; private final FieldType primaryField; private final QueryIteratorParam queryIteratorParam; @@ -58,7 +59,7 @@ public class QueryIterator { private long sessionTs = 0; public QueryIterator(QueryIteratorParam queryIteratorParam, - MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, + RpcStubWrapper blockingStub, FieldType primaryField) { this.iteratorCache = new IteratorCache(); this.blockingStub = blockingStub; @@ -76,7 +77,7 @@ public QueryIterator(QueryIteratorParam queryIteratorParam, } public QueryIterator(QueryIteratorReq queryIteratorReq, - MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, + RpcStubWrapper blockingStub, CreateCollectionReq.FieldSchema primaryField) { this.iteratorCache = new IteratorCache(); this.blockingStub = blockingStub; @@ -200,7 +201,7 @@ private String setupNextExpr() { if (StringUtils.isEmpty(currentExpr)) { return filteredPKStr; } - return " ( "+currentExpr+" ) " + " and " + filteredPKStr; + return " ( " + currentExpr + " ) " + " and " + filteredPKStr; } private boolean isResSufficient(List ret) { @@ -245,7 +246,7 @@ private QueryResults executeQuery(String expr, long offset, long limit, long ts, // set default consistency level builder.setUseDefaultConsistency(true); - QueryResults response = rpcUtils.retry(()->blockingStub.query(builder.build())); + QueryResults response = rpcUtils.retry(() -> blockingStub.get().query(builder.build())); String title = String.format("QueryRequest collectionName:%s", queryIteratorParam.getCollectionName()); rpcUtils.handleResponse(title, response.getStatus()); return response; diff --git a/sdk-core/src/main/java/io/milvus/orm/iterator/RpcStubWrapper.java b/sdk-core/src/main/java/io/milvus/orm/iterator/RpcStubWrapper.java new file mode 100644 index 000000000..fa0d9c8b5 --- /dev/null +++ b/sdk-core/src/main/java/io/milvus/orm/iterator/RpcStubWrapper.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.milvus.orm.iterator; + +import io.milvus.grpc.MilvusServiceGrpc; + +import java.util.concurrent.TimeUnit; + +public class RpcStubWrapper { + private final MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub; + + // rpcTimeoutMs of MilvusServiceBlockingStub.withDeadlineAfter() is "end of using time", not "timeout of per call", + // we have to reset this value for each time QueryIterator calls the query() interface. + // the rpcDeadlineMs value is passed from MilvusClient + private long rpcDeadlineMs = 0L; + + public RpcStubWrapper(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, + long rpcDeadlineMs) { + this.blockingStub = blockingStub; + this.rpcDeadlineMs = rpcDeadlineMs; + } + + public MilvusServiceGrpc.MilvusServiceBlockingStub get() { + if (rpcDeadlineMs > 0) { + return blockingStub.withDeadlineAfter(rpcDeadlineMs, TimeUnit.MILLISECONDS); + } else { + return blockingStub; + } + } +} \ No newline at end of file diff --git a/sdk-core/src/main/java/io/milvus/orm/iterator/SearchIterator.java b/sdk-core/src/main/java/io/milvus/orm/iterator/SearchIterator.java index 829b51a03..bf385dc98 100644 --- a/sdk-core/src/main/java/io/milvus/orm/iterator/SearchIterator.java +++ b/sdk-core/src/main/java/io/milvus/orm/iterator/SearchIterator.java @@ -25,7 +25,10 @@ import io.milvus.common.utils.ExceptionUtils; import io.milvus.common.utils.JsonUtils; import io.milvus.exception.ParamException; -import io.milvus.grpc.*; +import io.milvus.grpc.DataType; +import io.milvus.grpc.KeyValuePair; +import io.milvus.grpc.SearchRequest; +import io.milvus.grpc.SearchResults; import io.milvus.param.Constant; import io.milvus.param.MetricType; import io.milvus.param.ParamUtils; @@ -44,22 +47,17 @@ import java.nio.ByteBuffer; import java.text.DecimalFormat; -import java.util.*; - -import static io.milvus.param.Constant.DEFAULT_SEARCH_EXTENSION_RATE; -import static io.milvus.param.Constant.EF; -import static io.milvus.param.Constant.MAX_BATCH_SIZE; -import static io.milvus.param.Constant.MAX_FILTERED_IDS_COUNT_ITERATION; -import static io.milvus.param.Constant.MAX_TRY_TIME; -import static io.milvus.param.Constant.NO_CACHE_ID; -import static io.milvus.param.Constant.RADIUS; -import static io.milvus.param.Constant.RANGE_FILTER; -import static io.milvus.param.Constant.UNLIMITED; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; + +import static io.milvus.param.Constant.*; public class SearchIterator { private static final Logger logger = LoggerFactory.getLogger(SearchIterator.class); private final IteratorCache iteratorCache; - private final MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub; + private final RpcStubWrapper blockingStub; private final FieldType primaryField; private final SearchIteratorParam searchIteratorParam; @@ -81,7 +79,7 @@ public class SearchIterator { private long sessionTs = 0; public SearchIterator(SearchIteratorParam searchIteratorParam, - MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, + RpcStubWrapper blockingStub, FieldType primaryField) { this.iteratorCache = new IteratorCache(); this.searchIteratorParam = searchIteratorParam; @@ -102,7 +100,7 @@ public SearchIterator(SearchIteratorParam searchIteratorParam, // to support V2 public SearchIterator(SearchIteratorReq searchIteratorReq, - MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, + RpcStubWrapper blockingStub, CreateCollectionReq.FieldSchema primaryField) { this.iteratorCache = new IteratorCache(); this.blockingStub = blockingStub; @@ -168,7 +166,8 @@ private void initParams() { if (null != searchIteratorParam.getParams() && !searchIteratorParam.getParams().isEmpty()) { params = new HashMap<>(); } - params = JsonUtils.fromJson(searchIteratorParam.getParams(), new TypeToken>() {}.getType()); + params = JsonUtils.fromJson(searchIteratorParam.getParams(), new TypeToken>() { + }.getType()); } private void checkForSpecialIndexParam() { @@ -300,7 +299,7 @@ private SearchResults executeSearch(Map params, String nextExpr, // set default consistency level builder.setUseDefaultConsistency(true); - SearchResults response = rpcUtils.retry(()->blockingStub.search(builder.build())); + SearchResults response = rpcUtils.retry(() -> blockingStub.get().search(builder.build())); String title = String.format("SearchRequest collectionName:%s", searchIteratorParam.getCollectionName()); rpcUtils.handleResponse(title, response.getStatus()); return response; @@ -399,8 +398,8 @@ private List extractPageFromCache(long count) { throw new ParamException(msg); } - List retPageRes = cachedPage.subList(0, (int)count); - List leftCachePage = cachedPage.subList((int)count, cachedPage.size()); + List retPageRes = cachedPage.subList(0, (int) count); + List leftCachePage = cachedPage.subList((int) count, cachedPage.size()); iteratorCache.cache(cacheId, leftCachePage); return retPageRes; @@ -448,7 +447,8 @@ private Map nextParams(int coefficient) { // here we make a new object nextParams, to ensure is it a deep copy, we convert it to JSON string and // convert back to a Map. Map nextParams = JsonUtils.fromJson(JsonUtils.toJson(params), - new TypeToken>() {}.getType()); + new TypeToken>() { + }.getType()); if (metricsPositiveRelated(metricType)) { float nextRadius = tailBand + width * coefficient; diff --git a/sdk-core/src/main/java/io/milvus/orm/iterator/SearchIteratorV2.java b/sdk-core/src/main/java/io/milvus/orm/iterator/SearchIteratorV2.java index 65b5d8937..a11776548 100644 --- a/sdk-core/src/main/java/io/milvus/orm/iterator/SearchIteratorV2.java +++ b/sdk-core/src/main/java/io/milvus/orm/iterator/SearchIteratorV2.java @@ -33,7 +33,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; import java.util.function.Function; import static io.milvus.param.Constant.MAX_BATCH_SIZE; @@ -41,7 +43,7 @@ public class SearchIteratorV2 { private static final Logger logger = LoggerFactory.getLogger(SearchIterator.class); - private final MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub; + private final RpcStubWrapper blockingStub; private final SearchIteratorReqV2 searchIteratorReq; private final int batchSize; @@ -56,7 +58,7 @@ public class SearchIteratorV2 { // to support V2 public SearchIteratorV2(SearchIteratorReqV2 searchIteratorReq, - MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub) { + RpcStubWrapper blockingStub) { this.blockingStub = blockingStub; this.searchIteratorReq = searchIteratorReq; @@ -77,7 +79,7 @@ private void checkParams() { } searchParams = searchIteratorReq.getSearchParams(); - if (searchParams.containsKey(Constant.OFFSET) && (int)searchParams.get(Constant.OFFSET) > 0) { + if (searchParams.containsKey(Constant.OFFSET) && (int) searchParams.get(Constant.OFFSET) > 0) { ExceptionUtils.throwUnExpectedException("Offset is not supported for SearchIterator"); } @@ -99,7 +101,7 @@ private void setupCollectionID() { if (StringUtils.isNotEmpty(searchIteratorReq.getDatabaseName())) { builder.setDbName(searchIteratorReq.getDatabaseName()); } - DescribeCollectionResponse response = rpcUtils.retry(()->this.blockingStub.describeCollection(builder.build())); + DescribeCollectionResponse response = rpcUtils.retry(() -> this.blockingStub.get().describeCollection(builder.build())); String title = String.format("DescribeCollectionRequest collectionName:%s", searchIteratorReq.getCollectionName()); rpcUtils.handleResponse(title, response.getStatus()); @@ -126,7 +128,7 @@ private SearchResults executeSearch(int limit) { .groupByFieldName(searchIteratorReq.getGroupByFieldName()) .build(); SearchRequest searchRequest = new VectorUtils().ConvertToGrpcSearchRequest(request); - SearchResults response = rpcUtils.retry(()->this.blockingStub.search(searchRequest)); + SearchResults response = rpcUtils.retry(() -> this.blockingStub.get().search(searchRequest)); String title = String.format("SearchRequest collectionName:%s", searchIteratorReq.getCollectionName()); rpcUtils.handleResponse(title, response.getStatus()); @@ -200,7 +202,7 @@ private List _next() { searchParams.put("search_iter_id", iterInfo.getToken()); } - long ts = (long)searchParams.get("guarantee_timestamp"); + long ts = (long) searchParams.get("guarantee_timestamp"); if (ts <= 0) { if (response.getSessionTs() > 0) { searchParams.put("guarantee_timestamp", response.getSessionTs()); diff --git a/sdk-core/src/main/java/io/milvus/v2/client/MilvusClientV2.java b/sdk-core/src/main/java/io/milvus/v2/client/MilvusClientV2.java index d611b6f2a..7eeafb9a2 100644 --- a/sdk-core/src/main/java/io/milvus/v2/client/MilvusClientV2.java +++ b/sdk-core/src/main/java/io/milvus/v2/client/MilvusClientV2.java @@ -20,29 +20,39 @@ package io.milvus.v2.client; import io.grpc.ManagedChannel; -import io.milvus.grpc.*; +import io.milvus.grpc.ClientInfo; +import io.milvus.grpc.ConnectRequest; +import io.milvus.grpc.ConnectResponse; +import io.milvus.grpc.MilvusServiceGrpc; import io.milvus.orm.iterator.QueryIterator; +import io.milvus.orm.iterator.RpcStubWrapper; import io.milvus.orm.iterator.SearchIterator; import io.milvus.orm.iterator.SearchIteratorV2; - -import io.milvus.v2.service.database.DatabaseService; -import io.milvus.v2.service.database.request.*; -import io.milvus.v2.service.database.response.*; import io.milvus.v2.service.collection.CollectionService; import io.milvus.v2.service.collection.request.*; -import io.milvus.v2.service.collection.response.*; +import io.milvus.v2.service.collection.response.DescribeCollectionResp; +import io.milvus.v2.service.collection.response.DescribeReplicasResp; +import io.milvus.v2.service.collection.response.GetCollectionStatsResp; +import io.milvus.v2.service.collection.response.ListCollectionsResp; +import io.milvus.v2.service.database.DatabaseService; +import io.milvus.v2.service.database.request.*; +import io.milvus.v2.service.database.response.DescribeDatabaseResp; +import io.milvus.v2.service.database.response.ListDatabasesResp; import io.milvus.v2.service.index.IndexService; import io.milvus.v2.service.index.request.*; -import io.milvus.v2.service.index.response.*; +import io.milvus.v2.service.index.response.DescribeIndexResp; import io.milvus.v2.service.partition.PartitionService; import io.milvus.v2.service.partition.request.*; -import io.milvus.v2.service.partition.response.*; +import io.milvus.v2.service.partition.response.GetPartitionStatsResp; import io.milvus.v2.service.rbac.RBACService; import io.milvus.v2.service.rbac.request.*; -import io.milvus.v2.service.rbac.response.*; +import io.milvus.v2.service.rbac.response.DescribeRoleResp; +import io.milvus.v2.service.rbac.response.DescribeUserResp; +import io.milvus.v2.service.rbac.response.ListPrivilegeGroupsResp; import io.milvus.v2.service.resourcegroup.ResourceGroupService; import io.milvus.v2.service.resourcegroup.request.*; -import io.milvus.v2.service.resourcegroup.response.*; +import io.milvus.v2.service.resourcegroup.response.DescribeResourceGroupResp; +import io.milvus.v2.service.resourcegroup.response.ListResourceGroupsResp; import io.milvus.v2.service.utility.UtilityService; import io.milvus.v2.service.utility.request.*; import io.milvus.v2.service.utility.response.*; @@ -79,6 +89,7 @@ public class MilvusClientV2 { /** * Creates a Milvus client instance. + * * @param connectConfig Milvus server connection configuration */ public MilvusClientV2(ConnectConfig connectConfig) { @@ -106,10 +117,10 @@ private void initServices(String dbName) { * * @param connectConfig Milvus server connection configuration */ - private void connect(ConnectConfig connectConfig){ + private void connect(ConnectConfig connectConfig) { this.connectConfig = connectConfig; try { - if(this.channel != null) { + if (this.channel != null) { // close channel first close(3); } @@ -143,7 +154,7 @@ private MilvusServiceGrpc.MilvusServiceBlockingStub getRpcStub() { * This method is internal used, it calls a RPC Connect() to the remote server, * and sends the client info to the server so that the server knows which client is interacting, * especially for accesses log. - * + *

* The info includes: * 1. username(if Authentication is enabled) * 2. the client computer's name @@ -186,7 +197,7 @@ public MilvusClientV2 withTimeout(long timeout, TimeUnit timeoutUnit) { // if the input timeout value is not zero and less than 1ms, it will be treated as 1ms // if the input timeout value is larger than 1ms, it will be converted to an integer ms value long nn = timeoutUnit.toNanos(timeout); - long ms = (nn == 0) ? 0 : (nn < 1000000 ? 1 : nn/1000000); + long ms = (nn == 0) ? 0 : (nn < 1000000 ? 1 : nn / 1000000); connectConfig.setRpcDeadlineMs(ms); return this; } @@ -204,6 +215,7 @@ public String currentUsedDatabase() { ///////////////////////////////////////////////////////////////////////////////////////////// /** * use Database + * * @param dbName databaseName */ public void useDatabase(@NonNull String dbName) throws InterruptedException { @@ -214,7 +226,7 @@ public void useDatabase(@NonNull String dbName) throws InterruptedException { this.close(3); this.connect(this.connectConfig); this.initServices(dbName); - } catch (InterruptedException e){ + } catch (InterruptedException e) { logger.error("close connect error"); throw new RuntimeException(e); } @@ -222,28 +234,35 @@ public void useDatabase(@NonNull String dbName) throws InterruptedException { /** * Creates a database in Milvus. + * * @param request create database request */ public void createDatabase(CreateDatabaseReq request) { - rpcUtils.retry(()-> databaseService.createDatabase(this.getRpcStub(), request)); + rpcUtils.retry(() -> databaseService.createDatabase(this.getRpcStub(), request)); } + /** * Drops a database. Note that this method drops all data in the database. + * * @param request drop database request */ public void dropDatabase(DropDatabaseReq request) { - rpcUtils.retry(()-> databaseService.dropDatabase(this.getRpcStub(), request)); + rpcUtils.retry(() -> databaseService.dropDatabase(this.getRpcStub(), request)); } + /** * List all databases. + * * @return List of String database names */ public ListDatabasesResp listDatabases() { - return rpcUtils.retry(()-> databaseService.listDatabases(this.getRpcStub())); + return rpcUtils.retry(() -> databaseService.listDatabases(this.getRpcStub())); } + /** * Alter database with key value pair. (Available from Milvus v2.4.4) * Deprecated, replaced by alterDatabaseProperties from SDK v2.5.3, to keep consistence with other SDKs + * * @param request alter database request */ @Deprecated @@ -253,28 +272,33 @@ public void alterDatabase(AlterDatabaseReq request) { .properties(request.getProperties()) .build()); } + /** * Alter a database's properties. + * * @param request alter database properties request */ public void alterDatabaseProperties(AlterDatabasePropertiesReq request) { - rpcUtils.retry(()-> databaseService.alterDatabaseProperties(this.getRpcStub(), request)); + rpcUtils.retry(() -> databaseService.alterDatabaseProperties(this.getRpcStub(), request)); } + /** * drop a database's properties. + * * @param request alter database properties request */ public void dropDatabaseProperties(DropDatabasePropertiesReq request) { - rpcUtils.retry(()-> databaseService.dropDatabaseProperties(this.getRpcStub(), request)); + rpcUtils.retry(() -> databaseService.dropDatabaseProperties(this.getRpcStub(), request)); } + /** * Show detail of database base, such as replica number and resource groups. (Available from Milvus v2.4.4) - * @param request describe database request * + * @param request describe database request * @return DescribeDatabaseResp */ public DescribeDatabaseResp describeDatabase(DescribeDatabaseReq request) { - return rpcUtils.retry(()-> databaseService.describeDatabase(this.getRpcStub(), request)); + return rpcUtils.retry(() -> databaseService.describeDatabase(this.getRpcStub(), request)); } ///////////////////////////////////////////////////////////////////////////////////////////// @@ -282,14 +306,16 @@ public DescribeDatabaseResp describeDatabase(DescribeDatabaseReq request) { ///////////////////////////////////////////////////////////////////////////////////////////// /** * Creates a collection in Milvus. + * * @param request create collection request */ public void createCollection(CreateCollectionReq request) { - rpcUtils.retry(()-> collectionService.createCollection(this.getRpcStub(), request)); + rpcUtils.retry(() -> collectionService.createCollection(this.getRpcStub(), request)); } /** * Creates a collection schema. This method is deprecated from v2.5.9, replaced by CreateSchema() + * * @return CreateCollectionReq.CollectionSchema */ @Deprecated @@ -299,6 +325,7 @@ public CreateCollectionReq.CollectionSchema createSchema() { /** * Creates a collection schema. + * * @return CreateCollectionReq.CollectionSchema */ public static CreateCollectionReq.CollectionSchema CreateSchema() { @@ -311,8 +338,9 @@ public static CreateCollectionReq.CollectionSchema CreateSchema() { * @return List of String collection names */ public ListCollectionsResp listCollections() { - return rpcUtils.retry(()-> collectionService.listCollections(this.getRpcStub(), "")); + return rpcUtils.retry(() -> collectionService.listCollections(this.getRpcStub(), "")); } + /** * List milvus collections, can specify the target database * Note: the old API listCollections() doesn't have a ListCollectionsReq argument, we have to create @@ -321,16 +349,18 @@ public ListCollectionsResp listCollections() { * @return List of String collection names */ public ListCollectionsResp listCollectionsV2(ListCollectionsReq request) { - return rpcUtils.retry(()-> collectionService.listCollections(this.getRpcStub(), request.getDatabaseName())); + return rpcUtils.retry(() -> collectionService.listCollections(this.getRpcStub(), request.getDatabaseName())); } + /** * Drops a collection in Milvus. * * @param request drop collection request */ public void dropCollection(DropCollectionReq request) { - rpcUtils.retry(()-> collectionService.dropCollection(this.getRpcStub(), request)); + rpcUtils.retry(() -> collectionService.dropCollection(this.getRpcStub(), request)); } + /** * Alter a collection in Milvus. * Deprecated, replaced by alterCollectionProperties from SDK v2.5.3, to keep consistence with other SDKs @@ -345,36 +375,43 @@ public void alterCollection(AlterCollectionReq request) { .properties(request.getProperties()) .build()); } + /** * Alter a collection's properties. * * @param request alter collection properties request */ public void alterCollectionProperties(AlterCollectionPropertiesReq request) { - rpcUtils.retry(()-> collectionService.alterCollectionProperties(this.getRpcStub(), request)); + rpcUtils.retry(() -> collectionService.alterCollectionProperties(this.getRpcStub(), request)); } + /** * Alter a field's properties. * * @param request alter field properties request */ public void alterCollectionField(AlterCollectionFieldReq request) { - rpcUtils.retry(()-> collectionService.alterCollectionField(this.getRpcStub(), request)); + rpcUtils.retry(() -> collectionService.alterCollectionField(this.getRpcStub(), request)); } + /** * drop a collection's properties. + * * @param request drop collection properties request */ public void dropCollectionProperties(DropCollectionPropertiesReq request) { - rpcUtils.retry(()-> collectionService.dropCollectionProperties(this.getRpcStub(), request)); + rpcUtils.retry(() -> collectionService.dropCollectionProperties(this.getRpcStub(), request)); } + /** * drop a field's properties. + * * @param request drop field properties request */ public void dropCollectionFieldProperties(DropCollectionFieldPropertiesReq request) { - rpcUtils.retry(()-> collectionService.dropCollectionFieldProperties(this.getRpcStub(), request)); + rpcUtils.retry(() -> collectionService.dropCollectionFieldProperties(this.getRpcStub(), request)); } + /** * Checks whether a collection exists in Milvus. * @@ -382,8 +419,9 @@ public void dropCollectionFieldProperties(DropCollectionFieldPropertiesReq reque * @return Boolean */ public Boolean hasCollection(HasCollectionReq request) { - return rpcUtils.retry(()-> collectionService.hasCollection(this.getRpcStub(), request)); + return rpcUtils.retry(() -> collectionService.hasCollection(this.getRpcStub(), request)); } + /** * Gets the collection info in Milvus. * @@ -391,8 +429,9 @@ public Boolean hasCollection(HasCollectionReq request) { * @return DescribeCollectionResp */ public DescribeCollectionResp describeCollection(DescribeCollectionReq request) { - return rpcUtils.retry(()-> collectionService.describeCollection(this.getRpcStub(), request)); + return rpcUtils.retry(() -> collectionService.describeCollection(this.getRpcStub(), request)); } + /** * get collection stats for a collection in Milvus. * @@ -400,24 +439,27 @@ public DescribeCollectionResp describeCollection(DescribeCollectionReq request) * @return GetCollectionStatsResp */ public GetCollectionStatsResp getCollectionStats(GetCollectionStatsReq request) { - return rpcUtils.retry(()-> collectionService.getCollectionStats(this.getRpcStub(), request)); + return rpcUtils.retry(() -> collectionService.getCollectionStats(this.getRpcStub(), request)); } + /** * rename collection in a collection in Milvus. * * @param request rename collection request */ public void renameCollection(RenameCollectionReq request) { - rpcUtils.retry(()-> collectionService.renameCollection(this.getRpcStub(), request)); + rpcUtils.retry(() -> collectionService.renameCollection(this.getRpcStub(), request)); } + /** * Loads a collection into memory in Milvus. * * @param request load collection request */ public void loadCollection(LoadCollectionReq request) { - rpcUtils.retry(()-> collectionService.loadCollection(this.getRpcStub(), request)); + rpcUtils.retry(() -> collectionService.loadCollection(this.getRpcStub(), request)); } + /** * Refresh loads a collection. Mainly used when there are new segments generated by bulkinsert request. * Force the new segments to be loaded into memory. @@ -426,16 +468,18 @@ public void loadCollection(LoadCollectionReq request) { * @param request refresh load collection request */ public void refreshLoad(RefreshLoadReq request) { - rpcUtils.retry(()-> collectionService.refreshLoad(this.getRpcStub(), request)); + rpcUtils.retry(() -> collectionService.refreshLoad(this.getRpcStub(), request)); } + /** * Releases a collection from memory in Milvus. * * @param request release collection request */ public void releaseCollection(ReleaseCollectionReq request) { - rpcUtils.retry(()-> collectionService.releaseCollection(this.getRpcStub(), request)); + rpcUtils.retry(() -> collectionService.releaseCollection(this.getRpcStub(), request)); } + /** * Checks whether a collection is loaded in Milvus. * @@ -443,7 +487,7 @@ public void releaseCollection(ReleaseCollectionReq request) { * @return Boolean */ public Boolean getLoadState(GetLoadStateReq request) { - return rpcUtils.retry(()->collectionService.getLoadState(this.getRpcStub(), request)); + return rpcUtils.retry(() -> collectionService.getLoadState(this.getRpcStub(), request)); } /** @@ -452,7 +496,7 @@ public Boolean getLoadState(GetLoadStateReq request) { * @param request describe replicas request */ public DescribeReplicasResp describeReplicas(DescribeReplicasReq request) { - return rpcUtils.retry(()->collectionService.describeReplicas(this.getRpcStub(), request)); + return rpcUtils.retry(() -> collectionService.describeReplicas(this.getRpcStub(), request)); } ///////////////////////////////////////////////////////////////////////////////////////////// @@ -464,16 +508,18 @@ public DescribeReplicasResp describeReplicas(DescribeReplicasReq request) { * @param request create index request */ public void createIndex(CreateIndexReq request) { - rpcUtils.retry(()->indexService.createIndex(this.getRpcStub(), request)); + rpcUtils.retry(() -> indexService.createIndex(this.getRpcStub(), request)); } + /** * Drops an index for a specified field in a collection in Milvus. * * @param request drop index request */ public void dropIndex(DropIndexReq request) { - rpcUtils.retry(()->indexService.dropIndex(this.getRpcStub(), request)); + rpcUtils.retry(() -> indexService.dropIndex(this.getRpcStub(), request)); } + /** * Alter an index in Milvus. * Deprecated, replaced by alterIndexProperties from SDK v2.5.3, to keep consistence with other SDKs @@ -489,21 +535,25 @@ public void alterIndex(AlterIndexReq request) { .properties(request.getProperties()) .build()); } + /** * Alter an index's properties. * * @param request alter index request */ public void alterIndexProperties(AlterIndexPropertiesReq request) { - rpcUtils.retry(()->indexService.alterIndexProperties(this.getRpcStub(), request)); + rpcUtils.retry(() -> indexService.alterIndexProperties(this.getRpcStub(), request)); } + /** * drop an index's properties. + * * @param request drop index properties request */ public void dropIndexProperties(DropIndexPropertiesReq request) { - rpcUtils.retry(()-> indexService.dropIndexProperties(this.getRpcStub(), request)); + rpcUtils.retry(() -> indexService.dropIndexProperties(this.getRpcStub(), request)); } + /** * Checks whether an index exists for a specified field in a collection in Milvus. * @@ -511,8 +561,9 @@ public void dropIndexProperties(DropIndexPropertiesReq request) { * @return DescribeIndexResp */ public DescribeIndexResp describeIndex(DescribeIndexReq request) { - return rpcUtils.retry(()->indexService.describeIndex(this.getRpcStub(), request)); + return rpcUtils.retry(() -> indexService.describeIndex(this.getRpcStub(), request)); } + /** * Lists all indexes in a collection in Milvus. * @@ -520,7 +571,7 @@ public DescribeIndexResp describeIndex(DescribeIndexReq request) { * @return List of String names of the indexes */ public List listIndexes(ListIndexesReq request) { - return rpcUtils.retry(()->indexService.listIndexes(this.getRpcStub(), request)); + return rpcUtils.retry(() -> indexService.listIndexes(this.getRpcStub(), request)); } ///////////////////////////////////////////////////////////////////////////////////////////// @@ -533,8 +584,9 @@ public List listIndexes(ListIndexesReq request) { * @return InsertResp */ public InsertResp insert(InsertReq request) { - return rpcUtils.retry(()->vectorService.insert(this.getRpcStub(), request)); + return rpcUtils.retry(() -> vectorService.insert(this.getRpcStub(), request)); } + /** * Upsert vectors into a collection in Milvus. * @@ -542,8 +594,9 @@ public InsertResp insert(InsertReq request) { * @return UpsertResp */ public UpsertResp upsert(UpsertReq request) { - return rpcUtils.retry(()->vectorService.upsert(this.getRpcStub(), request)); + return rpcUtils.retry(() -> vectorService.upsert(this.getRpcStub(), request)); } + /** * Deletes vectors in a collection in Milvus. * @@ -551,8 +604,9 @@ public UpsertResp upsert(UpsertReq request) { * @return DeleteResp */ public DeleteResp delete(DeleteReq request) { - return rpcUtils.retry(()->vectorService.delete(this.getRpcStub(), request)); + return rpcUtils.retry(() -> vectorService.delete(this.getRpcStub(), request)); } + /** * Gets vectors in a collection in Milvus. * @@ -560,7 +614,7 @@ public DeleteResp delete(DeleteReq request) { * @return GetResp */ public GetResp get(GetReq request) { - return rpcUtils.retry(()->vectorService.get(this.getRpcStub(), request)); + return rpcUtils.retry(() -> vectorService.get(this.getRpcStub(), request)); } /** @@ -570,8 +624,9 @@ public GetResp get(GetReq request) { * @return QueryResp */ public QueryResp query(QueryReq request) { - return rpcUtils.retry(()->vectorService.query(this.getRpcStub(), request)); + return rpcUtils.retry(() -> vectorService.query(this.getRpcStub(), request)); } + /** * Searches vectors in a collection in Milvus. * @@ -579,8 +634,9 @@ public QueryResp query(QueryReq request) { * @return SearchResp */ public SearchResp search(SearchReq request) { - return rpcUtils.retry(()->vectorService.search(this.getRpcStub(), request)); + return rpcUtils.retry(() -> vectorService.search(this.getRpcStub(), request)); } + /** * Conducts multi vector similarity search with a ranker for rearrangement. * @@ -588,7 +644,7 @@ public SearchResp search(SearchReq request) { * @return SearchResp */ public SearchResp hybridSearch(HybridSearchReq request) { - return rpcUtils.retry(()->vectorService.hybridSearch(this.getRpcStub(), request)); + return rpcUtils.retry(() -> vectorService.hybridSearch(this.getRpcStub(), request)); } /** @@ -599,7 +655,7 @@ public SearchResp hybridSearch(HybridSearchReq request) { * @return QueryIterator */ public QueryIterator queryIterator(QueryIteratorReq request) { - return rpcUtils.retry(()->vectorService.queryIterator(this.getRpcStub(), request)); + return rpcUtils.retry(() -> vectorService.queryIterator(new RpcStubWrapper(this.getRpcStub(), connectConfig.getRpcDeadlineMs()), request)); } /** @@ -609,7 +665,7 @@ public QueryIterator queryIterator(QueryIteratorReq request) { * @return SearchIterator */ public SearchIterator searchIterator(SearchIteratorReq request) { - return rpcUtils.retry(()->vectorService.searchIterator(this.getRpcStub(), request)); + return rpcUtils.retry(() -> vectorService.searchIterator(new RpcStubWrapper(this.getRpcStub(), connectConfig.getRpcDeadlineMs()), request)); } /** @@ -619,7 +675,7 @@ public SearchIterator searchIterator(SearchIteratorReq request) { * @return SearchIteratorV2 */ public SearchIteratorV2 searchIteratorV2(SearchIteratorReqV2 request) { - return rpcUtils.retry(()->vectorService.searchIteratorV2(this.getRpcStub(), request)); + return rpcUtils.retry(() -> vectorService.searchIteratorV2(new RpcStubWrapper(this.getRpcStub(), connectConfig.getRpcDeadlineMs()), request)); } /** @@ -630,7 +686,7 @@ public SearchIteratorV2 searchIteratorV2(SearchIteratorReqV2 request) { * @return RunAnalyzerResp */ public RunAnalyzerResp runAnalyzer(RunAnalyzerReq request) { - return rpcUtils.retry(()->vectorService.runAnalyzer(this.getRpcStub(), request)); + return rpcUtils.retry(() -> vectorService.runAnalyzer(this.getRpcStub(), request)); } ///////////////////////////////////////////////////////////////////////////////////////////// @@ -642,7 +698,7 @@ public RunAnalyzerResp runAnalyzer(RunAnalyzerReq request) { * @param request create partition request */ public void createPartition(CreatePartitionReq request) { - rpcUtils.retry(()->partitionService.createPartition(this.getRpcStub(), request)); + rpcUtils.retry(() -> partitionService.createPartition(this.getRpcStub(), request)); } /** @@ -651,7 +707,7 @@ public void createPartition(CreatePartitionReq request) { * @param request drop partition request */ public void dropPartition(DropPartitionReq request) { - rpcUtils.retry(()->partitionService.dropPartition(this.getRpcStub(), request)); + rpcUtils.retry(() -> partitionService.dropPartition(this.getRpcStub(), request)); } /** @@ -661,7 +717,7 @@ public void dropPartition(DropPartitionReq request) { * @return Boolean */ public Boolean hasPartition(HasPartitionReq request) { - return rpcUtils.retry(()->partitionService.hasPartition(this.getRpcStub(), request)); + return rpcUtils.retry(() -> partitionService.hasPartition(this.getRpcStub(), request)); } /** @@ -671,7 +727,7 @@ public Boolean hasPartition(HasPartitionReq request) { * @return List of String partition names */ public List listPartitions(ListPartitionsReq request) { - return rpcUtils.retry(()->partitionService.listPartitions(this.getRpcStub(), request)); + return rpcUtils.retry(() -> partitionService.listPartitions(this.getRpcStub(), request)); } /** @@ -681,7 +737,7 @@ public List listPartitions(ListPartitionsReq request) { * @return GetPartitionStatsResp */ public GetPartitionStatsResp getPartitionStats(GetPartitionStatsReq request) { - return rpcUtils.retry(()-> partitionService.getPartitionStats(this.getRpcStub(), request)); + return rpcUtils.retry(() -> partitionService.getPartitionStats(this.getRpcStub(), request)); } /** @@ -690,15 +746,16 @@ public GetPartitionStatsResp getPartitionStats(GetPartitionStatsReq request) { * @param request load partitions request */ public void loadPartitions(LoadPartitionsReq request) { - rpcUtils.retry(()->partitionService.loadPartitions(this.getRpcStub(), request)); + rpcUtils.retry(() -> partitionService.loadPartitions(this.getRpcStub(), request)); } + /** * Releases partitions in a collection in Milvus. * * @param request release partitions request */ public void releasePartitions(ReleasePartitionsReq request) { - rpcUtils.retry(()->partitionService.releasePartitions(this.getRpcStub(), request)); + rpcUtils.retry(() -> partitionService.releasePartitions(this.getRpcStub(), request)); } ///////////////////////////////////////////////////////////////////////////////////////////// @@ -710,8 +767,9 @@ public void releasePartitions(ReleasePartitionsReq request) { * @return List of String usernames */ public List listUsers() { - return rpcUtils.retry(()->rbacService.listUsers(this.getRpcStub())); + return rpcUtils.retry(() -> rbacService.listUsers(this.getRpcStub())); } + /** * describe user * @@ -719,41 +777,46 @@ public List listUsers() { * @return DescribeUserResp */ public DescribeUserResp describeUser(DescribeUserReq request) { - return rpcUtils.retry(()->rbacService.describeUser(this.getRpcStub(), request)); + return rpcUtils.retry(() -> rbacService.describeUser(this.getRpcStub(), request)); } + /** * create user * * @param request create user request */ public void createUser(CreateUserReq request) { - rpcUtils.retry(()->rbacService.createUser(this.getRpcStub(), request)); + rpcUtils.retry(() -> rbacService.createUser(this.getRpcStub(), request)); } + /** * change password * * @param request change password request */ public void updatePassword(UpdatePasswordReq request) { - rpcUtils.retry(()->rbacService.updatePassword(this.getRpcStub(), request)); + rpcUtils.retry(() -> rbacService.updatePassword(this.getRpcStub(), request)); } + /** * drop user * * @param request drop user request */ public void dropUser(DropUserReq request) { - rpcUtils.retry(()->rbacService.dropUser(this.getRpcStub(), request)); + rpcUtils.retry(() -> rbacService.dropUser(this.getRpcStub(), request)); } // role operations + /** * list roles * * @return List of String role names */ public List listRoles() { - return rpcUtils.retry(()->rbacService.listRoles(this.getRpcStub())); + return rpcUtils.retry(() -> rbacService.listRoles(this.getRpcStub())); } + /** * describe role * @@ -761,83 +824,89 @@ public List listRoles() { * @return DescribeRoleResp */ public DescribeRoleResp describeRole(DescribeRoleReq request) { - return rpcUtils.retry(()->rbacService.describeRole(this.getRpcStub(), request)); + return rpcUtils.retry(() -> rbacService.describeRole(this.getRpcStub(), request)); } + /** * create role * * @param request create role request */ public void createRole(CreateRoleReq request) { - rpcUtils.retry(()->rbacService.createRole(this.getRpcStub(), request)); + rpcUtils.retry(() -> rbacService.createRole(this.getRpcStub(), request)); } + /** * drop role * * @param request drop role request */ public void dropRole(DropRoleReq request) { - rpcUtils.retry(()->rbacService.dropRole(this.getRpcStub(), request)); + rpcUtils.retry(() -> rbacService.dropRole(this.getRpcStub(), request)); } + /** * grant privilege * * @param request grant privilege request */ public void grantPrivilege(GrantPrivilegeReq request) { - rpcUtils.retry(()->rbacService.grantPrivilege(this.getRpcStub(), request)); + rpcUtils.retry(() -> rbacService.grantPrivilege(this.getRpcStub(), request)); } + /** * revoke privilege * * @param request revoke privilege request */ public void revokePrivilege(RevokePrivilegeReq request) { - rpcUtils.retry(()->rbacService.revokePrivilege(this.getRpcStub(), request)); + rpcUtils.retry(() -> rbacService.revokePrivilege(this.getRpcStub(), request)); } + /** * grant role * * @param request grant role request */ public void grantRole(GrantRoleReq request) { - rpcUtils.retry(()->rbacService.grantRole(this.getRpcStub(), request)); + rpcUtils.retry(() -> rbacService.grantRole(this.getRpcStub(), request)); } + /** * revoke role * * @param request revoke role request */ public void revokeRole(RevokeRoleReq request) { - rpcUtils.retry(()->rbacService.revokeRole(this.getRpcStub(), request)); + rpcUtils.retry(() -> rbacService.revokeRole(this.getRpcStub(), request)); } public void createPrivilegeGroup(CreatePrivilegeGroupReq request) { - rpcUtils.retry(()->rbacService.createPrivilegeGroup(this.getRpcStub(), request)); + rpcUtils.retry(() -> rbacService.createPrivilegeGroup(this.getRpcStub(), request)); } public void dropPrivilegeGroup(DropPrivilegeGroupReq request) { - rpcUtils.retry(()->rbacService.dropPrivilegeGroup(this.getRpcStub(), request)); + rpcUtils.retry(() -> rbacService.dropPrivilegeGroup(this.getRpcStub(), request)); } public ListPrivilegeGroupsResp listPrivilegeGroups(ListPrivilegeGroupsReq request) { - return rpcUtils.retry(()->rbacService.listPrivilegeGroups(this.getRpcStub(), request)); + return rpcUtils.retry(() -> rbacService.listPrivilegeGroups(this.getRpcStub(), request)); } public void addPrivilegesToGroup(AddPrivilegesToGroupReq request) { - rpcUtils.retry(()->rbacService.addPrivilegesToGroup(this.getRpcStub(), request)); + rpcUtils.retry(() -> rbacService.addPrivilegesToGroup(this.getRpcStub(), request)); } public void removePrivilegesFromGroup(RemovePrivilegesFromGroupReq request) { - rpcUtils.retry(()->rbacService.removePrivilegesFromGroup(this.getRpcStub(), request)); + rpcUtils.retry(() -> rbacService.removePrivilegesFromGroup(this.getRpcStub(), request)); } public void grantPrivilegeV2(GrantPrivilegeReqV2 request) { - rpcUtils.retry(()->rbacService.grantPrivilegeV2(this.getRpcStub(), request)); + rpcUtils.retry(() -> rbacService.grantPrivilegeV2(this.getRpcStub(), request)); } public void revokePrivilegeV2(RevokePrivilegeReqV2 request) { - rpcUtils.retry(()->rbacService.revokePrivilegeV2(this.getRpcStub(), request)); + rpcUtils.retry(() -> rbacService.revokePrivilegeV2(this.getRpcStub(), request)); } ///////////////////////////////////////////////////////////////////////////////////////////// @@ -848,8 +917,8 @@ public void revokePrivilegeV2(RevokePrivilegeReqV2 request) { * * @param request {@link CreateResourceGroupReq} */ - public void createResourceGroup(CreateResourceGroupReq request){ - rpcUtils.retry(()->rgroupService.createResourceGroup(this.getRpcStub(), request)); + public void createResourceGroup(CreateResourceGroupReq request) { + rpcUtils.retry(() -> rgroupService.createResourceGroup(this.getRpcStub(), request)); } /** @@ -858,7 +927,7 @@ public void createResourceGroup(CreateResourceGroupReq request){ * @param request {@link UpdateResourceGroupsReq} */ public void updateResourceGroups(UpdateResourceGroupsReq request) { - rpcUtils.retry(()->rgroupService.updateResourceGroups(this.getRpcStub(), request)); + rpcUtils.retry(() -> rgroupService.updateResourceGroups(this.getRpcStub(), request)); } /** @@ -867,7 +936,7 @@ public void updateResourceGroups(UpdateResourceGroupsReq request) { * @param request {@link DropResourceGroupReq} */ public void dropResourceGroup(DropResourceGroupReq request) { - rpcUtils.retry(()->rgroupService.dropResourceGroup(this.getRpcStub(), request)); + rpcUtils.retry(() -> rgroupService.dropResourceGroup(this.getRpcStub(), request)); } /** @@ -877,7 +946,7 @@ public void dropResourceGroup(DropResourceGroupReq request) { * @return ListResourceGroupsResp */ public ListResourceGroupsResp listResourceGroups(ListResourceGroupsReq request) { - return rpcUtils.retry(()->rgroupService.listResourceGroups(this.getRpcStub(), request)); + return rpcUtils.retry(() -> rgroupService.listResourceGroups(this.getRpcStub(), request)); } /** @@ -887,7 +956,7 @@ public ListResourceGroupsResp listResourceGroups(ListResourceGroupsReq request) * @return DescribeResourceGroupResp */ public DescribeResourceGroupResp describeResourceGroup(DescribeResourceGroupReq request) { - return rpcUtils.retry(()->rgroupService.describeResourceGroup(this.getRpcStub(), request)); + return rpcUtils.retry(() -> rgroupService.describeResourceGroup(this.getRpcStub(), request)); } /** @@ -896,7 +965,7 @@ public DescribeResourceGroupResp describeResourceGroup(DescribeResourceGroupReq * @param request {@link TransferNodeReq} */ public void transferNode(TransferNodeReq request) { - rpcUtils.retry(()->rgroupService.transferNode(this.getRpcStub(), request)); + rpcUtils.retry(() -> rgroupService.transferNode(this.getRpcStub(), request)); } /** @@ -905,7 +974,7 @@ public void transferNode(TransferNodeReq request) { * @param request {@link TransferReplicaReq} */ public void transferReplica(TransferReplicaReq request) { - rpcUtils.retry(()->rgroupService.transferReplica(this.getRpcStub(), request)); + rpcUtils.retry(() -> rgroupService.transferReplica(this.getRpcStub(), request)); } ///////////////////////////////////////////////////////////////////////////////////////////// @@ -917,24 +986,27 @@ public void transferReplica(TransferReplicaReq request) { * @param request create alias request */ public void createAlias(CreateAliasReq request) { - rpcUtils.retry(()->utilityService.createAlias(this.getRpcStub(), request)); + rpcUtils.retry(() -> utilityService.createAlias(this.getRpcStub(), request)); } + /** * drop aliases * * @param request drop alias request */ public void dropAlias(DropAliasReq request) { - rpcUtils.retry(()->utilityService.dropAlias(this.getRpcStub(), request)); + rpcUtils.retry(() -> utilityService.dropAlias(this.getRpcStub(), request)); } + /** * alter aliases * * @param request alter alias request */ public void alterAlias(AlterAliasReq request) { - rpcUtils.retry(()->utilityService.alterAlias(this.getRpcStub(), request)); + rpcUtils.retry(() -> utilityService.alterAlias(this.getRpcStub(), request)); } + /** * list aliases * @@ -942,8 +1014,9 @@ public void alterAlias(AlterAliasReq request) { * @return List of String alias names */ public ListAliasResp listAliases(ListAliasesReq request) { - return rpcUtils.retry(()->utilityService.listAliases(this.getRpcStub(), request)); + return rpcUtils.retry(() -> utilityService.listAliases(this.getRpcStub(), request)); } + /** * describe aliases * @@ -951,7 +1024,7 @@ public ListAliasResp listAliases(ListAliasesReq request) { * @return DescribeAliasResp */ public DescribeAliasResp describeAlias(DescribeAliasReq request) { - return rpcUtils.retry(()->utilityService.describeAlias(this.getRpcStub(), request)); + return rpcUtils.retry(() -> utilityService.describeAlias(this.getRpcStub(), request)); } /** @@ -960,7 +1033,7 @@ public DescribeAliasResp describeAlias(DescribeAliasReq request) { * @param request flush request */ public void flush(FlushReq request) { - FlushResp response = rpcUtils.retry(()->utilityService.flush(this.getRpcStub(), request)); + FlushResp response = rpcUtils.retry(() -> utilityService.flush(this.getRpcStub(), request)); // The BlockingStub.flush() api returns immediately after the datanode set all growing segments to be "sealed". // The flush state becomes "Completed" after the datanode uploading them to S3 asynchronously. @@ -981,7 +1054,7 @@ public void flush(FlushReq request) { * @return GetPersistentSegmentInfoResp */ public GetPersistentSegmentInfoResp getPersistentSegmentInfo(GetPersistentSegmentInfoReq request) { - return rpcUtils.retry(()->utilityService.getPersistentSegmentInfo(this.getRpcStub(), request)); + return rpcUtils.retry(() -> utilityService.getPersistentSegmentInfo(this.getRpcStub(), request)); } /** @@ -991,8 +1064,8 @@ public GetPersistentSegmentInfoResp getPersistentSegmentInfo(GetPersistentSegmen * @param request get request * @return GetQuerySegmentInfoResp */ - public GetQuerySegmentInfoResp getQuerySegmentInfo(GetQuerySegmentInfoReq request){ - return rpcUtils.retry(()->utilityService.getQuerySegmentInfo(this.getRpcStub(), request)); + public GetQuerySegmentInfoResp getQuerySegmentInfo(GetQuerySegmentInfoReq request) { + return rpcUtils.retry(() -> utilityService.getQuerySegmentInfo(this.getRpcStub(), request)); } /** @@ -1002,7 +1075,7 @@ public GetQuerySegmentInfoResp getQuerySegmentInfo(GetQuerySegmentInfoReq reques * @return CompactResp */ public CompactResp compact(CompactReq request) { - return rpcUtils.retry(()->utilityService.compact(this.getRpcStub(), request)); + return rpcUtils.retry(() -> utilityService.compact(this.getRpcStub(), request)); } /** @@ -1012,7 +1085,7 @@ public CompactResp compact(CompactReq request) { * @return GetCompactStateResp */ public GetCompactionStateResp getCompactionState(GetCompactionStateReq request) { - return rpcUtils.retry(()->utilityService.getCompactionState(this.getRpcStub(), request)); + return rpcUtils.retry(() -> utilityService.getCompactionState(this.getRpcStub(), request)); } /** @@ -1021,7 +1094,7 @@ public GetCompactionStateResp getCompactionState(GetCompactionStateReq request) * @return String */ public String getServerVersion() { - return rpcUtils.retry(()->clientUtils.getServerVersion(this.getRpcStub())); + return rpcUtils.retry(() -> clientUtils.getServerVersion(this.getRpcStub())); } /** @@ -1030,7 +1103,7 @@ public String getServerVersion() { * @return CheckHealthResp */ public CheckHealthResp checkHealth() { - return rpcUtils.retry(()->utilityService.checkHealth(this.getRpcStub())); + return rpcUtils.retry(() -> utilityService.checkHealth(this.getRpcStub())); } /** @@ -1040,7 +1113,7 @@ public CheckHealthResp checkHealth() { * @throws InterruptedException throws InterruptedException if the client failed to close connection */ public void close(long maxWaitSeconds) throws InterruptedException { - if(channel!= null){ + if (channel != null) { channel.shutdownNow(); channel.awaitTermination(maxWaitSeconds, TimeUnit.SECONDS); } diff --git a/sdk-core/src/main/java/io/milvus/v2/service/vector/VectorService.java b/sdk-core/src/main/java/io/milvus/v2/service/vector/VectorService.java index acad1b307..4b3a7ea3d 100644 --- a/sdk-core/src/main/java/io/milvus/v2/service/vector/VectorService.java +++ b/sdk-core/src/main/java/io/milvus/v2/service/vector/VectorService.java @@ -23,7 +23,10 @@ import io.milvus.common.utils.GTsDict; import io.milvus.common.utils.JsonUtils; import io.milvus.grpc.*; -import io.milvus.orm.iterator.*; +import io.milvus.orm.iterator.QueryIterator; +import io.milvus.orm.iterator.RpcStubWrapper; +import io.milvus.orm.iterator.SearchIterator; +import io.milvus.orm.iterator.SearchIteratorV2; import io.milvus.v2.exception.ErrorCode; import io.milvus.v2.exception.MilvusClientException; import io.milvus.v2.service.BaseService; @@ -289,25 +292,25 @@ public SearchResp hybridSearch(MilvusServiceGrpc.MilvusServiceBlockingStub block .build(); } - public QueryIterator queryIterator(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, - QueryIteratorReq request) { - DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, request.getDatabaseName(), + public QueryIterator queryIterator(RpcStubWrapper blockingStub, + QueryIteratorReq request) { + DescribeCollectionResponse descResp = getCollectionInfo(blockingStub.get(), request.getDatabaseName(), request.getCollectionName(), false); DescribeCollectionResp respR = convertUtils.convertDescCollectionResp(descResp); CreateCollectionReq.FieldSchema pkField = respR.getCollectionSchema().getField(respR.getPrimaryFieldName()); return new QueryIterator(request, blockingStub, pkField); } - public SearchIterator searchIterator(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, - SearchIteratorReq request) { - DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, request.getDatabaseName(), + public SearchIterator searchIterator(RpcStubWrapper blockingStub, + SearchIteratorReq request) { + DescribeCollectionResponse descResp = getCollectionInfo(blockingStub.get(), request.getDatabaseName(), request.getCollectionName(), false); DescribeCollectionResp respR = convertUtils.convertDescCollectionResp(descResp); CreateCollectionReq.FieldSchema pkField = respR.getCollectionSchema().getField(respR.getPrimaryFieldName()); return new SearchIterator(request, blockingStub, pkField); } - public SearchIteratorV2 searchIteratorV2(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, + public SearchIteratorV2 searchIteratorV2(RpcStubWrapper blockingStub, SearchIteratorReqV2 request) { return new SearchIteratorV2(request, blockingStub); } @@ -393,10 +396,10 @@ public RunAnalyzerResp runAnalyzer(MilvusServiceGrpc.MilvusServiceBlockingStub b List toResults = new ArrayList<>(); List results = response.getResultsList(); - results.forEach((item)->{ + results.forEach((item) -> { List toTokens = new ArrayList<>(); List tokens = item.getTokensList(); - tokens.forEach((token)->{ + tokens.forEach((token) -> { toTokens.add(RunAnalyzerResp.AnalyzerToken.builder() .token(token.getToken()) .startOffset(token.getStartOffset()) diff --git a/sdk-core/src/test/java/io/milvus/v2/client/MilvusClientV2DockerTest.java b/sdk-core/src/test/java/io/milvus/v2/client/MilvusClientV2DockerTest.java index fb4e912dd..dacd478f0 100644 --- a/sdk-core/src/test/java/io/milvus/v2/client/MilvusClientV2DockerTest.java +++ b/sdk-core/src/test/java/io/milvus/v2/client/MilvusClientV2DockerTest.java @@ -20,12 +20,17 @@ package io.milvus.v2.client; import com.google.common.collect.Lists; -import com.google.gson.*; - +import com.google.gson.Gson; +import com.google.gson.JsonArray; +import com.google.gson.JsonNull; +import com.google.gson.JsonObject; import com.google.gson.reflect.TypeToken; import io.milvus.TestUtils; import io.milvus.common.clientenum.FunctionType; -import io.milvus.common.resourcegroup.*; +import io.milvus.common.resourcegroup.NodeInfo; +import io.milvus.common.resourcegroup.ResourceGroupConfig; +import io.milvus.common.resourcegroup.ResourceGroupLimit; +import io.milvus.common.resourcegroup.ResourceGroupTransfer; import io.milvus.common.utils.Float16Utils; import io.milvus.common.utils.GTsDict; import io.milvus.common.utils.JsonUtils; @@ -41,29 +46,37 @@ import io.milvus.v2.common.IndexParam; import io.milvus.v2.exception.MilvusClientException; import io.milvus.v2.service.collection.request.*; -import io.milvus.v2.service.collection.response.*; +import io.milvus.v2.service.collection.response.DescribeCollectionResp; +import io.milvus.v2.service.collection.response.DescribeReplicasResp; +import io.milvus.v2.service.collection.response.ListCollectionsResp; import io.milvus.v2.service.database.request.*; -import io.milvus.v2.service.database.response.*; +import io.milvus.v2.service.database.response.DescribeDatabaseResp; +import io.milvus.v2.service.database.response.ListDatabasesResp; import io.milvus.v2.service.index.request.*; -import io.milvus.v2.service.index.response.*; +import io.milvus.v2.service.index.response.DescribeIndexResp; import io.milvus.v2.service.partition.request.*; import io.milvus.v2.service.rbac.PrivilegeGroup; -import io.milvus.v2.service.rbac.request.*; -import io.milvus.v2.service.rbac.response.*; +import io.milvus.v2.service.rbac.request.AddPrivilegesToGroupReq; +import io.milvus.v2.service.rbac.request.CreatePrivilegeGroupReq; +import io.milvus.v2.service.rbac.request.ListPrivilegeGroupsReq; +import io.milvus.v2.service.rbac.response.ListPrivilegeGroupsResp; import io.milvus.v2.service.resourcegroup.request.*; -import io.milvus.v2.service.resourcegroup.response.*; +import io.milvus.v2.service.resourcegroup.response.DescribeResourceGroupResp; +import io.milvus.v2.service.resourcegroup.response.ListResourceGroupsResp; import io.milvus.v2.service.utility.request.*; -import io.milvus.v2.service.utility.response.*; +import io.milvus.v2.service.utility.response.CheckHealthResp; +import io.milvus.v2.service.utility.response.CompactResp; +import io.milvus.v2.service.utility.response.GetPersistentSegmentInfoResp; +import io.milvus.v2.service.utility.response.GetQuerySegmentInfoResp; import io.milvus.v2.service.vector.request.*; import io.milvus.v2.service.vector.request.data.*; -import io.milvus.v2.service.vector.request.ranker.*; +import io.milvus.v2.service.vector.request.ranker.RRFRanker; import io.milvus.v2.service.vector.response.*; import org.apache.commons.lang3.StringUtils; import org.apache.commons.text.RandomStringGenerator; - +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Test; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; @@ -181,25 +194,25 @@ private List generateRandomData(CreateCollectionReq.CollectionSchema DataType dataType = field.getDataType(); switch (dataType) { case Bool: - row.addProperty(field.getName(), i%3==0); + row.addProperty(field.getName(), i % 3 == 0); break; case Int8: - row.addProperty(field.getName(), i%128); + row.addProperty(field.getName(), i % 128); break; case Int16: - row.addProperty(field.getName(), i%32768); + row.addProperty(field.getName(), i % 32768); break; case Int32: - row.addProperty(field.getName(), i%65536); + row.addProperty(field.getName(), i % 65536); break; case Int64: row.addProperty(field.getName(), i); break; case Float: - row.addProperty(field.getName(), i/8); + row.addProperty(field.getName(), i / 8); break; case Double: - row.addProperty(field.getName(), i/3); + row.addProperty(field.getName(), i / 3); break; case VarChar: row.addProperty(field.getName(), String.format("varchar_%d", i)); @@ -207,7 +220,7 @@ private List generateRandomData(CreateCollectionReq.CollectionSchema case JSON: { JsonObject jsonObj = new JsonObject(); jsonObj.addProperty(String.format("JSON_%d", i), i); - jsonObj.add("flags", JsonUtils.toJsonTree(new long[]{i, i+1, i + 2})); + jsonObj.add("flags", JsonUtils.toJsonTree(new long[]{i, i + 1, i + 2})); row.add(field.getName(), jsonObj); break; } @@ -273,13 +286,16 @@ private void verifyOutput(JsonObject row, Map entity) { Assertions.assertEquals(row.get("json_field").toString(), jsn.toString()); List arrInt = (List) entity.get("arr_int_field"); - List arrIntOri = JsonUtils.fromJson(row.get("arr_int_field"), new TypeToken>() {}.getType()); + List arrIntOri = JsonUtils.fromJson(row.get("arr_int_field"), new TypeToken>() { + }.getType()); Assertions.assertEquals(arrIntOri, arrInt); List arrFloat = (List) entity.get("arr_float_field"); - List arrFloatOri = JsonUtils.fromJson(row.get("arr_float_field"), new TypeToken>() {}.getType()); + List arrFloatOri = JsonUtils.fromJson(row.get("arr_float_field"), new TypeToken>() { + }.getType()); Assertions.assertEquals(arrFloatOri, arrFloat); List arrStr = (List) entity.get("arr_varchar_field"); - List arrStrOri = JsonUtils.fromJson(row.get("arr_varchar_field"), new TypeToken>() {}.getType()); + List arrStrOri = JsonUtils.fromJson(row.get("arr_varchar_field"), new TypeToken>() { + }.getType()); Assertions.assertEquals(arrStrOri, arrStr); } @@ -292,7 +308,7 @@ private long getRowCount(String dbName, String collectionName) { .build()); List queryResults = queryResp.getQueryResults(); Assertions.assertEquals(1, queryResults.size()); - return (long)queryResults.get(0).getEntity().get("count(*)"); + return (long) queryResults.get(0).getEntity().get("count(*)"); } @@ -333,7 +349,7 @@ void testFloatVectors() { .build()); // master branch, getPersistentSegmentInfo cannot ensure the segment is returned after flush() - while(true) { + while (true) { // get persistent segment info GetPersistentSegmentInfoResp pSegInfo = client.getPersistentSegmentInfo(GetPersistentSegmentInfoReq.builder() .collectionName(randomCollectionName) @@ -361,7 +377,7 @@ void testFloatVectors() { Assertions.assertTrue(compactResp.getCompactionID() == -1L || compactResp.getCompactionID() > 0L); // create index - Map extraParams = new HashMap<>(); + Map extraParams = new HashMap<>(); extraParams.put("M", 64); extraParams.put("efConstruction", 200); IndexParam indexParam = IndexParam.builder() @@ -470,9 +486,10 @@ void testFloatVectors() { List targetIDs = new ArrayList<>(); List targetVectors = new ArrayList<>(); for (int i = 0; i < nq; i++) { - JsonObject row = data.get(RANDOM.nextInt((int)count)); + JsonObject row = data.get(RANDOM.nextInt((int) count)); targetIDs.add(row.get("id").getAsLong()); - List vector = JsonUtils.fromJson(row.get(vectorFieldName), new TypeToken>() {}.getType()); + List vector = JsonUtils.fromJson(row.get(vectorFieldName), new TypeToken>() { + }.getType()); targetVectors.add(new FloatVec(vector)); } @@ -521,7 +538,7 @@ void testFloatVectors() { { // query with template - Map template = new HashMap<>(); + Map template = new HashMap<>(); template.put("id_arr", Arrays.asList(5, 6, 7)); QueryResp queryResp = client.query(QueryReq.builder() .collectionName(randomCollectionName) @@ -613,8 +630,8 @@ void testBinaryVectors() throws InterruptedException { .dimension(DIMENSION) .build()); - Map extraParams = new HashMap<>(); - extraParams.put("nlist",64); + Map extraParams = new HashMap<>(); + extraParams.put("nlist", 64); IndexParam indexParam = IndexParam.builder() .fieldName(vectorFieldName) .indexType(IndexParam.IndexType.BIN_IVF_FLAT) @@ -649,9 +666,10 @@ void testBinaryVectors() throws InterruptedException { List targetVectors = new ArrayList<>(); List targetOriginVectors = new ArrayList<>(); for (int i = 0; i < nq; i++) { - JsonObject row = data.get(RANDOM.nextInt((int)count)); + JsonObject row = data.get(RANDOM.nextInt((int) count)); targetIDs.add(row.get("id").getAsLong()); - byte[] vector = JsonUtils.fromJson(row.get(vectorFieldName), new TypeToken() {}.getType()); + byte[] vector = JsonUtils.fromJson(row.get(vectorFieldName), new TypeToken() { + }.getType()); targetOriginVectors.add(ByteBuffer.wrap(vector)); targetVectors.add(new BinaryVec(vector)); } @@ -696,8 +714,8 @@ void testFloat16Vectors() { .build()); List indexes = new ArrayList<>(); - Map extraParams = new HashMap<>(); - extraParams.put("nlist",64); + Map extraParams = new HashMap<>(); + extraParams.put("nlist", 64); indexes.add(IndexParam.builder() .fieldName(float16Field) .indexType(IndexParam.IndexType.IVF_FLAT) @@ -739,10 +757,10 @@ void testFloat16Vectors() { // update one row long targetID = 99; - JsonObject row = data.get((int)targetID); + JsonObject row = data.get((int) targetID); List originVector = new ArrayList<>(); for (int i = 0; i < DIMENSION; ++i) { - originVector.add((float)1/(i+1)); + originVector.add((float) 1 / (i + 1)); } // System.out.println("Original float32 vector: " + originVector); row.add(float16Field, JsonUtils.toJsonTree(Float16Utils.f32VectorToFp16Buffer(originVector).array())); @@ -826,8 +844,8 @@ void testSparseVectors() { .dimension(DIMENSION) .build()); - Map extraParams = new HashMap<>(); - extraParams.put("drop_ratio_build",0.2); + Map extraParams = new HashMap<>(); + extraParams.put("drop_ratio_build", 0.2); IndexParam indexParam = IndexParam.builder() .fieldName(vectorFieldName) .indexType(IndexParam.IndexType.SPARSE_INVERTED_INDEX) @@ -861,9 +879,10 @@ void testSparseVectors() { List targetIDs = new ArrayList<>(); List targetVectors = new ArrayList<>(); for (int i = 0; i < nq; i++) { - JsonObject row = data.get(RANDOM.nextInt((int)count)); + JsonObject row = data.get(RANDOM.nextInt((int) count)); targetIDs.add(row.get("id").getAsLong()); - SortedMap vector = JsonUtils.fromJson(row.get(vectorFieldName), new TypeToken>() {}.getType()); + SortedMap vector = JsonUtils.fromJson(row.get(vectorFieldName), new TypeToken>() { + }.getType()); targetVectors.add(new SparseFloatVec(vector)); } SearchResp searchResp = client.search(SearchReq.builder() @@ -908,7 +927,9 @@ void testHybridSearch() { .fieldName("float_vector") .indexType(IndexParam.IndexType.IVF_FLAT) .metricType(IndexParam.MetricType.L2) - .extraParams(new HashMap(){{put("nlist", 64);}}) + .extraParams(new HashMap() {{ + put("nlist", 64); + }}) .build()); indexParams.add(IndexParam.builder() .fieldName("binary_vector") @@ -919,7 +940,9 @@ void testHybridSearch() { .fieldName("sparse_vector") .indexType(IndexParam.IndexType.SPARSE_INVERTED_INDEX) .metricType(IndexParam.MetricType.IP) - .extraParams(new HashMap(){{put("drop_ratio_build", 0.1);}}) + .extraParams(new HashMap() {{ + put("drop_ratio_build", 0.1); + }}) .build()); CreateCollectionReq requestCreate = CreateCollectionReq.builder() @@ -976,13 +999,13 @@ void testHybridSearch() { .limit(topk) .consistencyLevel(ConsistencyLevel.BOUNDED) .build(); - }; + }; // search with an empty nq, return error - Assertions.assertThrows(MilvusClientException.class, ()->client.hybridSearch(genRequestFunc.apply(0))); + Assertions.assertThrows(MilvusClientException.class, () -> client.hybridSearch(genRequestFunc.apply(0))); // unequal nq, return error - Assertions.assertThrows(MilvusClientException.class, ()->client.hybridSearch(genRequestFunc.apply(1))); + Assertions.assertThrows(MilvusClientException.class, () -> client.hybridSearch(genRequestFunc.apply(1))); // search on empty collection, no result returned SearchResp searchResp = client.hybridSearch(genRequestFunc.apply(nq)); @@ -1045,7 +1068,9 @@ void testDeleteUpsert() { .fieldName("float_vector") .indexType(IndexParam.IndexType.IVF_FLAT) .metricType(IndexParam.MetricType.L2) - .extraParams(new HashMap(){{put("nlist", 64);}}) + .extraParams(new HashMap() {{ + put("nlist", 64); + }}) .build()); // create collection in the test db CreateCollectionReq requestCreate = CreateCollectionReq.builder() @@ -1061,7 +1086,7 @@ void testDeleteUpsert() { for (int i = 0; i < 10; i++) { JsonObject row = new JsonObject(); row.addProperty("pk", String.format("pk_%d", i)); - row.add("float_vector", JsonUtils.toJsonTree(new float[]{(float)i, (float)(i + 1), (float)(i + 2), (float)(i + 3)})); + row.add("float_vector", JsonUtils.toJsonTree(new float[]{(float) i, (float) (i + 1), (float) (i + 2), (float) (i + 3)})); data.add(row); } @@ -1187,7 +1212,7 @@ void testAlias() { .alias("CCC") .build()); - Assertions.assertThrows(MilvusClientException.class, ()->client.describeCollection(DescribeCollectionReq.builder() + Assertions.assertThrows(MilvusClientException.class, () -> client.describeCollection(DescribeCollectionReq.builder() .collectionName("CCC") .build())); } @@ -1270,9 +1295,9 @@ void testIndex() { .build()); List indexes = new ArrayList<>(); - Map extra = new HashMap<>(); - extra.put("M",8); - extra.put("efConstruction",64); + Map extra = new HashMap<>(); + extra.put("M", 8); + extra.put("efConstruction", 64); indexes.add(IndexParam.builder() .fieldName("vector") .indexName("abc") @@ -1511,7 +1536,7 @@ void testCacheCollectionSchema() throws InterruptedException { // insert wrong data, the schema cache will be removed row.add("vector", JsonUtils.toJsonTree(utils.generateFloatVector(7))); - Assertions.assertThrows(MilvusClientException.class, ()->client.insert(InsertReq.builder() + Assertions.assertThrows(MilvusClientException.class, () -> client.insert(InsertReq.builder() .databaseName("default") .collectionName(randomCollectionName) .data(Collections.singletonList(row)) @@ -1538,7 +1563,7 @@ void testCacheCollectionSchema() throws InterruptedException { // use the temp client to insert wrong data, wrong dimension row.addProperty("aaa", 22); row.add("vector", JsonUtils.toJsonTree(utils.generateFloatVector(7))); - Assertions.assertThrows(MilvusClientException.class, ()->tempClient.insert(InsertReq.builder() + Assertions.assertThrows(MilvusClientException.class, () -> tempClient.insert(InsertReq.builder() .collectionName(randomCollectionName) .data(Collections.singletonList(row)) .build())); @@ -1624,7 +1649,9 @@ public void testIterator() { .fieldName("sparse_vector") .indexType(IndexParam.IndexType.SPARSE_INVERTED_INDEX) .metricType(IndexParam.MetricType.IP) - .extraParams(new HashMap(){{put("drop_ratio_build", 0.1);}}) + .extraParams(new HashMap() {{ + put("drop_ratio_build", 0.1); + }}) .build()); indexParams.add(IndexParam.builder() .fieldName("bfloat16_vector") @@ -1652,11 +1679,14 @@ public void testIterator() { long rowCount = getRowCount("", randomCollectionName); Assertions.assertEquals(count, rowCount); + // set rpc timeout for each call + client.withTimeout(1000, TimeUnit.MILLISECONDS); + // search iterator SearchIterator searchIterator = client.searchIterator(SearchIteratorReq.builder() .collectionName(randomCollectionName) .outputFields(Lists.newArrayList("*")) - .batchSize(20L) + .batchSize(1L) .vectorFieldName("float_vector") .vectors(Collections.singletonList(new FloatVec(utils.generateFloatVector()))) .expr("int64_field > 500 && int64_field < 1000") @@ -1670,15 +1700,15 @@ public void testIterator() { while (true) { List res = searchIterator.next(); if (res.isEmpty()) { - System.out.println("search iteration finished, close"); + System.out.printf("search iteration finished, close, %d items fetched%n", counter); searchIterator.close(); break; } for (QueryResultsWrapper.RowRecord record : res) { Assertions.assertInstanceOf(Float.class, record.get("score")); - Assertions.assertTrue((float)record.get("score") >= 5.0); - Assertions.assertTrue((float)record.get("score") <= 50.0); + Assertions.assertTrue((float) record.get("score") >= 5.0); + Assertions.assertTrue((float) record.get("score") <= 50.0); Assertions.assertInstanceOf(Boolean.class, record.get("bool_field")); Assertions.assertInstanceOf(Integer.class, record.get("int8_field")); @@ -1695,28 +1725,28 @@ public void testIterator() { Assertions.assertInstanceOf(ByteBuffer.class, record.get("bfloat16_vector")); Assertions.assertInstanceOf(SortedMap.class, record.get("sparse_vector")); - long int64Val = (long)record.get("int64_field"); + long int64Val = (long) record.get("int64_field"); Assertions.assertTrue(int64Val > 500L && int64Val < 1000L); - String varcharVal = (String)record.get("varchar_field"); + String varcharVal = (String) record.get("varchar_field"); Assertions.assertTrue(varcharVal.startsWith("varchar_")); - JsonObject jsonObj = (JsonObject)record.get("json_field"); + JsonObject jsonObj = (JsonObject) record.get("json_field"); Assertions.assertTrue(jsonObj.has(String.format("JSON_%d", int64Val))); - List intArr = (List)record.get("arr_int_field"); + List intArr = (List) record.get("arr_int_field"); Assertions.assertTrue(intArr.size() <= 50); // max capacity 50 is defined in the baseSchema() - List floatVector = (List)record.get("float_vector"); + List floatVector = (List) record.get("float_vector"); Assertions.assertEquals(DIMENSION, floatVector.size()); - ByteBuffer binaryVector = (ByteBuffer)record.get("binary_vector"); - Assertions.assertEquals(DIMENSION, binaryVector.limit()*8); + ByteBuffer binaryVector = (ByteBuffer) record.get("binary_vector"); + Assertions.assertEquals(DIMENSION, binaryVector.limit() * 8); - ByteBuffer bfloat16Vector = (ByteBuffer)record.get("bfloat16_vector"); - Assertions.assertEquals(DIMENSION*2, bfloat16Vector.limit()); + ByteBuffer bfloat16Vector = (ByteBuffer) record.get("bfloat16_vector"); + Assertions.assertEquals(DIMENSION * 2, bfloat16Vector.limit()); - SortedMap sparseVector = (SortedMap)record.get("sparse_vector"); + SortedMap sparseVector = (SortedMap) record.get("sparse_vector"); Assertions.assertTrue(sparseVector.size() >= 10 && sparseVector.size() < 20); // defined in generateSparseVector() counter++; @@ -1732,7 +1762,7 @@ public void testIterator() { .collectionName(randomCollectionName) .expr("int64_field < " + String.valueOf(to)) .outputFields(Lists.newArrayList("*")) - .batchSize(50L) + .batchSize(1L) .offset(from) .limit(4000) .consistencyLevel(ConsistencyLevel.EVENTUALLY) @@ -1742,7 +1772,7 @@ public void testIterator() { while (true) { List res = queryIterator.next(); if (res.isEmpty()) { - System.out.println("query iteration finished, close"); + System.out.printf("query iteration finished, close, %d items fetched%n", counter); queryIterator.close(); break; } @@ -1764,29 +1794,29 @@ public void testIterator() { Assertions.assertInstanceOf(ByteBuffer.class, record.get("bfloat16_vector")); Assertions.assertInstanceOf(SortedMap.class, record.get("sparse_vector")); - long int64Val = (long)record.get("id"); + long int64Val = (long) record.get("id"); Assertions.assertTrue(int64Val >= from); Assertions.assertTrue(int64Val < to); - String varcharVal = (String)record.get("varchar_field"); + String varcharVal = (String) record.get("varchar_field"); Assertions.assertTrue(varcharVal.startsWith("varchar_")); - JsonObject jsonObj = (JsonObject)record.get("json_field"); + JsonObject jsonObj = (JsonObject) record.get("json_field"); Assertions.assertTrue(jsonObj.has(String.format("JSON_%d", int64Val))); - List intArr = (List)record.get("arr_int_field"); + List intArr = (List) record.get("arr_int_field"); Assertions.assertTrue(intArr.size() <= 50); // max capacity 50 is defined in the baseSchema() - List floatVector = (List)record.get("float_vector"); + List floatVector = (List) record.get("float_vector"); Assertions.assertEquals(DIMENSION, floatVector.size()); - ByteBuffer binaryVector = (ByteBuffer)record.get("binary_vector"); - Assertions.assertEquals(DIMENSION, binaryVector.limit()*8); + ByteBuffer binaryVector = (ByteBuffer) record.get("binary_vector"); + Assertions.assertEquals(DIMENSION, binaryVector.limit() * 8); - ByteBuffer bfloat16Vector = (ByteBuffer)record.get("bfloat16_vector"); - Assertions.assertEquals(DIMENSION*2, bfloat16Vector.limit()); + ByteBuffer bfloat16Vector = (ByteBuffer) record.get("bfloat16_vector"); + Assertions.assertEquals(DIMENSION * 2, bfloat16Vector.limit()); - SortedMap sparseVector = (SortedMap)record.get("sparse_vector"); + SortedMap sparseVector = (SortedMap) record.get("sparse_vector"); Assertions.assertTrue(sparseVector.size() >= 10 && sparseVector.size() < 20); // defined in generateSparseVector() counter++; @@ -1798,7 +1828,7 @@ public void testIterator() { SearchIteratorV2 searchIteratorV2 = client.searchIteratorV2(SearchIteratorReqV2.builder() .collectionName(randomCollectionName) .outputFields(Lists.newArrayList("*")) - .batchSize(1000L) + .batchSize(100L) .vectorFieldName("float_vector") .filter("id >= 50") .vectors(Collections.singletonList(new FloatVec(utils.generateFloatVector()))) @@ -1809,7 +1839,7 @@ public void testIterator() { while (true) { List res = searchIteratorV2.next(); if (res.isEmpty()) { - System.out.println("search iteration finished, close"); + System.out.printf("search iteration finished, close, %d items fetched%n", counter); searchIteratorV2.close(); break; } @@ -1831,27 +1861,27 @@ public void testIterator() { Assertions.assertInstanceOf(ByteBuffer.class, entity.get("bfloat16_vector")); Assertions.assertInstanceOf(SortedMap.class, entity.get("sparse_vector")); - String varcharVal = (String)entity.get("varchar_field"); + String varcharVal = (String) entity.get("varchar_field"); Assertions.assertTrue(varcharVal.startsWith("varchar_")); - long int64Val = (long)entity.get("int64_field"); - Assertions.assertEquals(int64Val, (long)record.getId()); - JsonObject jsonObj = (JsonObject)entity.get("json_field"); + long int64Val = (long) entity.get("int64_field"); + Assertions.assertEquals(int64Val, (long) record.getId()); + JsonObject jsonObj = (JsonObject) entity.get("json_field"); Assertions.assertTrue(jsonObj.has(String.format("JSON_%d", int64Val))); - List intArr = (List)entity.get("arr_int_field"); + List intArr = (List) entity.get("arr_int_field"); Assertions.assertTrue(intArr.size() <= 50); // max capacity 50 is defined in the baseSchema() - List floatVector = (List)entity.get("float_vector"); + List floatVector = (List) entity.get("float_vector"); Assertions.assertEquals(DIMENSION, floatVector.size()); - ByteBuffer binaryVector = (ByteBuffer)entity.get("binary_vector"); - Assertions.assertEquals(DIMENSION, binaryVector.limit()*8); + ByteBuffer binaryVector = (ByteBuffer) entity.get("binary_vector"); + Assertions.assertEquals(DIMENSION, binaryVector.limit() * 8); - ByteBuffer bfloat16Vector = (ByteBuffer)entity.get("bfloat16_vector"); - Assertions.assertEquals(DIMENSION*2, bfloat16Vector.limit()); + ByteBuffer bfloat16Vector = (ByteBuffer) entity.get("bfloat16_vector"); + Assertions.assertEquals(DIMENSION * 2, bfloat16Vector.limit()); - SortedMap sparseVector = (SortedMap)entity.get("sparse_vector"); + SortedMap sparseVector = (SortedMap) entity.get("sparse_vector"); Assertions.assertTrue(sparseVector.size() >= 10 && sparseVector.size() < 20); // defined in generateSparseVector() counter++; @@ -1859,7 +1889,10 @@ public void testIterator() { } // search iterator could not ensure that all the entities can be retrieved // expect count is 9950, but sometimes it returns 9949 or 9948 - Assertions.assertTrue(counter > ((int)count - 55) && counter <= ((int)count - 50)); + Assertions.assertTrue(counter > ((int) count - 55) && counter <= ((int) count - 50)); + + // reset rpc timeout to unlimited + client.withTimeout(0, TimeUnit.MILLISECONDS); client.dropCollection(DropCollectionReq.builder().collectionName(randomCollectionName).build()); } @@ -1889,7 +1922,7 @@ void testDatabase() { DescribeDatabaseResp descDBResp = client.describeDatabase(DescribeDatabaseReq.builder() .databaseName(tempDatabaseName) .build()); - Map propertiesResp = descDBResp.getProperties(); + Map propertiesResp = descDBResp.getProperties(); Assertions.assertTrue(propertiesResp.containsKey(Constant.DATABASE_REPLICA_NUMBER)); Assertions.assertEquals("5", propertiesResp.get(Constant.DATABASE_REPLICA_NUMBER)); @@ -1921,7 +1954,7 @@ void testDatabase() { Assertions.assertFalse(propertiesResp.containsKey("prop")); // switch to the temp database - Assertions.assertDoesNotThrow(()->client.useDatabase(tempDatabaseName)); + Assertions.assertDoesNotThrow(() -> client.useDatabase(tempDatabaseName)); // create a collection in the temp database String randomCollectionName = generator.generate(10); @@ -1946,7 +1979,7 @@ void testDatabase() { client.createCollection(requestCreate); // switch to the default database - Assertions.assertDoesNotThrow(()->client.useDatabase(currentDbName)); + Assertions.assertDoesNotThrow(() -> client.useDatabase(currentDbName)); // list collections in the temp database ListCollectionsResp listCollectionsResp = client.listCollectionsV2(ListCollectionsReq.builder() @@ -2272,7 +2305,7 @@ void testMultiThreadsInsert() { int cnt = rand.nextInt(100) + 100; for (int j = 0; j < cnt; j++) { JsonObject obj = new JsonObject(); - obj.addProperty("id", String.format("%d", i*cnt + j)); + obj.addProperty("id", String.format("%d", i * cnt + j)); List vector = utils.generateFloatVector(dim); obj.add("vector", JsonUtils.toJsonTree(vector)); obj.addProperty("dataTime", System.currentTimeMillis()); @@ -2316,7 +2349,7 @@ void testMultiThreadsInsert() { int cnt = rand.nextInt(100) + 100; for (int j = 0; j < cnt; j++) { JsonObject obj = new JsonObject(); - obj.addProperty("id", String.format("%d", i*cnt + j)); + obj.addProperty("id", String.format("%d", i * cnt + j)); List vector = utils.generateFloatVector(dim); obj.add("vector", JsonUtils.toJsonTree(vector)); obj.addProperty("dataTime", System.currentTimeMillis()); @@ -2374,7 +2407,7 @@ void testNullableAndDefaultValue() { .fieldName("flag") .dataType(DataType.Int32) .isNullable(true) - .defaultValue((int)10) + .defaultValue((int) 10) .build()); collectionSchema.addField(AddFieldReq.builder() .fieldName("desc") @@ -2420,7 +2453,7 @@ void testNullableAndDefaultValue() { List vector = utils.generateFloatVector(dim); row.addProperty("id", i); row.add("vector", JsonUtils.toJsonTree(vector)); - if (i%2 == 0) { + if (i % 2 == 0) { row.addProperty("flag", i); row.add("desc", JsonNull.INSTANCE); } else { @@ -2444,9 +2477,9 @@ void testNullableAndDefaultValue() { Function, Void> checkFunc = entity -> { - long id = (long)entity.get("id"); - if (id%2 == 0) { - Assertions.assertEquals((int)id, entity.get("flag")); + long id = (long) entity.get("id"); + if (id % 2 == 0) { + Assertions.assertEquals((int) id, entity.get("flag")); Assertions.assertNull(entity.get("desc")); Assertions.assertNull(entity.get("arr")); } else { @@ -2454,7 +2487,7 @@ void testNullableAndDefaultValue() { Assertions.assertEquals("AAA", entity.get("desc")); Object obj = entity.get("arr"); Assertions.assertInstanceOf(List.class, obj); - List arr = (List)obj; + List arr = (List) obj; Assertions.assertEquals(2, arr.size()); Assertions.assertEquals(5, arr.get(0)); Assertions.assertEquals(6, arr.get(1)); @@ -2492,7 +2525,7 @@ void testNullableAndDefaultValue() { Assertions.assertEquals(10, firstResults.size()); // System.out.println("Search results:"); for (SearchResp.SearchResult result : firstResults) { - long id = (long)result.getId(); + long id = (long) result.getId(); Map entity = result.getEntity(); checkFunc.apply(entity); // System.out.println(result); @@ -2549,7 +2582,9 @@ void testDocInOut() { .fieldName("sparse") .indexType(IndexParam.IndexType.SPARSE_INVERTED_INDEX) .metricType(IndexParam.MetricType.BM25) - .extraParams(new HashMap(){{put("drop_ratio_build", 0.1);}}) + .extraParams(new HashMap() {{ + put("drop_ratio_build", 0.1); + }}) .build()); CreateCollectionReq requestCreate = CreateCollectionReq.builder() .collectionName(randomCollectionName) @@ -2660,7 +2695,7 @@ void testDynamicField() { .outputFields(Collections.singletonList("count(*)")) .consistencyLevel(ConsistencyLevel.STRONG) .build()); - Assertions.assertEquals(100L, (long)countR.getQueryResults().get(0).getEntity().get("count(*)")); + Assertions.assertEquals(100L, (long) countR.getQueryResults().get(0).getEntity().get("count(*)")); GetResp getR = client.get(GetReq.builder() .collectionName(collectionName) @@ -2987,8 +3022,8 @@ void testConsistencyLevel() throws InterruptedException { Assertions.assertEquals(1, oneResult.get(0).size()); } } - return null; - }; + return null; + }; // test SESSION level createSimpleCollection(client, "", randomCollectionName, pkName, false, dim, ConsistencyLevel.SESSION);