Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 11 additions & 36 deletions examples/src/main/java/io/milvus/v1/BulkWriterExample.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,12 @@
*/
package io.milvus.v1;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.dataformat.csv.CsvMapper;
import com.fasterxml.jackson.dataformat.csv.CsvSchema;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.reflect.TypeToken;
import io.milvus.bulkwriter.BulkImport;
import io.milvus.bulkwriter.BulkWriter;
import io.milvus.bulkwriter.LocalBulkWriter;
import io.milvus.bulkwriter.LocalBulkWriterParam;
Expand All @@ -46,6 +43,7 @@
import io.milvus.bulkwriter.request.import_.MilvusImportRequest;
import io.milvus.bulkwriter.request.list.CloudListImportJobsRequest;
import io.milvus.bulkwriter.request.list.MilvusListImportJobsRequest;
import io.milvus.bulkwriter.restful.BulkImportUtils;
import io.milvus.client.MilvusClient;
import io.milvus.client.MilvusServiceClient;
import io.milvus.common.utils.ExceptionUtils;
Expand All @@ -69,13 +67,13 @@
import io.milvus.param.index.CreateIndexParam;
import io.milvus.response.GetCollStatResponseWrapper;
import io.milvus.response.QueryResultsWrapper;
import io.milvus.v2.bulkwriter.CsvDataObject;
import org.apache.avro.generic.GenericData;
import org.apache.http.util.Asserts;

import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -491,29 +489,6 @@ private static void readCsvSampleData(String filePath, BulkWriter writer) throws
}
}

private static class CsvDataObject {
@JsonProperty
private String vector;
@JsonProperty
private String path;
@JsonProperty
private String label;

public String getVector() {
return vector;
}
public String getPath() {
return path;
}
public String getLabel() {
return label;
}
public List<Float> toFloatArray() {
return GSON_INSTANCE.fromJson(vector, new TypeToken<List<Float>>() {
}.getType());
}
}

