diff --git a/sdk-core/src/main/java/io/milvus/common/utils/GTsDict.java b/sdk-core/src/main/java/io/milvus/common/utils/GTsDict.java index e890190ec..60773b588 100644 --- a/sdk-core/src/main/java/io/milvus/common/utils/GTsDict.java +++ b/sdk-core/src/main/java/io/milvus/common/utils/GTsDict.java @@ -52,19 +52,19 @@ public static String CombineCollectionName(String databaseName, String collectio private ConcurrentMap tsDict = new ConcurrentHashMap<>(); - public void updateCollectionTs(String collectionName, long ts) { + public void updateCollectionTs(String name, long ts) { // If the collection name exists, use its value to compare to the input ts, // only when the input ts is larger than the existing value, replace it with the input ts. // If the collection name doesn't exist, directly set the input value. - tsDict.compute(collectionName, (key, value) -> (value == null) ? ts : ((ts > value) ? ts : value)); + tsDict.compute(name, (key, value) -> (value == null) ? ts : ((ts > value) ? ts : value)); } - public Long getCollectionTs(String collectionName) { - return tsDict.get(collectionName); + public Long getCollectionTs(String name) { + return tsDict.get(name); } - public void removeCollectionTs(String collectionName) { - tsDict.remove(collectionName); + public void removeCollectionTs(String name) { + tsDict.remove(name); } public void cleanAllCollectionTs() { diff --git a/sdk-core/src/main/java/io/milvus/param/ParamUtils.java b/sdk-core/src/main/java/io/milvus/param/ParamUtils.java index 4cc0ad876..c88f2579b 100644 --- a/sdk-core/src/main/java/io/milvus/param/ParamUtils.java +++ b/sdk-core/src/main/java/io/milvus/param/ParamUtils.java @@ -849,14 +849,16 @@ public static void compatibleSearchParams(Map searchParams, Sear @SuppressWarnings("unchecked") public static SearchRequest convertSearchParam(@NonNull SearchParam requestParam) throws ParamException { + String dbName = requestParam.getDatabaseName(); + String collectionName = requestParam.getCollectionName(); SearchRequest.Builder builder = SearchRequest.newBuilder() - .setCollectionName(requestParam.getCollectionName()); + .setCollectionName(collectionName); if (!requestParam.getPartitionNames().isEmpty()) { requestParam.getPartitionNames().forEach(builder::addPartitionNames); } - if (StringUtils.isNotEmpty(requestParam.getDatabaseName())) { - builder.setDbName(requestParam.getDatabaseName()); + if (StringUtils.isNotEmpty(dbName)) { + builder.setDbName(dbName); } // prepare target vectors @@ -946,7 +948,7 @@ public static SearchRequest convertSearchParam(@NonNull SearchParam requestParam builder.setDsl(requestParam.getExpr()); } - long guaranteeTimestamp = getGuaranteeTimestamp(requestParam.getConsistencyLevel(), requestParam.getCollectionName()); + long guaranteeTimestamp = getGuaranteeTimestamp(requestParam.getConsistencyLevel(), dbName, collectionName); builder.setTravelTimestamp(requestParam.getTravelTimestamp()); // deprecated builder.setGuaranteeTimestamp(guaranteeTimestamp); @@ -1010,14 +1012,16 @@ public static SearchRequest convertAnnSearchParam(@NonNull AnnSearchParam annSea } public static HybridSearchRequest convertHybridSearchParam(@NonNull HybridSearchParam requestParam) throws ParamException { + String dbName = requestParam.getDatabaseName(); + String collectionName = requestParam.getCollectionName(); HybridSearchRequest.Builder builder = HybridSearchRequest.newBuilder() - .setCollectionName(requestParam.getCollectionName()); + .setCollectionName(collectionName); if (!requestParam.getPartitionNames().isEmpty()) { requestParam.getPartitionNames().forEach(builder::addPartitionNames); } - if (StringUtils.isNotEmpty(requestParam.getDatabaseName())) { - builder.setDbName(requestParam.getDatabaseName()); + if (StringUtils.isNotEmpty(dbName)) { + builder.setDbName(dbName); } for (AnnSearchParam req : requestParam.getSearchRequests()) { @@ -1063,7 +1067,7 @@ public static HybridSearchRequest convertHybridSearchParam(@NonNull HybridSearch requestParam.getOutFields().forEach(builder::addOutputFields); } - long guaranteeTimestamp = getGuaranteeTimestamp(requestParam.getConsistencyLevel(), requestParam.getCollectionName()); + long guaranteeTimestamp = getGuaranteeTimestamp(requestParam.getConsistencyLevel(), dbName, collectionName); builder.setGuaranteeTimestamp(guaranteeTimestamp); if (requestParam.getConsistencyLevel() == null) { @@ -1076,18 +1080,20 @@ public static HybridSearchRequest convertHybridSearchParam(@NonNull HybridSearch } public static QueryRequest convertQueryParam(@NonNull QueryParam requestParam) { + String dbName = requestParam.getDatabaseName(); + String collectionName = requestParam.getCollectionName(); boolean useDefaultConsistency = (requestParam.getConsistencyLevel() == null); - long guaranteeTimestamp = getGuaranteeTimestamp(requestParam.getConsistencyLevel(), requestParam.getCollectionName()); + long guaranteeTimestamp = getGuaranteeTimestamp(requestParam.getConsistencyLevel(), dbName, collectionName); QueryRequest.Builder builder = QueryRequest.newBuilder() - .setCollectionName(requestParam.getCollectionName()) + .setCollectionName(collectionName) .addAllPartitionNames(requestParam.getPartitionNames()) .addAllOutputFields(requestParam.getOutFields()) .setExpr(requestParam.getExpr()) .setTravelTimestamp(requestParam.getTravelTimestamp()) .setGuaranteeTimestamp(guaranteeTimestamp); - if (StringUtils.isNotEmpty(requestParam.getDatabaseName())) { - builder.setDbName(requestParam.getDatabaseName()); + if (StringUtils.isNotEmpty(dbName)) { + builder.setDbName(dbName); } // a new parameter from v2.2.9, if user didn't specify consistency level, set this parameter to true @@ -1124,17 +1130,20 @@ public static QueryRequest convertQueryParam(@NonNull QueryParam requestParam) { return builder.build(); } - private static long getGuaranteeTimestamp(ConsistencyLevelEnum consistencyLevel, String collectionName){ + private static long getGuaranteeTimestamp(ConsistencyLevelEnum consistencyLevel, String dbName, String collectionName){ if(consistencyLevel == null){ - Long ts = GTsDict.getInstance().getCollectionTs(collectionName); + String key = GTsDict.CombineCollectionName(dbName, collectionName); + Long ts = GTsDict.getInstance().getCollectionTs(key); return (ts == null) ? 1L : ts; } switch (consistencyLevel){ case STRONG: return 0L; - case SESSION: - Long ts = GTsDict.getInstance().getCollectionTs(collectionName); + case SESSION: { + String key = GTsDict.CombineCollectionName(dbName, collectionName); + Long ts = GTsDict.getInstance().getCollectionTs(key); return (ts == null) ? 1L : ts; + } case BOUNDED: return 2L; // let server side to determine the bounded time default: diff --git a/sdk-core/src/main/java/io/milvus/v2/service/vector/request/AnnSearchReq.java b/sdk-core/src/main/java/io/milvus/v2/service/vector/request/AnnSearchReq.java index 92bd978a9..7c9d4ca3e 100644 --- a/sdk-core/src/main/java/io/milvus/v2/service/vector/request/AnnSearchReq.java +++ b/sdk-core/src/main/java/io/milvus/v2/service/vector/request/AnnSearchReq.java @@ -35,9 +35,12 @@ public class AnnSearchReq { @Deprecated private int topK = 0; @Builder.Default - private long limit = 0L; + private long limit = 0L; // deprecated, replaced by limit @Builder.Default - private String expr = ""; + @Deprecated + private String expr = ""; // deprecated, replaced by filter + @Builder.Default + private String filter = ""; private List vectors; private String params; @@ -45,7 +48,7 @@ public class AnnSearchReq { private IndexParam.MetricType metricType = null; public static abstract class AnnSearchReqBuilder> { - // topK is deprecated, topK and limit must be the same value + // topK is deprecated replaced by limit, topK and limit must be the same value public B topK(int val) { this.topK$value = val; this.topK$set = true; @@ -61,5 +64,22 @@ public B limit(long val) { this.limit$set = true; return self(); } + + // expr is deprecated replaced by filter, expr and filter must be the same value + public B expr(String val) { + this.expr$value = val; + this.expr$set = true; + this.filter$value = val; + this.filter$set = true; + return self(); + } + + public B filter(String val) { + this.expr$value = val; + this.expr$set = true; + this.filter$value = val; + this.filter$set = true; + return self(); + } } } diff --git a/sdk-core/src/main/java/io/milvus/v2/utils/VectorUtils.java b/sdk-core/src/main/java/io/milvus/v2/utils/VectorUtils.java index 19f5fee74..588150ce5 100644 --- a/sdk-core/src/main/java/io/milvus/v2/utils/VectorUtils.java +++ b/sdk-core/src/main/java/io/milvus/v2/utils/VectorUtils.java @@ -43,13 +43,17 @@ public class VectorUtils { public QueryRequest ConvertToGrpcQueryRequest(QueryReq request){ + String dbName = request.getDatabaseName(); + String collectionName = request.getCollectionName(); + long guaranteeTimestamp = getGuaranteeTimestamp(request.getConsistencyLevel(), dbName, collectionName); QueryRequest.Builder builder = QueryRequest.newBuilder() - .setCollectionName(request.getCollectionName()) + .setCollectionName(collectionName) .addAllPartitionNames(request.getPartitionNames()) .addAllOutputFields(request.getOutputFields()) + .setGuaranteeTimestamp(guaranteeTimestamp) .setExpr(request.getFilter()); - if (StringUtils.isNotEmpty(request.getDatabaseName())) { - builder.setDbName(request.getDatabaseName()); + if (StringUtils.isNotEmpty(dbName)) { + builder.setDbName(dbName); } if (request.getFilter() != null && !request.getFilter().isEmpty()) { @@ -94,17 +98,20 @@ public QueryRequest ConvertToGrpcQueryRequest(QueryReq request){ } - private static long getGuaranteeTimestamp(ConsistencyLevel consistencyLevel, String collectionName){ + private static long getGuaranteeTimestamp(ConsistencyLevel consistencyLevel, String dbName, String collectionName){ if(consistencyLevel == null){ - Long ts = GTsDict.getInstance().getCollectionTs(collectionName); + String key = GTsDict.CombineCollectionName(dbName, collectionName); + Long ts = GTsDict.getInstance().getCollectionTs(key); return (ts == null) ? 1L : ts; } switch (consistencyLevel){ case STRONG: return 0L; - case SESSION: - Long ts = GTsDict.getInstance().getCollectionTs(collectionName); - return (ts == null) ? 1L : ts; + case SESSION: { + String key = GTsDict.CombineCollectionName(dbName, collectionName); + Long ts = GTsDict.getInstance().getCollectionTs(key); + return (ts == null) ? 1L : ts; + } case BOUNDED: return 2L; // let server side to determine the bounded time default: @@ -138,14 +145,16 @@ private static ByteString convertPlaceholder(List data, PlaceholderType } public SearchRequest ConvertToGrpcSearchRequest(SearchReq request) { + String dbName = request.getDatabaseName(); + String collectionName = request.getCollectionName(); SearchRequest.Builder builder = SearchRequest.newBuilder() - .setCollectionName(request.getCollectionName()); + .setCollectionName(collectionName); if (!request.getPartitionNames().isEmpty()) { request.getPartitionNames().forEach(builder::addPartitionNames); } - if (StringUtils.isNotEmpty(request.getDatabaseName())) { - builder.setDbName(request.getDatabaseName()); + if (StringUtils.isNotEmpty(dbName)) { + builder.setDbName(dbName); } // prepare target, the input could be vectors or string list for doc-in-doc-out @@ -259,7 +268,7 @@ public SearchRequest ConvertToGrpcSearchRequest(SearchReq request) { } builder.setGuaranteeTimestamp(guaranteeTimestamp); } else { - long guaranteeTimestamp = getGuaranteeTimestamp(request.getConsistencyLevel(), request.getCollectionName()); + long guaranteeTimestamp = getGuaranteeTimestamp(request.getConsistencyLevel(), dbName, collectionName); builder.setGuaranteeTimestamp(guaranteeTimestamp); } @@ -442,14 +451,16 @@ public static SearchRequest convertAnnSearchParam(@NonNull AnnSearchReq annSearc } public HybridSearchRequest ConvertToGrpcHybridSearchRequest(HybridSearchReq request) { + String dbName = request.getDatabaseName(); + String collectionName = request.getCollectionName(); HybridSearchRequest.Builder builder = HybridSearchRequest.newBuilder() - .setCollectionName(request.getCollectionName()); + .setCollectionName(collectionName); if (request.getPartitionNames() != null && !request.getPartitionNames().isEmpty()) { request.getPartitionNames().forEach(builder::addPartitionNames); } - if (StringUtils.isNotEmpty(request.getDatabaseName())) { - builder.setDbName(request.getDatabaseName()); + if (StringUtils.isNotEmpty(dbName)) { + builder.setDbName(dbName); } if (request.getSearchRequests() == null || request.getSearchRequests().isEmpty()) { @@ -505,6 +516,9 @@ public HybridSearchRequest ConvertToGrpcHybridSearchRequest(HybridSearchReq requ request.getOutFields().forEach(builder::addOutputFields); } + long guaranteeTimestamp = getGuaranteeTimestamp(request.getConsistencyLevel(), dbName, collectionName); + builder.setGuaranteeTimestamp(guaranteeTimestamp); + if (request.getConsistencyLevel() == null) { builder.setUseDefaultConsistency(true); } else { diff --git a/sdk-core/src/test/java/io/milvus/client/MilvusClientDockerTest.java b/sdk-core/src/test/java/io/milvus/client/MilvusClientDockerTest.java index 2a4b16016..4d8ba3748 100644 --- a/sdk-core/src/test/java/io/milvus/client/MilvusClientDockerTest.java +++ b/sdk-core/src/test/java/io/milvus/client/MilvusClientDockerTest.java @@ -2916,7 +2916,8 @@ void testDatabase() { Assertions.assertEquals(R.Status.Success.getCode(), dropResponse.getStatus().intValue()); } - private static void createSimpleCollection(MilvusClient client, String collName, String pkName, boolean autoID, int dimension) { + private static void createSimpleCollection(MilvusClient client, String collName, String pkName, boolean autoID, + int dimension, ConsistencyLevelEnum level) { client.dropCollection(DropCollectionParam.newBuilder() .withCollectionName(collName) .build()); @@ -2940,8 +2941,24 @@ private static void createSimpleCollection(MilvusClient client, String collName, R createR = client.createCollection(CreateCollectionParam.newBuilder() .withCollectionName(collName) .withFieldTypes(fieldsSchema) + .withConsistencyLevel(level) .build()); Assertions.assertEquals(R.Status.Success.getCode(), createR.getStatus().intValue()); + + CreateIndexParam indexParam = CreateIndexParam.newBuilder() + .withCollectionName(collName) + .withFieldName("vector") + .withIndexType(IndexType.FLAT) + .withMetricType(MetricType.L2) + .build(); + + R createIndexR = client.createIndex(indexParam); + Assertions.assertEquals(R.Status.Success.getCode(), createIndexR.getStatus().intValue()); + + R loadR = client.loadCollection(LoadCollectionParam.newBuilder() + .withCollectionName(collName) + .build()); + Assertions.assertEquals(R.Status.Success.getCode(), loadR.getStatus().intValue()); } @Test @@ -2958,7 +2975,7 @@ void testCacheCollectionSchema() throws InterruptedException { Assertions.assertEquals(R.Status.Success.getCode(), dbResponse.getStatus().intValue()); // create a collection in the default db - createSimpleCollection(client, randomCollectionName, "pk", false, DIMENSION); + createSimpleCollection(client, randomCollectionName, "pk", false, DIMENSION, ConsistencyLevelEnum.BOUNDED); // a temp client connect to the new db ConnectParam connectParam = connectParamBuilder() @@ -3019,7 +3036,7 @@ void testCacheCollectionSchema() throws InterruptedException { Assertions.assertTrue(ts12 > ts11); // create a new collection with the same name, different schema, in the test db - createSimpleCollection(tempClient, randomCollectionName, "aaa", false, 4); + createSimpleCollection(tempClient, randomCollectionName, "aaa", false, 4, ConsistencyLevelEnum.BOUNDED); // use the temp client to insert wrong data, wrong dimension row.addProperty("aaa", 22); @@ -3315,4 +3332,82 @@ void testNullableAndDefaultValue() { System.out.println(score); } } + + @Test + void testConsistencyLevel() { + String randomCollectionName = generator.generate(10); + int dim = 4; + + Function runTestFunc = + rowCount -> { + for (int i = 0; i < rowCount; i++) { + JsonObject row = new JsonObject(); + row.addProperty("pk", i); + row.add("vector", JsonUtils.toJsonTree(utils.generateFloatVector(dim))); + client.insert(InsertParam.newBuilder() + .withCollectionName(randomCollectionName) + .withRows(Collections.singletonList(row)) + .build()); + // query/search/hybridSearch test + String expr = String.format("pk == %d", i); + if (i%3 == 0) { + R fetchR = client.query(QueryParam.newBuilder() + .withCollectionName(randomCollectionName) + .withExpr(expr) + .withLimit(5L) + .addOutField("pk") + .build()); + Assertions.assertEquals(R.Status.Success.getCode(), fetchR.getStatus().intValue()); + QueryResultsWrapper oneResult = new QueryResultsWrapper(fetchR.getData()); + List records = oneResult.getRowRecords(); + Assertions.assertEquals(1L, records.size()); + } else if (i%2 == 0) { + R searchOne = client.search(SearchParam.newBuilder() + .withCollectionName(randomCollectionName) + .withVectorFieldName("vector") + .withLimit(5L) + .withExpr(expr) + .withFloatVectors(Collections.singletonList(utils.generateFloatVector(dim))) + .addOutField("pk") + .build()); + Assertions.assertEquals(R.Status.Success.getCode(), searchOne.getStatus().intValue()); + + SearchResultsWrapper oneResult = new SearchResultsWrapper(searchOne.getData().getResults()); + List scores = oneResult.getIDScore(0); + Assertions.assertEquals(1, scores.size()); + } else { + AnnSearchParam subReq = AnnSearchParam.newBuilder() + .withVectorFieldName("vector") + .withExpr(expr) + .withFloatVectors(Collections.singletonList(utils.generateFloatVector(dim))) + .withLimit(5L) + .build(); + + R searchR = client.hybridSearch(HybridSearchParam.newBuilder() + .withCollectionName(randomCollectionName) + .addSearchRequest(subReq) + .withLimit(5L) + .withRanker(WeightedRanker.newBuilder() + .withWeights(Collections.singletonList(1.0f)) + .build()) + .withOutFields(Collections.singletonList("pk")) + .build()); + Assertions.assertEquals(R.Status.Success.getCode(), searchR.getStatus().intValue()); + SearchResultsWrapper oneResult = new SearchResultsWrapper(searchR.getData().getResults()); + List scores = oneResult.getIDScore(0); + Assertions.assertEquals(1, scores.size()); + } + } + return null; + }; + + // immediately search/query after insert, the data must be visible + // test STRONG level + createSimpleCollection(client, randomCollectionName, "pk", false, dim, ConsistencyLevelEnum.STRONG); + runTestFunc.apply(10); + + // test SESSION level + createSimpleCollection(client, randomCollectionName, "pk", false, dim, ConsistencyLevelEnum.SESSION); + runTestFunc.apply(100); + } } diff --git a/sdk-core/src/test/java/io/milvus/v2/client/MilvusClientV2DockerTest.java b/sdk-core/src/test/java/io/milvus/v2/client/MilvusClientV2DockerTest.java index e760429ff..f999baa5d 100644 --- a/sdk-core/src/test/java/io/milvus/v2/client/MilvusClientV2DockerTest.java +++ b/sdk-core/src/test/java/io/milvus/v2/client/MilvusClientV2DockerTest.java @@ -33,7 +33,6 @@ import io.milvus.orm.iterator.SearchIterator; import io.milvus.orm.iterator.SearchIteratorV2; import io.milvus.param.Constant; -import io.milvus.param.dml.HybridSearchParam; import io.milvus.pool.MilvusClientV2Pool; import io.milvus.pool.PoolConfig; import io.milvus.response.QueryResultsWrapper; @@ -1512,7 +1511,8 @@ void testIndex() { Assertions.assertEquals("64", extraParams.get("efConstruction")); } - private static void createSimpleCollection(MilvusClientV2 client, String collName, String pkName, boolean autoID, int dimension) { + private static void createSimpleCollection(MilvusClientV2 client, String collName, String pkName, boolean autoID, + int dimension, ConsistencyLevel level) { client.dropCollection(DropCollectionReq.builder() .collectionName(collName) .build()); @@ -1522,6 +1522,7 @@ private static void createSimpleCollection(MilvusClientV2 client, String collNam .autoID(autoID) .primaryFieldName(pkName) .dimension(dimension) + .consistencyLevel(level) .enableDynamicField(false) .build()); } @@ -1537,7 +1538,7 @@ void testCacheCollectionSchema() throws InterruptedException { .build()); // create a collection in the default db - createSimpleCollection(client, randomCollectionName, "pk", false, DIMENSION); + createSimpleCollection(client, randomCollectionName, "pk", false, DIMENSION, ConsistencyLevel.BOUNDED); // a temp client connect to the new db ConnectConfig config = ConnectConfig.builder() @@ -1589,7 +1590,7 @@ void testCacheCollectionSchema() throws InterruptedException { Assertions.assertTrue(ts12 > ts11); // create a new collection with the same name, different schema, in the test db - createSimpleCollection(tempClient, randomCollectionName, "aaa", false, 4); + createSimpleCollection(tempClient, randomCollectionName, "aaa", false, 4, ConsistencyLevel.BOUNDED); // use the temp client to insert wrong data, wrong dimension row.addProperty("aaa", 22); @@ -2780,4 +2781,73 @@ void testRunAnalyzer() { } } } + + @Test + void testConsistencyLevel() { + String randomCollectionName = generator.generate(10); + int dim = 4; + + Function runTestFunc = + rowCount -> { + for (int i = 0; i < rowCount; i++) { + JsonObject row = new JsonObject(); + row.addProperty("pk", i); + row.add("vector", JsonUtils.toJsonTree(utils.generateFloatVector(dim))); + + client.insert(InsertReq.builder() + .collectionName(randomCollectionName) + .data(Collections.singletonList(row)) + .build()); + // query/search/hybridSearch test + String filter = String.format("pk == %d", i); + if (i % 3 == 0) { + QueryResp queryResp = client.query(QueryReq.builder() + .collectionName(randomCollectionName) + .filter(filter) + .outputFields(Collections.singletonList("pk")) + .build()); + List oneResult = queryResp.getQueryResults(); + Assertions.assertEquals(1, oneResult.size()); + } else if (i % 2 == 0) { + SearchResp searchResp = client.search(SearchReq.builder() + .collectionName(randomCollectionName) + .annsField("vector") + .filter(filter) + .data(Collections.singletonList(new FloatVec(utils.generateFloatVector(dim)))) + .limit(10) + .build()); + List> oneResult = searchResp.getSearchResults(); + Assertions.assertEquals(1, oneResult.size()); + Assertions.assertEquals(1, oneResult.get(0).size()); + } else { + AnnSearchReq subReq = AnnSearchReq.builder() + .vectorFieldName("vector") + .filter(filter) + .vectors(Collections.singletonList(new FloatVec(utils.generateFloatVector(dim)))) + .limit(7) + .build(); + + SearchResp searchResp = client.hybridSearch(HybridSearchReq.builder() + .collectionName(randomCollectionName) + .searchRequests(Collections.singletonList(subReq)) + .ranker(new RRFRanker(20)) + .limit(5) + .build()); + List> oneResult = searchResp.getSearchResults(); + Assertions.assertEquals(1, oneResult.size()); + Assertions.assertEquals(1, oneResult.get(0).size()); + } + } + return null; + }; + + // immediately search/query after insert, the data must be visible + // test STRONG level +// createSimpleCollection(client, randomCollectionName, "pk", false, dim, ConsistencyLevel.STRONG); +// runTestFunc.apply(10); + + // test SESSION level + createSimpleCollection(client, randomCollectionName, "pk", false, dim, ConsistencyLevel.SESSION); + runTestFunc.apply(100); + } }