Skip to content

Commit cedd844

Browse files
committed
support stage operation
Signed-off-by: lentitude2tk <xushuang.hu@zilliz.com>
1 parent 278dd08 commit cedd844

30 files changed

Lines changed: 2238 additions & 277 deletions

examples/src/main/java/io/milvus/v1/BulkWriterExample.java

Lines changed: 11 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,12 @@
1818
*/
1919
package io.milvus.v1;
2020

21-
import com.fasterxml.jackson.annotation.JsonProperty;
2221
import com.fasterxml.jackson.dataformat.csv.CsvMapper;
2322
import com.fasterxml.jackson.dataformat.csv.CsvSchema;
2423
import com.google.common.collect.Lists;
2524
import com.google.gson.Gson;
2625
import com.google.gson.JsonElement;
2726
import com.google.gson.JsonObject;
28-
import com.google.gson.reflect.TypeToken;
29-
import io.milvus.bulkwriter.BulkImport;
3027
import io.milvus.bulkwriter.BulkWriter;
3128
import io.milvus.bulkwriter.LocalBulkWriter;
3229
import io.milvus.bulkwriter.LocalBulkWriterParam;
@@ -46,6 +43,7 @@
4643
import io.milvus.bulkwriter.request.import_.MilvusImportRequest;
4744
import io.milvus.bulkwriter.request.list.CloudListImportJobsRequest;
4845
import io.milvus.bulkwriter.request.list.MilvusListImportJobsRequest;
46+
import io.milvus.bulkwriter.restful.BulkImportUtils;
4947
import io.milvus.client.MilvusClient;
5048
import io.milvus.client.MilvusServiceClient;
5149
import io.milvus.common.utils.ExceptionUtils;
@@ -69,13 +67,13 @@
6967
import io.milvus.param.index.CreateIndexParam;
7068
import io.milvus.response.GetCollStatResponseWrapper;
7169
import io.milvus.response.QueryResultsWrapper;
70+
import io.milvus.v2.bulkwriter.CsvDataObject;
7271
import org.apache.avro.generic.GenericData;
7372
import org.apache.http.util.Asserts;
7473

7574
import java.io.File;
7675
import java.io.IOException;
7776
import java.net.URL;
78-
import java.nio.ByteBuffer;
7977
import java.util.ArrayList;
8078
import java.util.Iterator;
8179
import java.util.List;
@@ -491,29 +489,6 @@ private static void readCsvSampleData(String filePath, BulkWriter writer) throws
491489
}
492490
}
493491

