Skip to content

Commit 6114bf3

Browse files
thunderz99zhang.lei
andauthored
feat: add bulkPatch support for CosmosDB/PostgreSQL/Mongo with per-item result classification (#210)
* feat: add bulk patch methods * feat: add bulk patch methods. add comments and tests --------- Co-authored-by: zhang.lei <zhang.lei@onehr.jp>
1 parent 00fcdcf commit 6114bf3

16 files changed

Lines changed: 1251 additions & 11 deletions

README.md

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,33 @@ The main difference between Partial update and Patch is that:
240240
* Partial update is focused at insert/replace fields' values. And is able to support updating 10 more fields in one call.
241241
* Patch is focused to implementing a method similar to JSON Patch, which supports more complicated ops like "Add, Set, Replace, Remove, Increment". And is not able to support ops exceeding 10 in one patch call.
242242

243+
### Bulk Patch
244+
245+
Bulk patch is a non-transactional bulk operation for patch updates.
246+
247+
```java
248+
// 1) Apply the same patch operations to multiple ids
249+
var ids = List.of("id001", "id002", "id003");
250+
var operations = PatchOperations.create()
251+
.set("/status", "ENABLED")
252+
.increment("/version", 1);
253+
254+
var result1 = db.bulkPatch("Collection1", ids, operations, "Users");
255+
256+
// 2) Apply different patch operations per id
257+
var patchList = List.of(
258+
BulkPatchOperation.of("id001", PatchOperations.create().set("/status", "ENABLED")),
259+
BulkPatchOperation.of("id002", PatchOperations.create().set("/status", "DISABLED"))
260+
);
261+
262+
var result2 = db.bulkPatch("Collection1", patchList, "Users");
263+
```
264+
265+
Return value is `CosmosBulkResult`:
266+
267+
* `successList`: patched documents (or document ids depending on implementation)
268+
* `fatalList`: failed items with `CosmosException` (for example, target id not found)
269+
243270
### Complex queries
244271

245272
```java

src/main/java/io/github/thunderz99/cosmos/CosmosDatabase.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import io.github.thunderz99.cosmos.condition.Aggregate;
66
import io.github.thunderz99.cosmos.condition.Condition;
7+
import io.github.thunderz99.cosmos.dto.BulkPatchOperation;
78
import io.github.thunderz99.cosmos.dto.CosmosBulkResult;
89
import io.github.thunderz99.cosmos.dto.PartialUpdateOption;
910
import io.github.thunderz99.cosmos.v4.PatchOperations;
@@ -512,6 +513,67 @@ default public CosmosDocumentList aggregate(String coll, Aggregate aggregate, Co
512513
*/
513514
public CosmosBulkResult bulkDelete(String coll, List<?> data, String partition) throws Exception;
514515

516+
/**
517+
* Bulk patch documents with the same patch operations.
518+
*
519+
* <p>
520+
* Default implementation converts ids to BulkPatchOperation list and delegates to
521+
* {@link #bulkPatch(String, List, String)}.
522+
* </p>
523+
*
524+
* @param coll collection name
525+
* @param ids target document ids
526+
* @param operations patch operations applied to each target id
527+
* @param partition partition name
528+
* @return CosmosBulkResult
529+
* @throws Exception cosmos exception
530+
*/
531+
default CosmosBulkResult bulkPatch(String coll, List<String> ids, PatchOperations operations, String partition) throws Exception {
532+
if (ids == null) {
533+
return this.bulkPatch(coll, (List<BulkPatchOperation>) null, partition);
534+
}
535+
536+
var data = ids.stream()
537+
.map(id -> BulkPatchOperation.of(id, operations == null ? null : operations.copy()))
538+
.toList();
539+
return this.bulkPatch(coll, data, partition);
540+
}
541+
542+
/**
543+
* Bulk patch documents.
544+
* Note: Non-transaction. Have no number limit in theoretically.
545+
*
546+
* <p>
547+
* Default implementation executes one-by-one patch for compatibility.
548+
* </p>
549+
*
550+
* @param coll collection name
551+
* @param data bulk patch operations (id + PatchOperations)
552+
* @param partition partition name
553+
* @return CosmosBulkResult
554+
*/
555+
default CosmosBulkResult bulkPatch(String coll, List<BulkPatchOperation> data, String partition) throws Exception {
556+
var ret = new CosmosBulkResult();
557+
if (data == null) {
558+
return ret;
559+
}
560+
561+
for (var operation : data) {
562+
try {
563+
if (operation == null) {
564+
throw new CosmosException(400, "BAD_REQUEST", "bulkPatch operation should not be null");
565+
}
566+
ret.successList.add(this.patch(coll, operation.id, operation.operations, partition));
567+
} catch (CosmosException ce) {
568+
ret.fatalList.add(ce);
569+
} catch (Exception e) {
570+
ret.fatalList.add(new CosmosException(500, "UNKNOWN", e.getMessage(), e));
571+
}
572+
}
573+
574+
return ret;
575+
}
576+
515577
/**
516578
* Ping a collection to test whether it is accessible.
517579
*
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package io.github.thunderz99.cosmos.dto;
2+
3+
import io.github.thunderz99.cosmos.v4.PatchOperations;
4+
5+
/**
6+
* A patch task used by bulkPatch.
7+
*/
8+
public class BulkPatchOperation {
9+
10+
/**
11+
* Target document id.
12+
*/
13+
public String id;
14+
15+
/**
16+
* Patch operations for the target document.
17+
*/
18+
public PatchOperations operations;
19+
20+
/**
21+
* Create an empty operation.
22+
*/
23+
public BulkPatchOperation() {
24+
}
25+
26+
/**
27+
* Create an operation with id and patch operations.
28+
*
29+
* @param id target document id
30+
* @param operations patch operations
31+
*/
32+
public BulkPatchOperation(String id, PatchOperations operations) {
33+
this.id = id;
34+
this.operations = operations;
35+
}
36+
37+
/**
38+
* Build a bulk patch operation.
39+
*
40+
* @param id target document id
41+
* @param operations patch operations
42+
* @return BulkPatchOperation
43+
*/
44+
public static BulkPatchOperation of(String id, PatchOperations operations) {
45+
return new BulkPatchOperation(id, operations);
46+
}
47+
}

src/main/java/io/github/thunderz99/cosmos/impl/cosmosdb/CosmosDatabaseImpl.java

Lines changed: 110 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import io.github.thunderz99.cosmos.*;
1111
import io.github.thunderz99.cosmos.condition.Aggregate;
1212
import io.github.thunderz99.cosmos.condition.Condition;
13+
import io.github.thunderz99.cosmos.dto.BulkPatchOperation;
1314
import io.github.thunderz99.cosmos.dto.CosmosBatchResponseWrapper;
1415
import io.github.thunderz99.cosmos.dto.CosmosBulkResult;
1516
import io.github.thunderz99.cosmos.dto.CosmosSqlQuerySpec;
@@ -36,7 +37,8 @@ public class CosmosDatabaseImpl implements CosmosDatabase {
3637

3738
private static Logger log = LoggerFactory.getLogger(CosmosDatabaseImpl.class);
3839

39-
static final int MAX_BATCH_NUMBER_OF_OPERATION = 100;
40+
static final int MAX_BATCH_NUMBER_OF_OPERATION = CosmosLimits.BATCH_OPERATION_LIMIT;
41+
static final int BULK_PATCH_CHUNK_SIZE = CosmosLimits.BULK_CHUNK_SIZE;
4042

4143
static final int FIND_PREFERRED_PAGE_SIZE = 10;
4244

@@ -1279,6 +1281,85 @@ public CosmosBulkResult bulkDelete(String coll, List<?> data, String partition)
12791281
return result;
12801282
}
12811283

1284+
/**
1285+
* Bulk patch documents with the same patch operations.
1286+
*
1287+
* @param coll collection name
1288+
* @param ids target document ids
1289+
* @param operations patch operations
1290+
* @param partition partition name
1291+
* @return CosmosBulkResult
1292+
* @throws Exception cosmos exception
1293+
*/
1294+
public CosmosBulkResult bulkPatch(String coll, List<String> ids, PatchOperations operations, String partition) throws Exception {
1295+
doCheckBeforeBulkPatch(coll, ids, operations, partition);
1296+
1297+
var partitionKey = new PartitionKey(partition);
1298+
var container = this.clientV4.getDatabase(db).getContainer(coll);
1299+
1300+
log.info("begin bulkPatch(same operations) coll:{}, partition:{}, account:{}", coll, partition, getAccount());
1301+
1302+
var ret = new CosmosBulkResult();
1303+
var retryList = new ArrayList<Object>();
1304+
for (int from = 0; from < ids.size(); from += BULK_PATCH_CHUNK_SIZE) {
1305+
var to = Math.min(from + BULK_PATCH_CHUNK_SIZE, ids.size());
1306+
var chunkIds = ids.subList(from, to);
1307+
var itemOperations = chunkIds.stream()
1308+
.map(id -> CosmosBulkOperations.getPatchItemOperation(id, partitionKey, operations.getCosmosPatchOperations()))
1309+
.collect(Collectors.toList());
1310+
1311+
var chunkResult = RetryUtil.executeBulkWithRetry(coll, itemOperations,
1312+
(ops) -> container.executeBulkOperations(ops));
1313+
1314+
ret.successList.addAll(chunkResult.successList);
1315+
ret.fatalList.addAll(chunkResult.fatalList);
1316+
retryList.addAll(chunkResult.retryList);
1317+
}
1318+
ret.retryList = retryList;
1319+
1320+
log.info("end bulkPatch(same operations) coll:{}, partition:{}, account:{}", coll, partition, getAccount());
1321+
return ret;
1322+
}
1323+
1324+
/**
1325+
* Bulk patch documents.
1326+
* Note: Non-transaction. Have no number limit in theoretically.
1327+
*
1328+
* @param coll collection name
1329+
* @param data bulk patch operations (id + PatchOperations)
1330+
* @param partition partition name
1331+
* @return CosmosBulkResult
1332+
*/
1333+
public CosmosBulkResult bulkPatch(String coll, List<BulkPatchOperation> data, String partition) throws Exception {
1334+
doCheckBeforeBulkPatch(coll, data, partition);
1335+
1336+
var partitionKey = new PartitionKey(partition);
1337+
var container = this.clientV4.getDatabase(db).getContainer(coll);
1338+
1339+
log.info("begin bulkPatch coll:{}, partition:{}, account:{}", coll, partition, getAccount());
1340+
1341+
var ret = new CosmosBulkResult();
1342+
var retryList = new ArrayList<Object>();
1343+
for (int from = 0; from < data.size(); from += BULK_PATCH_CHUNK_SIZE) {
1344+
var to = Math.min(from + BULK_PATCH_CHUNK_SIZE, data.size());
1345+
var chunkData = data.subList(from, to);
1346+
var operations = chunkData.stream()
1347+
.map(it -> CosmosBulkOperations.getPatchItemOperation(it.id, partitionKey, it.operations.getCosmosPatchOperations()))
1348+
.collect(Collectors.toList());
1349+
1350+
var chunkResult = RetryUtil.executeBulkWithRetry(coll, operations,
1351+
(ops) -> container.executeBulkOperations(ops));
1352+
1353+
ret.successList.addAll(chunkResult.successList);
1354+
ret.fatalList.addAll(chunkResult.fatalList);
1355+
retryList.addAll(chunkResult.retryList);
1356+
}
1357+
ret.retryList = retryList;
1358+
1359+
log.info("end bulkPatch coll:{}, partition:{}, account:{}", coll, partition, getAccount());
1360+
return ret;
1361+
}
1362+
12821363
@Override
12831364
public boolean ping(String coll) throws Exception {
12841365
var docs = this.find(coll, Condition.filter().limit(1), "_ping");
@@ -1290,10 +1371,37 @@ static void checkBatchMaxOperations(List<?> data) {
12901371
// There's a current limit of 100 operations per TransactionalBatch to ensure the performance is as expected and within SLAs:
12911372
// https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/transactional-batch?tabs=dotnet#limitations
12921373
if (data.size() > MAX_BATCH_NUMBER_OF_OPERATION) {
1293-
throw new IllegalArgumentException("The number of data operations should not exceed 100.");
1374+
throw new IllegalArgumentException("The number of data operations should not exceed %d.".formatted(MAX_BATCH_NUMBER_OF_OPERATION));
12941375
}
12951376
}
12961377

1378+
static void doCheckBeforeBulkPatch(String coll, List<BulkPatchOperation> data, String partition) {
1379+
Checker.checkNotBlank(coll, "coll");
1380+
Checker.checkNotBlank(partition, "partition");
1381+
Checker.checkNotEmpty(data, "bulkPatch data " + coll + " " + partition);
1382+
1383+
for (var operation : data) {
1384+
Checker.checkNotNull(operation, "bulkPatch operation");
1385+
Checker.checkNotBlank(operation.id, "bulkPatch operation id");
1386+
checkValidId(operation.id);
1387+
Checker.checkNotNull(operation.operations, "bulkPatch operation patch operations");
1388+
Preconditions.checkArgument(operation.operations.size() <= PatchOperations.LIMIT,
1389+
"Size of operations should be less or equal to 10. We got: %d, which exceed the limit 10",
1390+
operation.operations.size());
1391+
}
1392+
}
1393+
1394+
static void doCheckBeforeBulkPatch(String coll, List<String> ids, PatchOperations operations, String partition) {
1395+
Checker.checkNotBlank(coll, "coll");
1396+
Checker.checkNotBlank(partition, "partition");
1397+
Checker.checkNotEmpty(ids, "bulkPatch ids " + coll + " " + partition);
1398+
Checker.checkNotNull(operations, "bulkPatch operations");
1399+
Preconditions.checkArgument(operations.size() <= PatchOperations.LIMIT,
1400+
"Size of operations should be less or equal to 10. We got: %d, which exceed the limit 10",
1401+
operations.size());
1402+
checkValidId(ids);
1403+
}
1404+
12971405
/**
12981406
* Get cosmos db account id associated with this instance.
12991407
*

0 commit comments

Comments
 (0)