Skip to content

Commit 080d894

Browse files
authored
Add Collection#bulkCreateOrReplace (#284)
1 parent f3bc54c commit 080d894

File tree

3 files changed

+373
-14
lines changed

3 files changed

+373
-14
lines changed

document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java

Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1465,6 +1465,245 @@ void testBulkUpsertAndReturnOlderDocumentsUpsertFailure() throws Exception {
14651465
}
14661466
}
14671467
}
1468+
1469+
@Test
1470+
@DisplayName("Should bulk createOrReplace multiple new documents")
1471+
void testBulkCreateOrReplaceNewDocuments() throws Exception {
1472+
Map<Key, Document> bulkMap = new LinkedHashMap<>();
1473+
1474+
ObjectNode node1 = OBJECT_MAPPER.createObjectNode();
1475+
node1.put("item", "BulkReplaceItem1");
1476+
node1.put("price", 201);
1477+
node1.put("quantity", 10);
1478+
bulkMap.put(new SingleValueKey(DEFAULT_TENANT, "bulk-replace-1"), new JSONDocument(node1));
1479+
1480+
ObjectNode node2 = OBJECT_MAPPER.createObjectNode();
1481+
node2.put("item", "BulkReplaceItem2");
1482+
node2.put("price", 202);
1483+
node2.put("quantity", 20);
1484+
bulkMap.put(new SingleValueKey(DEFAULT_TENANT, "bulk-replace-2"), new JSONDocument(node2));
1485+
1486+
boolean result = flatCollection.bulkCreateOrReplace(bulkMap);
1487+
1488+
assertTrue(result);
1489+
1490+
queryAndAssert(
1491+
new SingleValueKey(DEFAULT_TENANT, "bulk-replace-1"),
1492+
rs -> {
1493+
assertTrue(rs.next());
1494+
assertEquals("BulkReplaceItem1", rs.getString("item"));
1495+
assertEquals(201, rs.getInt("price"));
1496+
assertEquals(10, rs.getInt("quantity"));
1497+
});
1498+
1499+
queryAndAssert(
1500+
new SingleValueKey(DEFAULT_TENANT, "bulk-replace-2"),
1501+
rs -> {
1502+
assertTrue(rs.next());
1503+
assertEquals("BulkReplaceItem2", rs.getString("item"));
1504+
assertEquals(202, rs.getInt("price"));
1505+
assertEquals(20, rs.getInt("quantity"));
1506+
});
1507+
}
1508+
1509+
@Test
1510+
@DisplayName(
1511+
"Should bulk createOrReplace replacing existing documents and reset missing cols to default")
1512+
void testBulkCreateOrReplaceResetsUnspecifiedColumnsToDefault() throws Exception {
1513+
// First create documents with multiple fields
1514+
String docId1 = "bulk-replace-reset-1";
1515+
String docId2 = "bulk-replace-reset-2";
1516+
1517+
ObjectNode initial1 = OBJECT_MAPPER.createObjectNode();
1518+
initial1.put("item", "Original1");
1519+
initial1.put("price", 100);
1520+
initial1.put("quantity", 50);
1521+
initial1.put("in_stock", true);
1522+
flatCollection.createOrReplace(
1523+
new SingleValueKey(DEFAULT_TENANT, docId1), new JSONDocument(initial1));
1524+
1525+
ObjectNode initial2 = OBJECT_MAPPER.createObjectNode();
1526+
initial2.put("item", "Original2");
1527+
initial2.put("price", 200);
1528+
initial2.put("quantity", 75);
1529+
initial2.put("in_stock", false);
1530+
flatCollection.createOrReplace(
1531+
new SingleValueKey(DEFAULT_TENANT, docId2), new JSONDocument(initial2));
1532+
1533+
// Now bulk createOrReplace with only some fields - others should be RESET to default
1534+
Map<Key, Document> bulkMap = new LinkedHashMap<>();
1535+
1536+
ObjectNode updated1 = OBJECT_MAPPER.createObjectNode();
1537+
updated1.put("item", "Updated1");
1538+
// price, quantity, in_stock are NOT specified - should be reset to NULL
1539+
bulkMap.put(new SingleValueKey(DEFAULT_TENANT, docId1), new JSONDocument(updated1));
1540+
1541+
ObjectNode updated2 = OBJECT_MAPPER.createObjectNode();
1542+
updated2.put("item", "Updated2");
1543+
updated2.put("price", 999);
1544+
// quantity, in_stock are NOT specified - should be reset to NULL
1545+
bulkMap.put(new SingleValueKey(DEFAULT_TENANT, docId2), new JSONDocument(updated2));
1546+
1547+
boolean result = flatCollection.bulkCreateOrReplace(bulkMap);
1548+
1549+
assertTrue(result);
1550+
1551+
// Verify doc1: item updated, other fields reset to NULL
1552+
queryAndAssert(
1553+
new SingleValueKey(DEFAULT_TENANT, docId1),
1554+
rs -> {
1555+
assertTrue(rs.next());
1556+
assertEquals("Updated1", rs.getString("item"));
1557+
assertNull(rs.getObject("price")); // RESET to NULL
1558+
assertNull(rs.getObject("quantity")); // RESET to NULL
1559+
assertNull(rs.getObject("in_stock")); // RESET to NULL
1560+
});
1561+
1562+
// Verify doc2: item and price updated, other fields reset to NULL
1563+
queryAndAssert(
1564+
new SingleValueKey(DEFAULT_TENANT, docId2),
1565+
rs -> {
1566+
assertTrue(rs.next());
1567+
assertEquals("Updated2", rs.getString("item"));
1568+
assertEquals(999, rs.getInt("price"));
1569+
assertNull(rs.getObject("quantity")); // RESET to NULL
1570+
assertNull(rs.getObject("in_stock")); // RESET to NULL
1571+
});
1572+
}
1573+
1574+
@Test
1575+
@DisplayName("bulkUpsert vs bulkCreateOrReplace: upsert preserves, createOrReplace resets")
1576+
void testBulkUpsertVsBulkCreateOrReplaceBehavior() throws Exception {
1577+
// Setup: Create two identical documents
1578+
String docId1 = "bulk-compare-upsert";
1579+
String docId2 = "bulk-compare-replace";
1580+
1581+
ObjectNode initial = OBJECT_MAPPER.createObjectNode();
1582+
initial.put("item", "Original Item");
1583+
initial.put("price", 100);
1584+
initial.put("quantity", 50);
1585+
1586+
flatCollection.createOrReplace(
1587+
new SingleValueKey(DEFAULT_TENANT, docId1), new JSONDocument(initial.deepCopy()));
1588+
flatCollection.createOrReplace(
1589+
new SingleValueKey(DEFAULT_TENANT, docId2), new JSONDocument(initial.deepCopy()));
1590+
1591+
// Now update both with partial documents (only item field)
1592+
ObjectNode partialUpdate = OBJECT_MAPPER.createObjectNode();
1593+
partialUpdate.put("item", "Updated Item");
1594+
1595+
// Use bulkUpsert for doc1 - should PRESERVE price and quantity
1596+
Map<Key, Document> upsertMap = new LinkedHashMap<>();
1597+
upsertMap.put(
1598+
new SingleValueKey(DEFAULT_TENANT, docId1), new JSONDocument(partialUpdate.deepCopy()));
1599+
flatCollection.bulkUpsert(upsertMap);
1600+
1601+
// Use bulkCreateOrReplace for doc2 - should RESET price and quantity to NULL
1602+
Map<Key, Document> replaceMap = new LinkedHashMap<>();
1603+
replaceMap.put(
1604+
new SingleValueKey(DEFAULT_TENANT, docId2), new JSONDocument(partialUpdate.deepCopy()));
1605+
flatCollection.bulkCreateOrReplace(replaceMap);
1606+
1607+
// Verify bulkUpsert preserved original values
1608+
queryAndAssert(
1609+
new SingleValueKey(DEFAULT_TENANT, docId1),
1610+
rs -> {
1611+
assertTrue(rs.next());
1612+
assertEquals("Updated Item", rs.getString("item"));
1613+
assertEquals(100, rs.getInt("price")); // PRESERVED
1614+
assertEquals(50, rs.getInt("quantity")); // PRESERVED
1615+
});
1616+
1617+
// Verify bulkCreateOrReplace reset to defaults
1618+
queryAndAssert(
1619+
new SingleValueKey(DEFAULT_TENANT, docId2),
1620+
rs -> {
1621+
assertTrue(rs.next());
1622+
assertEquals("Updated Item", rs.getString("item"));
1623+
assertNull(rs.getObject("price")); // RESET to NULL
1624+
assertNull(rs.getObject("quantity")); // RESET to NULL
1625+
});
1626+
}
1627+
1628+
@Test
1629+
@DisplayName("Should handle empty document map for bulkCreateOrReplace")
1630+
void testBulkCreateOrReplaceEmptyMap() {
1631+
Map<Key, Document> emptyMap = Collections.emptyMap();
1632+
boolean result = flatCollection.bulkCreateOrReplace(emptyMap);
1633+
assertTrue(result);
1634+
}
1635+
1636+
@Test
1637+
@DisplayName("Should handle null document map for bulkCreateOrReplace")
1638+
void testBulkCreateOrReplaceNullMap() {
1639+
boolean result = flatCollection.bulkCreateOrReplace(null);
1640+
assertTrue(result);
1641+
}
1642+
1643+
@Test
1644+
@DisplayName("Should skip unknown fields in bulkCreateOrReplace")
1645+
void testBulkCreateOrReplaceSkipsUnknownFields() throws Exception {
1646+
Map<Key, Document> bulkMap = new LinkedHashMap<>();
1647+
1648+
ObjectNode node = OBJECT_MAPPER.createObjectNode();
1649+
node.put("item", "ItemWithUnknown");
1650+
node.put("price", 300);
1651+
node.put("unknown_field", "should be skipped");
1652+
bulkMap.put(
1653+
new SingleValueKey(DEFAULT_TENANT, "bulk-replace-unknown"), new JSONDocument(node));
1654+
1655+
boolean result = flatCollection.bulkCreateOrReplace(bulkMap);
1656+
1657+
assertTrue(result);
1658+
1659+
queryAndAssert(
1660+
new SingleValueKey(DEFAULT_TENANT, "bulk-replace-unknown"),
1661+
rs -> {
1662+
assertTrue(rs.next());
1663+
assertEquals("ItemWithUnknown", rs.getString("item"));
1664+
assertEquals(300, rs.getInt("price"));
1665+
});
1666+
}
1667+
1668+
@Test
1669+
@DisplayName(
1670+
"Should ignore documents with unknown fields when IGNORE_DOCUMENT strategy for bulkCreateOrReplace")
1671+
void testBulkCreateOrReplaceIgnoreDocumentStrategy() throws Exception {
1672+
Collection collectionWithIgnoreStrategy =
1673+
getFlatCollectionWithStrategy(MissingColumnStrategy.IGNORE_DOCUMENT.toString());
1674+
1675+
Map<Key, Document> bulkMap = new LinkedHashMap<>();
1676+
1677+
// Document with unknown field - should be ignored
1678+
ObjectNode nodeWithUnknown = OBJECT_MAPPER.createObjectNode();
1679+
nodeWithUnknown.put("item", "ItemWithUnknown");
1680+
nodeWithUnknown.put("unknown_field", "should cause document to be ignored");
1681+
bulkMap.put(
1682+
new SingleValueKey(DEFAULT_TENANT, "ignore-replace-1"),
1683+
new JSONDocument(nodeWithUnknown));
1684+
1685+
// Valid document - should be inserted
1686+
ObjectNode validNode = OBJECT_MAPPER.createObjectNode();
1687+
validNode.put("item", "ValidItem");
1688+
validNode.put("price", 200);
1689+
bulkMap.put(
1690+
new SingleValueKey(DEFAULT_TENANT, "ignore-replace-2"), new JSONDocument(validNode));
1691+
1692+
boolean result = collectionWithIgnoreStrategy.bulkCreateOrReplace(bulkMap);
1693+
1694+
assertTrue(result);
1695+
1696+
queryAndAssert(
1697+
new SingleValueKey(DEFAULT_TENANT, "ignore-replace-1"), rs -> assertFalse(rs.next()));
1698+
1699+
queryAndAssert(
1700+
new SingleValueKey(DEFAULT_TENANT, "ignore-replace-2"),
1701+
rs -> {
1702+
assertTrue(rs.next());
1703+
assertEquals("ValidItem", rs.getString("item"));
1704+
assertEquals(200, rs.getInt("price"));
1705+
});
1706+
}
14681707
}
14691708

