diff --git a/docker-compose.yml b/docker-compose.yml index d882236ba..baaede566 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -32,7 +32,7 @@ services: standalone: container_name: milvus-javasdk-test-standalone - image: milvusdb/milvus:master-20250610-9439eaef-amd64 + image: milvusdb/milvus:master-20250706-d0976450 command: ["milvus", "run", "standalone"] environment: ETCD_ENDPOINTS: etcd:2379 @@ -77,7 +77,7 @@ services: standaloneslave: container_name: milvus-javasdk-test-slave-standalone - image: milvusdb/milvus:master-20250610-9439eaef-amd64 + image: milvusdb/milvus:master-20250706-d0976450 command: ["milvus", "run", "standalone"] environment: ETCD_ENDPOINTS: etcdslave:2379 diff --git a/sdk-core/src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java b/sdk-core/src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java index 865b663c5..5064210b6 100644 --- a/sdk-core/src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java +++ b/sdk-core/src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java @@ -47,6 +47,9 @@ import io.milvus.param.resourcegroup.*; import io.milvus.param.role.*; import io.milvus.response.*; +import io.milvus.v2.service.collection.response.DescribeCollectionResp; +import io.milvus.v2.service.vector.request.InsertReq; +import io.milvus.v2.utils.DataUtils; import lombok.NonNull; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -79,10 +82,10 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient { * If the cache doesn't have the collection info, call describeCollection() and cache it. * If insert/upsert get server error, remove the cached collection info. */ - private DescribeCollectionResponse getCollectionInfo(String databaseName, String collectionName) { + private DescribeCollectionResponse getCollectionInfo(String databaseName, String collectionName, boolean forceUpdate) { String key = combineCacheKey(databaseName, collectionName); DescribeCollectionResponse info = cacheCollectionInfo.get(key); - if (info == null) { + if (info == null || forceUpdate) { String msg = String.format("Fail to describe collection '%s'", collectionName); DescribeCollectionRequest.Builder builder = DescribeCollectionRequest.newBuilder() .setCollectionName(collectionName); @@ -119,10 +122,14 @@ private String combineCacheKey(String databaseName, String collectionName) { private void cleanCacheIfFailed(Status status, String databaseName, String collectionName) { if ((status.getCode() != 0 && status.getCode() != 8) || (!status.getErrorCode().equals(ErrorCode.Success) && status.getErrorCode() != ErrorCode.RateLimit)) { - cacheCollectionInfo.remove(combineCacheKey(databaseName, collectionName)); + removeCollectionCache(databaseName, collectionName); } } + private void removeCollectionCache(String databaseName, String collectionName) { + cacheCollectionInfo.remove(combineCacheKey(databaseName, collectionName)); + } + private void waitForLoadingCollection(String databaseName, String collectionName, List partitionNames, long waitingInterval, long timeout) throws IllegalResponseException { long tsBegin = System.currentTimeMillis(); @@ -637,19 +644,21 @@ public R dropCollection(@NonNull DropCollectionParam requestParam) { } logDebug(requestParam.toString()); - String title = String.format("DropCollectionRequest collectionName:%s", requestParam.getCollectionName()); + String dbName = requestParam.getDatabaseName(); + String collectionName = requestParam.getCollectionName(); + String title = String.format("DropCollectionRequest collectionName:%s", collectionName); try { DropCollectionRequest.Builder builder = DropCollectionRequest.newBuilder() - .setCollectionName(requestParam.getCollectionName()); - if (StringUtils.isNotEmpty(requestParam.getDatabaseName())) { - builder.setDbName(requestParam.getDatabaseName()); + .setCollectionName(collectionName); + if (StringUtils.isNotEmpty(dbName)) { + builder.setDbName(dbName); } DropCollectionRequest dropCollectionRequest = builder.build(); Status response = blockingStub().dropCollection(dropCollectionRequest); handleResponse(title, response); - cacheCollectionInfo.remove(combineCacheKey(requestParam.getDatabaseName(), requestParam.getCollectionName())); + removeCollectionCache(dbName, collectionName); return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG)); } catch (StatusRuntimeException e) { logError("{} RPC failed! Exception:{}", title, e); @@ -1587,6 +1596,13 @@ public R delete(@NonNull DeleteParam requestParam) { } } + private InsertRequest buildInsertRequest(InsertParam requestParam, DescribeCollectionResponse descResp) { + DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp); + ParamUtils.InsertBuilderWrapper builderWraper = new ParamUtils.InsertBuilderWrapper(requestParam, wrapper); + InsertRequest rpcRequest = builderWraper.buildInsertRequest(); + return rpcRequest.toBuilder().setSchemaTimestamp(descResp.getUpdateTimestamp()).build(); + } + @Override public R insert(@NonNull InsertParam requestParam) { if (!clientIsReady()) { @@ -1594,23 +1610,46 @@ public R insert(@NonNull InsertParam requestParam) { } logDebug(requestParam.toString()); - String title = String.format("InsertRequest collectionName:%s", requestParam.getCollectionName()); + String dbName = requestParam.getDatabaseName(); + String collectionName = requestParam.getCollectionName(); + String title = String.format("InsertRequest collectionName:%s", collectionName); try { - DescribeCollectionResponse descResp = getCollectionInfo(requestParam.getDatabaseName(), - requestParam.getCollectionName()); - DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp); - ParamUtils.InsertBuilderWrapper builderWraper = new ParamUtils.InsertBuilderWrapper(requestParam, wrapper); - MutationResult response = blockingStub().insert(builderWraper.buildInsertRequest()); - cleanCacheIfFailed(response.getStatus(), requestParam.getDatabaseName(), requestParam.getCollectionName()); + DescribeCollectionResponse descResp = getCollectionInfo(dbName, collectionName, false); + + // To handle this bug: https://github.com/milvus-io/milvus/issues/41688 + // if the collection is already recreated, some schema might be changed, the buildInsertRequest() + // could not convert the InsertRequest with the old collectionDesc, we need to update the + // collectionDesc and call buildInsertRequest() again. + InsertRequest rpcRequest; + try { + rpcRequest = buildInsertRequest(requestParam, descResp); + } catch (Exception ignored) { + descResp = getCollectionInfo(dbName, collectionName, true); + rpcRequest = buildInsertRequest(requestParam, descResp); + } + + // If there are multiple clients, the client_A repeatedly do insert, the client_B changes + // the collection schema. The server might return a special error code "SchemaMismatch". + // If the client_A gets this special error code, it needs to update the collectionDesc and + // call insert() again. + MutationResult response = blockingStub().insert(rpcRequest); + if (response.getStatus().getErrorCode() == io.milvus.grpc.ErrorCode.SchemaMismatch) { + getCollectionInfo(dbName, collectionName, true); + return this.insert(requestParam); + } + + // if illegal data, server fails to process insert, else succeed + cleanCacheIfFailed(response.getStatus(), dbName, collectionName); handleResponse(title, response.getStatus()); - GTsDict.getInstance().updateCollectionTs(requestParam.getCollectionName(), response.getTimestamp()); + GTsDict.getInstance().updateCollectionTs(collectionName, response.getTimestamp()); return R.success(response); } catch (StatusRuntimeException e) { logError("{} RPC failed! Exception:{}", title, e); return R.failed(e); } catch (Exception e) { logError("{} failed! Exception:{}", title, e); + removeCollectionCache(dbName, collectionName); return R.failed(e); } } @@ -1624,23 +1663,35 @@ public ListenableFuture> insertAsync(InsertParam requestParam) } logDebug(requestParam.toString()); - String title = String.format("InsertAsyncRequest collectionName:%s", requestParam.getCollectionName()); + String dbName = requestParam.getDatabaseName(); + String collectionName = requestParam.getCollectionName(); + String title = String.format("InsertAsyncRequest collectionName:%s", collectionName); - DescribeCollectionResponse descResp = getCollectionInfo(requestParam.getDatabaseName(), - requestParam.getCollectionName()); - DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp); - ParamUtils.InsertBuilderWrapper builderWraper = new ParamUtils.InsertBuilderWrapper(requestParam, wrapper); - ListenableFuture response = futureStub().insert(builderWraper.buildInsertRequest()); + DescribeCollectionResponse descResp = getCollectionInfo(dbName, collectionName, false); + + // To handle this bug: https://github.com/milvus-io/milvus/issues/41688 + // if the collection is already recreated, some schema might be changed, the buildInsertRequest() + // could not convert the InsertRequest with the old collectionDesc, we need to update the + // collectionDesc and call buildInsertRequest() again. + InsertRequest rpcRequest; + try { + rpcRequest = buildInsertRequest(requestParam, descResp); + } catch (Exception ignored) { + descResp = getCollectionInfo(dbName, collectionName, true); + rpcRequest = buildInsertRequest(requestParam, descResp); + } + ListenableFuture response = futureStub().insert(rpcRequest); Futures.addCallback( response, new FutureCallback() { @Override public void onSuccess(MutationResult result) { - cleanCacheIfFailed(result.getStatus(), requestParam.getDatabaseName(), requestParam.getCollectionName()); + // if illegal data, server fails to process insert, else succeed + cleanCacheIfFailed(result.getStatus(), dbName, collectionName); if (result.getStatus().getErrorCode() == ErrorCode.Success) { logDebug("{} successfully!", title); - GTsDict.getInstance().updateCollectionTs(requestParam.getCollectionName(), result.getTimestamp()); + GTsDict.getInstance().updateCollectionTs(collectionName, result.getTimestamp()); } else { logError("{} failed:\n{}", title, result.getStatus().getReason()); } @@ -1666,6 +1717,13 @@ public void onFailure(@Nonnull Throwable t) { return Futures.transform(response, transformFunc::apply, MoreExecutors.directExecutor()); } + private UpsertRequest buildUpsertRequest(UpsertParam requestParam, DescribeCollectionResponse descResp) { + DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp); + ParamUtils.InsertBuilderWrapper builderWraper = new ParamUtils.InsertBuilderWrapper(requestParam, wrapper); + UpsertRequest rpcRequest = builderWraper.buildUpsertRequest(); + return rpcRequest.toBuilder().setSchemaTimestamp(descResp.getUpdateTimestamp()).build(); + } + @Override public R upsert(UpsertParam requestParam) { if (!clientIsReady()) { @@ -1673,23 +1731,46 @@ public R upsert(UpsertParam requestParam) { } logDebug(requestParam.toString()); - String title = String.format("UpsertRequest collectionName:%s", requestParam.getCollectionName()); + String dbName = requestParam.getDatabaseName(); + String collectionName = requestParam.getCollectionName(); + String title = String.format("UpsertRequest collectionName:%s", collectionName); try { - DescribeCollectionResponse descResp = getCollectionInfo(requestParam.getDatabaseName(), - requestParam.getCollectionName()); - DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp); - ParamUtils.InsertBuilderWrapper builderWraper = new ParamUtils.InsertBuilderWrapper(requestParam, wrapper); - MutationResult response = blockingStub().upsert(builderWraper.buildUpsertRequest()); - cleanCacheIfFailed(response.getStatus(), requestParam.getDatabaseName(), requestParam.getCollectionName()); + DescribeCollectionResponse descResp = getCollectionInfo(dbName, collectionName, false); + + // To handle this bug: https://github.com/milvus-io/milvus/issues/41688 + // if the collection is already recreated, some schema might be changed, the buildUpsertRequest() + // could not convert the UpsertRequest with the old collectionDesc, we need to update the + // collectionDesc and call buildUpsertRequest() again. + UpsertRequest rpcRequest; + try { + rpcRequest = buildUpsertRequest(requestParam, descResp); + } catch (Exception ignored) { + descResp = getCollectionInfo(dbName, collectionName, true); + rpcRequest = buildUpsertRequest(requestParam, descResp); + } + + // If there are multiple clients, the client_A repeatedly do upsert, the client_B changes + // the collection schema. The server might return a special error code "SchemaMismatch". + // If the client_A gets this special error code, it needs to update the collectionDesc and + // call upsert() again. + MutationResult response = blockingStub().upsert(rpcRequest); + if (response.getStatus().getErrorCode() == io.milvus.grpc.ErrorCode.SchemaMismatch) { + getCollectionInfo(dbName, collectionName, true); + return this.upsert(requestParam); + } + + // if illegal data, server fails to process upsert, else succeed + cleanCacheIfFailed(response.getStatus(), dbName, collectionName); handleResponse(title, response.getStatus()); - GTsDict.getInstance().updateCollectionTs(requestParam.getCollectionName(), response.getTimestamp()); + GTsDict.getInstance().updateCollectionTs(collectionName, response.getTimestamp()); return R.success(response); } catch (StatusRuntimeException e) { logError("{} RPC failed! Exception:{}", title, e); return R.failed(e); } catch (Exception e) { logError("{} failed! Exception:{}", title, e); + removeCollectionCache(dbName, collectionName); return R.failed(e); } } @@ -1702,23 +1783,35 @@ public ListenableFuture> upsertAsync(UpsertParam requestParam) } logDebug(requestParam.toString()); - String title = String.format("UpsertAsyncRequest collectionName:%s", requestParam.getCollectionName()); + String dbName = requestParam.getDatabaseName(); + String collectionName = requestParam.getCollectionName(); + String title = String.format("UpsertAsyncRequest collectionName:%s", collectionName); - DescribeCollectionResponse descResp = getCollectionInfo(requestParam.getDatabaseName(), - requestParam.getCollectionName()); - DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp); - ParamUtils.InsertBuilderWrapper builderWraper = new ParamUtils.InsertBuilderWrapper(requestParam, wrapper); - ListenableFuture response = futureStub().upsert(builderWraper.buildUpsertRequest()); + DescribeCollectionResponse descResp = getCollectionInfo(dbName, collectionName, false); + + // To handle this bug: https://github.com/milvus-io/milvus/issues/41688 + // if the collection is already recreated, some schema might be changed, the buildInsertRequest() + // could not convert the InsertRequest with the old collectionDesc, we need to update the + // collectionDesc and call buildInsertRequest() again. + UpsertRequest rpcRequest; + try { + rpcRequest = buildUpsertRequest(requestParam, descResp); + } catch (Exception ignored) { + descResp = getCollectionInfo(dbName, collectionName, true); + rpcRequest = buildUpsertRequest(requestParam, descResp); + } + ListenableFuture response = futureStub().upsert(rpcRequest); Futures.addCallback( response, new FutureCallback() { @Override public void onSuccess(MutationResult result) { - cleanCacheIfFailed(result.getStatus(), requestParam.getDatabaseName(), requestParam.getCollectionName()); + // if illegal data, server fails to process upsert, else succeed + cleanCacheIfFailed(result.getStatus(), dbName, collectionName); if (result.getStatus().getErrorCode() == ErrorCode.Success) { logDebug("{} successfully!", title); - GTsDict.getInstance().updateCollectionTs(requestParam.getCollectionName(), result.getTimestamp()); + GTsDict.getInstance().updateCollectionTs(collectionName, result.getTimestamp()); } else { logError("{} failed:\n{}", title, result.getStatus().getReason()); } @@ -3161,15 +3254,16 @@ public R delete(DeleteIdsParam requestParam) { return R.failed(new ClientNotConnectedException("Client rpc channel is not ready")); } logDebug(requestParam.toString()); - String title = String.format("DeleteIdsRequest collectionName:%s", requestParam.getCollectionName()); + String collectionName = requestParam.getCollectionName(); + String title = String.format("DeleteIdsRequest collectionName:%s", collectionName); try { - DescribeCollectionResponse descResp = getCollectionInfo("", requestParam.getCollectionName()); + DescribeCollectionResponse descResp = getCollectionInfo("", collectionName, false); DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp); String expr = VectorUtils.convertPksExpr(requestParam.getPrimaryIds(), wrapper); DeleteParam deleteParam = DeleteParam.newBuilder() - .withCollectionName(requestParam.getCollectionName()) + .withCollectionName(collectionName) .withPartitionName(requestParam.getPartitionName()) .withExpr(expr) .build(); diff --git a/sdk-core/src/main/java/io/milvus/v2/service/vector/VectorService.java b/sdk-core/src/main/java/io/milvus/v2/service/vector/VectorService.java index 5728e1154..2e0b5d53a 100644 --- a/sdk-core/src/main/java/io/milvus/v2/service/vector/VectorService.java +++ b/sdk-core/src/main/java/io/milvus/v2/service/vector/VectorService.java @@ -107,10 +107,14 @@ private void cleanCacheIfFailed(Status status, String databaseName, String colle if ((status.getCode() != 0 && status.getCode() != 8) || (!status.getErrorCode().equals(io.milvus.grpc.ErrorCode.Success) && status.getErrorCode() != io.milvus.grpc.ErrorCode.RateLimit)) { - cacheCollectionInfo.remove(combineCacheKey(databaseName, collectionName)); + removeCollectionCache(databaseName, collectionName); } } + private void removeCollectionCache(String databaseName, String collectionName) { + cacheCollectionInfo.remove(combineCacheKey(databaseName, collectionName)); + } + private InsertRequest buildInsertRequest(InsertReq request, DescribeCollectionResponse descResp) { DataUtils.InsertBuilderWrapper requestBuilder = new DataUtils.InsertBuilderWrapper(); DescribeCollectionResp descColl = convertUtils.convertDescCollectionResp(descResp); @@ -119,21 +123,38 @@ private InsertRequest buildInsertRequest(InsertReq request, DescribeCollectionRe } public InsertResp insert(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, InsertReq request) { - String title = String.format("InsertRequest collectionName:%s", request.getCollectionName()); + String collectionName = request.getCollectionName(); + String title = String.format("InsertRequest collectionName:%s", collectionName); // TODO: set the database name - DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", request.getCollectionName(), false); - InsertRequest rpcRequest = buildInsertRequest(request, descResp); + DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", collectionName, false); + + // To handle this bug: https://github.com/milvus-io/milvus/issues/41688 + // if the collection is already recreated, some schema might be changed, the buildInsertRequest() + // could not convert the InsertRequest with the old collectionDesc, we need to update the + // collectionDesc and call buildInsertRequest() again. + InsertRequest rpcRequest; + try { + rpcRequest = buildInsertRequest(request, descResp); + } catch (Exception ignored) { + descResp = getCollectionInfo(blockingStub, "", collectionName, true); + rpcRequest = buildInsertRequest(request, descResp); + } + + // If there are multiple clients, the client_A repeatedly do insert, the client_B changes + // the collection schema. The server might return a special error code "SchemaMismatch". + // If the client_A gets this special error code, it needs to update the collectionDesc and + // call insert() again. MutationResult response = blockingStub.insert(rpcRequest); if (response.getStatus().getErrorCode() == io.milvus.grpc.ErrorCode.SchemaMismatch) { - descResp = getCollectionInfo(blockingStub, "", request.getCollectionName(), true); - rpcRequest = buildInsertRequest(request, descResp); - response = blockingStub.insert(rpcRequest); + getCollectionInfo(blockingStub, "", collectionName, true); + return this.insert(blockingStub, request); } - cleanCacheIfFailed(response.getStatus(), "", request.getCollectionName()); + // if illegal data, server fails to process insert, else succeed + cleanCacheIfFailed(response.getStatus(), "", collectionName); rpcUtils.handleResponse(title, response.getStatus()); - GTsDict.getInstance().updateCollectionTs(request.getCollectionName(), response.getTimestamp()); + GTsDict.getInstance().updateCollectionTs(collectionName, response.getTimestamp()); if (response.getIDs().hasIntId()) { List ids = new ArrayList<>(response.getIDs().getIntId().getDataList()); @@ -158,23 +179,40 @@ private UpsertRequest buildUpsertRequest(UpsertReq request, DescribeCollectionRe } public UpsertResp upsert(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, UpsertReq request) { - String title = String.format("UpsertRequest collectionName:%s", request.getCollectionName()); + String collectionName = request.getCollectionName(); + String title = String.format("UpsertRequest collectionName:%s", collectionName); // TODO: set the database name - DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", request.getCollectionName(), false); - UpsertRequest rpcRequest = buildUpsertRequest(request, descResp); + DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", collectionName, false); + + // To handle this bug: https://github.com/milvus-io/milvus/issues/41688 + // if the collection is already recreated, some schema might be changed, the buildUpsertRequest() + // could not convert the UpsertRequest with the old collectionDesc, we need to update the + // collectionDesc and call buildUpsertRequest() again. + UpsertRequest rpcRequest; + try { + rpcRequest = buildUpsertRequest(request, descResp); + } catch (Exception ignored) { + descResp = getCollectionInfo(blockingStub, "", collectionName, true); + rpcRequest = buildUpsertRequest(request, descResp); + } + + // If there are multiple clients, the client_A repeatedly do upsert, the client_B changes + // the collection schema. The server might return a special error code "SchemaMismatch". + // If the client_A gets this special error code, it needs to update the collectionDesc and + // call upsert() again. MutationResult response = blockingStub.upsert(rpcRequest); if (response.getStatus().getErrorCode() == io.milvus.grpc.ErrorCode.SchemaMismatch) { - descResp = getCollectionInfo(blockingStub, "", request.getCollectionName(), true); - rpcRequest = buildUpsertRequest(request, descResp); - response = blockingStub.upsert(rpcRequest); + getCollectionInfo(blockingStub, "", collectionName, true); + return this.upsert(blockingStub, request); } - cleanCacheIfFailed(response.getStatus(), "", request.getCollectionName()); + // if illegal data, server fails to process upsert, else succeed + cleanCacheIfFailed(response.getStatus(), "", collectionName); rpcUtils.handleResponse(title, response.getStatus()); - GTsDict.getInstance().updateCollectionTs(request.getCollectionName(), response.getTimestamp()); + GTsDict.getInstance().updateCollectionTs(collectionName, response.getTimestamp()); return UpsertResp.builder() - .upsertCnt(response.getInsertCnt()) + .upsertCnt(response.getUpsertCnt()) .build(); } diff --git a/sdk-core/src/test/java/io/milvus/TestUtils.java b/sdk-core/src/test/java/io/milvus/TestUtils.java index 4edf5d30c..deb56fcf9 100644 --- a/sdk-core/src/test/java/io/milvus/TestUtils.java +++ b/sdk-core/src/test/java/io/milvus/TestUtils.java @@ -11,7 +11,7 @@ public class TestUtils { private int dimension = 256; private static final Random RANDOM = new Random(); - public static final String MilvusDockerImageID = "milvusdb/milvus:master-20250610-9439eaef-amd64"; + public static final String MilvusDockerImageID = "milvusdb/milvus:master-20250706-d0976450"; public TestUtils(int dimension) { this.dimension = dimension; diff --git a/sdk-core/src/test/java/io/milvus/client/MilvusClientDockerTest.java b/sdk-core/src/test/java/io/milvus/client/MilvusClientDockerTest.java index 98262aebe..fd575ae86 100644 --- a/sdk-core/src/test/java/io/milvus/client/MilvusClientDockerTest.java +++ b/sdk-core/src/test/java/io/milvus/client/MilvusClientDockerTest.java @@ -68,6 +68,7 @@ class MilvusClientDockerTest { private static MilvusClient client; private static RandomStringGenerator generator; private static final int DIMENSION = 256; + private static final Random RANDOM = new Random(); private static final int ARRAY_CAPACITY = 100; private static final float FLOAT16_PRECISION = 0.001f; private static final float BFLOAT16_PRECISION = 0.01f; @@ -2866,73 +2867,125 @@ void testDatabase() { Assertions.assertEquals(R.Status.Success.getCode(), dropResponse.getStatus().intValue()); } - @Test - void testCacheCollectionSchema() { - String randomCollectionName = generator.generate(10); + private static void createSimpleCollection(String collName, String pkName, boolean autoID, int dimension) { + client.dropCollection(DropCollectionParam.newBuilder() + .withCollectionName(collName) + .build()); // collection schema List fieldsSchema = new ArrayList<>(); fieldsSchema.add(FieldType.newBuilder() .withPrimaryKey(true) - .withAutoID(true) + .withAutoID(autoID) .withDataType(DataType.Int64) - .withName("id") + .withName(pkName) .build()); fieldsSchema.add(FieldType.newBuilder() .withDataType(DataType.FloatVector) .withName("vector") - .withDimension(DIMENSION) + .withDimension(dimension) .build()); // create collection R createR = client.createCollection(CreateCollectionParam.newBuilder() - .withCollectionName(randomCollectionName) + .withCollectionName(collName) .withFieldTypes(fieldsSchema) .build()); Assertions.assertEquals(R.Status.Success.getCode(), createR.getStatus().intValue()); + } + + @Test + void testCacheCollectionSchema() { + String randomCollectionName = generator.generate(10); + + createSimpleCollection(randomCollectionName, "aaa", false, DIMENSION); - // insert + // insert/upsert correct data JsonObject row = new JsonObject(); + row.addProperty("aaa", 8); row.add("vector", JsonUtils.toJsonTree(utils.generateFloatVectors(1).get(0))); R insertR = client.insert(InsertParam.newBuilder() .withCollectionName(randomCollectionName) .withRows(Collections.singletonList(row)) .build()); Assertions.assertEquals(R.Status.Success.getCode(), insertR.getStatus().intValue()); + Assertions.assertEquals(1, insertR.getData().getInsertCnt()); - // drop collection - client.dropCollection(DropCollectionParam.newBuilder() + insertR = client.upsert(UpsertParam.newBuilder() .withCollectionName(randomCollectionName) + .withRows(Collections.singletonList(row)) .build()); + Assertions.assertEquals(R.Status.Success.getCode(), insertR.getStatus().intValue()); + Assertions.assertEquals(1, insertR.getData().getUpsertCnt()); - // create a new collection with the same name, different schema - fieldsSchema.add(FieldType.newBuilder() - .withDataType(DataType.VarChar) - .withName("title") - .withMaxLength(100) + // create a new collection with the same name, different dimension + createSimpleCollection(randomCollectionName, "aaa", false, 100); + + // insert/upsert wrong data, dimension mismatch + insertR = client.insert(InsertParam.newBuilder() + .withCollectionName(randomCollectionName) + .withRows(Collections.singletonList(row)) .build()); + Assertions.assertNotEquals(R.Status.Success.getCode(), insertR.getStatus().intValue()); - createR = client.createCollection(CreateCollectionParam.newBuilder() + insertR = client.upsert(UpsertParam.newBuilder() .withCollectionName(randomCollectionName) - .withFieldTypes(fieldsSchema) + .withRows(Collections.singletonList(row)) .build()); - Assertions.assertEquals(R.Status.Success.getCode(), createR.getStatus().intValue()); + Assertions.assertNotEquals(R.Status.Success.getCode(), insertR.getStatus().intValue()); + + // insert/upsert correct data + List vector = new ArrayList<>(); + for (int i = 0; i < 100; ++i) { + vector.add(RANDOM.nextFloat()); + } + row.add("vector", JsonUtils.toJsonTree(vector)); + insertR = client.insert(InsertParam.newBuilder() + .withCollectionName(randomCollectionName) + .withRows(Collections.singletonList(row)) + .build()); + Assertions.assertEquals(R.Status.Success.getCode(), insertR.getStatus().intValue()); + Assertions.assertEquals(1, insertR.getData().getInsertCnt()); + + insertR = client.upsert(UpsertParam.newBuilder() + .withCollectionName(randomCollectionName) + .withRows(Collections.singletonList(row)) + .build()); + Assertions.assertEquals(R.Status.Success.getCode(), insertR.getStatus().intValue()); + Assertions.assertEquals(1, insertR.getData().getUpsertCnt()); - // insert wrong data + // create a new collection with the same name, different primary key + createSimpleCollection(randomCollectionName, "bbb", false, 100); + + // insert/upsert wrong data, primary key name mismatch insertR = client.insert(InsertParam.newBuilder() .withCollectionName(randomCollectionName) .withRows(Collections.singletonList(row)) .build()); Assertions.assertNotEquals(R.Status.Success.getCode(), insertR.getStatus().intValue()); - // insert correct data - row.addProperty("title", "hello world"); + insertR = client.upsert(UpsertParam.newBuilder() + .withCollectionName(randomCollectionName) + .withRows(Collections.singletonList(row)) + .build()); + Assertions.assertNotEquals(R.Status.Success.getCode(), insertR.getStatus().intValue()); + + // insert/upsert correct data + row.addProperty("bbb", 5); insertR = client.insert(InsertParam.newBuilder() .withCollectionName(randomCollectionName) .withRows(Collections.singletonList(row)) .build()); Assertions.assertEquals(R.Status.Success.getCode(), insertR.getStatus().intValue()); + Assertions.assertEquals(1, insertR.getData().getInsertCnt()); + + insertR = client.upsert(UpsertParam.newBuilder() + .withCollectionName(randomCollectionName) + .withRows(Collections.singletonList(row)) + .build()); + Assertions.assertEquals(R.Status.Success.getCode(), insertR.getStatus().intValue()); + Assertions.assertEquals(1, insertR.getData().getUpsertCnt()); } @Test diff --git a/sdk-core/src/test/java/io/milvus/v2/client/MilvusClientV2DockerTest.java b/sdk-core/src/test/java/io/milvus/v2/client/MilvusClientV2DockerTest.java index 3bf71952a..ccb6e80d6 100644 --- a/sdk-core/src/test/java/io/milvus/v2/client/MilvusClientV2DockerTest.java +++ b/sdk-core/src/test/java/io/milvus/v2/client/MilvusClientV2DockerTest.java @@ -1479,18 +1479,29 @@ void testIndex() { Assertions.assertEquals("64", extraParams.get("efConstruction")); } + private static void createSimpleCollection(String collName, String pkName, boolean autoID, int dimension) { + client.dropCollection(DropCollectionReq.builder() + .collectionName(collName) + .build()); + + client.createCollection(CreateCollectionReq.builder() + .collectionName(collName) + .autoID(autoID) + .primaryFieldName(pkName) + .dimension(dimension) + .enableDynamicField(false) + .build()); + } + @Test void testCacheCollectionSchema() { String randomCollectionName = generator.generate(10); - client.createCollection(CreateCollectionReq.builder() - .collectionName(randomCollectionName) - .autoID(true) - .dimension(DIMENSION) - .build()); + createSimpleCollection(randomCollectionName, "aaa", false, DIMENSION); - // insert + // insert/upsert correct data JsonObject row = new JsonObject(); + row.addProperty("aaa", 8); row.add("vector", JsonUtils.toJsonTree(utils.generateFloatVectors(1).get(0))); InsertResp insertResp = client.insert(InsertReq.builder() .collectionName(randomCollectionName) @@ -1498,25 +1509,26 @@ void testCacheCollectionSchema() { .build()); Assertions.assertEquals(1L, insertResp.getInsertCnt()); - // drop collection - client.dropCollection(DropCollectionReq.builder() + UpsertResp upsertResp = client.upsert(UpsertReq.builder() .collectionName(randomCollectionName) + .data(Collections.singletonList(row)) .build()); + Assertions.assertEquals(1L, upsertResp.getUpsertCnt()); - // create a new collection with the same name, different schema - client.createCollection(CreateCollectionReq.builder() - .collectionName(randomCollectionName) - .autoID(true) - .dimension(100) - .build()); + // create a new collection with the same name, different dimension + createSimpleCollection(randomCollectionName, "aaa", false, 100); - // insert wrong data + // insert/upsert wrong data, dimension mismatch Assertions.assertThrows(MilvusClientException.class, ()->client.insert(InsertReq.builder() .collectionName(randomCollectionName) .data(Collections.singletonList(row)) .build())); + Assertions.assertThrows(MilvusClientException.class, ()->client.upsert(UpsertReq.builder() + .collectionName(randomCollectionName) + .data(Collections.singletonList(row)) + .build())); - // insert correct data + // insert/upsert correct data List vector = new ArrayList<>(); for (int i = 0; i < 100; ++i) { vector.add(RANDOM.nextFloat()); @@ -1527,6 +1539,39 @@ void testCacheCollectionSchema() { .data(Collections.singletonList(row)) .build()); Assertions.assertEquals(1L, insertResp.getInsertCnt()); + + upsertResp = client.upsert(UpsertReq.builder() + .collectionName(randomCollectionName) + .data(Collections.singletonList(row)) + .build()); + Assertions.assertEquals(1L, upsertResp.getUpsertCnt()); + + // create a new collection with the same name, different primary key + createSimpleCollection(randomCollectionName, "bbb", false, 100); + + // insert/upsert wrong data, primary key name mismatch + Assertions.assertThrows(MilvusClientException.class, ()->client.insert(InsertReq.builder() + .collectionName(randomCollectionName) + .data(Collections.singletonList(row)) + .build())); + Assertions.assertThrows(MilvusClientException.class, ()->client.upsert(UpsertReq.builder() + .collectionName(randomCollectionName) + .data(Collections.singletonList(row)) + .build())); + + // insert/upsert correct data + row.addProperty("bbb", 5); + insertResp = client.insert(InsertReq.builder() + .collectionName(randomCollectionName) + .data(Collections.singletonList(row)) + .build()); + Assertions.assertEquals(1L, insertResp.getInsertCnt()); + + upsertResp = client.upsert(UpsertReq.builder() + .collectionName(randomCollectionName) + .data(Collections.singletonList(row)) + .build()); + Assertions.assertEquals(1L, upsertResp.getUpsertCnt()); } @Test