diff --git a/sdk-core/src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java b/sdk-core/src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java index 5064210b6..a9aa40b23 100644 --- a/sdk-core/src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java +++ b/sdk-core/src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java @@ -47,9 +47,6 @@ import io.milvus.param.resourcegroup.*; import io.milvus.param.role.*; import io.milvus.response.*; -import io.milvus.v2.service.collection.response.DescribeCollectionResp; -import io.milvus.v2.service.vector.request.InsertReq; -import io.milvus.v2.utils.DataUtils; import lombok.NonNull; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -68,7 +65,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient { protected static final Logger logger = LoggerFactory.getLogger(AbstractMilvusGrpcClient.class); protected LogLevel logLevel = LogLevel.Info; - private ConcurrentHashMap cacheCollectionInfo = new ConcurrentHashMap<>(); + protected ConcurrentHashMap cacheCollectionInfo = new ConcurrentHashMap<>(); protected abstract MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub(); @@ -76,6 +73,15 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient { protected abstract boolean clientIsReady(); + protected abstract String currentDbName(); + + private String actualDbName(String overwriteName) { + if (StringUtils.isNotEmpty(overwriteName)) { + return overwriteName; + } + return currentDbName(); + } + /** * This method is for insert/upsert requests to reduce the rpc call of describeCollection() * Always try to get the collection info from cache. @@ -83,7 +89,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient { * If insert/upsert get server error, remove the cached collection info. */ private DescribeCollectionResponse getCollectionInfo(String databaseName, String collectionName, boolean forceUpdate) { - String key = combineCacheKey(databaseName, collectionName); + String key = GTsDict.CombineCollectionName(actualDbName(databaseName), collectionName); DescribeCollectionResponse info = cacheCollectionInfo.get(key); if (info == null || forceUpdate) { String msg = String.format("Fail to describe collection '%s'", collectionName); @@ -104,17 +110,6 @@ private DescribeCollectionResponse getCollectionInfo(String databaseName, String return info; } - private String combineCacheKey(String databaseName, String collectionName) { - if (collectionName == null || StringUtils.isBlank(collectionName)) { - throw new ParamException("Collection name is empty, not able to get collection info."); - } - String key = collectionName; - if (StringUtils.isNotEmpty(databaseName)) { - key = String.format("%s|%s", databaseName, collectionName); - } - return key; - } - /** * insert/upsert return an error, but is not a RateLimit error, * clean the cache so that the next insert will call describeCollection() to get the latest info. @@ -127,7 +122,8 @@ private void cleanCacheIfFailed(Status status, String databaseName, String colle } private void removeCollectionCache(String databaseName, String collectionName) { - cacheCollectionInfo.remove(combineCacheKey(databaseName, collectionName)); + String key = GTsDict.CombineCollectionName(actualDbName(databaseName), collectionName); + cacheCollectionInfo.remove(key); } private void waitForLoadingCollection(String databaseName, String collectionName, List partitionNames, @@ -658,7 +654,13 @@ public R dropCollection(@NonNull DropCollectionParam requestParam) { Status response = blockingStub().dropCollection(dropCollectionRequest); handleResponse(title, response); + + // remove the collection schema cache removeCollectionCache(dbName, collectionName); + + // remove the last write timestamp for this collection + String key = GTsDict.CombineCollectionName(actualDbName(dbName), collectionName); + GTsDict.getInstance().removeCollectionTs(key); return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG)); } catch (StatusRuntimeException e) { logError("{} RPC failed! Exception:{}", title, e); @@ -1570,22 +1572,27 @@ public R delete(@NonNull DeleteParam requestParam) { } logDebug(requestParam.toString()); - String title = String.format("DeleteRequest collectionName:%s", requestParam.getCollectionName()); + String dbName = requestParam.getDatabaseName(); + String collectionName = requestParam.getCollectionName(); + String title = String.format("DeleteRequest collectionName:%s", collectionName); try { DeleteRequest.Builder builder = DeleteRequest.newBuilder() .setBase(MsgBase.newBuilder().setMsgType(MsgType.Delete).build()) - .setCollectionName(requestParam.getCollectionName()) + .setCollectionName(collectionName) .setPartitionName(requestParam.getPartitionName()) .setExpr(requestParam.getExpr()); - if (StringUtils.isNotEmpty(requestParam.getDatabaseName())) { - builder.setDbName(requestParam.getDatabaseName()); + if (StringUtils.isNotEmpty(dbName)) { + builder.setDbName(dbName); } MutationResult response = blockingStub().delete(builder.build()); handleResponse(title, response.getStatus()); - GTsDict.getInstance().updateCollectionTs(requestParam.getCollectionName(), response.getTimestamp()); + + // update the last write timestamp for SESSION consistency + String key = GTsDict.CombineCollectionName(actualDbName(dbName), collectionName); + GTsDict.getInstance().updateCollectionTs(key, response.getTimestamp()); return R.success(response); } catch (StatusRuntimeException e) { logError("{} RPC failed! Exception:{}", title, e); @@ -1639,10 +1646,14 @@ public R insert(@NonNull InsertParam requestParam) { return this.insert(requestParam); } - // if illegal data, server fails to process insert, else succeed + // if illegal data, server fails to process insert, , clean the schema cache + // so that the next call of dml can update the cache cleanCacheIfFailed(response.getStatus(), dbName, collectionName); handleResponse(title, response.getStatus()); - GTsDict.getInstance().updateCollectionTs(collectionName, response.getTimestamp()); + + // update the last write timestamp for SESSION consistency + String key = GTsDict.CombineCollectionName(actualDbName(dbName), collectionName); + GTsDict.getInstance().updateCollectionTs(key, response.getTimestamp()); return R.success(response); } catch (StatusRuntimeException e) { logError("{} RPC failed! Exception:{}", title, e); @@ -1687,11 +1698,15 @@ public ListenableFuture> insertAsync(InsertParam requestParam) new FutureCallback() { @Override public void onSuccess(MutationResult result) { - // if illegal data, server fails to process insert, else succeed + // if illegal data, server fails to process insert, clean the schema cache + // so that the next call of dml can update the cache cleanCacheIfFailed(result.getStatus(), dbName, collectionName); if (result.getStatus().getErrorCode() == ErrorCode.Success) { logDebug("{} successfully!", title); - GTsDict.getInstance().updateCollectionTs(collectionName, result.getTimestamp()); + + // update the last write timestamp for SESSION consistency + String key = GTsDict.CombineCollectionName(actualDbName(dbName), collectionName); + GTsDict.getInstance().updateCollectionTs(key, result.getTimestamp()); } else { logError("{} failed:\n{}", title, result.getStatus().getReason()); } @@ -1760,10 +1775,14 @@ public R upsert(UpsertParam requestParam) { return this.upsert(requestParam); } - // if illegal data, server fails to process upsert, else succeed + // if illegal data, server fails to process upsert, clean the schema cache + // so that the next call of dml can update the cache cleanCacheIfFailed(response.getStatus(), dbName, collectionName); handleResponse(title, response.getStatus()); - GTsDict.getInstance().updateCollectionTs(collectionName, response.getTimestamp()); + + // update the last write timestamp for SESSION consistency + String key = GTsDict.CombineCollectionName(actualDbName(dbName), collectionName); + GTsDict.getInstance().updateCollectionTs(key, response.getTimestamp()); return R.success(response); } catch (StatusRuntimeException e) { logError("{} RPC failed! Exception:{}", title, e); @@ -1807,11 +1826,15 @@ public ListenableFuture> upsertAsync(UpsertParam requestParam) new FutureCallback() { @Override public void onSuccess(MutationResult result) { - // if illegal data, server fails to process upsert, else succeed + // if illegal data, server fails to process upsert, clean the schema cache + // so that the next call of dml can update the cache cleanCacheIfFailed(result.getStatus(), dbName, collectionName); if (result.getStatus().getErrorCode() == ErrorCode.Success) { logDebug("{} successfully!", title); - GTsDict.getInstance().updateCollectionTs(collectionName, result.getTimestamp()); + + // update the last write timestamp for SESSION consistency + String key = GTsDict.CombineCollectionName(actualDbName(dbName), collectionName); + GTsDict.getInstance().updateCollectionTs(key, result.getTimestamp()); } else { logError("{} failed:\n{}", title, result.getStatus().getReason()); } diff --git a/sdk-core/src/main/java/io/milvus/client/MilvusServiceClient.java b/sdk-core/src/main/java/io/milvus/client/MilvusServiceClient.java index 7b8404125..1402a2463 100644 --- a/sdk-core/src/main/java/io/milvus/client/MilvusServiceClient.java +++ b/sdk-core/src/main/java/io/milvus/client/MilvusServiceClient.java @@ -73,6 +73,7 @@ public class MilvusServiceClient extends AbstractMilvusGrpcClient { private final long rpcDeadlineMs; private long timeoutMs = 0; private RetryParam retryParam = RetryParam.newBuilder().build(); + private String currentDatabaseName; public MilvusServiceClient(@NonNull ConnectParam connectParam) { this.rpcDeadlineMs = connectParam.getRpcDeadlineMs(); @@ -80,6 +81,7 @@ public MilvusServiceClient(@NonNull ConnectParam connectParam) { Metadata metadata = new Metadata(); metadata.put(Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER), connectParam.getAuthorization()); if (StringUtils.isNotEmpty(connectParam.getDatabaseName())) { + currentDatabaseName = connectParam.getDatabaseName(); metadata.put(Metadata.Key.of("dbname", Metadata.ASCII_STRING_MARSHALLER), connectParam.getDatabaseName()); } @@ -201,6 +203,7 @@ protected MilvusServiceClient(MilvusServiceClient src) { this.timeoutMs = src.timeoutMs; this.logLevel = src.logLevel; this.retryParam = src.retryParam; + this.currentDatabaseName = src.currentDatabaseName; } @Override @@ -222,6 +225,11 @@ public boolean clientIsReady() { return channel != null && !channel.isShutdown() && !channel.isTerminated(); } + @Override + protected String currentDbName() { + return currentDatabaseName; + } + @Override public void close(long maxWaitSeconds) throws InterruptedException { channel.shutdownNow(); diff --git a/sdk-core/src/main/java/io/milvus/common/utils/GTsDict.java b/sdk-core/src/main/java/io/milvus/common/utils/GTsDict.java index de0f8adf8..e890190ec 100644 --- a/sdk-core/src/main/java/io/milvus/common/utils/GTsDict.java +++ b/sdk-core/src/main/java/io/milvus/common/utils/GTsDict.java @@ -19,6 +19,9 @@ package io.milvus.common.utils; +import io.milvus.exception.ParamException; +import org.apache.commons.lang3.StringUtils; + import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -37,6 +40,16 @@ public static GTsDict getInstance() { return TS_DICT; } + public static String CombineCollectionName(String databaseName, String collectionName) { + if (collectionName == null || StringUtils.isBlank(collectionName)) { + throw new ParamException("Collection name is empty, not able to get collection info."); + } + if (StringUtils.isEmpty(databaseName)) { + databaseName = "default"; + } + return String.format("%s_%s", databaseName, collectionName); + } + private ConcurrentMap tsDict = new ConcurrentHashMap<>(); public void updateCollectionTs(String collectionName, long ts) { @@ -49,4 +62,12 @@ public void updateCollectionTs(String collectionName, long ts) { public Long getCollectionTs(String collectionName) { return tsDict.get(collectionName); } + + public void removeCollectionTs(String collectionName) { + tsDict.remove(collectionName); + } + + public void cleanAllCollectionTs() { + tsDict.clear(); + } } diff --git a/sdk-core/src/main/java/io/milvus/v2/client/ConnectConfig.java b/sdk-core/src/main/java/io/milvus/v2/client/ConnectConfig.java index 8e1f1e971..3178d7157 100644 --- a/sdk-core/src/main/java/io/milvus/v2/client/ConnectConfig.java +++ b/sdk-core/src/main/java/io/milvus/v2/client/ConnectConfig.java @@ -23,16 +23,14 @@ import lombok.Builder; import lombok.Data; import lombok.NonNull; -import lombok.experimental.SuperBuilder; import org.apache.commons.lang3.StringUtils; import javax.net.ssl.SSLContext; -import java.net.URI; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; @Data -@SuperBuilder +@Builder public class ConnectConfig { @NonNull private String uri; diff --git a/sdk-core/src/main/java/io/milvus/v2/client/MilvusClientV2.java b/sdk-core/src/main/java/io/milvus/v2/client/MilvusClientV2.java index 97a388011..6de5f1904 100644 --- a/sdk-core/src/main/java/io/milvus/v2/client/MilvusClientV2.java +++ b/sdk-core/src/main/java/io/milvus/v2/client/MilvusClientV2.java @@ -83,8 +83,24 @@ public class MilvusClientV2 { public MilvusClientV2(ConnectConfig connectConfig) { if (connectConfig != null) { connect(connectConfig); + + initServices(connectConfig.getDbName()); + } } + + private void initServices(String dbName) { + this.databaseService.setCurrentDbName(dbName); + this.collectionService.setCurrentDbName(dbName); + this.indexService.setCurrentDbName(dbName); + this.vectorService.setCurrentDbName(dbName); + this.vectorService.cleanCollectionCache(); + this.partitionService.setCurrentDbName(dbName); + this.rbacService.setCurrentDbName(dbName); + this.rgroupService.setCurrentDbName(dbName); + this.utilityService.setCurrentDbName(dbName); + } + /** * connect to Milvus server * @@ -159,6 +175,22 @@ public void retryConfig(RetryConfig retryConfig) { rpcUtils.retryConfig(retryConfig); } + public MilvusClientV2 withRetry(RetryConfig retryConfig) { + rpcUtils.retryConfig(retryConfig); + return this; + } + + public MilvusClientV2 withTimeout(long timeout, TimeUnit timeoutUnit) { + // the unit of rpcDeadlineMs is millisecond + // if the input timeout value is zero, rpcDeadlineMs is zero + // if the input timeout value is not zero and less than 1ms, it will be treated as 1ms + // if the input timeout value is larger than 1ms, it will be converted to an integer ms value + long nn = timeoutUnit.toNanos(timeout); + long ms = (nn == 0) ? 0 : (nn < 1000000 ? 1 : nn/1000000); + connectConfig.setRpcDeadlineMs(ms); + return this; + } + ///////////////////////////////////////////////////////////////////////////////////////////// // Database Operations ///////////////////////////////////////////////////////////////////////////////////////////// @@ -170,10 +202,10 @@ public void useDatabase(@NonNull String dbName) throws InterruptedException { // check if database exists clientUtils.checkDatabaseExist(this.getRpcStub(), dbName); try { - this.vectorService.cleanCollectionCache(); this.connectConfig.setDbName(dbName); this.close(3); this.connect(this.connectConfig); + this.initServices(dbName); } catch (InterruptedException e){ logger.error("close connect error"); throw new RuntimeException(e); diff --git a/sdk-core/src/main/java/io/milvus/v2/service/BaseService.java b/sdk-core/src/main/java/io/milvus/v2/service/BaseService.java index d36a0625d..c77c1d294 100644 --- a/sdk-core/src/main/java/io/milvus/v2/service/BaseService.java +++ b/sdk-core/src/main/java/io/milvus/v2/service/BaseService.java @@ -28,6 +28,7 @@ import io.milvus.v2.utils.DataUtils; import io.milvus.v2.utils.RpcUtils; import io.milvus.v2.utils.VectorUtils; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,6 +38,18 @@ public class BaseService { public DataUtils dataUtils = new DataUtils(); public VectorUtils vectorUtils = new VectorUtils(); public ConvertUtils convertUtils = new ConvertUtils(); + private String currentDbName; + + public void setCurrentDbName(String dbName) { + currentDbName = dbName; + } + + protected String actualDbName(String overwriteName) { + if (StringUtils.isNotEmpty(overwriteName)) { + return overwriteName; + } + return currentDbName; + } protected void checkCollectionExist(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, String collectionName) { HasCollectionRequest request = HasCollectionRequest.newBuilder().setCollectionName(collectionName).build(); diff --git a/sdk-core/src/main/java/io/milvus/v2/service/collection/CollectionService.java b/sdk-core/src/main/java/io/milvus/v2/service/collection/CollectionService.java index 61b0d8cc6..3bbe55d04 100644 --- a/sdk-core/src/main/java/io/milvus/v2/service/collection/CollectionService.java +++ b/sdk-core/src/main/java/io/milvus/v2/service/collection/CollectionService.java @@ -19,6 +19,7 @@ package io.milvus.v2.service.collection; +import io.milvus.common.utils.GTsDict; import io.milvus.grpc.*; import io.milvus.param.ParamUtils; import io.milvus.v2.common.IndexParam; @@ -187,14 +188,20 @@ public ListCollectionsResp listCollections(MilvusServiceGrpc.MilvusServiceBlocki } public Void dropCollection(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, DropCollectionReq request) { - - String title = String.format("DropCollectionRequest collectionName:%s", request.getCollectionName()); - DropCollectionRequest dropCollectionRequest = DropCollectionRequest.newBuilder() - .setCollectionName(request.getCollectionName()) - .build(); - Status status = blockingStub.dropCollection(dropCollectionRequest); + String dbName = request.getDatabaseName(); + String collectionName = request.getCollectionName(); + String title = String.format("DropCollectionRequest collectionName:%s", collectionName); + DropCollectionRequest.Builder builder = DropCollectionRequest.newBuilder() + .setCollectionName(collectionName); + if (StringUtils.isNotEmpty(dbName)) { + builder.setDbName(dbName); + } + Status status = blockingStub.dropCollection(builder.build()); rpcUtils.handleResponse(title, status); + // remove the last write timestamp for this collection + String key = GTsDict.CombineCollectionName(actualDbName(dbName), collectionName); + GTsDict.getInstance().removeCollectionTs(key); return null; } diff --git a/sdk-core/src/main/java/io/milvus/v2/service/collection/request/DropCollectionReq.java b/sdk-core/src/main/java/io/milvus/v2/service/collection/request/DropCollectionReq.java index 74fd56325..9c8870d02 100644 --- a/sdk-core/src/main/java/io/milvus/v2/service/collection/request/DropCollectionReq.java +++ b/sdk-core/src/main/java/io/milvus/v2/service/collection/request/DropCollectionReq.java @@ -26,6 +26,7 @@ @Data @SuperBuilder public class DropCollectionReq { + private String databaseName; private String collectionName; @Deprecated @Builder.Default diff --git a/sdk-core/src/main/java/io/milvus/v2/service/vector/VectorService.java b/sdk-core/src/main/java/io/milvus/v2/service/vector/VectorService.java index 2e0b5d53a..86ef4656e 100644 --- a/sdk-core/src/main/java/io/milvus/v2/service/vector/VectorService.java +++ b/sdk-core/src/main/java/io/milvus/v2/service/vector/VectorService.java @@ -22,7 +22,6 @@ import com.google.protobuf.ByteString; import io.milvus.common.utils.GTsDict; import io.milvus.common.utils.JsonUtils; -import io.milvus.exception.ParamException; import io.milvus.grpc.*; import io.milvus.orm.iterator.*; import io.milvus.v2.exception.ErrorCode; @@ -74,7 +73,7 @@ private DescribeCollectionResponse describeCollection(MilvusServiceGrpc.MilvusSe */ private DescribeCollectionResponse getCollectionInfo(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, String databaseName, String collectionName, boolean forceUpdate) { - String key = combineCacheKey(databaseName, collectionName); + String key = GTsDict.CombineCollectionName(actualDbName(databaseName), collectionName); DescribeCollectionResponse info = cacheCollectionInfo.get(key); if (info == null || forceUpdate) { info = describeCollection(blockingStub, databaseName, collectionName); @@ -88,17 +87,6 @@ public void cleanCollectionCache() { cacheCollectionInfo.clear(); } - private String combineCacheKey(String databaseName, String collectionName) { - if (collectionName == null || StringUtils.isBlank(collectionName)) { - throw new ParamException("Collection name is empty, not able to get collection info."); - } - String key = collectionName; - if (StringUtils.isNotEmpty(databaseName)) { - key = String.format("%s|%s", databaseName, collectionName); - } - return key; - } - /** * insert/upsert return an error, but is not a RateLimit error, * clean the cache so that the next insert will call describeCollection() to get the latest info. @@ -112,7 +100,8 @@ private void cleanCacheIfFailed(Status status, String databaseName, String colle } private void removeCollectionCache(String databaseName, String collectionName) { - cacheCollectionInfo.remove(combineCacheKey(databaseName, collectionName)); + String key = GTsDict.CombineCollectionName(actualDbName(databaseName), collectionName); + cacheCollectionInfo.remove(key); } private InsertRequest buildInsertRequest(InsertReq request, DescribeCollectionResponse descResp) { @@ -123,11 +112,11 @@ private InsertRequest buildInsertRequest(InsertReq request, DescribeCollectionRe } public InsertResp insert(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, InsertReq request) { + String dbName = request.getDatabaseName(); String collectionName = request.getCollectionName(); String title = String.format("InsertRequest collectionName:%s", collectionName); - // TODO: set the database name - DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", collectionName, false); + DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, dbName, collectionName, false); // To handle this bug: https://github.com/milvus-io/milvus/issues/41688 // if the collection is already recreated, some schema might be changed, the buildInsertRequest() @@ -137,7 +126,7 @@ public InsertResp insert(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStu try { rpcRequest = buildInsertRequest(request, descResp); } catch (Exception ignored) { - descResp = getCollectionInfo(blockingStub, "", collectionName, true); + descResp = getCollectionInfo(blockingStub, dbName, collectionName, true); rpcRequest = buildInsertRequest(request, descResp); } @@ -147,14 +136,17 @@ public InsertResp insert(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStu // call insert() again. MutationResult response = blockingStub.insert(rpcRequest); if (response.getStatus().getErrorCode() == io.milvus.grpc.ErrorCode.SchemaMismatch) { - getCollectionInfo(blockingStub, "", collectionName, true); + getCollectionInfo(blockingStub, dbName, collectionName, true); return this.insert(blockingStub, request); } // if illegal data, server fails to process insert, else succeed - cleanCacheIfFailed(response.getStatus(), "", collectionName); + cleanCacheIfFailed(response.getStatus(), dbName, collectionName); rpcUtils.handleResponse(title, response.getStatus()); - GTsDict.getInstance().updateCollectionTs(collectionName, response.getTimestamp()); + + // update the last write timestamp for SESSION consistency + String key = GTsDict.CombineCollectionName(actualDbName(dbName), collectionName); + GTsDict.getInstance().updateCollectionTs(key, response.getTimestamp()); if (response.getIDs().hasIntId()) { List ids = new ArrayList<>(response.getIDs().getIntId().getDataList()); @@ -179,11 +171,11 @@ private UpsertRequest buildUpsertRequest(UpsertReq request, DescribeCollectionRe } public UpsertResp upsert(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, UpsertReq request) { + String dbName = request.getDatabaseName(); String collectionName = request.getCollectionName(); String title = String.format("UpsertRequest collectionName:%s", collectionName); - // TODO: set the database name - DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", collectionName, false); + DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, dbName, collectionName, false); // To handle this bug: https://github.com/milvus-io/milvus/issues/41688 // if the collection is already recreated, some schema might be changed, the buildUpsertRequest() @@ -193,7 +185,7 @@ public UpsertResp upsert(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStu try { rpcRequest = buildUpsertRequest(request, descResp); } catch (Exception ignored) { - descResp = getCollectionInfo(blockingStub, "", collectionName, true); + descResp = getCollectionInfo(blockingStub, dbName, collectionName, true); rpcRequest = buildUpsertRequest(request, descResp); } @@ -203,14 +195,18 @@ public UpsertResp upsert(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStu // call upsert() again. MutationResult response = blockingStub.upsert(rpcRequest); if (response.getStatus().getErrorCode() == io.milvus.grpc.ErrorCode.SchemaMismatch) { - getCollectionInfo(blockingStub, "", collectionName, true); + getCollectionInfo(blockingStub, dbName, collectionName, true); return this.upsert(blockingStub, request); } - // if illegal data, server fails to process upsert, else succeed - cleanCacheIfFailed(response.getStatus(), "", collectionName); + // if illegal data, server fails to process upsert, clean the schema cache + // so that the next call of dml can update the cache + cleanCacheIfFailed(response.getStatus(), dbName, collectionName); rpcUtils.handleResponse(title, response.getStatus()); - GTsDict.getInstance().updateCollectionTs(collectionName, response.getTimestamp()); + + // update the last write timestamp for SESSION consistency + String key = GTsDict.CombineCollectionName(actualDbName(dbName), collectionName); + GTsDict.getInstance().updateCollectionTs(key, response.getTimestamp()); return UpsertResp.builder() .upsertCnt(response.getUpsertCnt()) .build(); @@ -297,19 +293,21 @@ public SearchIteratorV2 searchIteratorV2(MilvusServiceGrpc.MilvusServiceBlocking } public DeleteResp delete(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, DeleteReq request) { - String title = String.format("DeleteRequest collectionName:%s", request.getCollectionName()); + String dbName = request.getDatabaseName(); + String collectionName = request.getCollectionName(); + String title = String.format("DeleteRequest collectionName:%s", collectionName); if (request.getFilter() != null && request.getIds() != null) { throw new MilvusClientException(ErrorCode.INVALID_PARAMS, "filter and ids can't be set at the same time"); } if (request.getFilter() == null) { - DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", request.getCollectionName(), false); + DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, dbName, collectionName, false); DescribeCollectionResp respR = convertUtils.convertDescCollectionResp(descResp); request.setFilter(vectorUtils.getExprById(respR.getPrimaryFieldName(), request.getIds())); } DeleteRequest.Builder builder = DeleteRequest.newBuilder() - .setCollectionName(request.getCollectionName()) + .setCollectionName(collectionName) .setPartitionName(request.getPartitionName()) .setExpr(request.getFilter()); if (request.getFilter() != null && !request.getFilter().isEmpty()) { @@ -319,8 +317,15 @@ public DeleteResp delete(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStu }); } MutationResult response = blockingStub.delete(builder.build()); + + // if illegal data, server fails to process delete, clean the schema cache + // so that the next call of dml can update the cache + cleanCacheIfFailed(response.getStatus(), dbName, collectionName); rpcUtils.handleResponse(title, response.getStatus()); - GTsDict.getInstance().updateCollectionTs(request.getCollectionName(), response.getTimestamp()); + + // update the last write timestamp for SESSION consistency + String key = GTsDict.CombineCollectionName(actualDbName(dbName), collectionName); + GTsDict.getInstance().updateCollectionTs(key, response.getTimestamp()); return DeleteResp.builder() .deleteCnt(response.getDeleteCnt()) .build(); diff --git a/sdk-core/src/main/java/io/milvus/v2/service/vector/request/DeleteReq.java b/sdk-core/src/main/java/io/milvus/v2/service/vector/request/DeleteReq.java index ed6c16af7..39132b336 100644 --- a/sdk-core/src/main/java/io/milvus/v2/service/vector/request/DeleteReq.java +++ b/sdk-core/src/main/java/io/milvus/v2/service/vector/request/DeleteReq.java @@ -30,6 +30,8 @@ @Data @SuperBuilder public class DeleteReq { + @Builder.Default + private String databaseName = ""; private String collectionName; @Builder.Default private String partitionName = ""; diff --git a/sdk-core/src/main/java/io/milvus/v2/service/vector/request/InsertReq.java b/sdk-core/src/main/java/io/milvus/v2/service/vector/request/InsertReq.java index cef0fde74..535ead438 100644 --- a/sdk-core/src/main/java/io/milvus/v2/service/vector/request/InsertReq.java +++ b/sdk-core/src/main/java/io/milvus/v2/service/vector/request/InsertReq.java @@ -55,6 +55,8 @@ public class InsertReq { * */ private List data; + @Builder.Default + private String databaseName = ""; private String collectionName; @Builder.Default private String partitionName = ""; diff --git a/sdk-core/src/main/java/io/milvus/v2/service/vector/request/UpsertReq.java b/sdk-core/src/main/java/io/milvus/v2/service/vector/request/UpsertReq.java index 58508df94..2f527e960 100644 --- a/sdk-core/src/main/java/io/milvus/v2/service/vector/request/UpsertReq.java +++ b/sdk-core/src/main/java/io/milvus/v2/service/vector/request/UpsertReq.java @@ -53,6 +53,8 @@ public class UpsertReq { * */ private List data; + @Builder.Default + private String databaseName = ""; private String collectionName; @Builder.Default private String partitionName = ""; diff --git a/sdk-core/src/main/java/io/milvus/v2/utils/DataUtils.java b/sdk-core/src/main/java/io/milvus/v2/utils/DataUtils.java index b5e8e1ea7..cc8e6eae0 100644 --- a/sdk-core/src/main/java/io/milvus/v2/utils/DataUtils.java +++ b/sdk-core/src/main/java/io/milvus/v2/utils/DataUtils.java @@ -32,6 +32,7 @@ import lombok.Builder; import lombok.Getter; import lombok.NonNull; +import org.apache.commons.lang3.StringUtils; import java.util.*; @@ -43,6 +44,7 @@ public static class InsertBuilderWrapper { public InsertRequest convertGrpcInsertRequest(@NonNull InsertReq requestParam, DescribeCollectionResp descColl) { + String dbName = requestParam.getDatabaseName(); String collectionName = requestParam.getCollectionName(); // generate insert request builder @@ -51,6 +53,9 @@ public InsertRequest convertGrpcInsertRequest(@NonNull InsertReq requestParam, .setCollectionName(collectionName) .setBase(msgBase) .setNumRows(requestParam.getData().size()); + if (StringUtils.isNotEmpty(dbName)) { + insertBuilder.setDbName(dbName); + } upsertBuilder = null; fillFieldsData(requestParam, descColl); return insertBuilder.build(); @@ -58,14 +63,18 @@ public InsertRequest convertGrpcInsertRequest(@NonNull InsertReq requestParam, public UpsertRequest convertGrpcUpsertRequest(@NonNull UpsertReq requestParam, DescribeCollectionResp descColl) { + String dbName = requestParam.getDatabaseName(); String collectionName = requestParam.getCollectionName(); // generate upsert request builder - MsgBase msgBase = MsgBase.newBuilder().setMsgType(MsgType.Insert).build(); + MsgBase msgBase = MsgBase.newBuilder().setMsgType(MsgType.Upsert).build(); upsertBuilder = UpsertRequest.newBuilder() .setCollectionName(collectionName) .setBase(msgBase) .setNumRows(requestParam.getData().size()); + if (StringUtils.isNotEmpty(dbName)) { + upsertBuilder.setDbName(dbName); + } insertBuilder = null; fillFieldsData(requestParam, descColl); return upsertBuilder.build(); diff --git a/sdk-core/src/test/java/io/milvus/client/MilvusClientDockerTest.java b/sdk-core/src/test/java/io/milvus/client/MilvusClientDockerTest.java index fd575ae86..166491d11 100644 --- a/sdk-core/src/test/java/io/milvus/client/MilvusClientDockerTest.java +++ b/sdk-core/src/test/java/io/milvus/client/MilvusClientDockerTest.java @@ -25,6 +25,7 @@ import io.milvus.TestUtils; import io.milvus.common.clientenum.ConsistencyLevelEnum; import io.milvus.common.utils.Float16Utils; +import io.milvus.common.utils.GTsDict; import io.milvus.common.utils.JsonUtils; import io.milvus.grpc.*; import io.milvus.orm.iterator.QueryIterator; @@ -75,6 +76,19 @@ class MilvusClientDockerTest { private static final TestUtils utils = new TestUtils(DIMENSION); + // this class is for testing the behavior of AbstractMilvusGrpcClient + // to expose some internal methods + private static class MilvusClientForTest extends MilvusServiceClient { + public MilvusClientForTest(ConnectParam connectParam) { + super(connectParam); + } + + public DescribeCollectionResponse getCollectionInfo(String databaseName, String collectionName) { + String key = GTsDict.CombineCollectionName(databaseName, collectionName); + return cacheCollectionInfo.get(key); + } + } + @Container private static final MilvusContainer milvus = new MilvusContainer(TestUtils.MilvusDockerImageID) .withEnv("DEPLOY_MODE", "STANDALONE"); @@ -2835,7 +2849,11 @@ public void testIterator() { @Test void testDatabase() { String dbName = "test_database"; - CreateDatabaseParam createDatabaseParam = CreateDatabaseParam.newBuilder().withDatabaseName(dbName).withReplicaNumber(1).withResourceGroups(Arrays.asList("rg1")).build(); + CreateDatabaseParam createDatabaseParam = CreateDatabaseParam.newBuilder() + .withDatabaseName(dbName) + .withReplicaNumber(1) + .withResourceGroups(Arrays.asList("rg1")) + .build(); R createResponse = client.createDatabase(createDatabaseParam); Assertions.assertEquals(R.Status.Success.getCode(), createResponse.getStatus().intValue()); @@ -2849,7 +2867,11 @@ void testDatabase() { Assertions.assertEquals(1, describeDBWrapper.getResourceGroups().size()); // alter database props - AlterDatabaseParam alterDatabaseParam = AlterDatabaseParam.newBuilder().withDatabaseName(dbName).withReplicaNumber(3).WithResourceGroups(Arrays.asList("rg1", "rg2", "rg3")).build(); + AlterDatabaseParam alterDatabaseParam = AlterDatabaseParam.newBuilder() + .withDatabaseName(dbName) + .withReplicaNumber(3) + .WithResourceGroups(Arrays.asList("rg1", "rg2", "rg3")) + .build(); R alterDatabaseResponse = client.alterDatabase(alterDatabaseParam); Assertions.assertEquals(R.Status.Success.getCode(), alterDatabaseResponse.getStatus().intValue()); @@ -2867,7 +2889,7 @@ void testDatabase() { Assertions.assertEquals(R.Status.Success.getCode(), dropResponse.getStatus().intValue()); } - private static void createSimpleCollection(String collName, String pkName, boolean autoID, int dimension) { + private static void createSimpleCollection(MilvusClient client, String collName, String pkName, boolean autoID, int dimension) { client.dropCollection(DropCollectionParam.newBuilder() .withCollectionName(collName) .build()); @@ -2896,96 +2918,169 @@ private static void createSimpleCollection(String collName, String pkName, boole } @Test - void testCacheCollectionSchema() { + void testCacheCollectionSchema() throws InterruptedException { String randomCollectionName = generator.generate(10); - createSimpleCollection(randomCollectionName, "aaa", false, DIMENSION); + // create a new db + String testDbName = "test_database"; + CreateDatabaseParam createDatabaseParam = CreateDatabaseParam.newBuilder() + .withDatabaseName(testDbName) + .withReplicaNumber(1) + .build(); + R dbResponse = client.createDatabase(createDatabaseParam); + Assertions.assertEquals(R.Status.Success.getCode(), dbResponse.getStatus().intValue()); + + // create a collection in the default db + createSimpleCollection(client, randomCollectionName, "pk", false, DIMENSION); - // insert/upsert correct data + // a temp client connect to the new db + ConnectParam connectParam = connectParamBuilder() + .withAuthorization("root", "Milvus") + .withDatabaseName(testDbName) + .build(); + MilvusClientForTest tempClient = new MilvusClientForTest(connectParam); + + // use the temp client to insert correct data into the default collection + // there will be a schema cache for this collection in the temp client + // there will be timestamp for this collection in the global GTsDict JsonObject row = new JsonObject(); - row.addProperty("aaa", 8); - row.add("vector", JsonUtils.toJsonTree(utils.generateFloatVectors(1).get(0))); - R insertR = client.insert(InsertParam.newBuilder() + row.addProperty("pk", 8); + row.add("vector", JsonUtils.toJsonTree(utils.generateFloatVector(DIMENSION))); + R insertR = tempClient.insert(InsertParam.newBuilder() + .withDatabaseName("default") .withCollectionName(randomCollectionName) .withRows(Collections.singletonList(row)) .build()); Assertions.assertEquals(R.Status.Success.getCode(), insertR.getStatus().intValue()); Assertions.assertEquals(1, insertR.getData().getInsertCnt()); - insertR = client.upsert(UpsertParam.newBuilder() - .withCollectionName(randomCollectionName) - .withRows(Collections.singletonList(row)) - .build()); - Assertions.assertEquals(R.Status.Success.getCode(), insertR.getStatus().intValue()); - Assertions.assertEquals(1, insertR.getData().getUpsertCnt()); + // check the schema cache of this collection, must be not null + DescribeCollectionResponse descResp = tempClient.getCollectionInfo("default", randomCollectionName); + Assertions.assertNotNull(descResp); - // create a new collection with the same name, different dimension - createSimpleCollection(randomCollectionName, "aaa", false, 100); + // check the timestamp of this collection, must be positive + String key1 = GTsDict.CombineCollectionName("default", randomCollectionName); + Long ts11 = GTsDict.getInstance().getCollectionTs(key1); + Assertions.assertNotNull(ts11); + Assertions.assertTrue(ts11 > 0L); - // insert/upsert wrong data, dimension mismatch - insertR = client.insert(InsertParam.newBuilder() + // insert wrong data, the schema cache will be removed + row.add("vector", JsonUtils.toJsonTree(utils.generateFloatVector(7))); + insertR = tempClient.insert(InsertParam.newBuilder() + .withDatabaseName("default") .withCollectionName(randomCollectionName) .withRows(Collections.singletonList(row)) .build()); Assertions.assertNotEquals(R.Status.Success.getCode(), insertR.getStatus().intValue()); + descResp = tempClient.getCollectionInfo("default", randomCollectionName); + Assertions.assertNull(descResp); + // use the default client to do upsert correct data + TimeUnit.MILLISECONDS.sleep(100); + row.addProperty("pk", 999); + row.add("vector", JsonUtils.toJsonTree(utils.generateFloatVector(DIMENSION))); insertR = client.upsert(UpsertParam.newBuilder() .withCollectionName(randomCollectionName) .withRows(Collections.singletonList(row)) .build()); - Assertions.assertNotEquals(R.Status.Success.getCode(), insertR.getStatus().intValue()); + Assertions.assertEquals(R.Status.Success.getCode(), insertR.getStatus().intValue()); + Assertions.assertEquals(1, insertR.getData().getUpsertCnt()); - // insert/upsert correct data - List vector = new ArrayList<>(); - for (int i = 0; i < 100; ++i) { - vector.add(RANDOM.nextFloat()); - } - row.add("vector", JsonUtils.toJsonTree(vector)); - insertR = client.insert(InsertParam.newBuilder() + // check the timestamp of this collection, must be a new positive + Long ts12 = GTsDict.getInstance().getCollectionTs(key1); + Assertions.assertNotNull(ts12); + Assertions.assertTrue(ts12 > ts11); + + // create a new collection with the same name, different schema, in the test db + createSimpleCollection(tempClient, randomCollectionName, "aaa", false, 4); + + // use the temp client to insert wrong data, wrong dimension + row.addProperty("aaa", 22); + row.add("vector", JsonUtils.toJsonTree(utils.generateFloatVector(7))); + insertR = tempClient.insert(InsertParam.newBuilder() .withCollectionName(randomCollectionName) .withRows(Collections.singletonList(row)) .build()); - Assertions.assertEquals(R.Status.Success.getCode(), insertR.getStatus().intValue()); - Assertions.assertEquals(1, insertR.getData().getInsertCnt()); + Assertions.assertNotEquals(R.Status.Success.getCode(), insertR.getStatus().intValue()); - insertR = client.upsert(UpsertParam.newBuilder() + // check the timestamp of this collection, must be null + String key2 = GTsDict.CombineCollectionName(testDbName, randomCollectionName); + Long ts21 = GTsDict.getInstance().getCollectionTs(key2); + Assertions.assertNull(ts21); + + // use the temp client to do upsert correct data + TimeUnit.MILLISECONDS.sleep(100); + row.add("vector", JsonUtils.toJsonTree(utils.generateFloatVector(4))); + insertR = tempClient.upsert(UpsertParam.newBuilder() .withCollectionName(randomCollectionName) .withRows(Collections.singletonList(row)) .build()); Assertions.assertEquals(R.Status.Success.getCode(), insertR.getStatus().intValue()); Assertions.assertEquals(1, insertR.getData().getUpsertCnt()); - // create a new collection with the same name, different primary key - createSimpleCollection(randomCollectionName, "bbb", false, 100); + // check the schema cache of this collection, must be not null + descResp = tempClient.getCollectionInfo(testDbName, randomCollectionName); + Assertions.assertNotNull(descResp); + + // check the timestamp of this collection, must be positive + Long ts22 = GTsDict.getInstance().getCollectionTs(key2); + Assertions.assertNotNull(ts22); + Assertions.assertTrue(ts22 > 0L); - // insert/upsert wrong data, primary key name mismatch - insertR = client.insert(InsertParam.newBuilder() + // tempClient upsert wrong data + row.add("vector", JsonUtils.toJsonTree(utils.generateFloatVector(7))); + insertR = tempClient.upsert(UpsertParam.newBuilder() .withCollectionName(randomCollectionName) .withRows(Collections.singletonList(row)) .build()); Assertions.assertNotEquals(R.Status.Success.getCode(), insertR.getStatus().intValue()); - insertR = client.upsert(UpsertParam.newBuilder() + // check the schema cache of this collection, must be null + descResp = tempClient.getCollectionInfo(testDbName, randomCollectionName); + Assertions.assertNull(descResp); + + // tempClient delete data + R delResp = tempClient.delete(DeleteIdsParam.newBuilder() .withCollectionName(randomCollectionName) - .withRows(Collections.singletonList(row)) + .addPrimaryId(22L) .build()); - Assertions.assertNotEquals(R.Status.Success.getCode(), insertR.getStatus().intValue()); + Assertions.assertEquals(R.Status.Success.getCode(), delResp.getStatus().intValue()); - // insert/upsert correct data - row.addProperty("bbb", 5); - insertR = client.insert(InsertParam.newBuilder() + // check the schema cache of this collection, must be not null + descResp = tempClient.getCollectionInfo(testDbName, randomCollectionName); + Assertions.assertNotNull(descResp); + + // check the timestamp of this collection, must be greater than previous + Long ts23 = GTsDict.getInstance().getCollectionTs(key2); + Assertions.assertNotNull(ts23); + Assertions.assertTrue(ts23 > ts22); + + // use the default client to drop the collection in the new db + R dropResp = client.dropCollection(DropCollectionParam.newBuilder() .withCollectionName(randomCollectionName) - .withRows(Collections.singletonList(row)) + .withDatabaseName(testDbName) .build()); - Assertions.assertEquals(R.Status.Success.getCode(), insertR.getStatus().intValue()); - Assertions.assertEquals(1, insertR.getData().getInsertCnt()); + Assertions.assertEquals(R.Status.Success.getCode(), dropResp.getStatus().intValue()); - insertR = client.upsert(UpsertParam.newBuilder() + // check the timestamp of this collection, must be deleted + Long ts31 = GTsDict.getInstance().getCollectionTs(key2); + Assertions.assertNull(ts31); + + // use the temp client to insert correct data into the collection + row.add("vector", JsonUtils.toJsonTree(utils.generateFloatVector(4))); + insertR = tempClient.insert(InsertParam.newBuilder() .withCollectionName(randomCollectionName) .withRows(Collections.singletonList(row)) .build()); - Assertions.assertEquals(R.Status.Success.getCode(), insertR.getStatus().intValue()); - Assertions.assertEquals(1, insertR.getData().getUpsertCnt()); + Assertions.assertNotEquals(R.Status.Success.getCode(), insertR.getStatus().intValue()); + + // check the timestamp of this collection, must be null + Long ts32 = GTsDict.getInstance().getCollectionTs(key2); + Assertions.assertNull(ts32); + + // check the schema cache of this collection, must be null + descResp = tempClient.getCollectionInfo(testDbName, randomCollectionName); + Assertions.assertNull(descResp); } @Test diff --git a/sdk-core/src/test/java/io/milvus/v2/client/MilvusClientV2DockerTest.java b/sdk-core/src/test/java/io/milvus/v2/client/MilvusClientV2DockerTest.java index ccb6e80d6..9eeb7fb0f 100644 --- a/sdk-core/src/test/java/io/milvus/v2/client/MilvusClientV2DockerTest.java +++ b/sdk-core/src/test/java/io/milvus/v2/client/MilvusClientV2DockerTest.java @@ -27,6 +27,7 @@ import io.milvus.common.clientenum.FunctionType; import io.milvus.common.resourcegroup.*; import io.milvus.common.utils.Float16Utils; +import io.milvus.common.utils.GTsDict; import io.milvus.common.utils.JsonUtils; import io.milvus.orm.iterator.QueryIterator; import io.milvus.orm.iterator.SearchIterator; @@ -70,6 +71,7 @@ import java.nio.ByteBuffer; import java.util.*; +import java.util.concurrent.TimeUnit; import java.util.function.Function; @Testcontainers(disabledWithoutDocker = true) @@ -525,6 +527,15 @@ void testFloatVectors() { List queryResults = queryResp.getQueryResults(); Assertions.assertEquals(6, queryResults.size()); + // test the withTimeout works well + client.withTimeout(1, TimeUnit.NANOSECONDS); + Assertions.assertThrows(MilvusClientException.class, ()->client.query(QueryReq.builder() + .collectionName(randomCollectionName) + .filter("JSON_CONTAINS_ANY(json_field[\"flags\"], [4, 100])") + .consistencyLevel(ConsistencyLevel.STRONG) + .build())); + + client.withTimeout(0, TimeUnit.SECONDS); client.dropCollection(DropCollectionReq.builder().collectionName(randomCollectionName).build()); } @@ -1479,7 +1490,7 @@ void testIndex() { Assertions.assertEquals("64", extraParams.get("efConstruction")); } - private static void createSimpleCollection(String collName, String pkName, boolean autoID, int dimension) { + private static void createSimpleCollection(MilvusClientV2 client, String collName, String pkName, boolean autoID, int dimension) { client.dropCollection(DropCollectionReq.builder() .collectionName(collName) .build()); @@ -1494,84 +1505,117 @@ private static void createSimpleCollection(String collName, String pkName, boole } @Test - void testCacheCollectionSchema() { + void testCacheCollectionSchema() throws InterruptedException { String randomCollectionName = generator.generate(10); - createSimpleCollection(randomCollectionName, "aaa", false, DIMENSION); + // create a new db + String testDbName = "test_database"; + client.createDatabase(CreateDatabaseReq.builder() + .databaseName(testDbName) + .build()); + + // create a collection in the default db + createSimpleCollection(client, randomCollectionName, "pk", false, DIMENSION); + + // a temp client connect to the new db + ConnectConfig config = ConnectConfig.builder() + .uri(milvus.getEndpoint()) + .dbName(testDbName) + .build(); + MilvusClientV2 tempClient = new MilvusClientV2(config); - // insert/upsert correct data + // use the temp client to insert correct data into the default collection + // there will be a schema cache for this collection in the temp client + // there will be timestamp for this collection in the global GTsDict JsonObject row = new JsonObject(); - row.addProperty("aaa", 8); - row.add("vector", JsonUtils.toJsonTree(utils.generateFloatVectors(1).get(0))); - InsertResp insertResp = client.insert(InsertReq.builder() + row.addProperty("pk", 8); + row.add("vector", JsonUtils.toJsonTree(utils.generateFloatVector(DIMENSION))); + InsertResp insertResp = tempClient.insert(InsertReq.builder() + .databaseName("default") .collectionName(randomCollectionName) .data(Collections.singletonList(row)) .build()); Assertions.assertEquals(1L, insertResp.getInsertCnt()); - UpsertResp upsertResp = client.upsert(UpsertReq.builder() - .collectionName(randomCollectionName) - .data(Collections.singletonList(row)) - .build()); - Assertions.assertEquals(1L, upsertResp.getUpsertCnt()); + // check the timestamp of this collection, must be positive + String key1 = GTsDict.CombineCollectionName("default", randomCollectionName); + Long ts11 = GTsDict.getInstance().getCollectionTs(key1); + Assertions.assertNotNull(ts11); + Assertions.assertTrue(ts11 > 0L); - // create a new collection with the same name, different dimension - createSimpleCollection(randomCollectionName, "aaa", false, 100); - - // insert/upsert wrong data, dimension mismatch + // insert wrong data, the schema cache will be removed + row.add("vector", JsonUtils.toJsonTree(utils.generateFloatVector(7))); Assertions.assertThrows(MilvusClientException.class, ()->client.insert(InsertReq.builder() - .collectionName(randomCollectionName) - .data(Collections.singletonList(row)) - .build())); - Assertions.assertThrows(MilvusClientException.class, ()->client.upsert(UpsertReq.builder() + .databaseName("default") .collectionName(randomCollectionName) .data(Collections.singletonList(row)) .build())); - // insert/upsert correct data - List vector = new ArrayList<>(); - for (int i = 0; i < 100; ++i) { - vector.add(RANDOM.nextFloat()); - } - row.add("vector", JsonUtils.toJsonTree(vector)); - insertResp = client.insert(InsertReq.builder() - .collectionName(randomCollectionName) - .data(Collections.singletonList(row)) - .build()); - Assertions.assertEquals(1L, insertResp.getInsertCnt()); - - upsertResp = client.upsert(UpsertReq.builder() + // use the default client to do upsert correct data + TimeUnit.MILLISECONDS.sleep(100); + row.addProperty("pk", 999); + row.add("vector", JsonUtils.toJsonTree(utils.generateFloatVector(DIMENSION))); + UpsertResp upsertResp = client.upsert(UpsertReq.builder() .collectionName(randomCollectionName) .data(Collections.singletonList(row)) .build()); Assertions.assertEquals(1L, upsertResp.getUpsertCnt()); - // create a new collection with the same name, different primary key - createSimpleCollection(randomCollectionName, "bbb", false, 100); + // check the timestamp of this collection, must be a new positive + Long ts12 = GTsDict.getInstance().getCollectionTs(key1); + Assertions.assertNotNull(ts12); + Assertions.assertTrue(ts12 > ts11); - // insert/upsert wrong data, primary key name mismatch - Assertions.assertThrows(MilvusClientException.class, ()->client.insert(InsertReq.builder() + // create a new collection with the same name, different schema, in the test db + createSimpleCollection(tempClient, randomCollectionName, "aaa", false, 4); + + // use the temp client to insert wrong data, wrong dimension + row.addProperty("aaa", 22); + row.add("vector", JsonUtils.toJsonTree(utils.generateFloatVector(7))); + Assertions.assertThrows(MilvusClientException.class, ()->tempClient.insert(InsertReq.builder() .collectionName(randomCollectionName) .data(Collections.singletonList(row)) .build())); - Assertions.assertThrows(MilvusClientException.class, ()->client.upsert(UpsertReq.builder() + + // check the timestamp of this collection, must be null + String key2 = GTsDict.CombineCollectionName(testDbName, randomCollectionName); + Long ts21 = GTsDict.getInstance().getCollectionTs(key2); + Assertions.assertNull(ts21); + + // use the temp client to do upsert correct data + TimeUnit.MILLISECONDS.sleep(100); + row.add("vector", JsonUtils.toJsonTree(utils.generateFloatVector(4))); + upsertResp = tempClient.upsert(UpsertReq.builder() .collectionName(randomCollectionName) .data(Collections.singletonList(row)) - .build())); + .build()); + Assertions.assertEquals(1L, upsertResp.getUpsertCnt()); + + // check the timestamp of this collection, must be positive + Long ts22 = GTsDict.getInstance().getCollectionTs(key2); + Assertions.assertNotNull(ts22); + Assertions.assertTrue(ts22 > 0L); - // insert/upsert correct data - row.addProperty("bbb", 5); - insertResp = client.insert(InsertReq.builder() + // tempClient delete data + tempClient.delete(DeleteReq.builder() .collectionName(randomCollectionName) - .data(Collections.singletonList(row)) + .ids(Collections.singletonList(22L)) .build()); - Assertions.assertEquals(1L, insertResp.getInsertCnt()); - upsertResp = client.upsert(UpsertReq.builder() + // check the timestamp of this collection, must be greater than previous + Long ts23 = GTsDict.getInstance().getCollectionTs(key2); + Assertions.assertNotNull(ts23); + Assertions.assertTrue(ts23 > ts22); + + // use the default client to drop the collection in the new db + client.dropCollection(DropCollectionReq.builder() + .databaseName(testDbName) .collectionName(randomCollectionName) - .data(Collections.singletonList(row)) .build()); - Assertions.assertEquals(1L, upsertResp.getUpsertCnt()); + + // check the timestamp of this collection, must be deleted + Long ts31 = GTsDict.getInstance().getCollectionTs(key2); + Assertions.assertNull(ts31); } @Test