Skip to content

Commit 63017e5

Browse files
authored
fix(alerts): dedup successful change events to prevent Postgres ON CONFLICT abort (#28827)
* fix(alerts): dedup successful change events to prevent Postgres ON CONFLICT abort * refactor(alerts): publishEvent returns delivery result; add consumer-level recipient-dedup tests
1 parent 8f5ef35 commit 63017e5

5 files changed

Lines changed: 517 additions & 73 deletions

File tree

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Copyright 2025 Collate
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
* limitations under the License.
12+
*/
13+
package org.openmetadata.it.tests;
14+
15+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
16+
import static org.junit.jupiter.api.Assertions.assertEquals;
17+
18+
import java.util.List;
19+
import java.util.UUID;
20+
import org.junit.jupiter.api.Test;
21+
import org.junit.jupiter.api.parallel.Execution;
22+
import org.junit.jupiter.api.parallel.ExecutionMode;
23+
import org.openmetadata.service.Entity;
24+
import org.openmetadata.service.jdbi3.CollectionDAO.EventSubscriptionDAO;
25+
26+
/**
27+
* Regression test for {@code EventSubscriptionDAO.batchUpsertSuccessfulChangeEvents}.
28+
*
29+
* <p>When an alert fans an event out to multiple destination types, the same
30+
* {@code (change_event_id, event_subscription_id)} pair was added to the batch more than once. With
31+
* the Postgres JDBC driver's {@code reWriteBatchedInserts=true}, the batch collapses into a single
32+
* multi-row {@code INSERT ... ON CONFLICT (...) DO UPDATE}, and Postgres aborts the whole statement
33+
* with "ON CONFLICT DO UPDATE command cannot affect row a second time" — so no success records were
34+
* persisted. This guard must run under a Postgres profile to exercise that path; on MySQL it still
35+
* verifies the dedup invariant.
36+
*/
37+
@Execution(ExecutionMode.CONCURRENT)
38+
public class EventSubscriptionBatchUpsertIT {
39+
40+
private static EventSubscriptionDAO dao() {
41+
return Entity.getCollectionDAO().eventSubscriptionDAO();
42+
}
43+
44+
private static String json(String changeEventId) {
45+
return "{\"id\":\"" + changeEventId + "\"}";
46+
}
47+
48+
@Test
49+
void duplicateKeysInBatchDoNotFailAndCollapseToOneRow() {
50+
String subscriptionId = UUID.randomUUID().toString();
51+
String changeEventId = UUID.randomUUID().toString();
52+
long timestamp = System.currentTimeMillis();
53+
try {
54+
assertDoesNotThrow(
55+
() ->
56+
dao()
57+
.batchUpsertSuccessfulChangeEvents(
58+
List.of(changeEventId, changeEventId),
59+
List.of(subscriptionId, subscriptionId),
60+
List.of(json(changeEventId), json(changeEventId)),
61+
List.of(timestamp, timestamp)));
62+
assertEquals(1L, dao().getSuccessfulRecordCount(subscriptionId));
63+
} finally {
64+
dao().deleteSuccessfulChangeEventBySubscriptionId(subscriptionId);
65+
}
66+
}
67+
68+
@Test
69+
void mixedBatchKeepsOneRowPerDistinctKey() {
70+
String subscriptionId = UUID.randomUUID().toString();
71+
String changeEventA = UUID.randomUUID().toString();
72+
String changeEventB = UUID.randomUUID().toString();
73+
long timestamp = System.currentTimeMillis();
74+
try {
75+
dao()
76+
.batchUpsertSuccessfulChangeEvents(
77+
List.of(changeEventA, changeEventB, changeEventA),
78+
List.of(subscriptionId, subscriptionId, subscriptionId),
79+
List.of(json(changeEventA), json(changeEventB), json(changeEventA)),
80+
List.of(timestamp, timestamp, timestamp));
81+
assertEquals(2L, dao().getSuccessfulRecordCount(subscriptionId));
82+
} finally {
83+
dao().deleteSuccessfulChangeEventBySubscriptionId(subscriptionId);
84+
}
85+
}
86+
}

openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/AbstractEventConsumer.java

Lines changed: 56 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -170,16 +170,6 @@ public void handleFailedEvent(EventPublisherException ex, boolean errorOnSub) {
170170
source.toString());
171171
}
172172

