99import com .zilliz .milvustestv2 .utils .JsonObjectUtil ;
1010import com .zilliz .milvustestv2 .utils .MathUtil ;
1111import com .zilliz .milvustestv2 .utils .PropertyFilesUtil ;
12+ import io .milvus .bulkwriter .LocalBulkWriter ;
13+ import io .milvus .bulkwriter .LocalBulkWriterParam ;
1214import io .milvus .bulkwriter .RemoteBulkWriter ;
1315import io .milvus .bulkwriter .RemoteBulkWriterParam ;
1416import io .milvus .bulkwriter .common .clientenum .BulkFileType ;
3234import io .milvus .v2 .service .vector .request .data .*;
3335import io .milvus .v2 .service .vector .response .InsertResp ;
3436import io .milvus .v2 .service .vector .response .SearchResp ;
37+ import io .minio .BucketExistsArgs ;
38+ import io .minio .MakeBucketArgs ;
39+ import io .minio .MinioClient ;
40+ import io .minio .UploadObjectArgs ;
41+ import io .minio .errors .*;
3542import lombok .NonNull ;
3643import lombok .extern .slf4j .Slf4j ;
3744
3845import javax .annotation .Nullable ;
3946import java .io .IOException ;
4047import java .nio .ByteBuffer ;
48+ import java .security .InvalidKeyException ;
49+ import java .security .NoSuchAlgorithmException ;
4150import java .util .*;
4251import java .util .stream .Collectors ;
4352
@@ -266,6 +275,114 @@ public static String createNewCollection(int dim, String collectionName, DataTyp
266275 log .info ("create collection:" + collectionName );
267276 return collectionName ;
268277 }
278+ public static String createNewCollectionWithDynamic (int dim , String collectionName , DataType vectorType ) {
279+ if (collectionName == null || collectionName .equals ("" )) {
280+ collectionName = "Collection_" + GenerateUtil .getRandomString (10 );
281+ }
282+ CreateCollectionReq .FieldSchema fieldInt64 = CreateCollectionReq .FieldSchema .builder ()
283+ .autoID (false )
284+ .dataType (io .milvus .v2 .common .DataType .Int64 )
285+ .isPrimaryKey (true )
286+ .name (CommonData .fieldInt64 )
287+ .build ();
288+ CreateCollectionReq .FieldSchema fieldInt32 = CreateCollectionReq .FieldSchema .builder ()
289+ .dataType (DataType .Int32 )
290+ .name (CommonData .fieldInt32 )
291+ .isPrimaryKey (false )
292+ .build ();
293+ CreateCollectionReq .FieldSchema fieldInt16 = CreateCollectionReq .FieldSchema .builder ()
294+ .dataType (DataType .Int16 )
295+ .name (CommonData .fieldInt16 )
296+ .isPrimaryKey (false )
297+ .build ();
298+ CreateCollectionReq .FieldSchema fieldInt8 = CreateCollectionReq .FieldSchema .builder ()
299+ .dataType (DataType .Int8 )
300+ .name (CommonData .fieldInt8 )
301+ .isPrimaryKey (false )
302+ .build ();
303+ CreateCollectionReq .FieldSchema fieldDouble = CreateCollectionReq .FieldSchema .builder ()
304+ .dataType (DataType .Double )
305+ .name (CommonData .fieldDouble )
306+ .isPrimaryKey (false )
307+ .build ();
308+ CreateCollectionReq .FieldSchema fieldArray = CreateCollectionReq .FieldSchema .builder ()
309+ .dataType (DataType .Array )
310+ .name (CommonData .fieldArray )
311+ .elementType (DataType .Int64 )
312+ .maxCapacity (100 )
313+ .isPrimaryKey (false )
314+ .build ();
315+ CreateCollectionReq .FieldSchema fieldBool = CreateCollectionReq .FieldSchema .builder ()
316+ .dataType (DataType .Bool )
317+ .name (CommonData .fieldBool )
318+ .isPrimaryKey (false )
319+ .build ();
320+ CreateCollectionReq .FieldSchema fieldVarchar = CreateCollectionReq .FieldSchema .builder ()
321+ .dataType (DataType .VarChar )
322+ .name (CommonData .fieldVarchar )
323+ .isPrimaryKey (false )
324+ .maxLength (100 )
325+ .build ();
326+ CreateCollectionReq .FieldSchema fieldFloat = CreateCollectionReq .FieldSchema .builder ()
327+ .dataType (DataType .Float )
328+ .name (CommonData .fieldFloat )
329+ .isPrimaryKey (false )
330+ .build ();
331+ CreateCollectionReq .FieldSchema fieldJson = CreateCollectionReq .FieldSchema .builder ()
332+ .dataType (DataType .JSON )
333+ .name (CommonData .fieldJson )
334+ .isPrimaryKey (false )
335+ .build ();
336+ CreateCollectionReq .FieldSchema fieldVector = CreateCollectionReq .FieldSchema .builder ()
337+ .dataType (vectorType )
338+ .isPrimaryKey (false )
339+ .build ();
340+ if (vectorType == DataType .FloatVector ) {
341+ fieldVector .setDimension (dim );
342+ fieldVector .setName (CommonData .fieldFloatVector );
343+ }
344+ if (vectorType == DataType .BinaryVector ) {
345+ fieldVector .setDimension (dim );
346+ fieldVector .setName (CommonData .fieldBinaryVector );
347+ }
348+ if (vectorType == DataType .Float16Vector ) {
349+ fieldVector .setDimension (dim );
350+ fieldVector .setName (CommonData .fieldFloat16Vector );
351+ }
352+ if (vectorType == DataType .BFloat16Vector ) {
353+ fieldVector .setDimension (dim );
354+ fieldVector .setName (CommonData .fieldBF16Vector );
355+ }
356+ if (vectorType == DataType .SparseFloatVector ) {
357+ fieldVector .setName (CommonData .fieldSparseVector );
358+ }
359+ List <CreateCollectionReq .FieldSchema > fieldSchemaList = new ArrayList <>();
360+ fieldSchemaList .add (fieldInt64 );
361+ fieldSchemaList .add (fieldInt32 );
362+ fieldSchemaList .add (fieldInt16 );
363+ fieldSchemaList .add (fieldInt8 );
364+ fieldSchemaList .add (fieldFloat );
365+ fieldSchemaList .add (fieldDouble );
366+ fieldSchemaList .add (fieldArray );
367+ fieldSchemaList .add (fieldBool );
368+ fieldSchemaList .add (fieldJson );
369+ fieldSchemaList .add (fieldVarchar );
370+ fieldSchemaList .add (fieldVector );
371+ CreateCollectionReq .CollectionSchema collectionSchema = CreateCollectionReq .CollectionSchema .builder ()
372+ .fieldSchemaList (fieldSchemaList )
373+ .enableDynamicField (true )
374+ .build ();
375+ CreateCollectionReq createCollectionReq = CreateCollectionReq .builder ()
376+ .collectionSchema (collectionSchema )
377+ .collectionName (collectionName )
378+ .enableDynamicField (true )
379+ .description ("collection desc" )
380+ .numShards (1 )
381+ .build ();
382+ milvusClientV2 .createCollection (createCollectionReq );
383+ log .info ("create collection with dynamic field:" + collectionName );
384+ return collectionName ;
385+ }
269386
270387 /**
271388 * 创建包含nullable列的collection
@@ -704,6 +821,67 @@ public static List<JsonObject> generateDefaultData(long startId, long num, int d
704821 return jsonList ;
705822 }
706823
824+ public static List <JsonObject > generateDefaultDataWithDynamic (long startId , long num , int dim , DataType vectorType ) {
825+ List <JsonObject > jsonList = new ArrayList <>();
826+ Random ran = new Random ();
827+ Gson gson = new Gson ();
828+ for (long i = startId ; i < (num + startId ); i ++) {
829+ JsonObject row = new JsonObject ();
830+ row .addProperty (CommonData .fieldInt64 , i );
831+ row .addProperty (CommonData .fieldInt32 , (int ) i % 32767 );
832+ row .addProperty (CommonData .fieldInt16 , (int ) i % 32767 );
833+ row .addProperty (CommonData .fieldInt8 , (short ) i % 127 );
834+ row .addProperty (CommonData .fieldDouble , (double ) i );
835+ row .add (CommonData .fieldArray , gson .toJsonTree (Arrays .asList (i , i + 1 , i + 2 )));
836+ row .addProperty (CommonData .fieldBool , i % 2 == 0 );
837+ row .addProperty (CommonData .fieldVarchar , "Str" + i );
838+ row .addProperty (CommonData .fieldFloat , (float ) i );
839+ // 判断vectorType
840+ if (vectorType == DataType .FloatVector ) {
841+ List <Float > vector = new ArrayList <>();
842+ for (int k = 0 ; k < dim ; ++k ) {
843+ vector .add (ran .nextFloat ());
844+ }
845+ row .add (CommonData .fieldFloatVector , gson .toJsonTree (vector ));
846+ }
847+ if (vectorType == DataType .BinaryVector ) {
848+ row .add (CommonData .fieldBinaryVector , gson .toJsonTree (generateBinaryVector (dim ).array ()));
849+ }
850+ if (vectorType == DataType .Float16Vector ) {
851+ row .add (CommonData .fieldFloat16Vector , gson .toJsonTree (generateFloat16Vector (dim ).array ()));
852+ }
853+ if (vectorType == DataType .BFloat16Vector ) {
854+ row .add (CommonData .fieldBF16Vector , gson .toJsonTree (generateBF16Vector (dim ).array ()));
855+ }
856+ if (vectorType == DataType .SparseFloatVector ) {
857+ row .add (CommonData .fieldSparseVector , gson .toJsonTree (generateSparseVector (dim )));
858+ }
859+
860+ JsonObject json = new JsonObject ();
861+ json .addProperty (CommonData .fieldInt64 , (int ) i % 32767 );
862+ json .addProperty (CommonData .fieldInt32 , (int ) i % 32767 );
863+ json .addProperty (CommonData .fieldDouble , (double ) i );
864+ json .add (CommonData .fieldArray , gson .toJsonTree (Arrays .asList (i , i + 1 , i + 2 )));
865+ json .addProperty (CommonData .fieldBool , i % 2 == 0 );
866+ json .addProperty (CommonData .fieldVarchar , "Str" + i );
867+ json .addProperty (CommonData .fieldFloat , (float ) i );
868+ row .add (CommonData .fieldJson , json );
869+ // dynamic field
870+ JsonObject jsonDynamic = new JsonObject ();
871+ json .addProperty (CommonData .fieldInt64 , (int ) i % 32767 );
872+ json .addProperty (CommonData .fieldInt32 , (int ) i % 32767 );
873+ json .addProperty (CommonData .fieldDouble , (double ) i );
874+ json .add (CommonData .fieldArray , gson .toJsonTree (Arrays .asList (i , i + 1 , i + 2 )));
875+ json .addProperty (CommonData .fieldBool , i % 2 == 0 );
876+ json .addProperty (CommonData .fieldVarchar , "Str" + i );
877+ json .addProperty (CommonData .fieldFloat , (float ) i );
878+ row .add (CommonData .fieldDynamic , json );
879+
880+ jsonList .add (row );
881+ }
882+ return jsonList ;
883+ }
884+
707885 /**
708886 * 为快速生成的collection提供导入数据
709887 *
@@ -1550,7 +1728,7 @@ public static List<List<String>> providerBatchFiles(String collection, BulkFileT
15501728 }
15511729 });
15521730 System .out .printf ("%s rows appends%n" , remoteBulkWriter .getTotalRowCount ());
1553- System .out .printf ("%s rows in buffer not flushed%n" , remoteBulkWriter .getBufferRowCount ());
1731+ System .out .printf ("%s rows in buffer not flushed%n" , remoteBulkWriter .getTotalRowCount ());
15541732 try {
15551733 remoteBulkWriter .commit (false );
15561734 } catch (InterruptedException e ) {
@@ -1562,7 +1740,8 @@ public static List<List<String>> providerBatchFiles(String collection, BulkFileT
15621740 }
15631741
15641742 /**
1565- * 为开源提供bulk writer
1743+ * 为开源提供 remote bulk writer
1744+ * RemoteBulkWriterParam = LocalBulkWriterParam + uploadObject + clearData
15661745 *
15671746 * @param collectionSchema
15681747 * @param bulkFileType
@@ -1581,7 +1760,7 @@ private static RemoteBulkWriter buildRemoteBulkWriter(CreateCollectionReq.Collec
15811760 .withCollectionSchema (collectionSchema )
15821761 .withRemotePath ("bulk_data" )
15831762 .withFileType (bulkFileType )
1584- .withChunkSize (512 * 1024 * 1024 )
1763+ .withChunkSize (5 * 1024 * 1024 * 1024L )
15851764 .withConnectParam (connectParam )
15861765 .build ();
15871766 RemoteBulkWriter remoteBulkWriter = null ;
@@ -1593,6 +1772,93 @@ private static RemoteBulkWriter buildRemoteBulkWriter(CreateCollectionReq.Collec
15931772 return remoteBulkWriter ;
15941773 }
15951774
1775+
1776+ private static LocalBulkWriter buildLocalBulkWriter (CreateCollectionReq .CollectionSchema collectionSchema , BulkFileType bulkFileType ) {
1777+ LocalBulkWriterParam bulkWriterParam = LocalBulkWriterParam .newBuilder ()
1778+ .withCollectionSchema (collectionSchema )
1779+ .withLocalPath ("/tmp/bulk_writer" )
1780+ .withFileType (bulkFileType )
1781+ .withChunkSize (5 * 1024 * 1024 * 1024L )
1782+ .build ();
1783+ LocalBulkWriter localBulkWriter ;
1784+ try {
1785+ localBulkWriter = new LocalBulkWriter (bulkWriterParam );
1786+ } catch (IOException e ) {
1787+ throw new RuntimeException (e );
1788+ }
1789+ return localBulkWriter ;
1790+ }
1791+
1792+ public static List <List <String >> providerLocalBatchFiles (String collection , BulkFileType bulkFileType , long count ) {
1793+ DescribeCollectionResp describeCollectionResp = milvusClientV2 .describeCollection (DescribeCollectionReq .builder ().collectionName (collection ).build ());
1794+ CreateCollectionReq .CollectionSchema collectionSchema = describeCollectionResp .getCollectionSchema ();
1795+ LocalBulkWriter localBulkWriter = buildLocalBulkWriter (collectionSchema , bulkFileType );
1796+ List <JsonObject > jsonObjects = CommonFunction .genCommonData (collection , count );
1797+ for (JsonObject jsonObject : jsonObjects ) {
1798+ try {
1799+ localBulkWriter .appendRow (jsonObject );
1800+ } catch (IOException | InterruptedException e ) {
1801+ throw new RuntimeException (e );
1802+ }
1803+ }
1804+ System .out .printf ("%s rows appends%n" , localBulkWriter .getTotalRowCount ());
1805+ System .out .printf ("%s rows in buffer not flushed%n" , localBulkWriter .getTotalRowCount ());
1806+
1807+ try {
1808+ localBulkWriter .commit (false );
1809+ } catch (InterruptedException e ) {
1810+ throw new RuntimeException (e );
1811+ }
1812+ List <List <String >> batchFiles = localBulkWriter .getBatchFiles ();
1813+ System .out .printf ("Local writer done! output remote files: %s%n" , batchFiles );
1814+ return batchFiles ;
1815+ }
1816+
1817+ // minio上传--copy from v1
1818+ public static void multiFilesUpload (String path , List <List <String >> batchFiles ) {
1819+
1820+ MinioClient minioClient =
1821+ MinioClient .builder ()
1822+ .endpoint (System .getProperty ("minio" ) == null ? PropertyFilesUtil .getRunValue ("minio" ) : System .getProperty ("minio" ))
1823+ .credentials ("minioadmin" , "minioadmin" )
1824+ .build ();
1825+ // Make 'jsonBucket' bucket if not exist.
1826+ boolean found = false ;
1827+ try {
1828+ found = minioClient .bucketExists (BucketExistsArgs .builder ().bucket ("milvus-bucket" ).build ());
1829+ if (!found ) {
1830+ // Make a new bucket called 'jsonBucket'.
1831+ minioClient .makeBucket (MakeBucketArgs .builder ().bucket ("milvus-bucket" ).build ());
1832+ } else {
1833+ System .out .println ("Bucket 'milvus-bucket' already exists." );
1834+ }
1835+
1836+ List <String > fileNameList =new ArrayList <>();
1837+ for (List <String > batchFileList : batchFiles ) {
1838+ fileNameList .addAll (batchFileList );
1839+ }
1840+ for (String fileName : fileNameList ) {
1841+ minioClient .uploadObject (
1842+ UploadObjectArgs .builder ()
1843+ .bucket ("milvus-bucket" )
1844+ .object ( fileName )
1845+ .filename ( fileName )
1846+ .build ());
1847+ System .out .println (
1848+ "'"
1849+ + path
1850+ + fileName
1851+ + "' is successfully uploaded as "
1852+ + "object '"
1853+ + fileName
1854+ + "' to bucket 'milvus-bucket'." );
1855+ }
1856+ } catch (Exception e ) {
1857+ System .out .println (e .getMessage ());
1858+ }
1859+
1860+ }
1861+
15961862}
15971863
15981864
0 commit comments