From aa8b71e6feb63f184a170a1cc0b79902e9e5760e Mon Sep 17 00:00:00 2001 From: yhmo Date: Mon, 7 Apr 2025 18:58:55 +0800 Subject: [PATCH] Add options parameter for bulk_import interface Signed-off-by: yhmo --- .../src/main/java/io/milvus/v2/BulkWriterExample.java | 11 +++++++++-- .../bulkwriter/request/import_/BaseImportRequest.java | 3 +++ 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/examples/src/main/java/io/milvus/v2/BulkWriterExample.java b/examples/src/main/java/io/milvus/v2/BulkWriterExample.java index c389a771d..15d1a1f17 100644 --- a/examples/src/main/java/io/milvus/v2/BulkWriterExample.java +++ b/examples/src/main/java/io/milvus/v2/BulkWriterExample.java @@ -198,7 +198,7 @@ private static void exampleAllTypesCollectionRemote(List fileTypes CreateCollectionReq.CollectionSchema collectionSchema = buildAllTypesSchema(); List> batchFiles = allTypesRemoteWriter(collectionSchema, fileType, rows); createCollection(ALL_TYPES_COLLECTION_NAME, collectionSchema, true); - callBulkInsert(collectionSchema, batchFiles); + callBulkInsert(batchFiles); verifyImportData(collectionSchema, originalData); } @@ -502,6 +502,7 @@ private static RemoteBulkWriter buildRemoteBulkWriter(CreateCollectionReq.Collec .withFileType(fileType) .withChunkSize(512 * 1024 * 1024) .withConnectParam(connectParam) + .withConfig("sep", "|") // only take effect for CSV file .build(); return new RemoteBulkWriter(bulkWriterParam); } @@ -573,12 +574,15 @@ public List toFloatArray() { } } - private static void callBulkInsert(CreateCollectionReq.CollectionSchema collectionSchema, List> batchFiles) throws InterruptedException { + private static void callBulkInsert(List> batchFiles) throws InterruptedException { String url = String.format("http://%s:%s", HOST, PORT); System.out.println("\n===================== import files to milvus ===================="); + Map options = new HashMap<>(); + options.put("sep", "|"); // this option only take effect for CSV MilvusImportRequest milvusImportRequest = MilvusImportRequest.builder() .collectionName(ALL_TYPES_COLLECTION_NAME) .files(batchFiles) + .options(options) .build(); String bulkImportResult = BulkImport.bulkImport(url, milvusImportRequest); System.out.println(bulkImportResult); @@ -760,6 +764,9 @@ private static void verifyImportData(CreateCollectionReq.CollectionSchema collec List results = query(expr, Lists.newArrayList("*")); System.out.println("Verify data..."); + if (results.size() != QUERY_IDS.size()) { + throw new RuntimeException("Result count is incorrect"); + } for (QueryResp.QueryResult result : results) { Map fetchedEntity = result.getEntity(); long id = (Long)fetchedEntity.get("id"); diff --git a/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/request/import_/BaseImportRequest.java b/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/request/import_/BaseImportRequest.java index 5ece0b4aa..481cc08dc 100644 --- a/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/request/import_/BaseImportRequest.java +++ b/sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/request/import_/BaseImportRequest.java @@ -25,6 +25,7 @@ import lombok.experimental.SuperBuilder; import java.io.Serializable; +import java.util.Map; @Data @SuperBuilder(toBuilder = true) @@ -36,4 +37,6 @@ public class BaseImportRequest implements Serializable { * If you are calling the cloud API, this parameter needs to be filled in; otherwise, you can ignore it. */ private String apiKey; + + private Map options; }