Skip to content

Commit 72ad012

Browse files
authored
Adds additional thread synchronization in the AggregateProcessor to prevent duplicate or orphaned aggregate groups. (#6439)
Signed-off-by: David Venable <dlv@amazon.com>
1 parent bbfbc9c commit 72ad012

1 file changed

Lines changed: 22 additions & 17 deletions

File tree

  • data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate

data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessor.java

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -105,21 +105,23 @@ private AggregateGroup getAggregateGroupForEvent(final IdentificationKeysHasher.
105105
public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
106106
final List<Record<Event>> recordsOut = new LinkedList<>();
107107

108-
final List<Map.Entry<IdentificationKeysHasher.IdentificationKeysMap, AggregateGroup>> groupsToConclude = aggregateGroupManager.getGroupsToConclude(forceConclude);
109-
for (final Map.Entry<IdentificationKeysHasher.IdentificationKeysMap, AggregateGroup> groupEntry : groupsToConclude) {
110-
final AggregateActionOutput actionOutput = aggregateActionSynchronizer.concludeGroup(groupEntry.getKey(), groupEntry.getValue(), forceConclude);
111-
112-
final List<Event> concludeGroupEvents = actionOutput != null ? actionOutput.getEvents() : null;
113-
if (concludeGroupEvents != null && !concludeGroupEvents.isEmpty()) {
114-
concludeGroupEvents.stream().forEach((event) -> {
115-
if (aggregatedEventsTag != null) {
116-
event.getMetadata().addTags(List.of(aggregatedEventsTag));
117-
}
118-
recordsOut.add(new Record(event));
119-
actionConcludeGroupEventsOutCounter.increment();
120-
});
121-
} else {
122-
actionConcludeGroupEventsDroppedCounter.increment();
108+
synchronized (this) {
109+
final List<Map.Entry<IdentificationKeysHasher.IdentificationKeysMap, AggregateGroup>> groupsToConclude = aggregateGroupManager.getGroupsToConclude(forceConclude);
110+
for (final Map.Entry<IdentificationKeysHasher.IdentificationKeysMap, AggregateGroup> groupEntry : groupsToConclude) {
111+
final AggregateActionOutput actionOutput = aggregateActionSynchronizer.concludeGroup(groupEntry.getKey(), groupEntry.getValue(), forceConclude);
112+
113+
final List<Event> concludeGroupEvents = actionOutput != null ? actionOutput.getEvents() : null;
114+
if (concludeGroupEvents != null && !concludeGroupEvents.isEmpty()) {
115+
concludeGroupEvents.stream().forEach((event) -> {
116+
if (aggregatedEventsTag != null) {
117+
event.getMetadata().addTags(List.of(aggregatedEventsTag));
118+
}
119+
recordsOut.add(new Record(event));
120+
actionConcludeGroupEventsOutCounter.increment();
121+
});
122+
} else {
123+
actionConcludeGroupEventsDroppedCounter.increment();
124+
}
123125
}
124126
}
125127

@@ -132,9 +134,12 @@ public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
132134
continue;
133135
}
134136
final IdentificationKeysHasher.IdentificationKeysMap identificationKeysMap = identificationKeysHasher.createIdentificationKeysMapFromEvent(event);
135-
final AggregateGroup aggregateGroupForEvent = getAggregateGroupForEvent(identificationKeysMap);
137+
final AggregateActionResponse handleEventResponse;
138+
synchronized (this) {
139+
final AggregateGroup aggregateGroupForEvent = getAggregateGroupForEvent(identificationKeysMap);
136140

137-
final AggregateActionResponse handleEventResponse = aggregateActionSynchronizer.handleEventForGroup(event, identificationKeysMap, aggregateGroupForEvent);
141+
handleEventResponse = aggregateActionSynchronizer.handleEventForGroup(event, identificationKeysMap, aggregateGroupForEvent);
142+
}
138143

139144
final Event aggregateActionResponseEvent = handleEventResponse.getEvent();
140145

0 commit comments

Comments
 (0)