diff --git a/examples/src/main/java/io/milvus/v2/StageExample.java b/examples/src/main/java/io/milvus/v2/StageExample.java deleted file mode 100644 index 5b1d15593..000000000 --- a/examples/src/main/java/io/milvus/v2/StageExample.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package io.milvus.v2; - -import io.milvus.bulkwriter.StageOperation; -import io.milvus.bulkwriter.StageOperationParam; -import io.milvus.bulkwriter.model.StageUploadResult; - - -/** - * if you don't have bucket, but you want to upload data to bucket and import to milvus - * you can use this function - */ -public class StageExample { - /** - * You need to upload the local file path or folder path for import. - */ - public static final String LOCAL_DIR_OR_FILE_PATH = "/Users/zilliz/Desktop/1.parquet"; - - /** - * The value of the URL is fixed. - * For overseas regions, it is: https://api.cloud.zilliz.com - * For regions in China, it is: https://api.cloud.zilliz.com.cn - */ - public static final String CLOUD_ENDPOINT = "https://api.cloud.zilliz.com"; - public static final String API_KEY = "_api_key_for_cluster_org_"; - /** - * This is currently a private preview feature. If you need to use it, please submit a request and contact us. - * Before using this feature, you need to create a stage using the stage API. - */ - public static final String STAGE_NAME = "_stage_name_for_project_"; - public static final String PATH = "_path_for_stage"; - - public static void main(String[] args) throws Exception { - uploadFileToStage(); - } - - /** - * If you want to upload file to stage, and then use cloud interface merge the data to collection - */ - private static void uploadFileToStage() throws Exception { - StageOperationParam stageOperationParam = StageOperationParam.newBuilder() - .withCloudEndpoint(CLOUD_ENDPOINT).withApiKey(API_KEY) - .withStageName(STAGE_NAME).withPath(PATH) - .build(); - StageOperation stageOperation = new StageOperation(stageOperationParam); - StageUploadResult result = stageOperation.uploadFileToStage(LOCAL_DIR_OR_FILE_PATH); - System.out.println("\nuploadFileToStage results: " + result); - } -} diff --git a/examples/src/main/java/io/milvus/v2/StageFileManagerExample.java b/examples/src/main/java/io/milvus/v2/StageFileManagerExample.java new file mode 100644 index 000000000..44c89bda6 --- /dev/null +++ b/examples/src/main/java/io/milvus/v2/StageFileManagerExample.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.milvus.v2; + +import com.google.gson.Gson; +import io.milvus.bulkwriter.StageFileManager; +import io.milvus.bulkwriter.StageFileManagerParam; +import io.milvus.bulkwriter.model.UploadFilesResult; +import io.milvus.bulkwriter.request.stage.UploadFilesRequest; + + +/** + * This is currently a private preview feature. If you need to use it, please submit a request and contact us. + */ +public class StageFileManagerExample { + private static final StageFileManager stageFileManager; + static { + StageFileManagerParam stageFileManagerParam = StageFileManagerParam.newBuilder() + .withCloudEndpoint("https://api.cloud.zilliz.com") + .withApiKey("_api_key_for_cluster_org_") + .withStageName("_stage_name_for_project_") + .build(); + stageFileManager = new StageFileManager(stageFileManagerParam); + } + + public static void main(String[] args) throws Exception { + uploadFiles(); + shutdown(); + } + + private static void uploadFiles() throws Exception { + UploadFilesRequest request = UploadFilesRequest.builder() + .sourceFilePath("/Users/zilliz/data/") + .targetStagePath("data/") + .build(); + UploadFilesResult result = stageFileManager.uploadFilesAsync(request).get(); + System.out.println("\nuploadFiles results: " + new Gson().toJson(result)); + } + + private static void shutdown() { + stageFileManager.shutdownGracefully(); + } +} diff --git a/examples/src/main/java/io/milvus/v2/StageManagerExample.java b/examples/src/main/java/io/milvus/v2/StageManagerExample.java new file mode 100644 index 000000000..70c10444b --- /dev/null +++ b/examples/src/main/java/io/milvus/v2/StageManagerExample.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.milvus.v2; + +import com.google.gson.Gson; +import io.milvus.bulkwriter.StageManager; +import io.milvus.bulkwriter.StageManagerParam; +import io.milvus.bulkwriter.request.stage.CreateStageRequest; +import io.milvus.bulkwriter.request.stage.DeleteStageRequest; +import io.milvus.bulkwriter.request.stage.ListStagesRequest; +import io.milvus.bulkwriter.response.stage.ListStagesResponse; + + +/** + * This is currently a private preview feature. If you need to use it, please submit a request and contact us. + */ +public class StageManagerExample { + private static final StageManager stageManager; + static { + StageManagerParam stageManagerParam = StageManagerParam.newBuilder() + .withCloudEndpoint("https://api.cloud.zilliz.com") + .withApiKey("_api_key_for_cluster_org_") + .build(); + stageManager = new StageManager(stageManagerParam); + } + + private static final String PROJECT_ID = "_id_for_project_"; + private static final String REGION_ID = "_id_for_region_"; + private static final String STAGE_NAME = "_stage_name_for_project_"; + + public static void main(String[] args) throws Exception { + createStage(); + listStages(); + deleteStage(); + } + + private static void createStage() { + CreateStageRequest request = CreateStageRequest.builder() + .projectId(PROJECT_ID).regionId(REGION_ID).stageName(STAGE_NAME) + .build(); + stageManager.createStage(request); + System.out.printf("\nStage %s created%n", STAGE_NAME); + } + + private static void listStages() { + ListStagesRequest request = ListStagesRequest.builder() + .projectId(PROJECT_ID).currentPage(1).pageSize(10) + .build(); + ListStagesResponse listStagesResponse = stageManager.listStages(request); + System.out.println("\nlistStages results: " + new Gson().toJson(listStagesResponse)); + } + + private static void deleteStage() { + DeleteStageRequest request = DeleteStageRequest.builder() + .stageName(STAGE_NAME) + .build(); + stageManager.deleteStage(request); + System.out.printf("\nStage %s deleted%n", STAGE_NAME); + } +} diff --git a/examples/src/main/java/io/milvus/v2/bulkwriter/BulkWriterStageExample.java b/examples/src/main/java/io/milvus/v2/bulkwriter/BulkWriterStageExample.java index 546fc249c..7f6b5c213 100644 --- a/examples/src/main/java/io/milvus/v2/bulkwriter/BulkWriterStageExample.java +++ b/examples/src/main/java/io/milvus/v2/bulkwriter/BulkWriterStageExample.java @@ -27,7 +27,7 @@ import io.milvus.bulkwriter.StageBulkWriterParam; import io.milvus.bulkwriter.common.clientenum.BulkFileType; import io.milvus.bulkwriter.common.utils.GeneratorUtils; -import io.milvus.bulkwriter.model.StageUploadResult; +import io.milvus.bulkwriter.model.UploadFilesResult; import io.milvus.bulkwriter.request.describe.CloudDescribeImportRequest; import io.milvus.bulkwriter.request.import_.StageImportRequest; import io.milvus.bulkwriter.request.list.CloudListImportJobsRequest; @@ -118,7 +118,7 @@ private static void exampleCollectionRemoteStage(BulkFileType fileType) throws E CreateCollectionReq.CollectionSchema collectionSchema = buildAllTypesSchema(); createCollection(COLLECTION_NAME, collectionSchema, false); - StageUploadResult stageUploadResult = stageRemoteWriter(collectionSchema, fileType, rows); + UploadFilesResult stageUploadResult = stageRemoteWriter(collectionSchema, fileType, rows); callStageImport(stageUploadResult.getStageName(), stageUploadResult.getPath()); verifyImportData(collectionSchema, originalData); } @@ -284,7 +284,7 @@ private static List genImportData(List> original return data; } - private static StageUploadResult stageRemoteWriter(CreateCollectionReq.CollectionSchema collectionSchema, + private static UploadFilesResult stageRemoteWriter(CreateCollectionReq.CollectionSchema collectionSchema, BulkFileType fileType, List data) throws Exception { System.out.printf("\n===================== all field types (%s) ====================%n", fileType.name()); @@ -297,7 +297,7 @@ private static StageUploadResult stageRemoteWriter(CreateCollectionReq.Collectio System.out.println("Generate data files..."); stageBulkWriter.commit(false); - StageUploadResult stageUploadResult = stageBulkWriter.getStageUploadResult(); + UploadFilesResult stageUploadResult = stageBulkWriter.getStageUploadResult(); System.out.printf("Data files have been uploaded: %s%n", stageUploadResult); return stageUploadResult; } catch (Exception e) { diff --git a/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/StageBulkWriter.java b/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/StageBulkWriter.java index ee8d01e09..8bce60da1 100644 --- a/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/StageBulkWriter.java +++ b/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/StageBulkWriter.java @@ -21,7 +21,8 @@ import com.google.common.collect.Lists; import com.google.gson.JsonObject; -import io.milvus.bulkwriter.model.StageUploadResult; +import io.milvus.bulkwriter.model.UploadFilesResult; +import io.milvus.bulkwriter.request.stage.UploadFilesRequest; import io.milvus.common.utils.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,7 +40,7 @@ public class StageBulkWriter extends LocalBulkWriter { private String remotePath; private List> remoteFiles; - private StageOperation stageWriter; + private StageFileManager stageFileManager; private StageBulkWriterParam stageBulkWriterParam; public StageBulkWriter(StageBulkWriterParam bulkWriterParam) throws IOException { @@ -51,7 +52,7 @@ public StageBulkWriter(StageBulkWriterParam bulkWriterParam) throws IOException Path path = Paths.get(bulkWriterParam.getRemotePath()); Path remoteDirPath = path.resolve(getUUID()); this.remotePath = remoteDirPath + "/"; - this.stageWriter = initStageWriterParams(bulkWriterParam); + this.stageFileManager = initStageFileManagerParams(bulkWriterParam); this.stageBulkWriterParam = bulkWriterParam; this.remoteFiles = Lists.newArrayList(); @@ -59,12 +60,12 @@ public StageBulkWriter(StageBulkWriterParam bulkWriterParam) throws IOException } - private StageOperation initStageWriterParams(StageBulkWriterParam bulkWriterParam) throws IOException { - StageOperationParam stageWriterParam = StageOperationParam.newBuilder() + private StageFileManager initStageFileManagerParams(StageBulkWriterParam bulkWriterParam) throws IOException { + StageFileManagerParam stageFileManagerParam = StageFileManagerParam.newBuilder() .withCloudEndpoint(bulkWriterParam.getCloudEndpoint()).withApiKey(bulkWriterParam.getApiKey()) - .withStageName(bulkWriterParam.getStageName()).withPath(remotePath) + .withStageName(bulkWriterParam.getStageName()) .build(); - return new StageOperation(stageWriterParam); + return new StageFileManager(stageFileManagerParam); } @Override @@ -87,8 +88,8 @@ public List> getBatchFiles() { return remoteFiles; } - public StageUploadResult getStageUploadResult() { - return StageUploadResult.builder() + public UploadFilesResult getStageUploadResult() { + return UploadFilesResult.builder() .stageName(stageBulkWriterParam.getStageName()) .path(remotePath) .build(); @@ -175,8 +176,13 @@ private void serialImportData(List fileList) { private void uploadObject(String filePath, String objectName) throws Exception { logger.info(String.format("Prepare to upload %s to %s", filePath, objectName)); - stageWriter.uploadFileToStage(filePath); + UploadFilesRequest uploadFilesRequest = UploadFilesRequest.builder() + .sourceFilePath(filePath).targetStagePath(remotePath) + .build(); + + stageFileManager.uploadFilesAsync(uploadFilesRequest).get(); logger.info(String.format("Upload file %s to %s", filePath, objectName)); + } private static String generatorLocalPath() { diff --git a/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/StageFileManager.java b/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/StageFileManager.java new file mode 100644 index 000000000..016fb16f1 --- /dev/null +++ b/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/StageFileManager.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.milvus.bulkwriter; + +import com.google.gson.Gson; +import io.milvus.bulkwriter.common.utils.FileUtils; +import io.milvus.bulkwriter.model.UploadFilesResult; +import io.milvus.bulkwriter.request.stage.ApplyStageRequest; +import io.milvus.bulkwriter.request.stage.UploadFilesRequest; +import io.milvus.bulkwriter.response.ApplyStageResponse; +import io.milvus.bulkwriter.restful.DataStageUtils; +import io.milvus.bulkwriter.storage.StorageClient; +import io.milvus.bulkwriter.storage.client.MinioStorageClient; +import io.milvus.exception.ParamException; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Instant; +import java.util.Date; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +public class StageFileManager { + private static final Logger logger = LoggerFactory.getLogger(StageFileManager.class); + private final String cloudEndpoint; + private final String apiKey; + private final String stageName; + private final ExecutorService executor; + + private StorageClient storageClient; + private ApplyStageResponse applyStageResponse; + + public StageFileManager(StageFileManagerParam stageWriterParam) { + this.cloudEndpoint = stageWriterParam.getCloudEndpoint(); + this.apiKey = stageWriterParam.getApiKey(); + this.stageName = stageWriterParam.getStageName(); + this.executor = Executors.newFixedThreadPool(20); + } + + /** + * Asynchronously uploads a local file or directory to the specified path within the Stage. + * + * @param request the upload request containing the source local file or directory path + * and the target directory path in the Stage {@link UploadFilesRequest} + * @return a {@link CompletableFuture} that completes with an {@link UploadFilesResult} + * once all files have been uploaded successfully + * @throws CompletionException if an error occurs during the upload process + */ + public CompletableFuture uploadFilesAsync(UploadFilesRequest request) { + String localDirOrFilePath = request.getSourceFilePath(); + Pair, Long> localPathPair = FileUtils.processLocalPath(localDirOrFilePath); + String stagePath = convertDirPath(request.getTargetStagePath()); + + refreshStageAndClient(stagePath); + initValidator(localPathPair); + + AtomicInteger currentFileCount = new AtomicInteger(0); + AtomicLong processedBytes = new AtomicLong(0); + long totalBytes = localPathPair.getValue(); + long totalFilesCount = localPathPair.getKey().size(); + long startTime = System.currentTimeMillis(); + + return CompletableFuture.allOf(localPathPair.getKey().stream() + .map(localFilePath -> CompletableFuture.runAsync(() -> { + File file = new File(localFilePath); + long fileStartTime = System.currentTimeMillis(); + + try { + uploadLocalFileToStage(localFilePath, localDirOrFilePath, stagePath); + long bytes = processedBytes.addAndGet(file.length()); + int completeCount = currentFileCount.incrementAndGet(); + long elapsed = System.currentTimeMillis() - fileStartTime; + double percent = totalBytes == 0 ? 100.0 : (bytes * 100.0 / totalBytes); + logger.info("Uploaded file {}/{}: {} ({} bytes) elapsed:{} ms, progress(total bytes): {}/{} bytes, progress(total percentage):{}%", + completeCount, totalFilesCount, localFilePath, file.length(), elapsed, bytes, totalBytes, String.format("%.2f", percent)); + } catch (Exception e) { + logger.error("Upload failed: {}", localFilePath, e); + throw new CompletionException(e); + } + }, executor)).toArray(CompletableFuture[]::new)) + .whenComplete((v, t) -> { + }) + .thenApply(v -> { + long totalElapsed = (System.currentTimeMillis() - startTime) / 1000; + logger.info("all files in {} has been async uploaded to stage, stageName:{}, stagePath:{}, totalFileCount:{}, totalFileSize:{}, cost times:{} s", + localDirOrFilePath, applyStageResponse.getStageName(), stagePath, localPathPair.getKey().size(), localPathPair.getValue(), totalElapsed); + return UploadFilesResult.builder() + .stageName(applyStageResponse.getStageName()) + .path(stagePath) + .build(); + }); + } + + /** + * Gracefully shuts down the internal thread pool executor. + *

+ * This method attempts to stop accepting new tasks and waits for existing + * tasks to complete within a timeout period. If tasks do not finish within + * the timeout, it will forcefully shut down the executor. + *

+ * + * Usage recommendation: + *
    + *
  • Call this method when the StageFileManager is no longer needed.
  • + *
+ * + * Thread interruption is respected, and the interrupt status is restored if interrupted during shutdown. + */ + public void shutdownGracefully() { + executor.shutdown(); + try { + if (!executor.awaitTermination(10, TimeUnit.SECONDS)) { + logger.warn("Executor didn't terminate in time, forcing shutdown..."); + executor.shutdownNow(); + } + } catch (InterruptedException e) { + logger.error("Interrupted while waiting for executor to shutdown", e); + executor.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + + private void initValidator(Pair, Long> localPathPair) { + if (localPathPair.getValue() > applyStageResponse.getCondition().getMaxContentLength()) { + String msg = String.format("localFileTotalSize %s exceeds the maximum contentLength limit %s defined in the condition. If you want to upload larger files, please contact us to lift the restriction", localPathPair.getValue(), applyStageResponse.getCondition().getMaxContentLength()); + logger.error(msg); + throw new ParamException(msg); + } + } + + private void refreshStageAndClient(String path) { + logger.info("refreshing Stage info..."); + ApplyStageRequest applyStageRequest = ApplyStageRequest.builder() + .apiKey(apiKey) + .stageName(stageName) + .path(path) + .build(); + String result = DataStageUtils.applyStage(cloudEndpoint, applyStageRequest); + applyStageResponse = new Gson().fromJson(result, ApplyStageResponse.class); + logger.info("stage info refreshed"); + + storageClient = MinioStorageClient.getStorageClient( + applyStageResponse.getCloud(), + applyStageResponse.getEndpoint(), + applyStageResponse.getCredentials().getTmpAK(), + applyStageResponse.getCredentials().getTmpSK(), + applyStageResponse.getCredentials().getSessionToken(), + applyStageResponse.getRegion(), null); + logger.info("storage client refreshed"); + } + + private String convertDirPath(String inputPath) { + if (StringUtils.isEmpty(inputPath) || inputPath.equals("/")) { + return ""; + } + if (inputPath.endsWith("/")) { + return inputPath; + } + return inputPath + "/"; + } + + private void uploadLocalFileToStage(String localFilePath, String rootPath, String stagePath) { + File file = new File(localFilePath); + Path filePath = file.toPath().toAbsolutePath(); + Path root = Paths.get(rootPath).toAbsolutePath(); + + String relativePath; + if (root.toFile().isFile()) { + relativePath = file.getName(); + } else { + relativePath = root.relativize(filePath).toString().replace("\\", "/"); + } + + String remoteFilePath = applyStageResponse.getStagePrefix() + stagePath + relativePath; + putObjectWithRetry(file, remoteFilePath, stagePath); + } + + private void putObjectWithRetry(File file, String remoteFilePath, String stagePath) { + refreshIfExpire(stagePath); + String msg = "upload " + file.getAbsolutePath(); + withRetry(msg, () -> { + try { + storageClient.putObject(file, applyStageResponse.getBucketName(), remoteFilePath); + } catch (Exception e) { + throw new RuntimeException(e); + } + }, stagePath); + + } + + private void refreshIfExpire(String stagePath) { + Instant instant = Instant.parse(applyStageResponse.getCredentials().getExpireTime()); + Date expireTime = Date.from(instant); + if (new Date().after(expireTime)) { + synchronized (this) { + if (new Date().after(expireTime)) { + refreshStageAndClient(stagePath); + } + } + } + } + + private T withRetry(String actionName, Callable callable, String stagePath) { + final int maxRetries = 5; + int attempt = 0; + while (attempt < maxRetries) { + try { + return callable.call(); + } catch (Exception e) { + attempt++; + refreshStageAndClient(stagePath); + logger.warn("Attempt {} failed to {}", attempt, actionName, e); + if (attempt == maxRetries) { + throw new RuntimeException(actionName + " failed after " + maxRetries + " attempts", e); + } + try { + Thread.sleep(5000L); + } catch (InterruptedException ignored) { + } + } + } + throw new RuntimeException(actionName + " failed unexpectedly."); + } + + private void withRetry(String actionName, Runnable runnable, String stagePath) { + withRetry(actionName, () -> { + runnable.run(); + return null; + }, stagePath); + } + + +} \ No newline at end of file diff --git a/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/StageOperationParam.java b/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/StageFileManagerParam.java similarity index 75% rename from sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/StageOperationParam.java rename to sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/StageFileManagerParam.java index 2d390d899..927e5d8da 100644 --- a/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/StageOperationParam.java +++ b/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/StageFileManagerParam.java @@ -27,21 +27,19 @@ import org.jetbrains.annotations.NotNull; /** - * Parameters for bulkWriter interface. + * Parameters for stageFileManager interface. */ @Getter @ToString -public class StageOperationParam { +public class StageFileManagerParam { private final String cloudEndpoint; private final String apiKey; private final String stageName; - private final String path; - private StageOperationParam(@NonNull Builder builder) { + private StageFileManagerParam(@NonNull Builder builder) { this.cloudEndpoint = builder.cloudEndpoint; this.apiKey = builder.apiKey; this.stageName = builder.stageName; - this.path = builder.path; } public static Builder newBuilder() { @@ -49,7 +47,7 @@ public static Builder newBuilder() { } /** - * Builder for {@link StageOperationParam} class. + * Builder for {@link StageFileManagerParam} class. */ public static final class Builder { private String cloudEndpoint; @@ -58,11 +56,14 @@ public static final class Builder { private String stageName; - private String path; - private Builder() { } + /** + * The value of the URL is fixed. + * For overseas regions, it is: https://api.cloud.zilliz.com + * For regions in China, it is: https://api.cloud.zilliz.com.cn + */ public Builder withCloudEndpoint(@NotNull String cloudEndpoint) { this.cloudEndpoint = cloudEndpoint; return this; @@ -79,25 +80,16 @@ public Builder withStageName(@NotNull String stageName) { } /** - * Sets the path - * If specify the value, will use the path of the upload file - */ - public Builder withPath(String path) { - this.path = path; - return this; - } - - /** - * Verifies parameters and creates a new {@link StageOperationParam} instance. + * Verifies parameters and creates a new {@link StageFileManagerParam} instance. * - * @return {@link StageOperationParam} + * @return {@link StageFileManagerParam} */ - public StageOperationParam build() throws ParamException { + public StageFileManagerParam build() throws ParamException { ParamUtils.CheckNullEmptyString(cloudEndpoint, "cloudEndpoint"); ParamUtils.CheckNullEmptyString(apiKey, "apiKey"); ParamUtils.CheckNullEmptyString(stageName, "stageName"); - return new StageOperationParam(this); + return new StageFileManagerParam(this); } } diff --git a/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/StageManager.java b/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/StageManager.java new file mode 100644 index 000000000..afbeae951 --- /dev/null +++ b/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/StageManager.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.milvus.bulkwriter; + +import com.google.gson.Gson; +import io.milvus.bulkwriter.request.stage.CreateStageRequest; +import io.milvus.bulkwriter.request.stage.DeleteStageRequest; +import io.milvus.bulkwriter.request.stage.ListStagesRequest; +import io.milvus.bulkwriter.response.stage.ListStagesResponse; +import io.milvus.bulkwriter.restful.DataStageUtils; + +public class StageManager { + private final String cloudEndpoint; + private final String apiKey; + + public StageManager(StageManagerParam stageWriterParam) { + cloudEndpoint = stageWriterParam.getCloudEndpoint(); + apiKey = stageWriterParam.getApiKey(); + } + + /** + * Create a stage under the specified project and regionId. + */ + public void createStage(CreateStageRequest request) { + DataStageUtils.createStage(cloudEndpoint, apiKey, request); + } + + /** + * Delete a stage. + */ + public void deleteStage(DeleteStageRequest request) { + DataStageUtils.deleteStage(cloudEndpoint, apiKey, request); + } + + /** + * Paginated query of the stage list under a specified projectId. + */ + public ListStagesResponse listStages(ListStagesRequest request) { + String result = DataStageUtils.listStages(cloudEndpoint, apiKey, request); + return new Gson().fromJson(result, ListStagesResponse.class); + } +} \ No newline at end of file diff --git a/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/StageManagerParam.java b/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/StageManagerParam.java new file mode 100644 index 000000000..0145499d7 --- /dev/null +++ b/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/StageManagerParam.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.milvus.bulkwriter; + +import io.milvus.exception.ParamException; +import io.milvus.param.ParamUtils; +import lombok.Getter; +import lombok.NonNull; +import lombok.ToString; +import org.jetbrains.annotations.NotNull; + +/** + * Parameters for stageManager interface. + */ +@Getter +@ToString +public class StageManagerParam { + private final String cloudEndpoint; + private final String apiKey; + + private StageManagerParam(@NonNull Builder builder) { + this.cloudEndpoint = builder.cloudEndpoint; + this.apiKey = builder.apiKey; + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Builder for {@link StageManagerParam} class. + */ + public static final class Builder { + private String cloudEndpoint; + + private String apiKey; + + private Builder() { + } + + /** + * The value of the URL is fixed. + * For overseas regions, it is: https://api.cloud.zilliz.com + * For regions in China, it is: https://api.cloud.zilliz.com.cn + */ + public Builder withCloudEndpoint(@NotNull String cloudEndpoint) { + this.cloudEndpoint = cloudEndpoint; + return this; + } + + public Builder withApiKey(@NotNull String apiKey) { + this.apiKey = apiKey; + return this; + } + + /** + * Verifies parameters and creates a new {@link StageManagerParam} instance. + * + * @return {@link StageManagerParam} + */ + public StageManagerParam build() throws ParamException { + ParamUtils.CheckNullEmptyString(cloudEndpoint, "cloudEndpoint"); + ParamUtils.CheckNullEmptyString(apiKey, "apiKey"); + + return new StageManagerParam(this); + } + } + +} diff --git a/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/StageOperation.java b/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/StageOperation.java deleted file mode 100644 index 4fd5edb60..000000000 --- a/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/StageOperation.java +++ /dev/null @@ -1,201 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.milvus.bulkwriter; - -import com.google.gson.Gson; -import io.milvus.bulkwriter.common.utils.FileUtils; -import io.milvus.bulkwriter.model.StageUploadResult; -import io.milvus.bulkwriter.request.stage.ApplyStageRequest; -import io.milvus.bulkwriter.response.ApplyStageResponse; -import io.milvus.bulkwriter.restful.DataStageUtils; -import io.milvus.bulkwriter.storage.StorageClient; -import io.milvus.bulkwriter.storage.client.MinioStorageClient; -import io.milvus.exception.ParamException; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.tuple.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.time.Instant; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - -public class StageOperation { - private static final Logger logger = LoggerFactory.getLogger(StageOperation.class); - private final String cloudEndpoint; - private final String apiKey; - private final String stageName; - private Pair, Long> localPathPair; - private final String path; - - private StorageClient storageClient; - private ApplyStageResponse applyStageResponse; - - public StageOperation(StageOperationParam stageWriterParam) throws IOException { - cloudEndpoint = stageWriterParam.getCloudEndpoint(); - apiKey = stageWriterParam.getApiKey(); - stageName = stageWriterParam.getStageName(); - path = convertDirPath(stageWriterParam.getPath()); - - refreshStageAndClient(); - } - - public StageUploadResult uploadFileToStage(String localDirOrFilePath) throws Exception { - localPathPair = FileUtils.processLocalPath(localDirOrFilePath); - initValidator(); - - logger.info("begin to upload file to stage, localDirOrFilePath:{}, fileCount:{} to stageName:{}, stagePath:{}", localDirOrFilePath, localPathPair.getKey().size(), applyStageResponse.getStageName(), path); - long startTime = System.currentTimeMillis(); - - int concurrency = 20; // 并发线程数 - ExecutorService executor = Executors.newFixedThreadPool(concurrency); - AtomicInteger currentFileCount = new AtomicInteger(0); - long totalFiles = localPathPair.getKey().size(); - AtomicLong processedBytes = new AtomicLong(0); - long totalBytes = localPathPair.getValue(); - - List> futures = new ArrayList<>(); - for (String localFilePath : localPathPair.getKey()) { - futures.add(executor.submit(() -> { - long current = currentFileCount.incrementAndGet(); - File file = new File(localFilePath); - long fileStartTime = System.currentTimeMillis(); - try { - uploadLocalFileToStage(localFilePath, localDirOrFilePath); - long bytes = processedBytes.addAndGet(file.length()); - long elapsed = System.currentTimeMillis() - fileStartTime; - double percent = totalBytes == 0 ? 100.0 : (bytes * 100.0 / totalBytes); - logger.info("Uploaded file {}/{}: {} ({} bytes) elapsed:{} ms, progress(total bytes): {}/{} bytes, progress(total percentage):{}%", - current, totalFiles, localFilePath, file.length(), elapsed, bytes, totalBytes, String.format("%.2f", percent)); - } catch (Exception e) { - logger.error("Upload failed for file: {}", localFilePath, e); - } - })); - } - - for (Future f : futures) { - f.get(); - } - executor.shutdown(); - - long totalElapsed = (System.currentTimeMillis() - startTime) / 1000; - logger.info("all files in {} has been uploaded to stage, stageName:{}, stagePath:{}, totalFileCount:{}, totalFileSize:{}, cost times:{} s", - localDirOrFilePath, applyStageResponse.getStageName(), path, localPathPair.getKey().size(), localPathPair.getValue(), totalElapsed); - return StageUploadResult.builder().stageName(applyStageResponse.getStageName()).path(path).build(); - } - - private void initValidator() { - if (localPathPair.getValue() > applyStageResponse.getCondition().getMaxContentLength()) { - String msg = String.format("localFileTotalSize %s exceeds the maximum contentLength limit %s defined in the condition. If you want to upload larger files, please contact us to lift the restriction", localPathPair.getValue(), applyStageResponse.getCondition().getMaxContentLength()); - logger.error(msg); - throw new ParamException(msg); - } - } - - private void refreshStageAndClient() { - logger.info("refreshing Stage info..."); - ApplyStageRequest applyStageRequest = ApplyStageRequest.builder() - .apiKey(apiKey) - .stageName(stageName) - .path(path) - .build(); - String result = DataStageUtils.applyStage(cloudEndpoint, applyStageRequest); - applyStageResponse = new Gson().fromJson(result, ApplyStageResponse.class); - logger.info("stage info refreshed"); - - storageClient = MinioStorageClient.getStorageClient( - applyStageResponse.getCloud(), - applyStageResponse.getEndpoint(), - applyStageResponse.getCredentials().getTmpAK(), - applyStageResponse.getCredentials().getTmpSK(), - applyStageResponse.getCredentials().getSessionToken(), - applyStageResponse.getRegion(), null); - logger.info("storage client refreshed"); - } - - private String convertDirPath(String inputPath) { - if (StringUtils.isEmpty(inputPath) || inputPath.endsWith("/")) { - return inputPath; - } - return inputPath + "/"; - } - - private String uploadLocalFileToStage(String localFilePath, String rootPath) throws Exception { - File file = new File(localFilePath); - Path filePath = file.toPath().toAbsolutePath(); - Path root = Paths.get(rootPath).toAbsolutePath(); - - String relativePath; - if (root.toFile().isFile()) { - relativePath = file.getName(); - } else { - relativePath = root.relativize(filePath).toString().replace("\\", "/"); - } - - String remoteFilePath = applyStageResponse.getUploadPath() + relativePath; - putObject(file, remoteFilePath); - return remoteFilePath; - } - - private void putObject(File file, String remoteFilePath) throws Exception { - Instant instant = Instant.parse(applyStageResponse.getCredentials().getExpireTime()); - Date expireTime = Date.from(instant); - if (new Date().after(expireTime)) { - synchronized (this) { - if (new Date().after(expireTime)) { - refreshStageAndClient(); - } - } - } - uploadWithRetry(file, remoteFilePath); - } - - private void uploadWithRetry(File file, String remoteFilePath) { - final int maxRetries = 3; - int attempt = 0; - while (attempt < maxRetries) { - try { - storageClient.putObject(file, applyStageResponse.getBucketName(), remoteFilePath); - return; - } catch (Exception e) { - attempt++; - refreshStageAndClient(); - logger.warn("Attempt {} failed to upload {}", attempt, file.getAbsolutePath(), e); - if (attempt == maxRetries) { - throw new RuntimeException("Upload failed after " + maxRetries + " attempts", e); - } - try { - Thread.sleep(5000L); - } catch (InterruptedException ignored) { - } - } - } - } -} \ No newline at end of file diff --git a/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/model/StageUploadResult.java b/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/model/UploadFilesResult.java similarity index 88% rename from sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/model/StageUploadResult.java rename to sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/model/UploadFilesResult.java index 006a42e06..43fe1de0f 100644 --- a/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/model/StageUploadResult.java +++ b/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/model/UploadFilesResult.java @@ -9,7 +9,7 @@ @Builder @AllArgsConstructor @NoArgsConstructor -public class StageUploadResult { +public class UploadFilesResult { private String stageName; private String path; } diff --git a/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/request/stage/ApplyStageRequest.java b/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/request/stage/ApplyStageRequest.java index e036a8eca..8ddeba45c 100644 --- a/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/request/stage/ApplyStageRequest.java +++ b/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/request/stage/ApplyStageRequest.java @@ -30,6 +30,5 @@ @NoArgsConstructor public class ApplyStageRequest extends BaseStageRequest { private String stageName; - private String path; } diff --git a/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/request/stage/CreateStageRequest.java b/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/request/stage/CreateStageRequest.java new file mode 100644 index 000000000..9420ea13b --- /dev/null +++ b/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/request/stage/CreateStageRequest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.milvus.bulkwriter.request.stage; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +@Data +@SuperBuilder +@AllArgsConstructor +@NoArgsConstructor +public class CreateStageRequest { + private String projectId; + private String regionId; + private String stageName; +} diff --git a/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/request/stage/DeleteStageRequest.java b/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/request/stage/DeleteStageRequest.java new file mode 100644 index 000000000..f90d3d19c --- /dev/null +++ b/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/request/stage/DeleteStageRequest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.milvus.bulkwriter.request.stage; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +@Data +@SuperBuilder +@AllArgsConstructor +@NoArgsConstructor +public class DeleteStageRequest { + private String stageName; +} diff --git a/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/request/stage/ListStagesRequest.java b/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/request/stage/ListStagesRequest.java new file mode 100644 index 000000000..6b24cd60e --- /dev/null +++ b/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/request/stage/ListStagesRequest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.milvus.bulkwriter.request.stage; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +@Data +@SuperBuilder +@AllArgsConstructor +@NoArgsConstructor +public class ListStagesRequest { + private String projectId; + private Integer pageSize; + private Integer currentPage; +} diff --git a/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/request/stage/UploadFilesRequest.java b/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/request/stage/UploadFilesRequest.java new file mode 100644 index 000000000..d749d12d4 --- /dev/null +++ b/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/request/stage/UploadFilesRequest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.milvus.bulkwriter.request.stage; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +@Data +@SuperBuilder +@AllArgsConstructor +@NoArgsConstructor +public class UploadFilesRequest { + /** + * The full path of a local file or directory: + * If it is a file, please include the file name, e.g., /Users/zilliz/data/1.parquet + * If it is a directory, please end the path with a /, e.g., /Users/zilliz/data/ + */ + private String sourceFilePath; + + /** + * The target stage directory path: + * Leave it empty to upload to the root directory. + * To upload to a specific folder, end the path with a /, e.g., data/ + */ + private String targetStagePath; +} diff --git a/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/response/ApplyStageResponse.java b/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/response/ApplyStageResponse.java index 532c84200..de13d981f 100644 --- a/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/response/ApplyStageResponse.java +++ b/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/response/ApplyStageResponse.java @@ -30,6 +30,8 @@ public class ApplyStageResponse implements Serializable { private String stageName; + private String stagePrefix; + @AllArgsConstructor @NoArgsConstructor @Data diff --git a/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/response/stage/ListStagesResponse.java b/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/response/stage/ListStagesResponse.java new file mode 100644 index 000000000..ca045ebb1 --- /dev/null +++ b/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/response/stage/ListStagesResponse.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.milvus.bulkwriter.response.stage; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.List; + +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +public class ListStagesResponse { + + private Integer count; + + private Integer currentPage; + + private Integer pageSize; + + private List stages; +} diff --git a/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/response/stage/StageInfo.java b/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/response/stage/StageInfo.java new file mode 100644 index 000000000..84899f7f4 --- /dev/null +++ b/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/response/stage/StageInfo.java @@ -0,0 +1,14 @@ +package io.milvus.bulkwriter.response.stage; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +public class StageInfo { + private String stageName; +} diff --git a/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/restful/BaseRestful.java b/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/restful/BaseRestful.java index 32efd3f50..50db290fb 100644 --- a/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/restful/BaseRestful.java +++ b/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/restful/BaseRestful.java @@ -46,6 +46,24 @@ protected static String postRequest(String url, String apiKey, Map params, int timeout) { + try { + setDefaultOptionsIfCallCloud(params, apiKey); + kong.unirest.HttpResponse response = Unirest.delete(url) + .connectTimeout(timeout) + .headers(httpHeaders(apiKey)) + .asString(); + if (response.getStatus() != 200) { + ExceptionUtils.throwUnExpectedException(String.format("Failed to delete url: %s, status code: %s, msg: %s", url, response.getStatus(), response.getBody())); + } else { + return response.getBody(); + } + } catch (Exception e) { + ExceptionUtils.throwUnExpectedException(String.format("Failed to delete url: %s, error: %s", url, e)); + } + return null; + } + protected static String getRequest(String url, String apiKey, Map params, int timeout) { try { kong.unirest.HttpResponse response = Unirest.get(url) diff --git a/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/restful/DataStageUtils.java b/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/restful/DataStageUtils.java index f92e8ecb6..fde3d67d7 100644 --- a/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/restful/DataStageUtils.java +++ b/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/restful/DataStageUtils.java @@ -22,6 +22,9 @@ import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import io.milvus.bulkwriter.request.stage.BaseStageRequest; +import io.milvus.bulkwriter.request.stage.CreateStageRequest; +import io.milvus.bulkwriter.request.stage.DeleteStageRequest; +import io.milvus.bulkwriter.request.stage.ListStagesRequest; import io.milvus.bulkwriter.response.RestfulResponse; import io.milvus.common.utils.JsonUtils; @@ -37,4 +40,32 @@ public static String applyStage(String url, BaseStageRequest request) { handleResponse(requestURL, response); return new Gson().toJson(response.getData()); } + + public static String listStages(String url, String apiKey, ListStagesRequest request) { + String requestURL = url + "/v2/stages"; + + Map params = JsonUtils.fromJson(JsonUtils.toJson(request), new TypeToken>() {}.getType()); + String body = getRequest(requestURL, apiKey, params, 60 * 1000); + RestfulResponse response = JsonUtils.fromJson(body, new TypeToken>(){}.getType()); + handleResponse(requestURL, response); + return new Gson().toJson(response.getData()); + } + + public static void createStage(String url, String apiKey, CreateStageRequest request) { + String requestURL = url + "/v2/stages/create"; + + Map params = JsonUtils.fromJson(JsonUtils.toJson(request), new TypeToken>() {}.getType()); + String body = postRequest(requestURL, apiKey, params, 60 * 1000); + RestfulResponse response = JsonUtils.fromJson(body, new TypeToken>(){}.getType()); + handleResponse(requestURL, response); + } + + public static void deleteStage(String url, String apiKey, DeleteStageRequest request) { + String requestURL = url + "/v2/stages/" + request.getStageName(); + + Map params = JsonUtils.fromJson(JsonUtils.toJson(request), new TypeToken>() {}.getType()); + String body = deleteRequest(requestURL, apiKey, params, 60 * 1000); + RestfulResponse response = JsonUtils.fromJson(body, new TypeToken>(){}.getType()); + handleResponse(requestURL, response); + } } diff --git a/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/storage/client/AzureStorageClient.java b/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/storage/client/AzureStorageClient.java index 82e9dc93f..b95131a49 100644 --- a/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/storage/client/AzureStorageClient.java +++ b/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/storage/client/AzureStorageClient.java @@ -78,4 +78,5 @@ public boolean checkBucketExist(String bucketName) { BlobContainerClient blobContainerClient = blobServiceClient.getBlobContainerClient(bucketName); return blobContainerClient.exists(); } + }