14701709
@Nested

document-store/src/main/java/org/hypertrace/core/documentstore/Collection.java

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,16 @@
1515

1616
/** Interface spec for common operations on a collection of documents */
1717
public interface Collection {
18+
1819
/**
1920
* Upsert (create a new doc or update if one already exists) the given document into the doc
2021
* store.
2122
*
2223
* <p>Note: This method ensures that all the fields defined in the `Document` are set/created. How
2324
* the existing fields are modified is implementation specific. For example, upserting <code>
24-
* {
25-
* "foo2": "bar2"
26-
* }
25+
* { "foo2": "bar2" }
2726
* </code> if a document <code>
28-
* {
29-
* "foo1": "bar1"
30-
* }
27+
* { "foo1": "bar1" }
3128
* </code> already exists would ensure that "foo2" is set the value of "bar2" and what happens to
3229
* the "foo1" field is implementation specific
3330
*
@@ -46,13 +43,9 @@ public interface Collection {
4643
*
4744
* <p>Note: This method ensures that all the fields defined in the `Document` are set/created. How
4845
* the existing fields are modified is implementation specific. For example, upserting <code>
49-
* {
50-
* "foo2": "bar2"
51-
* }
46+
* { "foo2": "bar2" }
5247
* </code> if a document <code>
53-
* {
54-
* "foo1": "bar1"
55-
* }
48+
* { "foo1": "bar1" }
5649
* </code> already exists would ensure that "foo2" is set the value of "bar2" and what happens to
5750
* the "foo1" field is implementation specific
5851
*
@@ -104,10 +97,10 @@ public interface Collection {
10497
/**
10598
* Search for documents matching the query.
10699
*
107-
* @deprecated Use {@link #query(org.hypertrace.core.documentstore.query.Query, QueryOptions)}
108-
* instead
109100
* @param query filter to query matching documents
110101
* @return {@link CloseableIterator} of matching documents
102+
* @deprecated Use {@link #query(org.hypertrace.core.documentstore.query.Query, QueryOptions)}
103+
* instead
111104
*/
112105
@Deprecated(forRemoval = true)
113106
CloseableIterator<Document> search(Query query);
@@ -268,6 +261,30 @@ CloseableIterator<Document> bulkUpsertAndReturnOlderDocuments(Map<Key, Document>
268261
*/
269262
boolean createOrReplace(final Key key, final Document document) throws IOException;
270263

264+
/**
265+
* Bulk createOrReplace with no atomicity guarantee. It partial documents succeed, the operation
266+
* is not rolled back. It's possible that certain document are ignored, if they contain columns
267+
* that are not present in the table's schema. This happens when the missingColumnStrategy is
268+
* configured to {@link
269+
* org.hypertrace.core.documentstore.model.options.MissingColumnStrategy#IGNORE_DOCUMENT}. If it's
270+
* configured to {@link
271+
* org.hypertrace.core.documentstore.model.options.MissingColumnStrategy#SKIP}, then that column
272+
* is skipped (but the document is still created/replaced). If it's configured to be {@link
273+
* org.hypertrace.core.documentstore.model.options.MissingColumnStrategy#THROW}, the entire batch
274+
* fails.
275+
*
276+
* <p>Semantically, if the document already exists, each column is replaced with its new value (or
277+
* to its default value if not specified). Note that no merge happens. For example, if the
278+
* original row contains "tag" : {"k1": "v1"} and the new row contains "tag" : {"k2": "v2"}, then
279+
* the final row will be "tag" : {"k2": "v2"}
280+
*
281+
* @param documents the batch
282+
* @return true if the operation succeeded, even partially.
283+
*/
284+
default boolean bulkCreateOrReplace(Map<Key, Document> documents) {
285+
throw new UnsupportedOperationException("bulkCreateOrReplace is not supported");
286+
}
287+
271288
/**
272289
* Atomically create a new document if the key does not exist in the collection or, replace the
273290
* existing document if the key exists in the collection and return the created/replaced document

0 commit comments

Comments
 (0)