Skip to content

Commit ef10e36

Browse files
committed
support stageManager & stageFileManager
Signed-off-by: lentitude2tk <xushuang.hu@zilliz.com>
1 parent 0dd7a71 commit ef10e36

24 files changed

Lines changed: 908 additions & 304 deletions

examples/src/main/java/io/milvus/v2/StageExample.java

Lines changed: 0 additions & 66 deletions
This file was deleted.
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package io.milvus.v2;
20+
21+
import com.google.gson.Gson;
22+
import io.milvus.bulkwriter.StageFileManager;
23+
import io.milvus.bulkwriter.StageFileManagerParam;
24+
import io.milvus.bulkwriter.model.UploadFilesResult;
25+
import io.milvus.bulkwriter.request.stage.UploadFilesRequest;
26+
27+
28+
/**
29+
* This is currently a private preview feature. If you need to use it, please submit a request and contact us.
30+
*/
31+
public class StageFileManagerExample {
32+
private static final StageFileManager stageFileManager;
33+
static {
34+
StageFileManagerParam stageFileManagerParam = StageFileManagerParam.newBuilder()
35+
.withCloudEndpoint("https://api.cloud.zilliz.com")
36+
.withApiKey("_api_key_for_cluster_org_")
37+
.withStageName("_stage_name_for_project_")
38+
.build();
39+
stageFileManager = new StageFileManager(stageFileManagerParam);
40+
}
41+
42+
public static void main(String[] args) throws Exception {
43+
uploadFiles();
44+
shutdown();
45+
}
46+
47+
private static void uploadFiles() throws Exception {
48+
UploadFilesRequest request = UploadFilesRequest.builder()
49+
.sourceFilePath("/Users/zilliz/data/")
50+
.targetStagePath("data/")
51+
.build();
52+
UploadFilesResult result = stageFileManager.uploadFilesAsync(request).get();
53+
System.out.println("\nuploadFiles results: " + new Gson().toJson(result));
54+
}
55+
56+
private static void shutdown() {
57+
stageFileManager.shutdownGracefully();
58+
}
59+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package io.milvus.v2;
20+
21+
import com.google.gson.Gson;
22+
import io.milvus.bulkwriter.StageManager;
23+
import io.milvus.bulkwriter.StageManagerParam;
24+
import io.milvus.bulkwriter.request.stage.CreateStageRequest;
25+
import io.milvus.bulkwriter.request.stage.DeleteStageRequest;
26+
import io.milvus.bulkwriter.request.stage.ListStagesRequest;
27+
import io.milvus.bulkwriter.response.stage.ListStagesResponse;
28+
29+
30+
/**
31+
* This is currently a private preview feature. If you need to use it, please submit a request and contact us.
32+
*/
33+
public class StageManagerExample {
34+
private static final StageManager stageManager;
35+
static {
36+
StageManagerParam stageManagerParam = StageManagerParam.newBuilder()
37+
.withCloudEndpoint("https://api.cloud.zilliz.com")
38+
.withApiKey("_api_key_for_cluster_org_")
39+
.build();
40+
stageManager = new StageManager(stageManagerParam);
41+
}
42+
43+
private static final String PROJECT_ID = "_id_for_project_";
44+
private static final String REGION_ID = "_id_for_region_";
45+
private static final String STAGE_NAME = "_stage_name_for_project_";
46+
47+
public static void main(String[] args) throws Exception {
48+
createStage();
49+
listStages();
50+
deleteStage();
51+
}
52+
53+
private static void createStage() {
54+
CreateStageRequest request = CreateStageRequest.builder()
55+
.projectId(PROJECT_ID).regionId(REGION_ID).stageName(STAGE_NAME)
56+
.build();
57+
stageManager.createStage(request);
58+
System.out.printf("\nStage %s created%n", STAGE_NAME);
59+
}
60+
61+
private static void listStages() {
62+
ListStagesRequest request = ListStagesRequest.builder()
63+
.projectId(PROJECT_ID).currentPage(1).pageSize(10)
64+
.build();
65+
ListStagesResponse listStagesResponse = stageManager.listStages(request);
66+
System.out.println("\nlistStages results: " + new Gson().toJson(listStagesResponse));
67+
}
68+
69+
private static void deleteStage() {
70+
DeleteStageRequest request = DeleteStageRequest.builder()
71+
.stageName(STAGE_NAME)
72+
.build();
73+
stageManager.deleteStage(request);
74+
System.out.printf("\nStage %s deleted%n", STAGE_NAME);
75+
}
76+
}

examples/src/main/java/io/milvus/v2/bulkwriter/BulkWriterStageExample.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import io.milvus.bulkwriter.StageBulkWriterParam;
2828
import io.milvus.bulkwriter.common.clientenum.BulkFileType;
2929
import io.milvus.bulkwriter.common.utils.GeneratorUtils;
30-
import io.milvus.bulkwriter.model.StageUploadResult;
30+
import io.milvus.bulkwriter.model.UploadFilesResult;
3131
import io.milvus.bulkwriter.request.describe.CloudDescribeImportRequest;
3232
import io.milvus.bulkwriter.request.import_.StageImportRequest;
3333
import io.milvus.bulkwriter.request.list.CloudListImportJobsRequest;
@@ -118,7 +118,7 @@ private static void exampleCollectionRemoteStage(BulkFileType fileType) throws E
118118
CreateCollectionReq.CollectionSchema collectionSchema = buildAllTypesSchema();
119119
createCollection(COLLECTION_NAME, collectionSchema, false);
120120

121-
StageUploadResult stageUploadResult = stageRemoteWriter(collectionSchema, fileType, rows);
121+
UploadFilesResult stageUploadResult = stageRemoteWriter(collectionSchema, fileType, rows);
122122
callStageImport(stageUploadResult.getStageName(), stageUploadResult.getPath());
123123
verifyImportData(collectionSchema, originalData);
124124
}
@@ -284,7 +284,7 @@ private static List<JsonObject> genImportData(List<Map<String, Object>> original
284284
return data;
285285
}
286286