494-
private static class CsvDataObject {
495-
@JsonProperty
496-
private String vector;
497-
@JsonProperty
498-
private String path;
499-
@JsonProperty
500-
private String label;
501-
502-
public String getVector() {
503-
return vector;
504-
}
505-
public String getPath() {
506-
return path;
507-
}
508-
public String getLabel() {
509-
return label;
510-
}
511-
public List<Float> toFloatArray() {
512-
return GSON_INSTANCE.fromJson(vector, new TypeToken<List<Float>>() {
513-
}.getType());
514-
}
515-
}
516-
517492
private void callBulkInsert(CollectionSchemaParam collectionSchema, List<List<String>> batchFiles) throws InterruptedException {
518493
createCollection(ALL_TYPES_COLLECTION_NAME, collectionSchema, true);
519494

@@ -524,7 +499,7 @@ private void callBulkInsert(CollectionSchemaParam collectionSchema, List<List<St
524499
.partitionName("")
525500
.files(batchFiles)
526501
.build();
527-
String bulkImportResult = BulkImport.bulkImport(url, milvusImportRequest);
502+
String bulkImportResult = BulkImportUtils.bulkImport(url, milvusImportRequest);
528503
System.out.println(bulkImportResult);
529504

530505
JsonObject bulkImportObject = convertJsonObject(bulkImportResult);
@@ -533,7 +508,7 @@ private void callBulkInsert(CollectionSchemaParam collectionSchema, List<List<St
533508

534509
System.out.println("\n===================== listBulkInsertJobs() ====================");
535510
MilvusListImportJobsRequest listImportJobsRequest = MilvusListImportJobsRequest.builder().collectionName(ALL_TYPES_COLLECTION_NAME).build();
536-
String listImportJobsResult = BulkImport.listImportJobs(url, listImportJobsRequest);
511+
String listImportJobsResult = BulkImportUtils.listImportJobs(url, listImportJobsRequest);
537512
System.out.println(listImportJobsResult);
538513
while (true) {
539514
System.out.println("Wait 5 second to check bulkInsert job state...");
@@ -543,7 +518,7 @@ private void callBulkInsert(CollectionSchemaParam collectionSchema, List<List<St
543518
MilvusDescribeImportRequest request = MilvusDescribeImportRequest.builder()
544519
.jobId(jobId)
545520
.build();
546-
String getImportProgressResult = BulkImport.getImportProgress(url, request);
521+
String getImportProgressResult = BulkImportUtils.getImportProgress(url, request);
547522
System.out.println(getImportProgressResult);
548523

549524
JsonObject getImportProgressObject = convertJsonObject(getImportProgressResult);
@@ -577,23 +552,23 @@ private void callCloudImport(List<List<String>> batchFiles, String collectionNam
577552
.clusterId(CloudImportConsts.CLUSTER_ID).collectionName(collectionName).partitionName(partitionName)
578553
.apiKey(CloudImportConsts.API_KEY)
579554
.build();
580-
String bulkImportResult = BulkImport.bulkImport(CloudImportConsts.CLOUD_ENDPOINT, bulkImportRequest);
555+
String bulkImportResult = BulkImportUtils.bulkImport(CloudImportConsts.CLOUD_ENDPOINT, bulkImportRequest);
581556
JsonObject bulkImportObject = convertJsonObject(bulkImportResult);
582557

583558
String jobId = bulkImportObject.getAsJsonObject("data").get("jobId").getAsString();
584559
System.out.println("Create a cloudImport job, job id: " + jobId);
585560

586561
System.out.println("\n===================== call cloudListImportJobs ====================");
587562
CloudListImportJobsRequest listImportJobsRequest = CloudListImportJobsRequest.builder().clusterId(CloudImportConsts.CLUSTER_ID).currentPage(1).pageSize(10).apiKey(CloudImportConsts.API_KEY).build();
588-
String listImportJobsResult = BulkImport.listImportJobs(CloudImportConsts.CLOUD_ENDPOINT, listImportJobsRequest);
563+
String listImportJobsResult = BulkImportUtils.listImportJobs(CloudImportConsts.CLOUD_ENDPOINT, listImportJobsRequest);
589564
System.out.println(listImportJobsResult);
590565
while (true) {
591566
System.out.println("Wait 5 second to check bulkInsert job state...");
592567
TimeUnit.SECONDS.sleep(5);
593568

594569
System.out.println("\n===================== call cloudGetProgress ====================");
595570
CloudDescribeImportRequest request = CloudDescribeImportRequest.builder().clusterId(CloudImportConsts.CLUSTER_ID).jobId(jobId).apiKey(CloudImportConsts.API_KEY).build();
596-
String getImportProgressResult = BulkImport.getImportProgress(CloudImportConsts.CLOUD_ENDPOINT, request);
571+
String getImportProgressResult = BulkImportUtils.getImportProgress(CloudImportConsts.CLOUD_ENDPOINT, request);
597572
JsonObject getImportProgressObject = convertJsonObject(getImportProgressResult);
598573
String importProgressState = getImportProgressObject.getAsJsonObject("data").get("state").getAsString();
599574
String progress = getImportProgressObject.getAsJsonObject("data").get("progress").getAsString();
@@ -740,20 +715,20 @@ private static void exampleCloudImport() {
740715
.clusterId(CloudImportConsts.CLUSTER_ID).collectionName(CloudImportConsts.COLLECTION_NAME).partitionName(CloudImportConsts.PARTITION_NAME)
741716
.apiKey(CloudImportConsts.API_KEY)
742717
.build();
743-
String bulkImportResult = BulkImport.bulkImport(CloudImportConsts.CLOUD_ENDPOINT, request);
718+
String bulkImportResult = BulkImportUtils.bulkImport(CloudImportConsts.CLOUD_ENDPOINT, request);
744719
System.out.println(bulkImportResult);
745720

746721
System.out.println("\n===================== get import job progress ====================");
747722

748723
JsonObject bulkImportObject = convertJsonObject(bulkImportResult);
749724
String jobId = bulkImportObject.getAsJsonObject("data").get("jobId").getAsString();
750725
CloudDescribeImportRequest getImportProgressRequest = CloudDescribeImportRequest.builder().clusterId(CloudImportConsts.CLUSTER_ID).jobId(jobId).apiKey(CloudImportConsts.API_KEY).build();
751-
String getImportProgressResult = BulkImport.getImportProgress(CloudImportConsts.CLOUD_ENDPOINT, getImportProgressRequest);
726+
String getImportProgressResult = BulkImportUtils.getImportProgress(CloudImportConsts.CLOUD_ENDPOINT, getImportProgressRequest);
752727
System.out.println(getImportProgressResult);
753728

754729
System.out.println("\n===================== list import jobs ====================");
755730
CloudListImportJobsRequest listImportJobsRequest = CloudListImportJobsRequest.builder().clusterId(CloudImportConsts.CLUSTER_ID).currentPage(1).pageSize(10).apiKey(CloudImportConsts.API_KEY).build();
756-
String listImportJobsResult = BulkImport.listImportJobs(CloudImportConsts.CLOUD_ENDPOINT, listImportJobsRequest);
731+
String listImportJobsResult = BulkImportUtils.listImportJobs(CloudImportConsts.CLOUD_ENDPOINT, listImportJobsRequest);
757732
System.out.println(listImportJobsResult);
758733
}
759734

examples/src/main/java/io/milvus/v1/GeneralExample.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public class GeneralExample {
6666

6767
private static final Long SEARCH_K = 5L;
6868
private static final String SEARCH_PARAM = "{\"nprobe\":10}";
69-
69+
7070

7171
private R<RpcStatus> createCollection(long timeoutMilliseconds) {
7272
System.out.println("========== createCollection() ==========");

examples/src/main/java/io/milvus/v2/GeneralExample.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,4 +231,4 @@ public static void main(String[] args) {
231231

232232
releaseCollection();
233233
}
234-
}
234+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package io.milvus.v2;
20+
21+
import io.milvus.bulkwriter.StageOperation;
22+
import io.milvus.bulkwriter.StageOperationParam;
23+
import io.milvus.bulkwriter.model.StageUploadResult;
24+
25+
26+
/**
27+
* if you don't have bucket, but you want to upload data to bucket and import to milvus
28+
* you can use this function
29+
*/
30+
public class StageExample {
31+
/**
32+
* You need to upload the local file path or folder path for import.
33+
*/
34+
public static final String LOCAL_DIR_OR_FILE_PATH = "/Users/zilliz/Desktop/1.parquet";
35+
36+
/**
37+
* The value of the URL is fixed.
38+
* For overseas regions, it is: https://api.cloud.zilliz.com
39+
* For regions in China, it is: https://api.cloud.zilliz.com.cn
40+
*/
41+
public static final String CLOUD_ENDPOINT = "https://api.cloud.zilliz.com";
42+
public static final String API_KEY = "_api_key_for_cluster_org_";
43+
/**
44+
* This is currently a private preview feature. If you need to use it, please submit a request and contact us.
45+
* Before using this feature, you need to create a stage using the stage API.
46+
*/
47+
public static final String STAGE_NAME = "_stage_name_for_project_";
48+
public static final String PATH = "_path_for_stage";
49+
50+
public static void main(String[] args) throws Exception {
51+
uploadFileToStage();
52+
}
53+
54+
/**
55+
* If you want to upload file to stage, and then use cloud interface merge the data to collection
56+
*/
57+
private static void uploadFileToStage() throws Exception {
58+
StageOperationParam stageOperationParam = StageOperationParam.newBuilder()
59+
.withCloudEndpoint(CLOUD_ENDPOINT).withApiKey(API_KEY)
60+
.withStageName(STAGE_NAME).withPath(PATH)
61+
.build();
62+
StageOperation stageOperation = new StageOperation(stageOperationParam);
63+
StageUploadResult result = stageOperation.uploadFileToStage(LOCAL_DIR_OR_FILE_PATH);
64+
System.out.println("\nuploadFileToStage results: " + result);
65+
}
66+
}

0 commit comments

Comments
 (0)