Skip to content

Commit e276509

Browse files
committed
Add sync parameter for loadCollection/loadPartitions/createIndex
Signed-off-by: yhmo <yihua.mo@zilliz.com>
1 parent 9e35eb9 commit e276509

14 files changed

Lines changed: 219 additions & 88 deletions

File tree

sdk-core/src/main/java/io/milvus/v2/service/collection/CollectionService.java

Lines changed: 27 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -98,12 +98,16 @@ public Void createCollection(MilvusServiceGrpc.MilvusServiceBlockingStub blockin
9898
CreateIndexReq createIndexReq = CreateIndexReq.builder()
9999
.indexParams(Collections.singletonList(indexParam))
100100
.collectionName(request.getCollectionName())
101+
.sync(false)
101102
.build();
102103
indexService.createIndex(blockingStub, createIndexReq);
103104
//load collection, set async to true since no need to wait loading progress
104105
try {
105106
//TimeUnit.MILLISECONDS.sleep(1000);
106-
loadCollection(blockingStub, LoadCollectionReq.builder().async(true).collectionName(request.getCollectionName()).build());
107+
loadCollection(blockingStub, LoadCollectionReq.builder()
108+
.sync(false)
109+
.collectionName(request.getCollectionName())
110+
.build());
107111
} catch (Exception e) {
108112
throw new MilvusClientException(ErrorCode.SERVER_ERROR, "Load collection failed: " + e);
109113
}
@@ -157,11 +161,15 @@ public Void createCollectionWithSchema(MilvusServiceGrpc.MilvusServiceBlockingSt
157161
CreateIndexReq createIndexReq = CreateIndexReq.builder()
158162
.indexParams(Collections.singletonList(indexParam))
159163
.collectionName(request.getCollectionName())
164+
.sync(false)
160165
.build();
161166
indexService.createIndex(blockingStub, createIndexReq);
162167
}
163168
//load collection, set async to true since no need to wait loading progress
164-
loadCollection(blockingStub, LoadCollectionReq.builder().async(true).collectionName(request.getCollectionName()).build());
169+
loadCollection(blockingStub, LoadCollectionReq.builder()
170+
.sync(false)
171+
.collectionName(request.getCollectionName())
172+
.build());
165173
}
166174

167175
return null;
@@ -187,10 +195,6 @@ public Void dropCollection(MilvusServiceGrpc.MilvusServiceBlockingStub blockingS
187195
Status status = blockingStub.dropCollection(dropCollectionRequest);
188196
rpcUtils.handleResponse(title, status);
189197

190-
if (request.getAsync()) {
191-
WaitForDropCollection(blockingStub, request);
192-
}
193-
194198
return null;
195199
}
196200

