Skip to content

Commit d31fbe0

Browse files
committed
Support addCollectionField()
Signed-off-by: yhmo <yihua.mo@zilliz.com>
1 parent 50a5560 commit d31fbe0

13 files changed

Lines changed: 317 additions & 198 deletions

File tree

docker-compose.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ services:
3232

3333
standalone:
3434
container_name: milvus-javasdk-test-standalone
35-
image: milvusdb/milvus:v2.5.11
35+
image: milvusdb/milvus:master-20250610-9439eaef-amd64
3636
command: ["milvus", "run", "standalone"]
3737
environment:
3838
ETCD_ENDPOINTS: etcd:2379
@@ -77,7 +77,7 @@ services:
7777

7878
standaloneslave:
7979
container_name: milvus-javasdk-test-slave-standalone
80-
image: milvusdb/milvus:v2.5.11
80+
image: milvusdb/milvus:master-20250610-9439eaef-amd64
8181
command: ["milvus", "run", "standalone"]
8282
environment:
8383
ETCD_ENDPOINTS: etcdslave:2379

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@
102102
<gson.version>2.13.1</gson.version>
103103
<kotlin.version>1.9.10</kotlin.version>
104104
<mockito.version>4.11.0</mockito.version>
105-
<testcontainers.version>1.19.8</testcontainers.version>
105+
<testcontainers.version>1.21.1</testcontainers.version>
106106
<apache.commons.pool2.version>2.12.0</apache.commons.pool2.version>
107107
<guava.version>32.1.3-jre</guava.version>
108108
<errorprone.version>2.38.0</errorprone.version>

