diff --git a/CHANGELOG.md b/CHANGELOG.md index db2ecd7ff..9d3189d1e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ # Changelog -## milvus-sdk-java 2.6.1 (2025-07-14) +## milvus-sdk-java 2.6.1 (2025-07-15) ### Bug - Fix a bug of SearchResultsWrapper.getRowRecords() that returns wrong data for output fields @@ -10,7 +10,7 @@ - Avoid exception when search result is empty - BulkWriter supports Int8Vector -## milvus-sdk-java 2.5.11 (2025-07-14) +## milvus-sdk-java 2.5.11 (2025-07-15) ### Bug - Fix a bug of SearchResultsWrapper.getRowRecords() that returns wrong data for output fields - Fix a bug of flush that timestamp is not correctly passed diff --git a/sdk-core/src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java b/sdk-core/src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java index a9aa40b23..e3bf60599 100644 --- a/sdk-core/src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java +++ b/sdk-core/src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java @@ -464,8 +464,6 @@ public R createDatabase(CreateDatabaseParam requestParam) { .addAllProperties(propertiesList) .build(); - System.out.println(requestParam.getProperties()); - Status response = blockingStub().createDatabase(createDatabaseRequest); handleResponse(title, response); return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG)); @@ -1870,6 +1868,8 @@ public R search(@NonNull SearchParam requestParam) { String title = String.format("SearchRequest collectionName:%s", requestParam.getCollectionName()); try { + // reset the db name so that the timestamp cache can set correct key for this collection + requestParam.setDatabaseName(actualDbName(requestParam.getDatabaseName())); SearchRequest searchRequest = ParamUtils.convertSearchParam(requestParam); SearchResults response = this.blockingStub().search(searchRequest); @@ -1897,6 +1897,8 @@ public ListenableFuture> searchAsync(SearchParam requestParam) logDebug(requestParam.toString()); String title = String.format("SearchAsyncRequest collectionName:%s", requestParam.getCollectionName()); + // reset the db name so that the timestamp cache can set correct key for this collection + requestParam.setDatabaseName(actualDbName(requestParam.getDatabaseName())); SearchRequest searchRequest = ParamUtils.convertSearchParam(requestParam); ListenableFuture response = this.futureStub().search(searchRequest); @@ -1942,6 +1944,8 @@ public R hybridSearch(HybridSearchParam requestParam) { String title = String.format("HybridSearchRequest collectionName:%s", requestParam.getCollectionName()); try { + // reset the db name so that the timestamp cache can set correct key for this collection + requestParam.setDatabaseName(actualDbName(requestParam.getDatabaseName())); HybridSearchRequest searchRequest = ParamUtils.convertHybridSearchParam(requestParam); SearchResults response = this.blockingStub().hybridSearch(searchRequest); handleResponse(title, response.getStatus()); @@ -1965,6 +1969,8 @@ public ListenableFuture> hybridSearchAsync(HybridSearchParam re logDebug(requestParam.toString()); String title = String.format("HybridSearchAsyncRequest collectionName:%s", requestParam.getCollectionName()); + // reset the db name so that the timestamp cache can set correct key for this collection + requestParam.setDatabaseName(actualDbName(requestParam.getDatabaseName())); HybridSearchRequest searchRequest = ParamUtils.convertHybridSearchParam(requestParam); ListenableFuture response = this.futureStub().hybridSearch(searchRequest); @@ -2011,6 +2017,8 @@ public R query(@NonNull QueryParam requestParam) { requestParam.getCollectionName(), requestParam.getExpr()); try { + // reset the db name so that the timestamp cache can set correct key for this collection + requestParam.setDatabaseName(actualDbName(requestParam.getDatabaseName())); QueryRequest queryRequest = ParamUtils.convertQueryParam(requestParam); QueryResults response = this.blockingStub().query(queryRequest); @@ -2046,6 +2054,8 @@ public ListenableFuture> queryAsync(QueryParam requestParam) { String title = String.format("QueryAsyncRequest collectionName:%s, expr:%s", requestParam.getCollectionName(), requestParam.getExpr()); + // reset the db name so that the timestamp cache can set correct key for this collection + requestParam.setDatabaseName(actualDbName(requestParam.getDatabaseName())); QueryRequest queryRequest = ParamUtils.convertQueryParam(requestParam); ListenableFuture response = this.futureStub().query(queryRequest); diff --git a/sdk-core/src/main/java/io/milvus/common/utils/GTsDict.java b/sdk-core/src/main/java/io/milvus/common/utils/GTsDict.java index e890190ec..60773b588 100644 --- a/sdk-core/src/main/java/io/milvus/common/utils/GTsDict.java +++ b/sdk-core/src/main/java/io/milvus/common/utils/GTsDict.java @@ -52,19 +52,19 @@ public static String CombineCollectionName(String databaseName, String collectio private ConcurrentMap tsDict = new ConcurrentHashMap<>(); - public void updateCollectionTs(String collectionName, long ts) { + public void updateCollectionTs(String name, long ts) { // If the collection name exists, use its value to compare to the input ts, // only when the input ts is larger than the existing value, replace it with the input ts. // If the collection name doesn't exist, directly set the input value. - tsDict.compute(collectionName, (key, value) -> (value == null) ? ts : ((ts > value) ? ts : value)); + tsDict.compute(name, (key, value) -> (value == null) ? ts : ((ts > value) ? ts : value)); } - public Long getCollectionTs(String collectionName) { - return tsDict.get(collectionName); + public Long getCollectionTs(String name) { + return tsDict.get(name); } - public void removeCollectionTs(String collectionName) { - tsDict.remove(collectionName); + public void removeCollectionTs(String name) { + tsDict.remove(name); } public void cleanAllCollectionTs() { diff --git a/sdk-core/src/main/java/io/milvus/param/ParamUtils.java b/sdk-core/src/main/java/io/milvus/param/ParamUtils.java index 4cc0ad876..c88f2579b 100644 --- a/sdk-core/src/main/java/io/milvus/param/ParamUtils.java +++ b/sdk-core/src/main/java/io/milvus/param/ParamUtils.java @@ -849,14 +849,16 @@ public static void compatibleSearchParams(Map searchParams, Sear @SuppressWarnings("unchecked") public static SearchRequest convertSearchParam(@NonNull SearchParam requestParam) throws ParamException { + String dbName = requestParam.getDatabaseName(); + String collectionName = requestParam.getCollectionName(); SearchRequest.Builder builder = SearchRequest.newBuilder() - .setCollectionName(requestParam.getCollectionName()); + .setCollectionName(collectionName); if (!requestParam.getPartitionNames().isEmpty()) { requestParam.getPartitionNames().forEach(builder::addPartitionNames); } - if (StringUtils.isNotEmpty(requestParam.getDatabaseName())) { - builder.setDbName(requestParam.getDatabaseName()); + if (StringUtils.isNotEmpty(dbName)) { + builder.setDbName(dbName); } // prepare target vectors @@ -946,7 +948,7 @@ public static SearchRequest convertSearchParam(@NonNull SearchParam requestParam builder.setDsl(requestParam.getExpr()); } - long guaranteeTimestamp = getGuaranteeTimestamp(requestParam.getConsistencyLevel(), requestParam.getCollectionName()); + long guaranteeTimestamp = getGuaranteeTimestamp(requestParam.getConsistencyLevel(), dbName, collectionName); builder.setTravelTimestamp(requestParam.getTravelTimestamp()); // deprecated builder.setGuaranteeTimestamp(guaranteeTimestamp); @@ -1010,14 +1012,16 @@ public static SearchRequest convertAnnSearchParam(@NonNull AnnSearchParam annSea } public static HybridSearchRequest convertHybridSearchParam(@NonNull HybridSearchParam requestParam) throws ParamException { + String dbName = requestParam.getDatabaseName(); + String collectionName = requestParam.getCollectionName(); HybridSearchRequest.Builder builder = HybridSearchRequest.newBuilder() - .setCollectionName(requestParam.getCollectionName()); + .setCollectionName(collectionName); if (!requestParam.getPartitionNames().isEmpty()) { requestParam.getPartitionNames().forEach(builder::addPartitionNames); } - if (StringUtils.isNotEmpty(requestParam.getDatabaseName())) { - builder.setDbName(requestParam.getDatabaseName()); + if (StringUtils.isNotEmpty(dbName)) { + builder.setDbName(dbName); } for (AnnSearchParam req : requestParam.getSearchRequests()) { @@ -1063,7 +1067,7 @@ public static HybridSearchRequest convertHybridSearchParam(@NonNull HybridSearch requestParam.getOutFields().forEach(builder::addOutputFields); } - long guaranteeTimestamp = getGuaranteeTimestamp(requestParam.getConsistencyLevel(), requestParam.getCollectionName()); + long guaranteeTimestamp = getGuaranteeTimestamp(requestParam.getConsistencyLevel(), dbName, collectionName); builder.setGuaranteeTimestamp(guaranteeTimestamp); if (requestParam.getConsistencyLevel() == null) { @@ -1076,18 +1080,20 @@ public static HybridSearchRequest convertHybridSearchParam(@NonNull HybridSearch } public static QueryRequest convertQueryParam(@NonNull QueryParam requestParam) { + String dbName = requestParam.getDatabaseName(); + String collectionName = requestParam.getCollectionName(); boolean useDefaultConsistency = (requestParam.getConsistencyLevel() == null); - long guaranteeTimestamp = getGuaranteeTimestamp(requestParam.getConsistencyLevel(), requestParam.getCollectionName()); + long guaranteeTimestamp = getGuaranteeTimestamp(requestParam.getConsistencyLevel(), dbName, collectionName); QueryRequest.Builder builder = QueryRequest.newBuilder() - .setCollectionName(requestParam.getCollectionName()) + .setCollectionName(collectionName) .addAllPartitionNames(requestParam.getPartitionNames()) .addAllOutputFields(requestParam.getOutFields()) .setExpr(requestParam.getExpr()) .setTravelTimestamp(requestParam.getTravelTimestamp()) .setGuaranteeTimestamp(guaranteeTimestamp); - if (StringUtils.isNotEmpty(requestParam.getDatabaseName())) { - builder.setDbName(requestParam.getDatabaseName()); + if (StringUtils.isNotEmpty(dbName)) { + builder.setDbName(dbName); } // a new parameter from v2.2.9, if user didn't specify consistency level, set this parameter to true @@ -1124,17 +1130,20 @@ public static QueryRequest convertQueryParam(@NonNull QueryParam requestParam) { return builder.build(); } - private static long getGuaranteeTimestamp(ConsistencyLevelEnum consistencyLevel, String collectionName){ + private static long getGuaranteeTimestamp(ConsistencyLevelEnum consistencyLevel, String dbName, String collectionName){ if(consistencyLevel == null){ - Long ts = GTsDict.getInstance().getCollectionTs(collectionName); + String key = GTsDict.CombineCollectionName(dbName, collectionName); + Long ts = GTsDict.getInstance().getCollectionTs(key); return (ts == null) ? 1L : ts; } switch (consistencyLevel){ case STRONG: return 0L; - case SESSION: - Long ts = GTsDict.getInstance().getCollectionTs(collectionName); + case SESSION: { + String key = GTsDict.CombineCollectionName(dbName, collectionName); + Long ts = GTsDict.getInstance().getCollectionTs(key); return (ts == null) ? 1L : ts; + } case BOUNDED: return 2L; // let server side to determine the bounded time default: diff --git a/sdk-core/src/main/java/io/milvus/param/dml/HybridSearchParam.java b/sdk-core/src/main/java/io/milvus/param/dml/HybridSearchParam.java index af79ac5ad..0f68c6d39 100644 --- a/sdk-core/src/main/java/io/milvus/param/dml/HybridSearchParam.java +++ b/sdk-core/src/main/java/io/milvus/param/dml/HybridSearchParam.java @@ -27,6 +27,7 @@ import io.milvus.param.dml.ranker.BaseRanker; import lombok.Getter; import lombok.NonNull; +import lombok.Setter; import lombok.ToString; import java.util.List; @@ -37,7 +38,8 @@ @Getter @ToString public class HybridSearchParam { - private final String databaseName; + @Setter + private String databaseName; private final String collectionName; private final List partitionNames; private final List searchRequests; diff --git a/sdk-core/src/main/java/io/milvus/param/dml/QueryParam.java b/sdk-core/src/main/java/io/milvus/param/dml/QueryParam.java index db153c4ab..39584451d 100644 --- a/sdk-core/src/main/java/io/milvus/param/dml/QueryParam.java +++ b/sdk-core/src/main/java/io/milvus/param/dml/QueryParam.java @@ -26,6 +26,7 @@ import io.milvus.param.ParamUtils; import lombok.Getter; import lombok.NonNull; +import lombok.Setter; import lombok.ToString; import java.util.ArrayList; @@ -37,7 +38,8 @@ @Getter @ToString public class QueryParam { - private final String databaseName; + @Setter + private String databaseName; private final String collectionName; private final List partitionNames; private final List outFields; diff --git a/sdk-core/src/main/java/io/milvus/param/dml/SearchParam.java b/sdk-core/src/main/java/io/milvus/param/dml/SearchParam.java index 7e527aa92..df80781c8 100644 --- a/sdk-core/src/main/java/io/milvus/param/dml/SearchParam.java +++ b/sdk-core/src/main/java/io/milvus/param/dml/SearchParam.java @@ -28,6 +28,7 @@ import io.milvus.param.ParamUtils; import lombok.Getter; import lombok.NonNull; +import lombok.Setter; import java.nio.ByteBuffer; import java.util.List; @@ -38,7 +39,8 @@ */ @Getter public class SearchParam { - private final String databaseName; + @Setter + private String databaseName; private final String collectionName; private final List partitionNames; private final String metricType; diff --git a/sdk-core/src/main/java/io/milvus/v2/service/collection/CollectionService.java b/sdk-core/src/main/java/io/milvus/v2/service/collection/CollectionService.java index 3bbe55d04..23eb334f3 100644 --- a/sdk-core/src/main/java/io/milvus/v2/service/collection/CollectionService.java +++ b/sdk-core/src/main/java/io/milvus/v2/service/collection/CollectionService.java @@ -97,17 +97,18 @@ public Void createCollection(MilvusServiceGrpc.MilvusServiceBlockingStub blockin .fieldName(request.getVectorFieldName()) .build(); CreateIndexReq createIndexReq = CreateIndexReq.builder() + .databaseName(request.getDatabaseName()) + .collectionName(request.getCollectionName()) .indexParams(Collections.singletonList(indexParam)) - .collectionName(request.getCollectionName()) .sync(false) .build(); indexService.createIndex(blockingStub, createIndexReq); - //load collection, set async to true since no need to wait loading progress + //load collection, set sync to false since no need to wait loading progress try { - //TimeUnit.MILLISECONDS.sleep(1000); loadCollection(blockingStub, LoadCollectionReq.builder() - .sync(false) + .databaseName(request.getDatabaseName()) .collectionName(request.getCollectionName()) + .sync(false) .build()); } catch (Exception e) { throw new MilvusClientException(ErrorCode.SERVER_ERROR, "Load collection failed: " + e); @@ -160,16 +161,18 @@ public Void createCollectionWithSchema(MilvusServiceGrpc.MilvusServiceBlockingSt if(request.getIndexParams() != null && !request.getIndexParams().isEmpty()) { for(IndexParam indexParam : request.getIndexParams()) { CreateIndexReq createIndexReq = CreateIndexReq.builder() - .indexParams(Collections.singletonList(indexParam)) + .databaseName(request.getDatabaseName()) .collectionName(request.getCollectionName()) + .indexParams(Collections.singletonList(indexParam)) .sync(false) .build(); indexService.createIndex(blockingStub, createIndexReq); } - //load collection, set async to true since no need to wait loading progress + //load collection, set sync to true since no need to wait loading progress loadCollection(blockingStub, LoadCollectionReq.builder() - .sync(false) + .databaseName(request.getDatabaseName()) .collectionName(request.getCollectionName()) + .sync(false) .build()); } @@ -329,15 +332,17 @@ public Void renameCollection(MilvusServiceGrpc.MilvusServiceBlockingStub blockin public Void loadCollection(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, LoadCollectionReq request) { String title = String.format("LoadCollectionRequest collectionName:%s", request.getCollectionName()); - LoadCollectionRequest loadCollectionRequest = LoadCollectionRequest.newBuilder() + LoadCollectionRequest.Builder builder = LoadCollectionRequest.newBuilder() .setCollectionName(request.getCollectionName()) .setReplicaNumber(request.getNumReplicas()) .setRefresh(request.getRefresh()) .addAllLoadFields(request.getLoadFields()) .setSkipLoadDynamicField(request.getSkipLoadDynamicField()) - .addAllResourceGroups(request.getResourceGroups()) - .build(); - Status status = blockingStub.loadCollection(loadCollectionRequest); + .addAllResourceGroups(request.getResourceGroups()); + if (StringUtils.isNotEmpty(request.getDatabaseName())) { + builder.setDbName(request.getDatabaseName()); + } + Status status = blockingStub.loadCollection(builder.build()); rpcUtils.handleResponse(title, status); if (request.getSync()) { WaitForLoadCollection(blockingStub, request.getCollectionName(), request.getTimeout()); @@ -348,11 +353,13 @@ public Void loadCollection(MilvusServiceGrpc.MilvusServiceBlockingStub blockingS public Void refreshLoad(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, RefreshLoadReq request) { String title = String.format("RefreshLoadRequest collectionName:%s", request.getCollectionName()); - LoadCollectionRequest loadCollectionRequest = LoadCollectionRequest.newBuilder() + LoadCollectionRequest.Builder builder = LoadCollectionRequest.newBuilder() .setCollectionName(request.getCollectionName()) - .setRefresh(true) - .build(); - Status status = blockingStub.loadCollection(loadCollectionRequest); + .setRefresh(true); + if (StringUtils.isNotEmpty(request.getDatabaseName())) { + builder.setDbName(request.getDatabaseName()); + } + Status status = blockingStub.loadCollection(builder.build()); rpcUtils.handleResponse(title, status); if (request.getSync()) { WaitForLoadCollection(blockingStub, request.getCollectionName(), request.getTimeout()); diff --git a/sdk-core/src/main/java/io/milvus/v2/service/collection/request/CreateCollectionReq.java b/sdk-core/src/main/java/io/milvus/v2/service/collection/request/CreateCollectionReq.java index 1ccf5a69f..f0ef9ae1d 100644 --- a/sdk-core/src/main/java/io/milvus/v2/service/collection/request/CreateCollectionReq.java +++ b/sdk-core/src/main/java/io/milvus/v2/service/collection/request/CreateCollectionReq.java @@ -84,6 +84,9 @@ public class CreateCollectionReq { public static abstract class CreateCollectionReqBuilder> { public B indexParam(IndexParam indexParam) { + if(null == this.indexParams$value ){ + this.indexParams$value = new ArrayList<>(); + } try { this.indexParams$value.add(indexParam); }catch (UnsupportedOperationException _e){ diff --git a/sdk-core/src/main/java/io/milvus/v2/service/collection/request/LoadCollectionReq.java b/sdk-core/src/main/java/io/milvus/v2/service/collection/request/LoadCollectionReq.java index 82761cbb8..e8aacc4f9 100644 --- a/sdk-core/src/main/java/io/milvus/v2/service/collection/request/LoadCollectionReq.java +++ b/sdk-core/src/main/java/io/milvus/v2/service/collection/request/LoadCollectionReq.java @@ -29,6 +29,7 @@ @Data @SuperBuilder public class LoadCollectionReq { + private String databaseName; private String collectionName; @Builder.Default private Integer numReplicas = 1; diff --git a/sdk-core/src/main/java/io/milvus/v2/service/collection/request/RefreshLoadReq.java b/sdk-core/src/main/java/io/milvus/v2/service/collection/request/RefreshLoadReq.java index 15ebe46df..e2b3946af 100644 --- a/sdk-core/src/main/java/io/milvus/v2/service/collection/request/RefreshLoadReq.java +++ b/sdk-core/src/main/java/io/milvus/v2/service/collection/request/RefreshLoadReq.java @@ -26,6 +26,7 @@ @Data @SuperBuilder public class RefreshLoadReq { + private String databaseName; private String collectionName; @Builder.Default private Boolean async = Boolean.TRUE; diff --git a/sdk-core/src/main/java/io/milvus/v2/service/vector/VectorService.java b/sdk-core/src/main/java/io/milvus/v2/service/vector/VectorService.java index 86ef4656e..8ae0dec46 100644 --- a/sdk-core/src/main/java/io/milvus/v2/service/vector/VectorService.java +++ b/sdk-core/src/main/java/io/milvus/v2/service/vector/VectorService.java @@ -226,6 +226,9 @@ public QueryResp query(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, DescribeCollectionResp descResp = collectionService.describeCollection(blockingStub, descReq); request.setFilter(vectorUtils.getExprById(descResp.getPrimaryFieldName(), request.getIds())); } + + // reset the db name so that the timestamp cache can set correct key for this collection + request.setDatabaseName(actualDbName(request.getDatabaseName())); QueryResults response = blockingStub.query(vectorUtils.ConvertToGrpcQueryRequest(request)); rpcUtils.handleResponse(title, response.getStatus()); @@ -241,6 +244,8 @@ public SearchResp search(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStu //checkCollectionExist(blockingStub, request.getCollectionName()); + // reset the db name so that the timestamp cache can set correct key for this collection + request.setDatabaseName(actualDbName(request.getDatabaseName())); SearchRequest searchRequest = vectorUtils.ConvertToGrpcSearchRequest(request); SearchResults response = blockingStub.search(searchRequest); @@ -259,6 +264,8 @@ public SearchResp hybridSearch(MilvusServiceGrpc.MilvusServiceBlockingStub block //checkCollectionExist(blockingStub, request.getCollectionName()); + // reset the db name so that the timestamp cache can set correct key for this collection + request.setDatabaseName(actualDbName(request.getDatabaseName())); HybridSearchRequest searchRequest = vectorUtils.ConvertToGrpcHybridSearchRequest(request); SearchResults response = blockingStub.hybridSearch(searchRequest); diff --git a/sdk-core/src/main/java/io/milvus/v2/service/vector/request/AnnSearchReq.java b/sdk-core/src/main/java/io/milvus/v2/service/vector/request/AnnSearchReq.java index 92bd978a9..7c9d4ca3e 100644 --- a/sdk-core/src/main/java/io/milvus/v2/service/vector/request/AnnSearchReq.java +++ b/sdk-core/src/main/java/io/milvus/v2/service/vector/request/AnnSearchReq.java @@ -35,9 +35,12 @@ public class AnnSearchReq { @Deprecated private int topK = 0; @Builder.Default - private long limit = 0L; + private long limit = 0L; // deprecated, replaced by limit @Builder.Default - private String expr = ""; + @Deprecated + private String expr = ""; // deprecated, replaced by filter + @Builder.Default + private String filter = ""; private List vectors; private String params; @@ -45,7 +48,7 @@ public class AnnSearchReq { private IndexParam.MetricType metricType = null; public static abstract class AnnSearchReqBuilder> { - // topK is deprecated, topK and limit must be the same value + // topK is deprecated replaced by limit, topK and limit must be the same value public B topK(int val) { this.topK$value = val; this.topK$set = true; @@ -61,5 +64,22 @@ public B limit(long val) { this.limit$set = true; return self(); } + + // expr is deprecated replaced by filter, expr and filter must be the same value + public B expr(String val) { + this.expr$value = val; + this.expr$set = true; + this.filter$value = val; + this.filter$set = true; + return self(); + } + + public B filter(String val) { + this.expr$value = val; + this.expr$set = true; + this.filter$value = val; + this.filter$set = true; + return self(); + } } } diff --git a/sdk-core/src/main/java/io/milvus/v2/utils/VectorUtils.java b/sdk-core/src/main/java/io/milvus/v2/utils/VectorUtils.java index 19f5fee74..588150ce5 100644 --- a/sdk-core/src/main/java/io/milvus/v2/utils/VectorUtils.java +++ b/sdk-core/src/main/java/io/milvus/v2/utils/VectorUtils.java @@ -43,13 +43,17 @@ public class VectorUtils { public QueryRequest ConvertToGrpcQueryRequest(QueryReq request){ + String dbName = request.getDatabaseName(); + String collectionName = request.getCollectionName(); + long guaranteeTimestamp = getGuaranteeTimestamp(request.getConsistencyLevel(), dbName, collectionName); QueryRequest.Builder builder = QueryRequest.newBuilder() - .setCollectionName(request.getCollectionName()) + .setCollectionName(collectionName) .addAllPartitionNames(request.getPartitionNames()) .addAllOutputFields(request.getOutputFields()) + .setGuaranteeTimestamp(guaranteeTimestamp) .setExpr(request.getFilter()); - if (StringUtils.isNotEmpty(request.getDatabaseName())) { - builder.setDbName(request.getDatabaseName()); + if (StringUtils.isNotEmpty(dbName)) { + builder.setDbName(dbName); } if (request.getFilter() != null && !request.getFilter().isEmpty()) { @@ -94,17 +98,20 @@ public QueryRequest ConvertToGrpcQueryRequest(QueryReq request){ } - private static long getGuaranteeTimestamp(ConsistencyLevel consistencyLevel, String collectionName){ + private static long getGuaranteeTimestamp(ConsistencyLevel consistencyLevel, String dbName, String collectionName){ if(consistencyLevel == null){ - Long ts = GTsDict.getInstance().getCollectionTs(collectionName); + String key = GTsDict.CombineCollectionName(dbName, collectionName); + Long ts = GTsDict.getInstance().getCollectionTs(key); return (ts == null) ? 1L : ts; } switch (consistencyLevel){ case STRONG: return 0L; - case SESSION: - Long ts = GTsDict.getInstance().getCollectionTs(collectionName); - return (ts == null) ? 1L : ts; + case SESSION: { + String key = GTsDict.CombineCollectionName(dbName, collectionName); + Long ts = GTsDict.getInstance().getCollectionTs(key); + return (ts == null) ? 1L : ts; + } case BOUNDED: return 2L; // let server side to determine the bounded time default: @@ -138,14 +145,16 @@ private static ByteString convertPlaceholder(List data, PlaceholderType } public SearchRequest ConvertToGrpcSearchRequest(SearchReq request) { + String dbName = request.getDatabaseName(); + String collectionName = request.getCollectionName(); SearchRequest.Builder builder = SearchRequest.newBuilder() - .setCollectionName(request.getCollectionName()); + .setCollectionName(collectionName); if (!request.getPartitionNames().isEmpty()) { request.getPartitionNames().forEach(builder::addPartitionNames); } - if (StringUtils.isNotEmpty(request.getDatabaseName())) { - builder.setDbName(request.getDatabaseName()); + if (StringUtils.isNotEmpty(dbName)) { + builder.setDbName(dbName); } // prepare target, the input could be vectors or string list for doc-in-doc-out @@ -259,7 +268,7 @@ public SearchRequest ConvertToGrpcSearchRequest(SearchReq request) { } builder.setGuaranteeTimestamp(guaranteeTimestamp); } else { - long guaranteeTimestamp = getGuaranteeTimestamp(request.getConsistencyLevel(), request.getCollectionName()); + long guaranteeTimestamp = getGuaranteeTimestamp(request.getConsistencyLevel(), dbName, collectionName); builder.setGuaranteeTimestamp(guaranteeTimestamp); } @@ -442,14 +451,16 @@ public static SearchRequest convertAnnSearchParam(@NonNull AnnSearchReq annSearc } public HybridSearchRequest ConvertToGrpcHybridSearchRequest(HybridSearchReq request) { + String dbName = request.getDatabaseName(); + String collectionName = request.getCollectionName(); HybridSearchRequest.Builder builder = HybridSearchRequest.newBuilder() - .setCollectionName(request.getCollectionName()); + .setCollectionName(collectionName); if (request.getPartitionNames() != null && !request.getPartitionNames().isEmpty()) { request.getPartitionNames().forEach(builder::addPartitionNames); } - if (StringUtils.isNotEmpty(request.getDatabaseName())) { - builder.setDbName(request.getDatabaseName()); + if (StringUtils.isNotEmpty(dbName)) { + builder.setDbName(dbName); } if (request.getSearchRequests() == null || request.getSearchRequests().isEmpty()) { @@ -505,6 +516,9 @@ public HybridSearchRequest ConvertToGrpcHybridSearchRequest(HybridSearchReq requ request.getOutFields().forEach(builder::addOutputFields); } + long guaranteeTimestamp = getGuaranteeTimestamp(request.getConsistencyLevel(), dbName, collectionName); + builder.setGuaranteeTimestamp(guaranteeTimestamp); + if (request.getConsistencyLevel() == null) { builder.setUseDefaultConsistency(true); } else { diff --git a/sdk-core/src/test/java/io/milvus/client/MilvusClientDockerTest.java b/sdk-core/src/test/java/io/milvus/client/MilvusClientDockerTest.java index 2a4b16016..0efab7bbf 100644 --- a/sdk-core/src/test/java/io/milvus/client/MilvusClientDockerTest.java +++ b/sdk-core/src/test/java/io/milvus/client/MilvusClientDockerTest.java @@ -2916,8 +2916,10 @@ void testDatabase() { Assertions.assertEquals(R.Status.Success.getCode(), dropResponse.getStatus().intValue()); } - private static void createSimpleCollection(MilvusClient client, String collName, String pkName, boolean autoID, int dimension) { + private static void createSimpleCollection(MilvusClient client, String dbName, String collName, String pkName, + boolean autoID, int dimension, ConsistencyLevelEnum level) { client.dropCollection(DropCollectionParam.newBuilder() + .withDatabaseName(dbName) .withCollectionName(collName) .build()); @@ -2938,10 +2940,29 @@ private static void createSimpleCollection(MilvusClient client, String collName, // create collection R createR = client.createCollection(CreateCollectionParam.newBuilder() + .withDatabaseName(dbName) .withCollectionName(collName) .withFieldTypes(fieldsSchema) + .withConsistencyLevel(level) .build()); Assertions.assertEquals(R.Status.Success.getCode(), createR.getStatus().intValue()); + + CreateIndexParam indexParam = CreateIndexParam.newBuilder() + .withDatabaseName(dbName) + .withCollectionName(collName) + .withFieldName("vector") + .withIndexType(IndexType.FLAT) + .withMetricType(MetricType.L2) + .build(); + + R createIndexR = client.createIndex(indexParam); + Assertions.assertEquals(R.Status.Success.getCode(), createIndexR.getStatus().intValue()); + + R loadR = client.loadCollection(LoadCollectionParam.newBuilder() + .withDatabaseName(dbName) + .withCollectionName(collName) + .build()); + Assertions.assertEquals(R.Status.Success.getCode(), loadR.getStatus().intValue()); } @Test @@ -2958,7 +2979,7 @@ void testCacheCollectionSchema() throws InterruptedException { Assertions.assertEquals(R.Status.Success.getCode(), dbResponse.getStatus().intValue()); // create a collection in the default db - createSimpleCollection(client, randomCollectionName, "pk", false, DIMENSION); + createSimpleCollection(client, "", randomCollectionName, "pk", false, DIMENSION, ConsistencyLevelEnum.BOUNDED); // a temp client connect to the new db ConnectParam connectParam = connectParamBuilder() @@ -3019,7 +3040,7 @@ void testCacheCollectionSchema() throws InterruptedException { Assertions.assertTrue(ts12 > ts11); // create a new collection with the same name, different schema, in the test db - createSimpleCollection(tempClient, randomCollectionName, "aaa", false, 4); + createSimpleCollection(tempClient, "", randomCollectionName, "aaa", false, 4, ConsistencyLevelEnum.BOUNDED); // use the temp client to insert wrong data, wrong dimension row.addProperty("aaa", 22); @@ -3315,4 +3336,109 @@ void testNullableAndDefaultValue() { System.out.println(score); } } + + @Test + void testConsistencyLevel() { + String randomCollectionName = generator.generate(10); + String pkName = "pk"; + String vectorName = "vector"; + int dim = 4; + String defaultDbName = "default"; + String tempDbName = "db_for_level"; + + // create a temp database + CreateDatabaseParam createDatabaseParam = CreateDatabaseParam.newBuilder() + .withDatabaseName(tempDbName) + .build(); + R createResponse = client.createDatabase(createDatabaseParam); + Assertions.assertEquals(R.Status.Success.getCode(), createResponse.getStatus().intValue()); + + Function runTestFunc = + dbName -> { + // a client use the temp database + ConnectParam connectParam = connectParamBuilder() + .withDatabaseName(tempDbName) + .build(); + MilvusClientForTest tempClient = new MilvusClientForTest(connectParam); + + for (int i = 0; i < 20; i++) { + JsonObject row = new JsonObject(); + row.addProperty(pkName, i); + row.add(vectorName, JsonUtils.toJsonTree(utils.generateFloatVector(dim))); + tempClient.insert(InsertParam.newBuilder() + .withDatabaseName(dbName) + .withCollectionName(randomCollectionName) + .withRows(Collections.singletonList(row)) + .build()); + + // query/search/hybridSearch immediately after insert, data must be visible + String expr = String.format("%s == %d", pkName, i); + if (i%3 == 0) { + R fetchR = tempClient.query(QueryParam.newBuilder() + .withDatabaseName(dbName) + .withCollectionName(randomCollectionName) + .withExpr(expr) + .withLimit(5L) + .addOutField(pkName) + .build()); + Assertions.assertEquals(R.Status.Success.getCode(), fetchR.getStatus().intValue()); + QueryResultsWrapper oneResult = new QueryResultsWrapper(fetchR.getData()); + List records = oneResult.getRowRecords(); + Assertions.assertEquals(1L, records.size()); + } else if (i%2 == 0) { + R searchOne = tempClient.search(SearchParam.newBuilder() + .withDatabaseName(dbName) + .withCollectionName(randomCollectionName) + .withVectorFieldName(vectorName) + .withLimit(5L) + .withExpr(expr) + .withFloatVectors(Collections.singletonList(utils.generateFloatVector(dim))) + .addOutField(pkName) + .build()); + Assertions.assertEquals(R.Status.Success.getCode(), searchOne.getStatus().intValue()); + + SearchResultsWrapper oneResult = new SearchResultsWrapper(searchOne.getData().getResults()); + List scores = oneResult.getIDScore(0); + Assertions.assertEquals(1, scores.size()); + } else { + AnnSearchParam subReq = AnnSearchParam.newBuilder() + .withVectorFieldName(vectorName) + .withExpr(expr) + .withFloatVectors(Collections.singletonList(utils.generateFloatVector(dim))) + .withLimit(5L) + .build(); + + R searchR = tempClient.hybridSearch(HybridSearchParam.newBuilder() + .withDatabaseName(dbName) + .withCollectionName(randomCollectionName) + .addSearchRequest(subReq) + .withLimit(5L) + .withRanker(WeightedRanker.newBuilder() + .withWeights(Collections.singletonList(1.0f)) + .build()) + .withOutFields(Collections.singletonList(pkName)) + .build()); + Assertions.assertEquals(R.Status.Success.getCode(), searchR.getStatus().intValue()); + SearchResultsWrapper oneResult = new SearchResultsWrapper(searchR.getData().getResults()); + List scores = oneResult.getIDScore(0); + Assertions.assertEquals(1, scores.size()); + } + } + return null; + }; + + // test SESSION level + createSimpleCollection(client, "", randomCollectionName, pkName, false, dim, ConsistencyLevelEnum.SESSION); + runTestFunc.apply(defaultDbName); + + createSimpleCollection(client, tempDbName, randomCollectionName, pkName, false, dim, ConsistencyLevelEnum.SESSION); + runTestFunc.apply(tempDbName); + + // test STRONG level + createSimpleCollection(client, "", randomCollectionName, pkName, false, dim, ConsistencyLevelEnum.STRONG); + runTestFunc.apply(defaultDbName); + + createSimpleCollection(client, tempDbName, randomCollectionName, pkName, false, dim, ConsistencyLevelEnum.STRONG); + runTestFunc.apply(tempDbName); + } } diff --git a/sdk-core/src/test/java/io/milvus/v2/client/MilvusClientV2DockerTest.java b/sdk-core/src/test/java/io/milvus/v2/client/MilvusClientV2DockerTest.java index e760429ff..2ab4ee7ec 100644 --- a/sdk-core/src/test/java/io/milvus/v2/client/MilvusClientV2DockerTest.java +++ b/sdk-core/src/test/java/io/milvus/v2/client/MilvusClientV2DockerTest.java @@ -33,7 +33,6 @@ import io.milvus.orm.iterator.SearchIterator; import io.milvus.orm.iterator.SearchIteratorV2; import io.milvus.param.Constant; -import io.milvus.param.dml.HybridSearchParam; import io.milvus.pool.MilvusClientV2Pool; import io.milvus.pool.PoolConfig; import io.milvus.response.QueryResultsWrapper; @@ -1512,16 +1511,20 @@ void testIndex() { Assertions.assertEquals("64", extraParams.get("efConstruction")); } - private static void createSimpleCollection(MilvusClientV2 client, String collName, String pkName, boolean autoID, int dimension) { + private static void createSimpleCollection(MilvusClientV2 client, String dbName, String collName, String pkName, boolean autoID, + int dimension, ConsistencyLevel level) { client.dropCollection(DropCollectionReq.builder() + .databaseName(dbName) .collectionName(collName) .build()); client.createCollection(CreateCollectionReq.builder() + .databaseName(dbName) .collectionName(collName) .autoID(autoID) .primaryFieldName(pkName) .dimension(dimension) + .consistencyLevel(level) .enableDynamicField(false) .build()); } @@ -1537,7 +1540,7 @@ void testCacheCollectionSchema() throws InterruptedException { .build()); // create a collection in the default db - createSimpleCollection(client, randomCollectionName, "pk", false, DIMENSION); + createSimpleCollection(client, "", randomCollectionName, "pk", false, DIMENSION, ConsistencyLevel.BOUNDED); // a temp client connect to the new db ConnectConfig config = ConnectConfig.builder() @@ -1589,7 +1592,7 @@ void testCacheCollectionSchema() throws InterruptedException { Assertions.assertTrue(ts12 > ts11); // create a new collection with the same name, different schema, in the test db - createSimpleCollection(tempClient, randomCollectionName, "aaa", false, 4); + createSimpleCollection(tempClient, "", randomCollectionName, "aaa", false, 4, ConsistencyLevel.BOUNDED); // use the temp client to insert wrong data, wrong dimension row.addProperty("aaa", 22); @@ -2780,4 +2783,98 @@ void testRunAnalyzer() { } } } + + @Test + void testConsistencyLevel() throws InterruptedException { + String randomCollectionName = generator.generate(10); + String pkName = "pk"; + String vectorName = "vector"; + int dim = 4; + String defaultDbName = "default"; + String tempDbName = "db_for_level"; + + // create a temp database + client.createDatabase(CreateDatabaseReq.builder() + .databaseName(tempDbName) + .build()); + + Function runTestFunc = + dbName -> { + // a client use the temp database + ConnectConfig config = ConnectConfig.builder() + .uri(milvus.getEndpoint()) + .dbName(tempDbName) + .build(); + MilvusClientV2 tempClient = new MilvusClientV2(config); + + for (int i = 0; i < 20; i++) { + JsonObject row = new JsonObject(); + row.addProperty(pkName, i); + row.add(vectorName, JsonUtils.toJsonTree(utils.generateFloatVector(dim))); + tempClient.insert(InsertReq.builder() + .databaseName(dbName) + .collectionName(randomCollectionName) + .data(Collections.singletonList(row)) + .build()); + + // query/search/hybridSearch immediately after insert, data must be visible + String filter = String.format("%s == %d", pkName, i); + if (i % 3 == 0) { + QueryResp queryResp = client.query(QueryReq.builder() + .databaseName(dbName) + .collectionName(randomCollectionName) + .filter(filter) + .outputFields(Collections.singletonList(pkName)) + .build()); + List oneResult = queryResp.getQueryResults(); + Assertions.assertEquals(1, oneResult.size()); + } else if (i % 2 == 0) { + SearchResp searchResp = client.search(SearchReq.builder() + .databaseName(dbName) + .collectionName(randomCollectionName) + .annsField(vectorName) + .filter(filter) + .data(Collections.singletonList(new FloatVec(utils.generateFloatVector(dim)))) + .limit(10) + .build()); + List> oneResult = searchResp.getSearchResults(); + Assertions.assertEquals(1, oneResult.size()); + Assertions.assertEquals(1, oneResult.get(0).size()); + } else { + AnnSearchReq subReq = AnnSearchReq.builder() + .vectorFieldName(vectorName) + .filter(filter) + .vectors(Collections.singletonList(new FloatVec(utils.generateFloatVector(dim)))) + .limit(7) + .build(); + + SearchResp searchResp = client.hybridSearch(HybridSearchReq.builder() + .databaseName(dbName) + .collectionName(randomCollectionName) + .searchRequests(Collections.singletonList(subReq)) + .ranker(new RRFRanker(20)) + .limit(5) + .build()); + List> oneResult = searchResp.getSearchResults(); + Assertions.assertEquals(1, oneResult.size()); + Assertions.assertEquals(1, oneResult.get(0).size()); + } + } + return null; + }; + + // test SESSION level + createSimpleCollection(client, "", randomCollectionName, pkName, false, dim, ConsistencyLevel.SESSION); + runTestFunc.apply(defaultDbName); + + createSimpleCollection(client, tempDbName, randomCollectionName, pkName, false, dim, ConsistencyLevel.SESSION); + runTestFunc.apply(tempDbName); + + // test STRONG level + createSimpleCollection(client, "", randomCollectionName, pkName, false, dim, ConsistencyLevel.STRONG); + runTestFunc.apply(defaultDbName); + + createSimpleCollection(client, tempDbName, randomCollectionName, pkName, false, dim, ConsistencyLevel.STRONG); + runTestFunc.apply(tempDbName); + } }