private void callBulkInsert(CollectionSchemaParam collectionSchema, List<List<String>> batchFiles) throws InterruptedException {
createCollection(ALL_TYPES_COLLECTION_NAME, collectionSchema, true);

Expand All @@ -524,7 +499,7 @@ private void callBulkInsert(CollectionSchemaParam collectionSchema, List<List<St
.partitionName("")
.files(batchFiles)
.build();
String bulkImportResult = BulkImport.bulkImport(url, milvusImportRequest);
String bulkImportResult = BulkImportUtils.bulkImport(url, milvusImportRequest);
System.out.println(bulkImportResult);

JsonObject bulkImportObject = convertJsonObject(bulkImportResult);
Expand All @@ -533,7 +508,7 @@ private void callBulkInsert(CollectionSchemaParam collectionSchema, List<List<St

System.out.println("\n===================== listBulkInsertJobs() ====================");
MilvusListImportJobsRequest listImportJobsRequest = MilvusListImportJobsRequest.builder().collectionName(ALL_TYPES_COLLECTION_NAME).build();
String listImportJobsResult = BulkImport.listImportJobs(url, listImportJobsRequest);
String listImportJobsResult = BulkImportUtils.listImportJobs(url, listImportJobsRequest);
System.out.println(listImportJobsResult);
while (true) {
System.out.println("Wait 5 second to check bulkInsert job state...");
Expand All @@ -543,7 +518,7 @@ private void callBulkInsert(CollectionSchemaParam collectionSchema, List<List<St
MilvusDescribeImportRequest request = MilvusDescribeImportRequest.builder()
.jobId(jobId)
.build();
String getImportProgressResult = BulkImport.getImportProgress(url, request);
String getImportProgressResult = BulkImportUtils.getImportProgress(url, request);
System.out.println(getImportProgressResult);

JsonObject getImportProgressObject = convertJsonObject(getImportProgressResult);
Expand Down Expand Up @@ -577,23 +552,23 @@ private void callCloudImport(List<List<String>> batchFiles, String collectionNam
.clusterId(CloudImportConsts.CLUSTER_ID).collectionName(collectionName).partitionName(partitionName)
.apiKey(CloudImportConsts.API_KEY)
.build();
String bulkImportResult = BulkImport.bulkImport(CloudImportConsts.CLOUD_ENDPOINT, bulkImportRequest);
String bulkImportResult = BulkImportUtils.bulkImport(CloudImportConsts.CLOUD_ENDPOINT, bulkImportRequest);
JsonObject bulkImportObject = convertJsonObject(bulkImportResult);

String jobId = bulkImportObject.getAsJsonObject("data").get("jobId").getAsString();
System.out.println("Create a cloudImport job, job id: " + jobId);

System.out.println("\n===================== call cloudListImportJobs ====================");
CloudListImportJobsRequest listImportJobsRequest = CloudListImportJobsRequest.builder().clusterId(CloudImportConsts.CLUSTER_ID).currentPage(1).pageSize(10).apiKey(CloudImportConsts.API_KEY).build();
String listImportJobsResult = BulkImport.listImportJobs(CloudImportConsts.CLOUD_ENDPOINT, listImportJobsRequest);
String listImportJobsResult = BulkImportUtils.listImportJobs(CloudImportConsts.CLOUD_ENDPOINT, listImportJobsRequest);
System.out.println(listImportJobsResult);
while (true) {
System.out.println("Wait 5 second to check bulkInsert job state...");
TimeUnit.SECONDS.sleep(5);

System.out.println("\n===================== call cloudGetProgress ====================");
CloudDescribeImportRequest request = CloudDescribeImportRequest.builder().clusterId(CloudImportConsts.CLUSTER_ID).jobId(jobId).apiKey(CloudImportConsts.API_KEY).build();
String getImportProgressResult = BulkImport.getImportProgress(CloudImportConsts.CLOUD_ENDPOINT, request);
String getImportProgressResult = BulkImportUtils.getImportProgress(CloudImportConsts.CLOUD_ENDPOINT, request);
JsonObject getImportProgressObject = convertJsonObject(getImportProgressResult);
String importProgressState = getImportProgressObject.getAsJsonObject("data").get("state").getAsString();
String progress = getImportProgressObject.getAsJsonObject("data").get("progress").getAsString();
Expand Down Expand Up @@ -740,20 +715,20 @@ private static void exampleCloudImport() {
.clusterId(CloudImportConsts.CLUSTER_ID).collectionName(CloudImportConsts.COLLECTION_NAME).partitionName(CloudImportConsts.PARTITION_NAME)
.apiKey(CloudImportConsts.API_KEY)
.build();
String bulkImportResult = BulkImport.bulkImport(CloudImportConsts.CLOUD_ENDPOINT, request);
String bulkImportResult = BulkImportUtils.bulkImport(CloudImportConsts.CLOUD_ENDPOINT, request);
System.out.println(bulkImportResult);

System.out.println("\n===================== get import job progress ====================");

JsonObject bulkImportObject = convertJsonObject(bulkImportResult);
String jobId = bulkImportObject.getAsJsonObject("data").get("jobId").getAsString();
CloudDescribeImportRequest getImportProgressRequest = CloudDescribeImportRequest.builder().clusterId(CloudImportConsts.CLUSTER_ID).jobId(jobId).apiKey(CloudImportConsts.API_KEY).build();
String getImportProgressResult = BulkImport.getImportProgress(CloudImportConsts.CLOUD_ENDPOINT, getImportProgressRequest);
String getImportProgressResult = BulkImportUtils.getImportProgress(CloudImportConsts.CLOUD_ENDPOINT, getImportProgressRequest);
System.out.println(getImportProgressResult);

System.out.println("\n===================== list import jobs ====================");
CloudListImportJobsRequest listImportJobsRequest = CloudListImportJobsRequest.builder().clusterId(CloudImportConsts.CLUSTER_ID).currentPage(1).pageSize(10).apiKey(CloudImportConsts.API_KEY).build();
String listImportJobsResult = BulkImport.listImportJobs(CloudImportConsts.CLOUD_ENDPOINT, listImportJobsRequest);
String listImportJobsResult = BulkImportUtils.listImportJobs(CloudImportConsts.CLOUD_ENDPOINT, listImportJobsRequest);
System.out.println(listImportJobsResult);
}

Expand Down
2 changes: 1 addition & 1 deletion examples/src/main/java/io/milvus/v1/GeneralExample.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class GeneralExample {

private static final Long SEARCH_K = 5L;
private static final String SEARCH_PARAM = "{\"nprobe\":10}";


private R<RpcStatus> createCollection(long timeoutMilliseconds) {
System.out.println("========== createCollection() ==========");
Expand Down
2 changes: 1 addition & 1 deletion examples/src/main/java/io/milvus/v2/GeneralExample.java
Original file line number Diff line number Diff line change
Expand Up @@ -231,4 +231,4 @@ public static void main(String[] args) {

releaseCollection();
}
}
}
66 changes: 66 additions & 0 deletions examples/src/main/java/io/milvus/v2/StageExample.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.milvus.v2;

import io.milvus.bulkwriter.StageOperation;
import io.milvus.bulkwriter.StageOperationParam;
import io.milvus.bulkwriter.model.StageUploadResult;


/**
* if you don't have bucket, but you want to upload data to bucket and import to milvus
* you can use this function
*/
public class StageExample {
/**
* You need to upload the local file path or folder path for import.
*/
public static final String LOCAL_DIR_OR_FILE_PATH = "/Users/zilliz/Desktop/1.parquet";

/**
* The value of the URL is fixed.
* For overseas regions, it is: https://api.cloud.zilliz.com
* For regions in China, it is: https://api.cloud.zilliz.com.cn
*/
public static final String CLOUD_ENDPOINT = "https://api.cloud.zilliz.com";
public static final String API_KEY = "_api_key_for_cluster_org_";
/**
* This is currently a private preview feature. If you need to use it, please submit a request and contact us.
* Before using this feature, you need to create a stage using the stage API.
*/
public static final String STAGE_NAME = "_stage_name_for_project_";
public static final String PATH = "_path_for_stage";

public static void main(String[] args) throws Exception {
uploadFileToStage();
}

/**
* If you want to upload file to stage, and then use cloud interface merge the data to collection
*/
private static void uploadFileToStage() throws Exception {
StageOperationParam stageOperationParam = StageOperationParam.newBuilder()
.withCloudEndpoint(CLOUD_ENDPOINT).withApiKey(API_KEY)
.withStageName(STAGE_NAME).withPath(PATH)
.build();
StageOperation stageOperation = new StageOperation(stageOperationParam);
StageUploadResult result = stageOperation.uploadFileToStage(LOCAL_DIR_OR_FILE_PATH);
System.out.println("\nuploadFileToStage results: " + result);
}
}
Loading
Loading