sdk-core/src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,7 @@ private void waitForFlush(FlushResponse flushResponse, long waitingInterval, lon
255255
// If waiting time exceed timeout, exist the circle
256256
long tsBegin = System.currentTimeMillis();
257257
Map<String, LongArray> collectionSegIDs = flushResponse.getCollSegIDsMap();
258+
String dbName = flushResponse.getDbName();
258259
collectionSegIDs.forEach((collectionName, segmentIDs) -> {
259260
while (segmentIDs.getDataCount() > 0) {
260261
long tsNow = System.currentTimeMillis();
@@ -265,6 +266,8 @@ private void waitForFlush(FlushResponse flushResponse, long waitingInterval, lon
265266

266267
GetFlushStateRequest getFlushStateRequest = GetFlushStateRequest.newBuilder()
267268
.addAllSegmentIDs(segmentIDs.getDataList())
269+
.setCollectionName(collectionName)
270+
.setDbName(dbName)
268271
.build();
269272
GetFlushStateResponse response = blockingStub().getFlushState(getFlushStateRequest);
270273
if (response.getFlushed()) {

sdk-core/src/main/java/io/milvus/v2/client/MilvusClientV2.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,14 @@ public void alterCollection(AlterCollectionReq request) {
303303
public void alterCollectionProperties(AlterCollectionPropertiesReq request) {
304304
rpcUtils.retry(()-> collectionService.alterCollectionProperties(this.getRpcStub(), request));
305305
}
306+
/**
307+
* Add a new field to collection.
308+
*
309+
* @param request add new field request
310+
*/
311+
public void addCollectionField(AddCollectionFieldReq request) {
312+
rpcUtils.retry(()-> collectionService.addCollectionField(this.getRpcStub(), request));
313+
}
306314
/**
307315
* Alter a field's properties.
308316
*
@@ -920,7 +928,7 @@ public void flush(FlushReq request) {
920928
if (request.getWaitFlushedTimeoutMs() > 0L) {
921929
tempBlockingStub = tempBlockingStub.withDeadlineAfter(request.getWaitFlushedTimeoutMs(), TimeUnit.MILLISECONDS);
922930
}
923-
utilityService.waitFlush(tempBlockingStub, response.getCollectionSegmentIDs(), response.getCollectionFlushTs());
931+
utilityService.waitFlush(tempBlockingStub, response);
924932
}
925933

926934
/**

sdk-core/src/main/java/io/milvus/v2/service/collection/CollectionService.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,24 @@ public Void alterCollectionProperties(MilvusServiceGrpc.MilvusServiceBlockingStu
216216
return null;
217217
}
218218

219+
public Void addCollectionField(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, AddCollectionFieldReq request) {
220+
String title = String.format("AddCollectionFieldReq fieldName:%s", request.getFieldName());
221+
AddCollectionFieldRequest.Builder builder = AddCollectionFieldRequest.newBuilder()
222+
.setCollectionName(request.getCollectionName());
223+
if (StringUtils.isNotEmpty(request.getDatabaseName())) {
224+
builder.setDbName(request.getDatabaseName());
225+
}
226+
227+
CreateCollectionReq.FieldSchema fieldSchema = SchemaUtils.convertFieldReqToFieldSchema(request);
228+
FieldSchema grpcFieldSchema = SchemaUtils.convertToGrpcFieldSchema(fieldSchema);
229+
builder.setSchema(grpcFieldSchema.toByteString());
230+
231+
Status response = blockingStub.addCollectionField(builder.build());
232+
rpcUtils.handleResponse(title, response);
233+
234+
return null;
235+
}
236+
219237
public Void alterCollectionField(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, AlterCollectionFieldReq request) {
220238
String title = String.format("AlterCollectionFieldReq collectionName:%s", request.getCollectionName());
221239
AlterCollectionFieldRequest.Builder builder = AlterCollectionFieldRequest.newBuilder()
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package io.milvus.v2.service.collection.request;
21+
22+
import lombok.Data;
23+
import lombok.experimental.SuperBuilder;
24+
25+
@Data
26+
@SuperBuilder
27+
public class AddCollectionFieldReq extends AddFieldReq{
28+
private String collectionName;
29+
private String databaseName;
30+
}

sdk-core/src/main/java/io/milvus/v2/service/collection/request/CreateCollectionReq.java

Lines changed: 2 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@
2020
package io.milvus.v2.service.collection.request;
2121

2222
import io.milvus.common.clientenum.FunctionType;
23-
import io.milvus.param.ParamUtils;
2423
import io.milvus.v2.common.ConsistencyLevel;
2524
import io.milvus.v2.common.DataType;
2625
import io.milvus.v2.common.IndexParam;
2726
import io.milvus.v2.exception.ErrorCode;
2827
import io.milvus.v2.exception.MilvusClientException;
28+
import io.milvus.v2.utils.SchemaUtils;
2929
import lombok.Builder;
3030
import lombok.Data;
3131
import lombok.NonNull;
@@ -136,44 +136,7 @@ public static class CollectionSchema {
136136
private List<CreateCollectionReq.Function> functionList = new ArrayList<>();
137137

138138
public CollectionSchema addField(AddFieldReq addFieldReq) {
139-
// check the input here to pop error messages earlier
140-
if (addFieldReq.isEnableDefaultValue() && addFieldReq.getDefaultValue() == null
141-
&& addFieldReq.getIsNullable() == Boolean.FALSE) {
142-
String msg = String.format("Default value cannot be null for field '%s' that is defined as nullable == false.", addFieldReq.getFieldName());
143-
throw new MilvusClientException(ErrorCode.INVALID_PARAMS, msg);
144-
}
145-
146-
CreateCollectionReq.FieldSchema fieldSchema = FieldSchema.builder()
147-
.name(addFieldReq.getFieldName())
148-
.dataType(addFieldReq.getDataType())
149-
.description(addFieldReq.getDescription())
150-
.isPrimaryKey(addFieldReq.getIsPrimaryKey())
151-
.isPartitionKey(addFieldReq.getIsPartitionKey())
152-
.isClusteringKey(addFieldReq.getIsClusteringKey())
153-
.autoID(addFieldReq.getAutoID())
154-
.isNullable(addFieldReq.getIsNullable())
155-
.defaultValue(addFieldReq.getDefaultValue())
156-
.enableAnalyzer(addFieldReq.getEnableAnalyzer())
157-
.enableMatch(addFieldReq.getEnableMatch())
158-
.analyzerParams(addFieldReq.getAnalyzerParams())
159-
.typeParams(addFieldReq.getTypeParams())
160-
.multiAnalyzerParams(addFieldReq.getMultiAnalyzerParams())
161-
.build();
162-
if (addFieldReq.getDataType().equals(DataType.Array)) {
163-
if (addFieldReq.getElementType() == null) {
164-
throw new MilvusClientException(ErrorCode.INVALID_PARAMS, "Element type, maxCapacity are required for array field");
165-
}
166-
fieldSchema.setElementType(addFieldReq.getElementType());
167-
fieldSchema.setMaxCapacity(addFieldReq.getMaxCapacity());
168-
} else if (addFieldReq.getDataType().equals(DataType.VarChar)) {
169-
fieldSchema.setMaxLength(addFieldReq.getMaxLength());
170-
} else if (ParamUtils.isDenseVectorDataType(io.milvus.grpc.DataType.valueOf(addFieldReq.getDataType().name()))) {
171-
if (addFieldReq.getDimension() == null) {
172-
throw new MilvusClientException(ErrorCode.INVALID_PARAMS, "Dimension is required for vector field");
173-
}
174-
fieldSchema.setDimension(addFieldReq.getDimension());
175-
}
176-
fieldSchemaList.add(fieldSchema);
139+
fieldSchemaList.add(SchemaUtils.convertFieldReqToFieldSchema(addFieldReq));
177140
return this;
178141
}
179142

sdk-core/src/main/java/io/milvus/v2/service/utility/UtilityService.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,21 +52,23 @@ public FlushResp flush(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
5252
});
5353
Map<String, Long> collectionFlushTs = response.getCollFlushTsMap();
5454
return FlushResp.builder()
55+
.databaseName(response.getDbName())
5556
.collectionSegmentIDs(collectionSegmentIDs)
5657
.collectionFlushTs(collectionFlushTs)
5758
.build();
5859
}
5960

6061
// this method is internal use, not expose to user
61-
public Void waitFlush(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
62-
Map<String, List<Long>> collectionSegmentIDs,
63-
Map<String, Long> collectionFlushTs) {
62+
public Void waitFlush(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, FlushResp flushResp) {
63+
Map<String, List<Long>> collectionSegmentIDs = flushResp.getCollectionSegmentIDs();
64+
Map<String, Long> collectionFlushTs = flushResp.getCollectionFlushTs();
6465
collectionSegmentIDs.forEach((collectionName, segmentIDs)->{
6566
if (collectionFlushTs.containsKey(collectionName)) {
6667
Long flushTs = collectionFlushTs.get(collectionName);
6768
boolean flushed = false;
6869
while (!flushed) {
6970
GetFlushStateResponse flushResponse = blockingStub.getFlushState(GetFlushStateRequest.newBuilder()
71+
.setDbName(flushResp.getDatabaseName())
7072
.addAllSegmentIDs(segmentIDs)
7173
.setFlushTs(flushTs)
7274
.build());

sdk-core/src/main/java/io/milvus/v2/service/utility/response/FlushResp.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
@Data
2929
@SuperBuilder
3030
public class FlushResp {
31+
@Builder.Default
32+
String databaseName = "";
3133
@Builder.Default
3234
Map<String, List<Long>> collectionSegmentIDs = new HashMap<>();
3335
@Builder.Default

sdk-core/src/main/java/io/milvus/v2/utils/SchemaUtils.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import io.milvus.param.ParamUtils;
3232
import io.milvus.v2.exception.ErrorCode;
3333
import io.milvus.v2.exception.MilvusClientException;
34+
import io.milvus.v2.service.collection.request.AddFieldReq;
3435
import io.milvus.v2.service.collection.request.CreateCollectionReq;
3536
import org.apache.commons.collections4.CollectionUtils;
3637
import org.apache.commons.lang3.StringUtils;
@@ -215,4 +216,46 @@ public static CreateCollectionReq.Function convertFromGrpcFunction(FunctionSchem
215216
.build();
216217
return function;
217218
}
219+
220+
public static CreateCollectionReq.FieldSchema convertFieldReqToFieldSchema(AddFieldReq addFieldReq) {
221+
// check the input here to pop error messages earlier
222+
if (addFieldReq.isEnableDefaultValue() && addFieldReq.getDefaultValue() == null
223+
&& addFieldReq.getIsNullable() == Boolean.FALSE) {
224+
String msg = String.format("Default value cannot be null for field '%s' that is defined as nullable == false.", addFieldReq.getFieldName());
225+
throw new MilvusClientException(ErrorCode.INVALID_PARAMS, msg);
226+
}
227+
228+
CreateCollectionReq.FieldSchema fieldSchema = CreateCollectionReq.FieldSchema.builder()
229+
.name(addFieldReq.getFieldName())
230+
.dataType(addFieldReq.getDataType())
231+
.description(addFieldReq.getDescription())
232+
.isPrimaryKey(addFieldReq.getIsPrimaryKey())
233+
.isPartitionKey(addFieldReq.getIsPartitionKey())
234+
.isClusteringKey(addFieldReq.getIsClusteringKey())
235+
.autoID(addFieldReq.getAutoID())
236+
.isNullable(addFieldReq.getIsNullable())
237+
.defaultValue(addFieldReq.getDefaultValue())
238+
.enableAnalyzer(addFieldReq.getEnableAnalyzer())
239+
.enableMatch(addFieldReq.getEnableMatch())
240+
.analyzerParams(addFieldReq.getAnalyzerParams())
241+
.typeParams(addFieldReq.getTypeParams())
242+
.multiAnalyzerParams(addFieldReq.getMultiAnalyzerParams())
243+
.build();
244+
if (addFieldReq.getDataType().equals(io.milvus.v2.common.DataType.Array)) {
245+
if (addFieldReq.getElementType() == null) {
246+
throw new MilvusClientException(ErrorCode.INVALID_PARAMS, "Element type, maxCapacity are required for array field");
247+
}
248+
fieldSchema.setElementType(addFieldReq.getElementType());
249+
fieldSchema.setMaxCapacity(addFieldReq.getMaxCapacity());
250+
} else if (addFieldReq.getDataType().equals(io.milvus.v2.common.DataType.VarChar)) {
251+
fieldSchema.setMaxLength(addFieldReq.getMaxLength());
252+
} else if (ParamUtils.isDenseVectorDataType(io.milvus.grpc.DataType.valueOf(addFieldReq.getDataType().name()))) {
253+
if (addFieldReq.getDimension() == null) {
254+
throw new MilvusClientException(ErrorCode.INVALID_PARAMS, "Dimension is required for vector field");
255+
}
256+
fieldSchema.setDimension(addFieldReq.getDimension());
257+
}
258+
259+
return fieldSchema;
260+
}
218261
}

0 commit comments

Comments
 (0)