@@ -289,7 +293,7 @@ public Void loadCollection(MilvusServiceGrpc.MilvusServiceBlockingStub blockingS
289293
.build();
290294
Status status = blockingStub.loadCollection(loadCollectionRequest);
291295
rpcUtils.handleResponse(title, status);
292-
if (!request.getAsync()) {
296+
if (request.getSync()) {
293297
WaitForLoadCollection(blockingStub, request.getCollectionName(), request.getTimeout());
294298
}
295299

@@ -304,7 +308,7 @@ public Void refreshLoad(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub
304308
.build();
305309
Status status = blockingStub.loadCollection(loadCollectionRequest);
306310
rpcUtils.handleResponse(title, status);
307-
if (request.getAsync()) {
311+
if (request.getSync()) {
308312
WaitForLoadCollection(blockingStub, request.getCollectionName(), request.getTimeout());
309313
}
310314

@@ -318,9 +322,6 @@ public Void releaseCollection(MilvusServiceGrpc.MilvusServiceBlockingStub blocki
318322
.build();
319323
Status status = blockingStub.releaseCollection(releaseCollectionRequest);
320324
rpcUtils.handleResponse(title, status);
321-
if (request.getAsync()) {
322-
waitForCollectionRelease(blockingStub, request);
323-
}
324325

325326
return null;
326327
}
@@ -407,75 +408,28 @@ public DescribeReplicasResp describeReplicas(MilvusServiceGrpc.MilvusServiceBloc
407408
.build();
408409
}
409410

410-
public void waitForCollectionRelease(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, ReleaseCollectionReq request) {
411-
boolean isLoaded = true;
412-
long startTime = System.currentTimeMillis(); // Capture start time/ Timeout in milliseconds (60 seconds)
413-
414-
while (isLoaded) {
415-
// Call the getLoadState method
416-
isLoaded = getLoadState(blockingStub, GetLoadStateReq.builder().collectionName(request.getCollectionName()).build());
417-
if (isLoaded) {
418-
// Check if timeout is exceeded
419-
if (System.currentTimeMillis() - startTime > request.getTimeout()) {
420-
throw new MilvusClientException(ErrorCode.SERVER_ERROR, "Load collection timeout");
421-
}
422-
// Wait for a certain period before checking again
423-
try {
424-
Thread.sleep(500); // Sleep for 0.5 second. Adjust this value as needed.
425-
} catch (InterruptedException e) {
426-
Thread.currentThread().interrupt();
427-
System.out.println("Thread was interrupted, Failed to complete operation");
428-
return; // or handle interruption appropriately
429-
}
430-
}
431-
}
432-
}
433-
434411
private void WaitForLoadCollection(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
435412
String collectionName, long timeoutMs) {
436-
boolean isLoaded = false;
437413
long startTime = System.currentTimeMillis(); // Capture start time/ Timeout in milliseconds (60 seconds)
438414

439-
while (!isLoaded) {
415+
while (true) {
440416
// Call the getLoadState method
441-
isLoaded = getLoadState(blockingStub, GetLoadStateReq.builder().collectionName(collectionName).build());
442-
if (!isLoaded) {
443-
// Check if timeout is exceeded
444-
if (System.currentTimeMillis() - startTime > timeoutMs) {
445-
throw new MilvusClientException(ErrorCode.SERVER_ERROR, "Load collection timeout");
446-
}
447-
// Wait for a certain period before checking again
448-
try {
449-
Thread.sleep(500); // Sleep for 0.5 second. Adjust this value as needed.
450-
} catch (InterruptedException e) {
451-
Thread.currentThread().interrupt();
452-
System.out.println("Thread was interrupted, Failed to complete operation");
453-
return; // or handle interruption appropriately
454-
}
417+
boolean isLoaded = getLoadState(blockingStub, GetLoadStateReq.builder().collectionName(collectionName).build());
418+
if (isLoaded) {
419+
return;
455420
}
456-
}
457-
}
458-
459-
private void WaitForDropCollection(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, DropCollectionReq request) {
460-
boolean hasCollection = true;
461-
long startTime = System.currentTimeMillis(); // Capture start time/ Timeout in milliseconds (60 seconds)
462421

463-
while (hasCollection) {
464-
// Call the getLoadState method
465-
hasCollection = hasCollection(blockingStub, HasCollectionReq.builder().collectionName(request.getCollectionName()).build());
466-
if (hasCollection) {
467-
// Check if timeout is exceeded
468-
if (System.currentTimeMillis() - startTime > request.getTimeout()) {
469-
throw new MilvusClientException(ErrorCode.SERVER_ERROR, "drop collection timeout");
470-
}
471-
// Wait for a certain period before checking again
472-
try {
473-
Thread.sleep(500); // Sleep for 0.5 second. Adjust this value as needed.
474-
} catch (InterruptedException e) {
475-
Thread.currentThread().interrupt();
476-
System.out.println("Thread was interrupted, Failed to complete operation");
477-
return; // or handle interruption appropriately
478-
}
422+
// Check if timeout is exceeded
423+
if (System.currentTimeMillis() - startTime > timeoutMs) {
424+
throw new MilvusClientException(ErrorCode.SERVER_ERROR, "Load collection timeout");
425+
}
426+
// Wait for a certain period before checking again
427+
try {
428+
Thread.sleep(500); // Sleep for 0.5 second. Adjust this value as needed.
429+
} catch (InterruptedException e) {
430+
Thread.currentThread().interrupt();
431+
System.out.println("Thread was interrupted, Failed to complete operation");
432+
return; // or handle interruption appropriately
479433
}
480434
}
481435
}

sdk-core/src/main/java/io/milvus/v2/service/collection/request/DropCollectionReq.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
@SuperBuilder
2828
public class DropCollectionReq {
2929
private String collectionName;
30+
@Deprecated
3031
@Builder.Default
3132
private Boolean async = Boolean.TRUE;
3233
@Builder.Default

sdk-core/src/main/java/io/milvus/v2/service/collection/request/LoadCollectionReq.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,13 @@ public class LoadCollectionReq {
3232
private String collectionName;
3333
@Builder.Default
3434
private Integer numReplicas = 1;
35+
@Deprecated
3536
@Builder.Default
3637
private Boolean async = Boolean.FALSE;
3738
@Builder.Default
38-
private Long timeout = 60000L;
39+
private Boolean sync = Boolean.TRUE; // wait the collection to be fully loaded. "async" is deprecated, use "sync" instead
40+
@Builder.Default
41+
private Long timeout = 60000L; // timeout value for waiting the collection to be fully loaded
3942
@Builder.Default
4043
private Boolean refresh = Boolean.FALSE;
4144
@Builder.Default
@@ -44,4 +47,22 @@ public class LoadCollectionReq {
4447
private Boolean skipLoadDynamicField = Boolean.FALSE;
4548
@Builder.Default
4649
private List<String> resourceGroups = new ArrayList<>();
50+
51+
public static abstract class LoadCollectionReqBuilder<C extends LoadCollectionReq, B extends LoadCollectionReq.LoadCollectionReqBuilder<C, B>> {
52+
public B async(Boolean async) {
53+
this.async$value = async;
54+
this.async$set = true;
55+
this.sync$value = !async;
56+
this.sync$set = true;
57+
return self();
58+
}
59+
60+
public B sync(Boolean sync) {
61+
this.sync$value = sync;
62+
this.sync$set = true;
63+
this.async$value = !sync;
64+
this.async$set = true;
65+
return self();
66+
}
67+
}
4768
}

sdk-core/src/main/java/io/milvus/v2/service/collection/request/RefreshLoadReq.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,5 +30,25 @@ public class RefreshLoadReq {
3030
@Builder.Default
3131
private Boolean async = Boolean.TRUE;
3232
@Builder.Default
33-
private Long timeout = 60000L;
33+
private Boolean sync = Boolean.TRUE; // wait the collection to be fully loaded. "async" is deprecated, use "sync" instead
34+
@Builder.Default
35+
private Long timeout = 60000L; // timeout value for waiting the collection to be fully loaded
36+
37+
public static abstract class RefreshLoadReqBuilder<C extends RefreshLoadReq, B extends RefreshLoadReq.RefreshLoadReqBuilder<C, B>> {
38+
public B async(Boolean async) {
39+
this.async$value = async;
40+
this.async$set = true;
41+
this.sync$value = !async;
42+
this.sync$set = true;
43+
return self();
44+
}
45+
46+
public B sync(Boolean sync) {
47+
this.sync$value = sync;
48+
this.sync$set = true;
49+
this.async$value = !sync;
50+
this.async$set = true;
51+
return self();
52+
}
53+
}
3454
}

sdk-core/src/main/java/io/milvus/v2/service/collection/request/ReleaseCollectionReq.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
@SuperBuilder
2828
public class ReleaseCollectionReq {
2929
private String collectionName;
30+
@Deprecated
3031
@Builder.Default
3132
private Boolean async = Boolean.TRUE;
3233
@Builder.Default

sdk-core/src/main/java/io/milvus/v2/service/index/IndexService.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.milvus.grpc.*;
2424
import io.milvus.param.Constant;
2525
import io.milvus.param.ParamUtils;
26+
import io.milvus.v2.common.IndexBuildState;
2627
import io.milvus.v2.common.IndexParam;
2728
import io.milvus.v2.exception.ErrorCode;
2829
import io.milvus.v2.exception.MilvusClientException;
@@ -78,6 +79,10 @@ public Void createIndex(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub
7879

7980
Status status = blockingStub.createIndex(builder.build());
8081
rpcUtils.handleResponse(title, status);
82+
if (request.getSync()) {
83+
WaitForIndexComplete(blockingStub, request.getCollectionName(), indexParam.getFieldName(),
84+
indexParam.getIndexName(), request.getTimeout());
85+
}
8186
}
8287

8388
return null;
@@ -180,4 +185,61 @@ public List<String> listIndexes(MilvusServiceGrpc.MilvusServiceBlockingStub bloc
180185
});
181186
return indexNames;
182187
}
188+
189+
private void WaitForIndexComplete(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
190+
String collectionName, String fieldName, String indexName, long timeoutMs) {
191+
long startTime = System.currentTimeMillis(); // Capture start time/ Timeout in milliseconds (60 seconds)
192+
193+
// alloc a timestamp from the server, the DescribeIndex() will use this timestamp to check the segments
194+
// which are generated before this timestamp.
195+
AllocTimestampResponse allocTsResp = blockingStub.allocTimestamp(AllocTimestampRequest.newBuilder().build());
196+
rpcUtils.handleResponse("AllocTimestampRequest", allocTsResp.getStatus());
197+
long serverTs = allocTsResp.getTimestamp();
198+
199+
while (true) {
200+
DescribeIndexResp response = describeIndex(blockingStub, DescribeIndexReq.builder()
201+
.collectionName(collectionName)
202+
.fieldName(fieldName)
203+
.indexName(indexName)
204+
.timestamp(serverTs)
205+
.build());
206+
List<DescribeIndexResp.IndexDesc> indices = response.getIndexDescriptions();
207+
DescribeIndexResp.IndexDesc desc = null;
208+
if (indices.size() == 1) {
209+
desc = indices.get(0);
210+
} else {
211+
for (DescribeIndexResp.IndexDesc index : indices) {
212+
if (fieldName.equals(index.getFieldName())) {
213+
desc = index;
214+
break;
215+
}
216+
}
217+
}
218+
219+
if (desc == null) {
220+
String msg = String.format("Failed to describe the index '%s' of field '%s' from serv side", fieldName, indexName);
221+
throw new MilvusClientException(ErrorCode.SERVER_ERROR, msg);
222+
}
223+
224+
if (desc.getIndexState() == IndexBuildState.Finished) {
225+
return;
226+
} else if (desc.getIndexState() == IndexBuildState.Failed) {
227+
String msg = "Index is failed, reason: " + desc.getIndexFailedReason();
228+
throw new MilvusClientException(ErrorCode.SERVER_ERROR, msg);
229+
}
230+
231+
// Check if timeout is exceeded
232+
if (System.currentTimeMillis() - startTime > timeoutMs) {
233+
throw new MilvusClientException(ErrorCode.SERVER_ERROR, "Create index timeout");
234+
}
235+
// Wait for a certain period before checking again
236+
try {
237+
Thread.sleep(500); // Sleep for 0.5 second. Adjust this value as needed.
238+
} catch (InterruptedException e) {
239+
Thread.currentThread().interrupt();
240+
System.out.println("Thread was interrupted, failed to complete operation");
241+
return; // or handle interruption appropriately
242+
}
243+
}
244+
}
183245
}

sdk-core/src/main/java/io/milvus/v2/service/index/request/CreateIndexReq.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package io.milvus.v2.service.index.request;
2121

2222
import io.milvus.v2.common.IndexParam;
23+
import lombok.Builder;
2324
import lombok.Data;
2425
import lombok.NonNull;
2526
import lombok.experimental.SuperBuilder;
@@ -33,4 +34,8 @@ public class CreateIndexReq {
3334
@NonNull
3435
private String collectionName;
3536
private List<IndexParam> indexParams;
37+
@Builder.Default
38+
private Boolean sync = Boolean.TRUE; // wait the index to complete
39+
@Builder.Default
40+
private Long timeout = 60000L; // timeout value for waiting the index to complete
3641
}

sdk-core/src/main/java/io/milvus/v2/service/index/request/DescribeIndexReq.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package io.milvus.v2.service.index.request;
2121

22+
import lombok.Builder;
2223
import lombok.Data;
2324
import lombok.NonNull;
2425
import lombok.experimental.SuperBuilder;
@@ -30,4 +31,6 @@ public class DescribeIndexReq {
3031
private String collectionName;
3132
private String fieldName;
3233
private String indexName;
34+
@Builder.Default
35+
private Long timestamp = 0L; // only check segments generated before this timestamp. all the segments will be checked if this value is zero.
3336
}

0 commit comments

Comments
 (0)