Skip to content

Commit 045fac2

Browse files
authored
Add Collection#bulkUpdate(Map<Key, Collection<SubDocumentUpdate>> updates, ..) (#288)
1 parent c445cc6 commit 045fac2

File tree

3 files changed

+347
-31
lines changed

3 files changed

+347
-31
lines changed

document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3403,6 +3403,135 @@ void testCreateOrReplaceRefreshesSchemaOnDroppedColumn() throws Exception {
34033403
}
34043404
}
34053405

3406+
@Nested
3407+
@DisplayName("Key-Specific Bulk Update Operations")
3408+
class KeySpecificBulkUpdateTests {
3409+
3410+
@Test
3411+
@DisplayName("Should update multiple keys with all operator types in a single batch")
3412+
void testBulkUpdateAllOperatorTypes() throws Exception {
3413+
Map<Key, java.util.Collection<SubDocumentUpdate>> updates = new LinkedHashMap<>();
3414+
updates.put(
3415+
rawKey("1"),
3416+
List.of(
3417+
SubDocumentUpdate.of("item", "UpdatedSoap"),
3418+
SubDocumentUpdate.builder()
3419+
.subDocument("price")
3420+
.operator(UpdateOperator.ADD)
3421+
.subDocumentValue(SubDocumentValue.of(5))
3422+
.build(),
3423+
SubDocumentUpdate.builder()
3424+
.subDocument("props.brand")
3425+
.operator(UpdateOperator.SET)
3426+
.subDocumentValue(SubDocumentValue.of("NewBrand"))
3427+
.build()));
3428+
3429+
updates.put(
3430+
rawKey("3"),
3431+
List.of(
3432+
SubDocumentUpdate.builder()
3433+
.subDocument("props.brand")
3434+
.operator(UpdateOperator.UNSET)
3435+
.build(),
3436+
SubDocumentUpdate.builder()
3437+
.subDocument("tags")
3438+
.operator(UpdateOperator.APPEND_TO_LIST)
3439+
.subDocumentValue(SubDocumentValue.of(new String[] {"newTag1", "newTag2"}))
3440+
.build()));
3441+
3442+
updates.put(
3443+
rawKey("5"),
3444+
List.of(
3445+
SubDocumentUpdate.builder()
3446+
.subDocument("tags")
3447+
.operator(UpdateOperator.ADD_TO_LIST_IF_ABSENT)
3448+
.subDocumentValue(SubDocumentValue.of(new String[] {"hygiene", "uniqueTag"}))
3449+
.build()));
3450+
3451+
updates.put(
3452+
rawKey("6"),
3453+
List.of(
3454+
SubDocumentUpdate.builder()
3455+
.subDocument("tags")
3456+
.operator(UpdateOperator.REMOVE_ALL_FROM_LIST)
3457+
.subDocumentValue(SubDocumentValue.of(new String[] {"plastic"}))
3458+
.build()));
3459+
3460+
BulkUpdateResult result = flatCollection.bulkUpdate(updates, UpdateOptions.builder().build());
3461+
3462+
assertEquals(4, result.getUpdatedCount());
3463+
3464+
try (CloseableIterator<Document> iter = flatCollection.find(queryById("1"))) {
3465+
assertTrue(iter.hasNext());
3466+
JsonNode json = OBJECT_MAPPER.readTree(iter.next().toJson());
3467+
assertEquals("UpdatedSoap", json.get("item").asText());
3468+
assertEquals(15, json.get("price").asInt()); // 10 + 5
3469+
assertEquals("NewBrand", json.get("props").get("brand").asText());
3470+
assertEquals("M", json.get("props").get("size").asText()); // preserved
3471+
}
3472+
3473+
try (CloseableIterator<Document> iter = flatCollection.find(queryById("3"))) {
3474+
assertTrue(iter.hasNext());
3475+
JsonNode json = OBJECT_MAPPER.readTree(iter.next().toJson());
3476+
assertFalse(json.get("props").has("brand"));
3477+
assertEquals("L", json.get("props").get("size").asText()); // preserved
3478+
JsonNode tagsNode = json.get("tags");
3479+
assertEquals(6, tagsNode.size()); // Original 4 + 2 new
3480+
}
3481+
3482+
try (CloseableIterator<Document> iter = flatCollection.find(queryById("5"))) {
3483+
assertTrue(iter.hasNext());
3484+
JsonNode json = OBJECT_MAPPER.readTree(iter.next().toJson());
3485+
JsonNode tagsNode = json.get("tags");
3486+
assertEquals(4, tagsNode.size()); // Original 3 + 1 new unique
3487+
Set<String> tags = new HashSet<>();
3488+
tagsNode.forEach(n -> tags.add(n.asText()));
3489+
assertTrue(tags.contains("uniqueTag"));
3490+
}
3491+
3492+
try (CloseableIterator<Document> iter = flatCollection.find(queryById("6"))) {
3493+
assertTrue(iter.hasNext());
3494+
JsonNode json = OBJECT_MAPPER.readTree(iter.next().toJson());
3495+
JsonNode tagsNode = json.get("tags");
3496+
assertEquals(2, tagsNode.size()); // grooming, essential remain
3497+
Set<String> tags = new HashSet<>();
3498+
tagsNode.forEach(n -> tags.add(n.asText()));
3499+
assertFalse(tags.contains("plastic"));
3500+
}
3501+
}
3502+
3503+
@Test
3504+
@DisplayName("Should handle edge cases: empty map, null map, non-existent keys")
3505+
void testBulkUpdateEdgeCases() throws Exception {
3506+
UpdateOptions options = UpdateOptions.builder().build();
3507+
3508+
// Empty map
3509+
assertEquals(0, flatCollection.bulkUpdate(new HashMap<>(), options).getUpdatedCount());
3510+
3511+
// Null map
3512+
Map<Key, java.util.Collection<SubDocumentUpdate>> nullUpdates = null;
3513+
assertEquals(0, flatCollection.bulkUpdate(nullUpdates, options).getUpdatedCount());
3514+
3515+
// Non-existent key
3516+
Map<Key, java.util.Collection<SubDocumentUpdate>> nonExistent = new LinkedHashMap<>();
3517+
nonExistent.put(rawKey("non-existent"), List.of(SubDocumentUpdate.of("item", "X")));
3518+
assertEquals(0, flatCollection.bulkUpdate(nonExistent, options).getUpdatedCount());
3519+
}
3520+
3521+
// Creates a key with raw ID (matching test data format)
3522+
private Key rawKey(String id) {
3523+
return Key.from(id);
3524+
}
3525+
3526+
private Query queryById(String id) {
3527+
return Query.builder()
3528+
.setFilter(
3529+
RelationalExpression.of(
3530+
IdentifierExpression.of("id"), RelationalOperator.EQ, ConstantExpression.of(id)))
3531+
.build();
3532+
}
3533+
}
3534+
34063535
private static void executeInsertStatements() {
34073536
PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore;
34083537
try {

document-store/src/main/java/org/hypertrace/core/documentstore/Collection.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -398,5 +398,54 @@ CloseableIterator<Document> bulkUpdate(
398398
final UpdateOptions updateOptions)
399399
throws IOException;
400400

401+
/**
402+
* Bulk update sub-documents with key-specific updates. Each key can have its own set of
403+
* SubDocumentUpdate operations, allowing different updates per document.
404+
*
405+
* <p>This method supports all update operators (SET, UNSET, ADD, APPEND_TO_LIST,
406+
* ADD_TO_LIST_IF_ABSENT, REMOVE_ALL_FROM_LIST). Updates for each individual key are applied
407+
* atomically, but there is no atomicity guarantee across different keys - some keys may be
408+
* updated while others fail. Batch-level atomicity is not guaranteed, while per-key update
409+
* atomicity is guaranteed.
410+
*
411+
* <p>Example usage:
412+
*
413+
* <pre>{@code
414+
* Map<Key, Collection<SubDocumentUpdate>> updates = new HashMap<>();
415+
*
416+
* // Key 1: SET a field and ADD to a number
417+
* updates.put(key1, List.of(
418+
* SubDocumentUpdate.of("name", "NewName"),
419+
* SubDocumentUpdate.builder()
420+
* .subDocument("count")
421+
* .operator(UpdateOperator.ADD)
422+
* .subDocumentValue(SubDocumentValue.of(5))
423+
* .build()
424+
* ));
425+
*
426+
* // Key 2: APPEND to an array
427+
* updates.put(key2, List.of(
428+
* SubDocumentUpdate.builder()
429+
* .subDocument("tags")
430+
* .operator(UpdateOperator.APPEND_TO_LIST)
431+
* .subDocumentValue(SubDocumentValue.of(new String[]{"newTag"}))
432+
* .build()
433+
* ));
434+
*
435+
* BulkUpdateResult result = collection.bulkUpdate(updates, UpdateOptions.builder().build());
436+
* }</pre>
437+
*
438+
* @param updates Map of Key to Collection of SubDocumentUpdate operations. Each key's updates are
439+
* applied atomically, but no cross-key atomicity is guaranteed.
440+
* @param updateOptions Options for the update operation
441+
* @return BulkUpdateResult containing the count of successfully updated documents
442+
* @throws IOException if the update operation fails
443+
*/
444+
default BulkUpdateResult bulkUpdate(
445+
Map<Key, java.util.Collection<SubDocumentUpdate>> updates, UpdateOptions updateOptions)
446+
throws IOException {
447+
throw new UnsupportedOperationException("bulkUpdate is not supported!");
448+
}
449+
401450
String UNSUPPORTED_QUERY_OPERATION = "Query operation is not supported";
402451
}

0 commit comments

Comments
 (0)