287-
private static StageUploadResult stageRemoteWriter(CreateCollectionReq.CollectionSchema collectionSchema,
287+
private static UploadFilesResult stageRemoteWriter(CreateCollectionReq.CollectionSchema collectionSchema,
288288
BulkFileType fileType,
289289
List<JsonObject> data) throws Exception {
290290
System.out.printf("\n===================== all field types (%s) ====================%n", fileType.name());
@@ -297,7 +297,7 @@ private static StageUploadResult stageRemoteWriter(CreateCollectionReq.Collectio
297297
System.out.println("Generate data files...");
298298
stageBulkWriter.commit(false);
299299

300-
StageUploadResult stageUploadResult = stageBulkWriter.getStageUploadResult();
300+
UploadFilesResult stageUploadResult = stageBulkWriter.getStageUploadResult();
301301
System.out.printf("Data files have been uploaded: %s%n", stageUploadResult);
302302
return stageUploadResult;
303303
} catch (Exception e) {

sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/StageBulkWriter.java

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@
2121

2222
import com.google.common.collect.Lists;
2323
import com.google.gson.JsonObject;
24-
import io.milvus.bulkwriter.model.StageUploadResult;
24+
import io.milvus.bulkwriter.model.UploadFilesResult;
25+
import io.milvus.bulkwriter.request.stage.UploadFilesRequest;
2526
import io.milvus.common.utils.ExceptionUtils;
2627
import org.slf4j.Logger;
2728
import org.slf4j.LoggerFactory;
@@ -39,7 +40,7 @@ public class StageBulkWriter extends LocalBulkWriter {
3940

4041
private String remotePath;
4142
private List<List<String>> remoteFiles;
42-
private StageOperation stageWriter;
43+
private StageFileManager stageFileManager;
4344
private StageBulkWriterParam stageBulkWriterParam;
4445

4546
public StageBulkWriter(StageBulkWriterParam bulkWriterParam) throws IOException {
@@ -51,20 +52,20 @@ public StageBulkWriter(StageBulkWriterParam bulkWriterParam) throws IOException
5152
Path path = Paths.get(bulkWriterParam.getRemotePath());
5253
Path remoteDirPath = path.resolve(getUUID());
5354
this.remotePath = remoteDirPath + "/";
54-
this.stageWriter = initStageWriterParams(bulkWriterParam);
55+
this.stageFileManager = initStageFileManagerParams(bulkWriterParam);
5556
this.stageBulkWriterParam = bulkWriterParam;
5657

5758
this.remoteFiles = Lists.newArrayList();
5859
logger.info("Remote buffer writer initialized, target path: {}", remotePath);
5960

6061
}
6162

62-
private StageOperation initStageWriterParams(StageBulkWriterParam bulkWriterParam) throws IOException {
63-
StageOperationParam stageWriterParam = StageOperationParam.newBuilder()
63+
private StageFileManager initStageFileManagerParams(StageBulkWriterParam bulkWriterParam) throws IOException {
64+
StageFileManagerParam stageFileManagerParam = StageFileManagerParam.newBuilder()
6465
.withCloudEndpoint(bulkWriterParam.getCloudEndpoint()).withApiKey(bulkWriterParam.getApiKey())
65-
.withStageName(bulkWriterParam.getStageName()).withPath(remotePath)
66+
.withStageName(bulkWriterParam.getStageName())
6667
.build();
67-
return new StageOperation(stageWriterParam);
68+
return new StageFileManager(stageFileManagerParam);
6869
}
6970

7071
@Override
@@ -87,8 +88,8 @@ public List<List<String>> getBatchFiles() {
8788
return remoteFiles;
8889
}
8990

90-
public StageUploadResult getStageUploadResult() {
91-
return StageUploadResult.builder()
91+
public UploadFilesResult getStageUploadResult() {
92+
return UploadFilesResult.builder()
9293
.stageName(stageBulkWriterParam.getStageName())
9394
.path(remotePath)
9495
.build();
@@ -175,8 +176,13 @@ private void serialImportData(List<String> fileList) {
175176
private void uploadObject(String filePath, String objectName) throws Exception {
176177
logger.info(String.format("Prepare to upload %s to %s", filePath, objectName));
177178

178-
stageWriter.uploadFileToStage(filePath);
179+
UploadFilesRequest uploadFilesRequest = UploadFilesRequest.builder()
180+
.sourceFilePath(filePath).targetStagePath(remotePath)
181+
.build();
182+
183+
stageFileManager.uploadFilesAsync(uploadFilesRequest).get();
179184
logger.info(String.format("Upload file %s to %s", filePath, objectName));
185+
180186
}
181187

182188
private static String generatorLocalPath() {

0 commit comments

Comments
 (0)