173-
private void recordSuccessfulChangeEvent(UUID eventSubscriptionId, ChangeEvent event) {
174-
Entity.getCollectionDAO()
175-
.eventSubscriptionDAO()
176-
.upsertSuccessfulChangeEvent(
177-
event.getId().toString(),
178-
eventSubscriptionId.toString(),
179-
JsonUtils.pojoToJson(event),
180-
System.currentTimeMillis());
181-
}
182-
183173
private EventSubscriptionOffset loadInitialOffset(JobExecutionContext context) {
184174
Object offsetValue = jobDetail.getJobDataMap().get(ALERT_OFFSET_KEY);
185175
if (offsetValue != null) {
@@ -232,54 +222,68 @@ public void publishEvents(Map<ChangeEvent, Set<UUID>> events) {
232222
if (events.isEmpty()) {
233223
return;
234224
}
235-
236-
// Filter events based on subscription configuration (entity type, conditions, etc.)
237225
Map<ChangeEvent, Set<UUID>> filteredEvents = getFilteredEvents(eventSubscription, events);
238226
RecipientResolver resolver = new RecipientResolver();
227+
int successDeliveries = 0;
228+
int failedDeliveries = 0;
229+
for (Map.Entry<ChangeEvent, Set<UUID>> eventWithReceivers : filteredEvents.entrySet()) {
230+
EventDeliveryResult result =
231+
publishEvent(eventWithReceivers.getKey(), eventWithReceivers.getValue(), resolver);
232+
// Record once per (event, subscription): the table has no destination dimension, so
233+
// recording per type would duplicate rows and break Postgres ON CONFLICT.
234+
if (result.delivered()) {
235+
successfulEvents.add(eventWithReceivers.getKey());
236+
}
237+
successDeliveries += result.successCount();
238+
failedDeliveries += result.failedCount();
239+
}
240+
alertMetrics.withSuccessEvents(alertMetrics.getSuccessEvents() + successDeliveries);
241+
alertMetrics.withFailedEvents(alertMetrics.getFailedEvents() + failedDeliveries);
242+
}
239243

240-
for (var eventWithReceivers : filteredEvents.entrySet()) {
241-
ChangeEvent event = eventWithReceivers.getKey();
242-
Set<UUID> destinationIds = eventWithReceivers.getValue();
243-
244-
// Group destinations by type to enable cross-destination recipient deduplication
245-
Map<SubscriptionType, List<Destination<ChangeEvent>>> destinationsByType =
246-
groupDestinationsByType(destinationIds);
247-
248-
for (var entry : destinationsByType.entrySet()) {
249-
List<Destination<ChangeEvent>> destinations = entry.getValue();
250-
Destination<ChangeEvent> publisher = destinations.getFirst();
251-
252-
// Resolve recipients from all destinations of this type for deduplication
253-
Set<Recipient> recipients = Set.of();
254-
if (publisher.requiresRecipients()) {
255-
List<SubscriptionDestination> subDestinations =
256-
destinations.stream().map(Destination::getSubscriptionDestination).toList();
257-
recipients = resolver.resolveRecipients(event, subDestinations);
258-
}
259-
260-
// Send via primary destination only, with deduplicated recipients (one send per type)
261-
boolean status = true;
262-
if (!publisher.requiresRecipients() || !recipients.isEmpty()) {
263-
try {
264-
publisher.sendMessage(event, recipients);
265-
} catch (EventPublisherException e) {
266-
LOG.error("Failed to send alert: {}", e.getMessage());
267-
handleFailedEvent(e, true);
268-
status = false;
269-
}
270-
}
244+
private EventDeliveryResult publishEvent(
245+
ChangeEvent event, Set<UUID> destinationIds, RecipientResolver resolver) {
246+
// Group destinations by type to enable cross-destination recipient deduplication
247+
Map<SubscriptionType, List<Destination<ChangeEvent>>> destinationsByType =
248+
groupDestinationsByType(destinationIds);
249+
int successCount = 0;
250+
int failedCount = 0;
251+
for (Map.Entry<SubscriptionType, List<Destination<ChangeEvent>>> entry :
252+
destinationsByType.entrySet()) {
253+
if (sendToDestinationType(event, entry.getValue(), resolver)) {
254+
successCount++;
255+
} else {
256+
failedCount++;
257+
}
258+
}
259+
return new EventDeliveryResult(successCount > 0, successCount, failedCount);
260+
}
271261

272-
if (status) {
273-
// Collect successful events instead of writing immediately
274-
// Batch write happens in commit() to reduce connection pool contention
275-
// Note: Empty recipients is treated as successful (no-op send)
276-
successfulEvents.add(eventWithReceivers.getKey());
277-
alertMetrics.withSuccessEvents(alertMetrics.getSuccessEvents() + 1);
278-
} else {
279-
alertMetrics.withFailedEvents(alertMetrics.getFailedEvents() + 1);
280-
}
262+
private record EventDeliveryResult(boolean delivered, int successCount, int failedCount) {}
263+
264+
private boolean sendToDestinationType(
265+
ChangeEvent event, List<Destination<ChangeEvent>> destinations, RecipientResolver resolver) {
266+
Destination<ChangeEvent> publisher = destinations.getFirst();
267+
// Resolve recipients from all destinations of this type for deduplication
268+
Set<Recipient> recipients = Set.of();
269+
if (publisher.requiresRecipients()) {
270+
List<SubscriptionDestination> subDestinations =
271+
destinations.stream().map(Destination::getSubscriptionDestination).toList();
272+
recipients = resolver.resolveRecipients(event, subDestinations);
273+
}
274+
// Send via primary destination only, with deduplicated recipients (one send per type).
275+
// Empty recipients is treated as successful (no-op send).
276+
boolean status = true;
277+
if (!publisher.requiresRecipients() || !recipients.isEmpty()) {
278+
try {
279+
publisher.sendMessage(event, recipients);
280+
} catch (EventPublisherException e) {
281+
LOG.error("Failed to send alert: {}", e.getMessage());
282+
handleFailedEvent(e, true);
283+
status = false;
281284
}
282285
}
286+
return status;
283287
}
284288

285289
private Map<SubscriptionType, List<Destination<ChangeEvent>>> groupDestinationsByType(

openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java

Lines changed: 41 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5333,27 +5333,47 @@ void upsertFailedEvent(
53335333
@Bind("json") String json,
53345334
@Bind("source") String source);
53355335

5336-
@ConnectionAwareSqlUpdate(
5337-
value =
5338-
"INSERT INTO successful_sent_change_events (change_event_id, event_subscription_id, json, timestamp) "
5339-
+ "VALUES (:change_event_id, :event_subscription_id, :json, :timestamp) "
5340-
+ "ON DUPLICATE KEY UPDATE json = :json, timestamp = :timestamp",
5341-
connectionType = MYSQL)
5342-
@ConnectionAwareSqlUpdate(
5343-
value =
5344-
"INSERT INTO successful_sent_change_events (change_event_id, event_subscription_id, json, timestamp) "
5345-
+ "VALUES (:change_event_id, :event_subscription_id, CAST(:json AS jsonb), :timestamp) "
5346-
+ "ON CONFLICT (change_event_id, event_subscription_id) "
5347-
+ "DO UPDATE SET json = EXCLUDED.json, timestamp = EXCLUDED.timestamp",
5348-
connectionType = POSTGRES)
5349-
void upsertSuccessfulChangeEvent(
5350-
@Bind("change_event_id") String changeEventId,
5351-
@Bind("event_subscription_id") String eventSubscriptionId,
5352-
@Bind("json") String json,
5353-
@Bind("timestamp") long timestamp);
5354-
53555336
// Batch insert for successful events - reduces connection pool contention
5356-
// from N connections to 1 when processing multiple events
5337+
// from N connections to 1 when processing multiple events.
5338+
// Deduplicates by (change_event_id, event_subscription_id) first: the primary key has no
5339+
// destination dimension, and Postgres rejects a rewritten multi-row INSERT whose own rows
5340+
// conflict on the arbiter key ("ON CONFLICT DO UPDATE command cannot affect row a second
5341+
// time").
5342+
default void batchUpsertSuccessfulChangeEvents(
5343+
List<String> changeEventIds,
5344+
List<String> eventSubscriptionIds,
5345+
List<String> jsonList,
5346+
List<Long> timestamps) {
5347+
List<Integer> keep = distinctLastIndexes(changeEventIds, eventSubscriptionIds);
5348+
if (keep.size() == changeEventIds.size()) {
5349+
batchUpsertSuccessfulChangeEventsInternal(
5350+
changeEventIds, eventSubscriptionIds, jsonList, timestamps);
5351+
} else {
5352+
batchUpsertSuccessfulChangeEventsInternal(
5353+
pickByIndex(changeEventIds, keep),
5354+
pickByIndex(eventSubscriptionIds, keep),
5355+
pickByIndex(jsonList, keep),
5356+
pickByIndex(timestamps, keep));
5357+
}
5358+
}
5359+
5360+
static List<Integer> distinctLastIndexes(
5361+
List<String> changeEventIds, List<String> eventSubscriptionIds) {
5362+
LinkedHashMap<String, Integer> lastIndexByKey = new LinkedHashMap<>();
5363+
for (int i = 0; i < changeEventIds.size(); i++) {
5364+
lastIndexByKey.put(changeEventIds.get(i) + "|" + eventSubscriptionIds.get(i), i);
5365+
}
5366+
return new ArrayList<>(lastIndexByKey.values());
5367+
}
5368+
5369+
static <T> List<T> pickByIndex(List<T> source, List<Integer> indexes) {
5370+
List<T> result = new ArrayList<>(indexes.size());
5371+
for (int index : indexes) {
5372+
result.add(source.get(index));
5373+
}
5374+
return result;
5375+
}
5376+
53575377
@Transaction
53585378
@ConnectionAwareSqlBatch(
53595379
value =
@@ -5368,7 +5388,7 @@ void upsertSuccessfulChangeEvent(
53685388
+ "ON CONFLICT (change_event_id, event_subscription_id) "
53695389
+ "DO UPDATE SET json = EXCLUDED.json, timestamp = EXCLUDED.timestamp",
53705390
connectionType = POSTGRES)
5371-
void batchUpsertSuccessfulChangeEvents(
5391+
void batchUpsertSuccessfulChangeEventsInternal(
53725392
@Bind("change_event_id") List<String> changeEventIds,
53735393
@Bind("event_subscription_id") List<String> eventSubscriptionIds,
53745394
@Bind("json") List<String> jsonList,

0 commit comments

Comments
 (0)