Skip to content

Commit ad828f4

Browse files
committed
WIP
1 parent c445cc6 commit ad828f4

3 files changed

Lines changed: 594 additions & 37 deletions

File tree

document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3403,6 +3403,135 @@ void testCreateOrReplaceRefreshesSchemaOnDroppedColumn() throws Exception {
34033403
}
34043404
}
34053405

3406+
@Nested
3407+
@DisplayName("Key-Specific Bulk Update Operations")
3408+
class KeySpecificBulkUpdateTests {
3409+
3410+
@Test
3411+
@DisplayName("Should update multiple keys with all operator types in a single batch")
3412+
void testBulkUpdateAllOperatorTypes() throws Exception {
3413+
Map<Key, java.util.Collection<SubDocumentUpdate>> updates = new LinkedHashMap<>();
3414+
updates.put(
3415+
rawKey("1"),
3416+
List.of(
3417+
SubDocumentUpdate.of("item", "UpdatedSoap"),
3418+
SubDocumentUpdate.builder()
3419+
.subDocument("price")
3420+
.operator(UpdateOperator.ADD)
3421+
.subDocumentValue(SubDocumentValue.of(5))
3422+
.build(),
3423+
SubDocumentUpdate.builder()
3424+
.subDocument("props.brand")
3425+
.operator(UpdateOperator.SET)
3426+
.subDocumentValue(SubDocumentValue.of("NewBrand"))
3427+
.build()));
3428+
3429+
updates.put(
3430+
rawKey("3"),
3431+
List.of(
3432+
SubDocumentUpdate.builder()
3433+
.subDocument("props.brand")
3434+
.operator(UpdateOperator.UNSET)
3435+
.build(),
3436+
SubDocumentUpdate.builder()
3437+
.subDocument("tags")
3438+
.operator(UpdateOperator.APPEND_TO_LIST)
3439+
.subDocumentValue(SubDocumentValue.of(new String[] {"newTag1", "newTag2"}))
3440+
.build()));
3441+
3442+
updates.put(
3443+
rawKey("5"),
3444+
List.of(
3445+
SubDocumentUpdate.builder()
3446+
.subDocument("tags")
3447+
.operator(UpdateOperator.ADD_TO_LIST_IF_ABSENT)
3448+
.subDocumentValue(SubDocumentValue.of(new String[] {"hygiene", "uniqueTag"}))
3449+
.build()));
3450+
3451+
updates.put(
3452+
rawKey("6"),
3453+
List.of(
3454+
SubDocumentUpdate.builder()
3455+
.subDocument("tags")
3456+
.operator(UpdateOperator.REMOVE_ALL_FROM_LIST)
3457+
.subDocumentValue(SubDocumentValue.of(new String[] {"plastic"}))
3458+
.build()));
3459+
3460+
BulkUpdateResult result = flatCollection.bulkUpdate(updates, UpdateOptions.builder().build());
3461+
3462+
assertEquals(4, result.getUpdatedCount());
3463+
3464+
try (CloseableIterator<Document> iter = flatCollection.find(queryById("1"))) {
3465+
assertTrue(iter.hasNext());
3466+
JsonNode json = OBJECT_MAPPER.readTree(iter.next().toJson());
3467+
assertEquals("UpdatedSoap", json.get("item").asText());
3468+
assertEquals(15, json.get("price").asInt()); // 10 + 5
3469+
assertEquals("NewBrand", json.get("props").get("brand").asText());
3470+
assertEquals("M", json.get("props").get("size").asText()); // preserved
3471+
}
3472+
3473+
try (CloseableIterator<Document> iter = flatCollection.find(queryById("3"))) {
3474+
assertTrue(iter.hasNext());
3475+
JsonNode json = OBJECT_MAPPER.readTree(iter.next().toJson());
3476+
assertFalse(json.get("props").has("brand"));
3477+
assertEquals("L", json.get("props").get("size").asText()); // preserved
3478+
JsonNode tagsNode = json.get("tags");
3479+
assertEquals(6, tagsNode.size()); // Original 4 + 2 new
3480+
}
3481+
3482+
try (CloseableIterator<Document> iter = flatCollection.find(queryById("5"))) {
3483+
assertTrue(iter.hasNext());
3484+
JsonNode json = OBJECT_MAPPER.readTree(iter.next().toJson());
3485+
JsonNode tagsNode = json.get("tags");
3486+
assertEquals(4, tagsNode.size()); // Original 3 + 1 new unique
3487+
Set<String> tags = new HashSet<>();
3488+
tagsNode.forEach(n -> tags.add(n.asText()));
3489+
assertTrue(tags.contains("uniqueTag"));
3490+
}
3491+
3492+
try (CloseableIterator<Document> iter = flatCollection.find(queryById("6"))) {
3493+
assertTrue(iter.hasNext());
3494+
JsonNode json = OBJECT_MAPPER.readTree(iter.next().toJson());
3495+
JsonNode tagsNode = json.get("tags");
3496+
assertEquals(2, tagsNode.size()); // grooming, essential remain
3497+
Set<String> tags = new HashSet<>();
3498+
tagsNode.forEach(n -> tags.add(n.asText()));
3499+
assertFalse(tags.contains("plastic"));
3500+
}
3501+
}
3502+
3503+
@Test
3504+
@DisplayName("Should handle edge cases: empty map, null map, non-existent keys")
3505+
void testBulkUpdateEdgeCases() throws Exception {
3506+
UpdateOptions options = UpdateOptions.builder().build();
3507+
3508+
// Empty map
3509+
assertEquals(0, flatCollection.bulkUpdate(new HashMap<>(), options).getUpdatedCount());
3510+
3511+
// Null map
3512+
Map<Key, java.util.Collection<SubDocumentUpdate>> nullUpdates = null;
3513+
assertEquals(0, flatCollection.bulkUpdate(nullUpdates, options).getUpdatedCount());
3514+
3515+
// Non-existent key
3516+
Map<Key, java.util.Collection<SubDocumentUpdate>> nonExistent = new LinkedHashMap<>();
3517+
nonExistent.put(rawKey("non-existent"), List.of(SubDocumentUpdate.of("item", "X")));
3518+
assertEquals(0, flatCollection.bulkUpdate(nonExistent, options).getUpdatedCount());
3519+
}
3520+
3521+
// Creates a key with raw ID (matching test data format)
3522+
private Key rawKey(String id) {
3523+
return Key.from(id);
3524+
}
3525+
3526+
private Query queryById(String id) {
3527+
return Query.builder()
3528+
.setFilter(
3529+
RelationalExpression.of(
3530+
IdentifierExpression.of("id"), RelationalOperator.EQ, ConstantExpression.of(id)))
3531+
.build();
3532+
}
3533+
}
3534+
34063535
private static void executeInsertStatements() {
34073536
PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore;
34083537
try {

document-store/src/main/java/org/hypertrace/core/documentstore/Collection.java

Lines changed: 52 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ public interface Collection {
2121
* store.
2222
*
2323
* <p>Note: This method ensures that all the fields defined in the `Document` are set/created. How
24-
* the existing fields are modified is implementation specific. For example, upserting <code> {
25-
* "foo2": "bar2" }
24+
* the existing fields are modified is implementation specific. For example, upserting <code>
25+
* { "foo2": "bar2" }
2626
* </code> if a document <code>
2727
* { "foo1": "bar1" }
2828
* </code> already exists would ensure that "foo2" is set the value of "bar2" and what happens to
@@ -42,8 +42,8 @@ public interface Collection {
4242
* store.
4343
*
4444
* <p>Note: This method ensures that all the fields defined in the `Document` are set/created. How
45-
* the existing fields are modified is implementation specific. For example, upserting <code> {
46-
* "foo2": "bar2" }
45+
* the existing fields are modified is implementation specific. For example, upserting <code>
46+
* { "foo2": "bar2" }
4747
* </code> if a document <code>
4848
* { "foo1": "bar1" }
4949
* </code> already exists would ensure that "foo2" is set the value of "bar2" and what happens to
@@ -398,5 +398,53 @@ CloseableIterator<Document> bulkUpdate(
398398
final UpdateOptions updateOptions)
399399
throws IOException;
400400

401+
/**
402+
* Bulk update sub-documents with key-specific updates. Each key can have its own set of
403+
* SubDocumentUpdate operations, allowing different updates per document.
404+
*
405+
* <p>This method supports all update operators (SET, UNSET, ADD, APPEND_TO_LIST,
406+
* ADD_TO_LIST_IF_ABSENT, REMOVE_ALL_FROM_LIST). Updates for each individual key are applied
407+
* atomically, but there is no atomicity guarantee across different keys - some keys may be
408+
* updated while others fail. Any atomicity guarantees are implementation-specific.
409+
*
410+
* <p>Example usage:
411+
*
412+
* <pre>{@code
413+
* Map<Key, Collection<SubDocumentUpdate>> updates = new HashMap<>();
414+
*
415+
* // Key 1: SET a field and ADD to a number
416+
* updates.put(key1, List.of(
417+
* SubDocumentUpdate.of("name", "NewName"),
418+
* SubDocumentUpdate.builder()
419+
* .subDocument("count")
420+
* .operator(UpdateOperator.ADD)
421+
* .subDocumentValue(SubDocumentValue.of(5))
422+
* .build()
423+
* ));
424+
*
425+
* // Key 2: APPEND to an array
426+
* updates.put(key2, List.of(
427+
* SubDocumentUpdate.builder()
428+
* .subDocument("tags")
429+
* .operator(UpdateOperator.APPEND_TO_LIST)
430+
* .subDocumentValue(SubDocumentValue.of(new String[]{"newTag"}))
431+
* .build()
432+
* ));
433+
*
434+
* BulkUpdateResult result = collection.bulkUpdate(updates, UpdateOptions.builder().build());
435+
* }</pre>
436+
*
437+
* @param updates Map of Key to Collection of SubDocumentUpdate operations. Each key's updates are
438+
* applied atomically, but no cross-key atomicity is guaranteed.
439+
* @param updateOptions Options for the update operation
440+
* @return BulkUpdateResult containing the count of successfully updated documents
441+
* @throws IOException if the update operation fails
442+
*/
443+
default BulkUpdateResult bulkUpdate(
444+
Map<Key, java.util.Collection<SubDocumentUpdate>> updates, UpdateOptions updateOptions)
445+
throws IOException {
446+
throw new UnsupportedOperationException("bulkUpdate is not supported!");
447+
}
448+
401449
String UNSUPPORTED_QUERY_OPERATION = "Query operation is not supported";
402450
}

0 commit comments

Comments
 (0)