Skip to content

Commit b6c0237

Browse files
committed
WIP
1 parent ad828f4 commit b6c0237

1 file changed

Lines changed: 2 additions & 249 deletions

File tree

document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java

Lines changed: 2 additions & 249 deletions
Original file line numberDiff line numberDiff line change
@@ -228,255 +228,8 @@ public boolean updateSubDoc(Key key, String subDocPath, Document subDocument) {
228228
}
229229

230230
@Override
231-
public BulkUpdateResult bulkUpdateSubDocs(Map<Key, Map<String, Document>> documents)
232-
throws Exception {
233-
if (documents == null || documents.isEmpty()) {
234-
return new BulkUpdateResult(0);
235-
}
236-
237-
String tableName = tableIdentifier.getTableName();
238-
String pkColumn = getPKForTable(tableName);
239-
String quotedPkColumn = PostgresUtils.wrapFieldNamesWithDoubleQuotes(pkColumn);
240-
241-
Set<Key> updatedKeys = new HashSet<>();
242-
243-
try (Connection connection = client.getTransactionalConnection()) {
244-
try {
245-
// Lock all rows upfront with SELECT ... FOR UPDATE to ensure atomicity
246-
String lockSql =
247-
String.format(
248-
"SELECT %s FROM %s WHERE %s = ANY(?) FOR UPDATE",
249-
quotedPkColumn, tableIdentifier, quotedPkColumn);
250-
251-
String[] keyArray = documents.keySet().stream().map(Key::toString).toArray(String[]::new);
252-
Array sqlArray = connection.createArrayOf("text", keyArray);
253-
254-
try (PreparedStatement lockPs = connection.prepareStatement(lockSql)) {
255-
lockPs.setArray(1, sqlArray);
256-
lockPs.executeQuery(); // Acquire locks on all rows
257-
}
258-
259-
// Now execute updates for each key
260-
for (Map.Entry<Key, Map<String, Document>> entry : documents.entrySet()) {
261-
Key key = entry.getKey();
262-
Map<String, Document> subDocUpdates = entry.getValue();
263-
264-
if (subDocUpdates == null || subDocUpdates.isEmpty()) {
265-
continue;
266-
}
267-
268-
boolean updated =
269-
executeSubDocUpdatesForKey(connection, key, subDocUpdates, tableName, quotedPkColumn);
270-
if (updated) {
271-
updatedKeys.add(key);
272-
}
273-
}
274-
275-
// Update lastUpdatedTime within the same transaction
276-
updateLastUpdatedTimeForKeys(connection, updatedKeys, tableName, quotedPkColumn);
277-
278-
connection.commit();
279-
} catch (Exception e) {
280-
connection.rollback();
281-
throw e;
282-
}
283-
} catch (SQLException e) {
284-
LOGGER.error("SQLException in bulkUpdateSubDocs", e);
285-
throw e;
286-
}
287-
288-
return new BulkUpdateResult(updatedKeys.size());
289-
}
290-
291-
/**
292-
* Executes sub-document updates for a single key.
293-
*
294-
* @return true if at least one update was successful
295-
*/
296-
private boolean executeSubDocUpdatesForKey(
297-
Connection connection,
298-
Key key,
299-
Map<String, Document> subDocUpdates,
300-
String tableName,
301-
String quotedPkColumn)
302-
throws SQLException {
303-
304-
// Group updates by resolved column
305-
Map<String, List<SubDocUpdateEntry>> updatesByColumn = new LinkedHashMap<>();
306-
307-
for (Map.Entry<String, Document> subDocEntry : subDocUpdates.entrySet()) {
308-
String subDocPath = subDocEntry.getKey();
309-
Document subDocument = subDocEntry.getValue();
310-
311-
Optional<String> columnName = resolveColumnName(subDocPath, tableName);
312-
if (columnName.isEmpty()) {
313-
if (missingColumnStrategy == MissingColumnStrategy.THROW) {
314-
throw new IllegalArgumentException(
315-
"Column not found in schema for path: "
316-
+ subDocPath
317-
+ " and missing column strategy is configured to: "
318-
+ missingColumnStrategy);
319-
}
320-
LOGGER.warn("Skipping update for unresolved path: {}", subDocPath);
321-
continue;
322-
}
323-
324-
updatesByColumn
325-
.computeIfAbsent(columnName.get(), k -> new ArrayList<>())
326-
.add(new SubDocUpdateEntry(subDocPath, subDocument));
327-
}
328-
329-
if (updatesByColumn.isEmpty()) {
330-
return false;
331-
}
332-
333-
// Build SET clause fragments - one per column
334-
List<String> setFragments = new ArrayList<>();
335-
List<Object> params = new ArrayList<>();
336-
337-
for (Map.Entry<String, List<SubDocUpdateEntry>> columnEntry : updatesByColumn.entrySet()) {
338-
String columnName = columnEntry.getKey();
339-
List<SubDocUpdateEntry> columnUpdates = columnEntry.getValue();
340-
341-
PostgresColumnMetadata colMeta =
342-
schemaRegistry.getColumnOrRefresh(tableName, columnName).orElseThrow();
343-
344-
// Track the current expression for chaining nested JSONB updates
345-
String currentExpr = String.format("\"%s\"", columnName);
346-
String fragment = null;
347-
348-
for (SubDocUpdateEntry updateEntry : columnUpdates) {
349-
String fullPath = updateEntry.path;
350-
String[] nestedPath = getNestedPath(fullPath, columnName);
351-
boolean isTopLevel = nestedPath.length == 0;
352-
353-
if (isTopLevel) {
354-
// Top-level column update - set the entire column value
355-
PostgresDataType colType = colMeta.getPostgresType();
356-
if (colType == PostgresDataType.JSONB) {
357-
// JSONB columns need explicit cast
358-
fragment = String.format("\"%s\" = ?::jsonb", columnName);
359-
params.add(updateEntry.document.toJson());
360-
} else {
361-
fragment = String.format("\"%s\" = ?", columnName);
362-
params.add(prepareValueForColumn(updateEntry.document, colMeta));
363-
}
364-
} else {
365-
// Nested JSONB path update using jsonb_set
366-
String jsonPath = formatJsonbPath(nestedPath);
367-
String jsonValue = updateEntry.document.toJson();
368-
String valueExpr =
369-
String.format("jsonb_set(%s, '%s'::text[], ?::jsonb)", currentExpr, jsonPath);
370-
currentExpr = valueExpr;
371-
fragment = String.format("\"%s\" = %s", columnName, valueExpr);
372-
params.add(jsonValue);
373-
}
374-
}
375-
376-
if (fragment != null) {
377-
setFragments.add(fragment);
378-
}
379-
}
380-
381-
if (setFragments.isEmpty()) {
382-
return false;
383-
}
384-
385-
// Build and execute UPDATE SQL
386-
String sql =
387-
String.format(
388-
"UPDATE %s SET %s WHERE %s = ?",
389-
tableIdentifier, String.join(", ", setFragments), quotedPkColumn);
390-
391-
params.add(key.toString());
392-
393-
LOGGER.debug("Executing sub-doc update SQL: {}", sql);
394-
395-
try (PreparedStatement ps = connection.prepareStatement(sql)) {
396-
int idx = 1;
397-
for (Object param : params) {
398-
ps.setObject(idx++, param);
399-
}
400-
int rowsUpdated = ps.executeUpdate();
401-
return rowsUpdated > 0;
402-
}
403-
}
404-
405-
/**
406-
* Prepares a Document value for insertion into a column based on its type. For JSONB columns,
407-
* returns the JSON string. For other types, extracts the appropriate scalar value.
408-
*/
409-
private Object prepareValueForColumn(Document document, PostgresColumnMetadata colMeta)
410-
throws SQLException {
411-
try {
412-
JsonNode jsonNode = MAPPER.readTree(document.toJson());
413-
PostgresDataType type = colMeta.getPostgresType();
414-
415-
if (type == PostgresDataType.JSONB) {
416-
return document.toJson();
417-
}
418-
419-
// For non-JSONB columns, extract the scalar value
420-
return extractValue(jsonNode, type, colMeta.isArray());
421-
} catch (IOException e) {
422-
throw new SQLException("Failed to parse document for column: " + colMeta.getName(), e);
423-
}
424-
}
425-
426-
/** Formats a nested path array into PostgreSQL jsonb path format: {path1,path2,path3} */
427-
private String formatJsonbPath(String[] nestedPath) {
428-
return "{" + String.join(",", nestedPath) + "}";
429-
}
430-
431-
/**
432-
* Updates the lastUpdatedTime column for the given keys using the provided connection. This
433-
* allows the update to be part of an existing transaction.
434-
*/
435-
private void updateLastUpdatedTimeForKeys(
436-
Connection connection, Set<Key> keys, String tableName, String quotedPkColumn)
437-
throws SQLException {
438-
if (keys.isEmpty() || lastUpdatedTsColumn == null) {
439-
return;
440-
}
441-
442-
String quotedTsColumn = PostgresUtils.wrapFieldNamesWithDoubleQuotes(lastUpdatedTsColumn);
443-
444-
Optional<PostgresColumnMetadata> tsColMeta =
445-
schemaRegistry.getColumnOrRefresh(tableName, lastUpdatedTsColumn);
446-
if (tsColMeta.isEmpty()) {
447-
LOGGER.warn(
448-
"lastUpdatedTsColumn '{}' not found in schema, skipping timestamp update",
449-
lastUpdatedTsColumn);
450-
return;
451-
}
452-
453-
long now = System.currentTimeMillis();
454-
Object tsValue = convertTimestampForType(now, tsColMeta.get().getPostgresType());
455-
456-
String sql =
457-
String.format(
458-
"UPDATE %s SET %s = ? WHERE %s = ?", tableIdentifier, quotedTsColumn, quotedPkColumn);
459-
460-
try (PreparedStatement ps = connection.prepareStatement(sql)) {
461-
for (Key key : keys) {
462-
ps.setObject(1, tsValue);
463-
ps.setString(2, key.toString());
464-
ps.addBatch();
465-
}
466-
ps.executeBatch();
467-
}
468-
}
469-
470-
/** Helper class to hold sub-document update information. */
471-
private static class SubDocUpdateEntry {
472-
473-
final String path;
474-
final Document document;
475-
476-
SubDocUpdateEntry(String path, Document document) {
477-
this.path = path;
478-
this.document = document;
479-
}
231+
public BulkUpdateResult bulkUpdateSubDocs(Map<Key, Map<String, Document>> documents) {
232+
throw new UnsupportedOperationException(WRITE_NOT_SUPPORTED);
480233
}
481234

482235
@Override

0 commit comments

Comments
 (0)