11package io .github .thunderz99 .cosmos ;
22
33import com .azure .cosmos .CosmosClient ;
4- import com .azure .cosmos .models .CosmosPatchOperations ;
4+ import com .azure .cosmos .CosmosContainer ;
5+ import com .azure .cosmos .models .*;
56import com .google .common .base .Preconditions ;
67import com .microsoft .azure .documentdb .*;
8+ import com .microsoft .azure .documentdb .PartitionKey ;
9+ import com .microsoft .azure .documentdb .SqlQuerySpec ;
710import io .github .thunderz99 .cosmos .condition .Aggregate ;
811import io .github .thunderz99 .cosmos .condition .Condition ;
12+ import io .github .thunderz99 .cosmos .dto .CosmosBulkResult ;
913import io .github .thunderz99 .cosmos .dto .PartialUpdateOption ;
1014import io .github .thunderz99 .cosmos .util .Checker ;
1115import io .github .thunderz99 .cosmos .util .JsonUtil ;
1216import io .github .thunderz99 .cosmos .util .RetryUtil ;
1317import io .github .thunderz99 .cosmos .v4 .PatchOperations ;
1418import org .apache .commons .collections4 .MapUtils ;
19+ import org .apache .commons .lang3 .ObjectUtils ;
1520import org .apache .commons .lang3 .StringUtils ;
21+ import org .apache .http .HttpStatus ;
22+ import org .jetbrains .annotations .NotNull ;
1623import org .json .JSONObject ;
1724import org .slf4j .Logger ;
1825import org .slf4j .LoggerFactory ;
@@ -33,6 +40,8 @@ public class CosmosDatabase {
3340
3441 private static Logger log = LoggerFactory .getLogger (CosmosDatabase .class );
3542
43+ static final int MAX_BATCH_NUMBER_OF_OPERATION = 100 ;
44+
3645 String db ;
3746
3847 String account ;
@@ -89,6 +98,294 @@ public CosmosDocument create(String coll, Object data, String partition) throws
8998 return new CosmosDocument (resource .toObject (JSONObject .class ));
9099 }
91100
101+ /**
102+ * Create batch documents in a single transaction.
103+ * Note: the maximum number of operations is 100.
104+ *
105+ * @param coll collection name
106+ * @param data data object
107+ * @param partition partition name
108+ * @return CosmosDocument instances
109+ * @throws Exception CosmosException
110+ */
111+ public List <CosmosDocument > batchCreate (String coll , List <?> data , String partition ) throws Exception {
112+ doCheckBeforeBatch (coll , data , partition );
113+
114+ var partitionKey = new com .azure .cosmos .models .PartitionKey (partition );
115+ var container = this .clientV4 .getDatabase (db ).getContainer (coll );
116+ CosmosBatch batch = CosmosBatch .createCosmosBatch (partitionKey );
117+ data .forEach (it -> {
118+ var map = JsonUtil .toMap (it );
119+ map .put (Cosmos .getDefaultPartitionKey (), partition );
120+ batch .createItemOperation (map );
121+ });
122+
123+ return doBatchWithRetry (container , batch );
124+ }
125+
126+ /**
127+ * Upsert batch documents in a single transaction.
128+ * Note: the maximum number of operations is 100.
129+ *
130+ * @param coll collection name
131+ * @param data data object
132+ * @param partition partition name
133+ * @return CosmosDocument instances
134+ * @throws Exception CosmosException
135+ */
136+ public List <CosmosDocument > batchUpsert (String coll , List <?> data , String partition ) throws Exception {
137+ doCheckBeforeBatch (coll , data , partition );
138+
139+ var partitionKey = new com .azure .cosmos .models .PartitionKey (partition );
140+ var container = this .clientV4 .getDatabase (db ).getContainer (coll );
141+ CosmosBatch batch = CosmosBatch .createCosmosBatch (partitionKey );
142+ data .forEach (it -> {
143+ var map = JsonUtil .toMap (it );
144+ map .put (Cosmos .getDefaultPartitionKey (), partition );
145+ batch .upsertItemOperation (map );
146+ });
147+
148+ return doBatchWithRetry (container , batch );
149+ }
150+
151+ /**
152+ * Delete batch documents in a single transaction.
153+ * Note: the maximum number of operations is 100.
154+ *
155+ * @param coll collection name
156+ * @param data data object
157+ * @param partition partition name
158+ * @return CosmosDocument instances (only id)
159+ * @throws Exception CosmosException
160+ */
161+ public List <CosmosDocument > batchDelete (String coll , List <?> data , String partition ) throws Exception {
162+ doCheckBeforeBatch (coll , data , partition );
163+
164+ var partitionKey = new com .azure .cosmos .models .PartitionKey (partition );
165+ var container = this .clientV4 .getDatabase (db ).getContainer (coll );
166+ CosmosBatch batch = CosmosBatch .createCosmosBatch (partitionKey );
167+
168+ var ids = new ArrayList <String >();
169+ data .stream ().map (CosmosDatabase ::getId ).filter (ObjectUtils ::isNotEmpty ).forEach (it -> {
170+ ids .add (it );
171+ batch .deleteItemOperation (it );
172+ });
173+
174+ doBatchWithRetry (container , batch );
175+
176+ return ids .stream ().map (it -> {
177+ var doc = new Document (JsonUtil .toJson (Map .of ("id" , it )));
178+ return new CosmosDocument (doc .toObject (JSONObject .class ));
179+ }).collect (Collectors .toList ());
180+ }
181+
182+ static String getId (Object object ) {
183+ String id ;
184+ if (object instanceof String ) {
185+ id = (String ) object ;
186+ } else {
187+ var map = JsonUtil .toMap (object );
188+ id = map .get ("id" ).toString ();
189+ }
190+ return id ;
191+ }
192+
193+ static void doCheckBeforeBatch (String coll , List <?> data , String partition ) {
194+ Checker .checkNotBlank (coll , "coll" );
195+ Checker .checkNotBlank (partition , "partition" );
196+ Checker .checkNotEmpty (data , "create data " + coll + " " + partition );
197+
198+ checkBatchMaxOperations (data );
199+ checkValidId (data );
200+ }
201+
202+ static void doCheckBeforeBulk (String coll , List <?> data , String partition ) {
203+ Checker .checkNotBlank (coll , "coll" );
204+ Checker .checkNotBlank (partition , "partition" );
205+ Checker .checkNotEmpty (data , "create data " + coll + " " + partition );
206+
207+ checkValidId (data );
208+ }
209+
210+ private List <CosmosDocument > doBatchWithRetry (CosmosContainer container , CosmosBatch batch ) throws Exception {
211+ var response = RetryUtil .executeBatchWithRetry (() -> container .executeCosmosBatch (batch ));
212+
213+ var successDocuments = new ArrayList <CosmosDocument >();
214+ for (CosmosBatchOperationResult cosmosBatchOperationResult : response .getResults ()) {
215+ var item = cosmosBatchOperationResult .getItem (Object .class );
216+ if (item == null ) continue ;
217+ var doc = new Document (JsonUtil .toJson (item ));
218+ successDocuments .add (new CosmosDocument (doc .toObject (JSONObject .class )));
219+ }
220+
221+ return successDocuments ;
222+ }
223+
224+ /**
225+ * Bulk create documents.
226+ * Note: Non-transaction. Have no number limit in theoretically.
227+ *
228+ * @param coll collection name
229+ * @param data data object
230+ * @param partition partition name
231+ * @return CosmosBulkResult
232+ */
233+ public CosmosBulkResult bulkCreate (String coll , List <?> data , String partition ) {
234+ doCheckBeforeBulk (coll , data , partition );
235+
236+ var partitionKey = new com .azure .cosmos .models .PartitionKey (partition );
237+ var operations = data .stream ().map (it -> {
238+ var map = JsonUtil .toMap (it );
239+ map .put (Cosmos .getDefaultPartitionKey (), partition );
240+ return CosmosBulkOperations .getCreateItemOperation (map , partitionKey );
241+ }
242+ ).collect (Collectors .toList ());
243+
244+ return doBulkWithRetry (coll , operations );
245+ }
246+
247+ /**
248+ * Bulk upsert documents
249+ * Note: Non-transaction. Have no number limit in theoretically.
250+ *
251+ * @param coll collection name
252+ * @param data data object
253+ * @param partition partition name
254+ * @return CosmosBulkResult
255+ */
256+ public CosmosBulkResult bulkUpsert (String coll , List <?> data , String partition ) {
257+ doCheckBeforeBulk (coll , data , partition );
258+
259+ var partitionKey = new com .azure .cosmos .models .PartitionKey (partition );
260+ var operations = data .stream ().map (it -> {
261+ var map = JsonUtil .toMap (it );
262+ map .put (Cosmos .getDefaultPartitionKey (), partition );
263+ return CosmosBulkOperations .getUpsertItemOperation (map , partitionKey );
264+ }
265+ ).collect (Collectors .toList ());
266+
267+ return doBulkWithRetry (coll , operations );
268+ }
269+
270+ /**
271+ * Bulk delete documents
272+ * Note: Non-transaction. Have no number limit in theoretically.
273+ *
274+ * @param coll collection name
275+ * @param data data object
276+ * @param partition partition name
277+ * @return CosmosBulkResult
278+ */
279+ public CosmosBulkResult bulkDelete (String coll , List <?> data , String partition ) {
280+ doCheckBeforeBulk (coll , data , partition );
281+
282+ var ids = new ArrayList <String >();
283+ var partitionKey = new com .azure .cosmos .models .PartitionKey (partition );
284+ var operations = data .stream ()
285+ .map (it -> {
286+ var id = getId (it );
287+ ids .add (id );
288+ return id ;
289+ })
290+ .filter (ObjectUtils ::isNotEmpty )
291+ .map (it -> CosmosBulkOperations .getDeleteItemOperation (it , partitionKey ))
292+ .collect (Collectors .toList ());
293+
294+ var result = doBulkWithRetry (coll , operations );
295+
296+ result .successList = ids .stream ().map (it -> {
297+ var doc = new Document (JsonUtil .toJson (Map .of ("id" , it )));
298+ return new CosmosDocument (doc .toObject (JSONObject .class ));
299+ }).collect (Collectors .toList ());
300+
301+
302+ return result ;
303+ }
304+
305+ @ NotNull
306+ private CosmosBulkResult doBulkWithRetry (String coll , List <CosmosItemOperation > operations ) {
307+ var container = this .clientV4 .getDatabase (db ).getContainer (coll );
308+ var bulkResult = new CosmosBulkResult ();
309+
310+ int maxRetries = 10 ;
311+ long delay = 0 ;
312+ long maxDelay = 16000 ;
313+
314+ var successDocuments = new ArrayList <CosmosDocument >();
315+
316+ for (int attempt = 0 ; attempt < maxRetries ; attempt ++) {
317+
318+ var retryTasks = new ArrayList <CosmosItemOperation >();
319+ var execResult = container .executeBulkOperations (operations );
320+
321+ for (CosmosBulkOperationResponse <?> result : execResult ) {
322+ var operation = result .getOperation ();
323+ var response = result .getResponse ();
324+ if (ObjectUtils .isEmpty (response )) {
325+ continue ;
326+ }
327+
328+ if (RetryUtil .shouldRetry (response .getStatusCode ())) {
329+ delay = Math .max (delay , response .getRetryAfterDuration ().toMillis ());
330+ retryTasks .add (operation );
331+ } else if (response .isSuccessStatusCode ()) {
332+ var item = response .getItem (Object .class );
333+ if (item == null ) continue ;
334+ var doc = new Document (JsonUtil .toJson (item ));
335+ successDocuments .add (new CosmosDocument (doc .toObject (JSONObject .class )));
336+ } else {
337+ var ex = result .getException ();
338+ if (HttpStatus .SC_CONFLICT == response .getStatusCode ()) {
339+ Map <String , String > map = operation .getItem ();
340+ bulkResult .fatalList .add (new CosmosException (response .getStatusCode (), "CONFLICT" , "id already exits: " + map .get ("id" )));
341+ } else {
342+ if (ObjectUtils .isNotEmpty (ex )) {
343+ bulkResult .fatalList .add (new CosmosException (response .getStatusCode (), ex .getMessage (), ex .getMessage ()));
344+ } else {
345+ bulkResult .fatalList .add (new CosmosException (response .getStatusCode (), "UNKNOWN" , "UNKNOWN" ));
346+ }
347+ }
348+ }
349+ }
350+
351+ if (retryTasks .isEmpty ()) {
352+ operations .clear ();
353+ break ;
354+ } else {
355+ operations = retryTasks ;
356+ }
357+
358+ try {
359+ Thread .sleep (delay );
360+ } catch (InterruptedException ignored ) {}
361+ // Exponential Backoff
362+ delay = Math .min (maxDelay , delay * 2 );
363+ }
364+
365+ bulkResult .retryList = operations ;
366+ bulkResult .successList = successDocuments ;
367+ return bulkResult ;
368+ }
369+
370+ static void checkBatchMaxOperations (List <?> data ) {
371+ // There's a current limit of 100 operations per TransactionalBatch to ensure the performance is as expected and within SLAs:
372+ // https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/transactional-batch?tabs=dotnet#limitations
373+ if (data .size () > MAX_BATCH_NUMBER_OF_OPERATION ) {
374+ throw new IllegalArgumentException ("The number of data operations should not exceed 100." );
375+ }
376+ }
377+
378+ static void checkValidId (List <?> data ) {
379+ for (Object datum : data ) {
380+ if (datum instanceof String ) {
381+ checkValidId ((String ) datum );
382+ } else {
383+ Map <String , Object > map = JsonUtil .toMap (datum );
384+ checkValidId (map );
385+ }
386+ }
387+ }
388+
92389 /**
93390 * Id cannot contain "\t", "\r", "\n", or cosmosdb will create invalid data.
94391 *
0 commit comments