Skip to content

Commit ef0d310

Browse files
authored
Add support for upserts in FlatPostgresCollection (#277)
1 parent 3504ce4 commit ef0d310

3 files changed

Lines changed: 598 additions & 18 deletions

File tree

document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java

Lines changed: 143 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -204,16 +204,154 @@ public static void shutdown() {
204204
class UpsertTests {
205205

206206
@Test
207-
@DisplayName("Should throw UnsupportedOperationException for upsert")
208-
void testUpsertNewDocument() {
207+
@DisplayName("Should create new document when key doesn't exist and return true")
208+
void testUpsertNewDocument() throws Exception {
209+
String docId = getRandomDocId(4);
210+
209211
ObjectNode objectNode = OBJECT_MAPPER.createObjectNode();
210-
objectNode.put("_id", 100);
212+
objectNode.put("id", docId);
211213
objectNode.put("item", "NewItem");
212214
objectNode.put("price", 99);
213215
Document document = new JSONDocument(objectNode);
214-
Key key = new SingleValueKey("default", "100");
216+
Key key = new SingleValueKey(DEFAULT_TENANT, docId);
217+
218+
boolean isNew = flatCollection.upsert(key, document);
219+
220+
assertTrue(isNew, "Should return true for new document");
221+
222+
queryAndAssert(
223+
key,
224+
rs -> {
225+
assertTrue(rs.next());
226+
assertEquals("NewItem", rs.getString("item"));
227+
assertEquals(99, rs.getInt("price"));
228+
});
229+
}
230+
231+
@Test
232+
@DisplayName("Should merge with existing document preserving unspecified fields")
233+
void testUpsertMergesWithExistingDocument() throws Exception {
234+
String docId = getRandomDocId(4);
235+
236+
// First, create a document with multiple fields
237+
ObjectNode initialNode = OBJECT_MAPPER.createObjectNode();
238+
initialNode.put("id", docId);
239+
initialNode.put("item", "Original Item");
240+
initialNode.put("price", 100);
241+
initialNode.put("quantity", 50);
242+
initialNode.put("in_stock", true);
243+
Document initialDoc = new JSONDocument(initialNode);
244+
Key key = new SingleValueKey(DEFAULT_TENANT, docId);
245+
246+
boolean firstResult = flatCollection.upsert(key, initialDoc);
247+
assertTrue(firstResult, "First upsert should create new document");
248+
249+
// Now upsert with only some fields - others should be PRESERVED (merge behavior)
250+
ObjectNode mergeNode = OBJECT_MAPPER.createObjectNode();
251+
mergeNode.put("id", docId);
252+
mergeNode.put("item", "Updated Item");
253+
// price and quantity are NOT specified - they should retain their original values
254+
Document mergeDoc = new JSONDocument(mergeNode);
255+
256+
boolean secondResult = flatCollection.upsert(key, mergeDoc);
257+
assertFalse(secondResult, "Second upsert should update existing document");
258+
259+
// Verify merge behavior: item updated, price/quantity/in_stock preserved
260+
queryAndAssert(
261+
key,
262+
rs -> {
263+
assertTrue(rs.next());
264+
assertEquals("Updated Item", rs.getString("item"));
265+
// These should be PRESERVED from the original document (merge semantics)
266+
assertEquals(100, rs.getInt("price"));
267+
assertEquals(50, rs.getInt("quantity"));
268+
assertTrue(rs.getBoolean("in_stock"));
269+
});
270+
}
271+
272+
@Test
273+
@DisplayName("Upsert vs CreateOrReplace: upsert preserves, createOrReplace resets to default")
274+
void testUpsertVsCreateOrReplaceBehavior() throws Exception {
275+
String docId1 = getRandomDocId(4);
276+
String docId2 = getRandomDocId(4);
215277

216-
assertThrows(UnsupportedOperationException.class, () -> flatCollection.upsert(key, document));
278+
// Setup: Create two identical documents
279+
ObjectNode initialNode = OBJECT_MAPPER.createObjectNode();
280+
initialNode.put("item", "Original Item");
281+
initialNode.put("price", 100);
282+
initialNode.put("quantity", 50);
283+
284+
ObjectNode doc1 = initialNode.deepCopy();
285+
doc1.put("id", docId1);
286+
ObjectNode doc2 = initialNode.deepCopy();
287+
doc2.put("id", docId2);
288+
289+
Key key1 = new SingleValueKey(DEFAULT_TENANT, docId1);
290+
Key key2 = new SingleValueKey(DEFAULT_TENANT, docId2);
291+
292+
flatCollection.upsert(key1, new JSONDocument(doc1));
293+
flatCollection.upsert(key2, new JSONDocument(doc2));
294+
295+
// Now update both with partial documents (only item field)
296+
ObjectNode partialUpdate = OBJECT_MAPPER.createObjectNode();
297+
partialUpdate.put("item", "Updated Item");
298+
299+
ObjectNode partial1 = partialUpdate.deepCopy();
300+
partial1.put("id", docId1);
301+
ObjectNode partial2 = partialUpdate.deepCopy();
302+
partial2.put("id", docId2);
303+
304+
// Use upsert for doc1 - should PRESERVE price and quantity
305+
flatCollection.upsert(key1, new JSONDocument(partial1));
306+
307+
// Use createOrReplace for doc2 - should RESET price and quantity to NULL (default)
308+
flatCollection.createOrReplace(key2, new JSONDocument(partial2));
309+
310+
// Verify upsert preserved original values
311+
queryAndAssert(
312+
key1,
313+
rs -> {
314+
assertTrue(rs.next());
315+
assertEquals("Updated Item", rs.getString("item"));
316+
assertEquals(100, rs.getInt("price")); // PRESERVED
317+
assertEquals(50, rs.getInt("quantity")); // PRESERVED
318+
});
319+
320+
// Verify createOrReplace reset to defaults
321+
queryAndAssert(
322+
key2,
323+
rs -> {
324+
assertTrue(rs.next());
325+
assertEquals("Updated Item", rs.getString("item"));
326+
assertNull(rs.getObject("price")); // RESET to NULL
327+
assertNull(rs.getObject("quantity")); // RESET to NULL
328+
});
329+
}
330+
331+
@Test
332+
@DisplayName("Should skip unknown fields in upsert (default SKIP strategy)")
333+
void testUpsertSkipsUnknownFields() throws Exception {
334+
String docId = getRandomDocId(4);
335+
336+
ObjectNode objectNode = OBJECT_MAPPER.createObjectNode();
337+
objectNode.put("id", docId);
338+
objectNode.put("item", "Item with unknown");
339+
objectNode.put("price", 200);
340+
objectNode.put("unknown_field", "should be skipped");
341+
Document document = new JSONDocument(objectNode);
342+
Key key = new SingleValueKey(DEFAULT_TENANT, docId);
343+
344+
boolean isNew = flatCollection.upsert(key, document);
345+
assertTrue(isNew);
346+
347+
// Verify only known fields were inserted
348+
queryAndAssert(
349+
key,
350+
rs -> {
351+
assertTrue(rs.next());
352+
assertEquals("Item with unknown", rs.getString("item"));
353+
assertEquals(200, rs.getInt("price"));
354+
});
217355
}
218356

219357
@Test

document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java

Lines changed: 103 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ private PostgresQueryParser createParser(Query query) {
148148

149149
@Override
150150
public boolean upsert(Key key, Document document) throws IOException {
151-
throw new UnsupportedOperationException(WRITE_NOT_SUPPORTED);
151+
return upsertWithRetry(key, document, false);
152152
}
153153

154154
@Override
@@ -325,7 +325,7 @@ public boolean bulkUpsert(Map<Key, Document> documents) {
325325

326326
// Build the bulk upsert SQL with all columns
327327
List<String> columnList = new ArrayList<>(allColumns);
328-
String sql = buildBulkUpsertSql(columnList, quotedPkColumn);
328+
String sql = buildMergeUpsertSql(columnList, quotedPkColumn, false);
329329
LOGGER.debug("Bulk upsert SQL: {}", sql);
330330

331331
try (Connection conn = client.getPooledConnection();
@@ -374,27 +374,34 @@ public boolean bulkUpsert(Map<Key, Document> documents) {
374374
}
375375

376376
/**
377-
* Builds a PostgreSQL bulk upsert SQL statement for batch execution.
377+
* Builds a PostgreSQL upsert SQL statement with merge semantics.
378+
*
379+
* <p>Generates: INSERT ... ON CONFLICT DO UPDATE SET col = EXCLUDED.col for each column. Only
380+
* columns in the provided list are updated on conflict (merge behavior).
378381
*
379-
* @param columns List of quoted column names (PK should be first)
382+
* @param columns List of quoted column names to include
380383
* @param pkColumn The quoted primary key column name
384+
* @param includeReturning If true, adds RETURNING clause to detect insert vs update
381385
* @return The upsert SQL statement
382386
*/
383-
private String buildBulkUpsertSql(List<String> columns, String pkColumn) {
387+
private String buildMergeUpsertSql(
388+
List<String> columns, String pkColumn, boolean includeReturning) {
384389
String columnList = String.join(", ", columns);
385390
String placeholders = String.join(", ", columns.stream().map(c -> "?").toArray(String[]::new));
386391

387-
// Build SET clause for non-PK columns: col = EXCLUDED.col (this ensures that on conflict, the
388-
// new value is picked)
392+
// Build SET clause for non-PK columns: col = EXCLUDED.col
389393
String setClause =
390394
columns.stream()
391395
.filter(col -> !col.equals(pkColumn))
392396
.map(col -> col + " = EXCLUDED." + col)
393397
.collect(Collectors.joining(", "));
394398

395-
return String.format(
396-
"INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO UPDATE SET %s",
397-
tableIdentifier, columnList, placeholders, pkColumn, setClause);
399+
String sql =
400+
String.format(
401+
"INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO UPDATE SET %s",
402+
tableIdentifier, columnList, placeholders, pkColumn, setClause);
403+
404+
return includeReturning ? sql + " RETURNING (xmax = 0) AS is_insert" : sql;
398405
}
399406

400407
@Override
@@ -880,13 +887,81 @@ private boolean createOrReplaceWithRetry(Key key, Document document, boolean isR
880887
return executeUpsert(sql, parsed);
881888

882889
} catch (PSQLException e) {
883-
return handlePSQLExceptionForUpsert(e, key, document, tableName, isRetry);
890+
return handlePSQLExceptionForCreateOrReplace(e, key, document, tableName, isRetry);
884891
} catch (SQLException e) {
885892
LOGGER.error("SQLException in createOrReplace. key: {} content: {}", key, document, e);
886893
throw new IOException(e);
887894
}
888895
}
889896

897+
/**
898+
* Upserts a document with merge semantics - only updates columns present in the document,
899+
* preserving existing values for columns not in the document.
900+
*
901+
* <p>Unlike {@link #createOrReplaceWithRetry}, this method does NOT reset missing columns to
902+
* their default values.
903+
*
904+
* @param key The document key
905+
* @param document The document to upsert
906+
* @param isRetry Whether this is a retry attempt after schema refresh
907+
* @return true if a new document was created, false if an existing document was updated
908+
*/
909+
private boolean upsertWithRetry(Key key, Document document, boolean isRetry) throws IOException {
910+
String tableName = tableIdentifier.getTableName();
911+
List<String> skippedFields = new ArrayList<>();
912+
913+
try {
914+
TypedDocument parsed = parseDocument(document, tableName, skippedFields);
915+
916+
// Add the key as the primary key column
917+
String pkColumn = getPKForTable(tableName);
918+
String quotedPkColumn = PostgresUtils.wrapFieldNamesWithDoubleQuotes(pkColumn);
919+
PostgresDataType pkType = getPrimaryKeyType(tableName, pkColumn);
920+
parsed.add(quotedPkColumn, key.toString(), pkType, false);
921+
922+
List<String> docColumns = parsed.getColumns();
923+
924+
String sql = buildUpsertSql(docColumns, quotedPkColumn);
925+
LOGGER.debug("Upsert (merge) SQL: {}", sql);
926+
927+
return executeUpsert(sql, parsed);
928+
929+
} catch (PSQLException e) {
930+
return handlePSQLExceptionForUpsert(e, key, document, tableName, isRetry);
931+
} catch (SQLException e) {
932+
LOGGER.error("SQLException in upsert. key: {} content: {}", key, document, e);
933+
throw new IOException(e);
934+
}
935+
}
936+
937+
/**
938+
* Builds a PostgreSQL upsert SQL statement with merge semantics.
939+
*
940+
* <p>This method constructs an atomic upsert query that:
941+
*
942+
* <ul>
943+
* <li>Inserts a new row if no conflict on the primary key
944+
* <li>If the row with that PK already exists, only updates columns present in the document
945+
* <li>Columns NOT in the document retain their existing values (merge behavior)
946+
* </ul>
947+
*
948+
* <p><b>Generated SQL pattern:</b>
949+
*
950+
* <pre>{@code
951+
* INSERT INTO table (col1, col2, pk_col)
952+
* VALUES (?, ?, ?)
953+
* ON CONFLICT (pk_col) DO UPDATE SET col1 = EXCLUDED.col1, col2 = EXCLUDED.col2
954+
* RETURNING (xmax = 0) AS is_insert
955+
* }</pre>
956+
*
957+
* @param docColumns columns present in the document
958+
* @param pkColumn The quoted primary key column name used for conflict detection
959+
* @return The complete upsert SQL statement with placeholders for values
960+
*/
961+
private String buildUpsertSql(List<String> docColumns, String pkColumn) {
962+
return buildMergeUpsertSql(docColumns, pkColumn, true);
963+
}
964+
890965
/**
891966
* Builds a PostgreSQL upsert (INSERT ... ON CONFLICT DO UPDATE) SQL statement.
892967
*
@@ -977,12 +1052,12 @@ private boolean executeUpsert(String sql, TypedDocument parsed) throws SQLExcept
9771052
}
9781053
}
9791054

980-
private boolean handlePSQLExceptionForUpsert(
1055+
private boolean handlePSQLExceptionForCreateOrReplace(
9811056
PSQLException e, Key key, Document document, String tableName, boolean isRetry)
9821057
throws IOException {
9831058
if (!isRetry && shouldRefreshSchemaAndRetry(e.getSQLState())) {
9841059
LOGGER.info(
985-
"Schema mismatch detected during upsert (SQLState: {}), refreshing schema and retrying. key: {}",
1060+
"Schema mismatch detected during createOrReplace (SQLState: {}), refreshing schema and retrying. key: {}",
9861061
e.getSQLState(),
9871062
key);
9881063
schemaRegistry.invalidate(tableName);
@@ -992,6 +1067,21 @@ private boolean handlePSQLExceptionForUpsert(
9921067
throw new IOException(e);
9931068
}
9941069

1070+
private boolean handlePSQLExceptionForUpsert(
1071+
PSQLException e, Key key, Document document, String tableName, boolean isRetry)
1072+
throws IOException {
1073+
if (!isRetry && shouldRefreshSchemaAndRetry(e.getSQLState())) {
1074+
LOGGER.info(
1075+
"Schema mismatch detected during upsert (SQLState: {}), refreshing schema and retrying. key: {}",
1076+
e.getSQLState(),
1077+
key);
1078+
schemaRegistry.invalidate(tableName);
1079+
return upsertWithRetry(key, document, true);
1080+
}
1081+
LOGGER.error("SQLException in upsert. key: {} content: {}", key, document, e);
1082+
throw new IOException(e);
1083+
}
1084+
9951085
private CreateResult handlePSQLExceptionForCreate(
9961086
PSQLException e, Key key, Document document, String tableName, boolean isRetry)
9971087
throws IOException {

0 commit comments

Comments
 (0)