Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ services:

standalone:
container_name: milvus-javasdk-test-standalone
image: milvusdb/milvus:v2.5.11
image: milvusdb/milvus:master-20250610-9439eaef-amd64
command: ["milvus", "run", "standalone"]
environment:
ETCD_ENDPOINTS: etcd:2379
Expand Down Expand Up @@ -77,7 +77,7 @@ services:

standaloneslave:
container_name: milvus-javasdk-test-slave-standalone
image: milvusdb/milvus:v2.5.11
image: milvusdb/milvus:master-20250610-9439eaef-amd64
command: ["milvus", "run", "standalone"]
environment:
ETCD_ENDPOINTS: etcdslave:2379
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,8 @@ private void waitForFlush(FlushResponse flushResponse, long waitingInterval, lon
// If waiting time exceed timeout, exist the circle
long tsBegin = System.currentTimeMillis();
Map<String, LongArray> collectionSegIDs = flushResponse.getCollSegIDsMap();
Map<String, Long> flushTsMap = flushResponse.getCollFlushTsMap();
String dbName = flushResponse.getDbName();
collectionSegIDs.forEach((collectionName, segmentIDs) -> {
while (segmentIDs.getDataCount() > 0) {
long tsNow = System.currentTimeMillis();
Expand All @@ -263,10 +265,15 @@ private void waitForFlush(FlushResponse flushResponse, long waitingInterval, lon
break;
}

GetFlushStateRequest getFlushStateRequest = GetFlushStateRequest.newBuilder()
GetFlushStateRequest.Builder builder = GetFlushStateRequest.newBuilder()
.addAllSegmentIDs(segmentIDs.getDataList())
.build();
GetFlushStateResponse response = blockingStub().getFlushState(getFlushStateRequest);
.setCollectionName(collectionName)
.setFlushTs(flushTsMap.get(collectionName));
if (StringUtils.isNotEmpty(dbName)) {
builder.setDbName(dbName);
}

GetFlushStateResponse response = blockingStub().getFlushState(builder.build());
if (response.getFlushed()) {
// if all segment of this collection has been flushed, break this circle and check next collection
String msg = segmentIDs.getDataCount() + " segments of " + collectionName + " has been flushed";
Expand Down
10 changes: 9 additions & 1 deletion sdk-core/src/main/java/io/milvus/v2/client/MilvusClientV2.java
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,14 @@ public void alterCollection(AlterCollectionReq request) {
public void alterCollectionProperties(AlterCollectionPropertiesReq request) {
rpcUtils.retry(()-> collectionService.alterCollectionProperties(this.getRpcStub(), request));
}
/**
* Add a new field to collection.
*
* @param request add new field request
*/
public void addCollectionField(AddCollectionFieldReq request) {
rpcUtils.retry(()-> collectionService.addCollectionField(this.getRpcStub(), request));
}
/**
* Alter a field's properties.
*
Expand Down Expand Up @@ -920,7 +928,7 @@ public void flush(FlushReq request) {
if (request.getWaitFlushedTimeoutMs() > 0L) {
tempBlockingStub = tempBlockingStub.withDeadlineAfter(request.getWaitFlushedTimeoutMs(), TimeUnit.MILLISECONDS);
}
utilityService.waitFlush(tempBlockingStub, response.getCollectionSegmentIDs(), response.getCollectionFlushTs());
utilityService.waitFlush(tempBlockingStub, response);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,24 @@ public Void alterCollectionProperties(MilvusServiceGrpc.MilvusServiceBlockingStu
return null;
}

public Void addCollectionField(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, AddCollectionFieldReq request) {
String title = String.format("AddCollectionFieldReq fieldName:%s", request.getFieldName());
AddCollectionFieldRequest.Builder builder = AddCollectionFieldRequest.newBuilder()
.setCollectionName(request.getCollectionName());
if (StringUtils.isNotEmpty(request.getDatabaseName())) {
builder.setDbName(request.getDatabaseName());
}

CreateCollectionReq.FieldSchema fieldSchema = SchemaUtils.convertFieldReqToFieldSchema(request);
FieldSchema grpcFieldSchema = SchemaUtils.convertToGrpcFieldSchema(fieldSchema);
builder.setSchema(grpcFieldSchema.toByteString());

Status response = blockingStub.addCollectionField(builder.build());
rpcUtils.handleResponse(title, response);

return null;
}

public Void alterCollectionField(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, AlterCollectionFieldReq request) {
String title = String.format("AlterCollectionFieldReq collectionName:%s", request.getCollectionName());
AlterCollectionFieldRequest.Builder builder = AlterCollectionFieldRequest.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.milvus.v2.service.collection.request;

import lombok.Data;
import lombok.experimental.SuperBuilder;

@Data
@SuperBuilder
public class AddCollectionFieldReq extends AddFieldReq{
private String collectionName;
private String databaseName;
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
package io.milvus.v2.service.collection.request;

import io.milvus.common.clientenum.FunctionType;
import io.milvus.param.ParamUtils;
import io.milvus.v2.common.ConsistencyLevel;
import io.milvus.v2.common.DataType;
import io.milvus.v2.common.IndexParam;
import io.milvus.v2.exception.ErrorCode;
import io.milvus.v2.exception.MilvusClientException;
import io.milvus.v2.utils.SchemaUtils;
import lombok.Builder;
import lombok.Data;
import lombok.NonNull;
Expand Down Expand Up @@ -136,44 +136,7 @@ public static class CollectionSchema {
private List<CreateCollectionReq.Function> functionList = new ArrayList<>();

public CollectionSchema addField(AddFieldReq addFieldReq) {
// check the input here to pop error messages earlier
if (addFieldReq.isEnableDefaultValue() && addFieldReq.getDefaultValue() == null
&& addFieldReq.getIsNullable() == Boolean.FALSE) {
String msg = String.format("Default value cannot be null for field '%s' that is defined as nullable == false.", addFieldReq.getFieldName());
throw new MilvusClientException(ErrorCode.INVALID_PARAMS, msg);
}

CreateCollectionReq.FieldSchema fieldSchema = FieldSchema.builder()
.name(addFieldReq.getFieldName())
.dataType(addFieldReq.getDataType())
.description(addFieldReq.getDescription())
.isPrimaryKey(addFieldReq.getIsPrimaryKey())
.isPartitionKey(addFieldReq.getIsPartitionKey())
.isClusteringKey(addFieldReq.getIsClusteringKey())
.autoID(addFieldReq.getAutoID())
.isNullable(addFieldReq.getIsNullable())
.defaultValue(addFieldReq.getDefaultValue())
.enableAnalyzer(addFieldReq.getEnableAnalyzer())
.enableMatch(addFieldReq.getEnableMatch())
.analyzerParams(addFieldReq.getAnalyzerParams())
.typeParams(addFieldReq.getTypeParams())
.multiAnalyzerParams(addFieldReq.getMultiAnalyzerParams())
.build();
if (addFieldReq.getDataType().equals(DataType.Array)) {
if (addFieldReq.getElementType() == null) {
throw new MilvusClientException(ErrorCode.INVALID_PARAMS, "Element type, maxCapacity are required for array field");
}
fieldSchema.setElementType(addFieldReq.getElementType());
fieldSchema.setMaxCapacity(addFieldReq.getMaxCapacity());
} else if (addFieldReq.getDataType().equals(DataType.VarChar)) {
fieldSchema.setMaxLength(addFieldReq.getMaxLength());
} else if (ParamUtils.isDenseVectorDataType(io.milvus.grpc.DataType.valueOf(addFieldReq.getDataType().name()))) {
if (addFieldReq.getDimension() == null) {
throw new MilvusClientException(ErrorCode.INVALID_PARAMS, "Dimension is required for vector field");
}
fieldSchema.setDimension(addFieldReq.getDimension());
}
fieldSchemaList.add(fieldSchema);
fieldSchemaList.add(SchemaUtils.convertFieldReqToFieldSchema(addFieldReq));
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,21 +52,23 @@ public FlushResp flush(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
});
Map<String, Long> collectionFlushTs = response.getCollFlushTsMap();
return FlushResp.builder()
.databaseName(response.getDbName())
.collectionSegmentIDs(collectionSegmentIDs)
.collectionFlushTs(collectionFlushTs)
.build();
}

// this method is internal use, not expose to user
public Void waitFlush(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
Map<String, List<Long>> collectionSegmentIDs,
Map<String, Long> collectionFlushTs) {
public Void waitFlush(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, FlushResp flushResp) {
Map<String, List<Long>> collectionSegmentIDs = flushResp.getCollectionSegmentIDs();
Map<String, Long> collectionFlushTs = flushResp.getCollectionFlushTs();
collectionSegmentIDs.forEach((collectionName, segmentIDs)->{
if (collectionFlushTs.containsKey(collectionName)) {
Long flushTs = collectionFlushTs.get(collectionName);
boolean flushed = false;
while (!flushed) {
GetFlushStateResponse flushResponse = blockingStub.getFlushState(GetFlushStateRequest.newBuilder()
.setDbName(flushResp.getDatabaseName())
.addAllSegmentIDs(segmentIDs)
.setFlushTs(flushTs)
.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
@Data
@SuperBuilder
public class FlushResp {
@Builder.Default
String databaseName = "";
@Builder.Default
Map<String, List<Long>> collectionSegmentIDs = new HashMap<>();
@Builder.Default
Expand Down
43 changes: 43 additions & 0 deletions sdk-core/src/main/java/io/milvus/v2/utils/SchemaUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.milvus.param.ParamUtils;
import io.milvus.v2.exception.ErrorCode;
import io.milvus.v2.exception.MilvusClientException;
import io.milvus.v2.service.collection.request.AddFieldReq;
import io.milvus.v2.service.collection.request.CreateCollectionReq;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -215,4 +216,46 @@ public static CreateCollectionReq.Function convertFromGrpcFunction(FunctionSchem
.build();
return function;
}

public static CreateCollectionReq.FieldSchema convertFieldReqToFieldSchema(AddFieldReq addFieldReq) {
// check the input here to pop error messages earlier
if (addFieldReq.isEnableDefaultValue() && addFieldReq.getDefaultValue() == null
&& addFieldReq.getIsNullable() == Boolean.FALSE) {
String msg = String.format("Default value cannot be null for field '%s' that is defined as nullable == false.", addFieldReq.getFieldName());
throw new MilvusClientException(ErrorCode.INVALID_PARAMS, msg);
}

CreateCollectionReq.FieldSchema fieldSchema = CreateCollectionReq.FieldSchema.builder()
.name(addFieldReq.getFieldName())
.dataType(addFieldReq.getDataType())
.description(addFieldReq.getDescription())
.isPrimaryKey(addFieldReq.getIsPrimaryKey())
.isPartitionKey(addFieldReq.getIsPartitionKey())
.isClusteringKey(addFieldReq.getIsClusteringKey())
.autoID(addFieldReq.getAutoID())
.isNullable(addFieldReq.getIsNullable())
.defaultValue(addFieldReq.getDefaultValue())
.enableAnalyzer(addFieldReq.getEnableAnalyzer())
.enableMatch(addFieldReq.getEnableMatch())
.analyzerParams(addFieldReq.getAnalyzerParams())
.typeParams(addFieldReq.getTypeParams())
.multiAnalyzerParams(addFieldReq.getMultiAnalyzerParams())
.build();
if (addFieldReq.getDataType().equals(io.milvus.v2.common.DataType.Array)) {
if (addFieldReq.getElementType() == null) {
throw new MilvusClientException(ErrorCode.INVALID_PARAMS, "Element type, maxCapacity are required for array field");
}
fieldSchema.setElementType(addFieldReq.getElementType());
fieldSchema.setMaxCapacity(addFieldReq.getMaxCapacity());
} else if (addFieldReq.getDataType().equals(io.milvus.v2.common.DataType.VarChar)) {
fieldSchema.setMaxLength(addFieldReq.getMaxLength());
} else if (ParamUtils.isDenseVectorDataType(io.milvus.grpc.DataType.valueOf(addFieldReq.getDataType().name()))) {
if (addFieldReq.getDimension() == null) {
throw new MilvusClientException(ErrorCode.INVALID_PARAMS, "Dimension is required for vector field");
}
fieldSchema.setDimension(addFieldReq.getDimension());
}

return fieldSchema;
}
}
2 changes: 1 addition & 1 deletion sdk-core/src/test/java/io/milvus/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class TestUtils {
private int dimension = 256;
private static final Random RANDOM = new Random();

public static final String MilvusDockerImageID = "milvusdb/milvus:v2.5.11";
public static final String MilvusDockerImageID = "milvusdb/milvus:master-20250610-9439eaef-amd64";

public TestUtils(int dimension) {
this.dimension = dimension;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,16 @@ class MilvusClientDockerTest {
private static final TestUtils utils = new TestUtils(DIMENSION);

@Container
private static final MilvusContainer milvus = new MilvusContainer(TestUtils.MilvusDockerImageID);
private static final MilvusContainer milvus = new MilvusContainer(TestUtils.MilvusDockerImageID)
.withEnv("DEPLOY_MODE", "STANDALONE");

@BeforeAll
public static void setUp() {
try {
Thread.sleep(3000); // Sleep for few seconds since the master branch milvus healthz check is bug
} catch (InterruptedException ignored) {
}

ConnectParam connectParam = connectParamBuilder()
.withAuthorization("root", "Milvus")
.build();
Expand Down Expand Up @@ -2021,6 +2027,7 @@ void testDynamicField() {
for (int i = 0; i < targetVectors.size(); ++i) {
List<SearchResultsWrapper.IDScore> scores = results.getIDScore(i);
System.out.println("The result of No." + i + " target vector:");
Assertions.assertFalse(scores.isEmpty());
SearchResultsWrapper.IDScore score = scores.get(0);
System.out.println(score);
Object extraMeta = score.get("dynamic");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,7 @@ void getCollectionStatistics() {
final long segmentID = 2021L;
mockServerImpl.setFlushResponse(FlushResponse.newBuilder()
.putCollSegIDs(collectionName, LongArray.newBuilder().addData(segmentID).build())
.putCollFlushTs(collectionName, 200L)
.build());
mockServerImpl.setGetFlushStateResponse(GetFlushStateResponse.newBuilder()
.setFlushed(false)
Expand Down
Loading
Loading