diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessor.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessor.java index b0abac031b..9833a8bfa0 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessor.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessor.java @@ -105,21 +105,23 @@ private AggregateGroup getAggregateGroupForEvent(final IdentificationKeysHasher. public Collection> doExecute(Collection> records) { final List> recordsOut = new LinkedList<>(); - final List> groupsToConclude = aggregateGroupManager.getGroupsToConclude(forceConclude); - for (final Map.Entry groupEntry : groupsToConclude) { - final AggregateActionOutput actionOutput = aggregateActionSynchronizer.concludeGroup(groupEntry.getKey(), groupEntry.getValue(), forceConclude); - - final List concludeGroupEvents = actionOutput != null ? actionOutput.getEvents() : null; - if (concludeGroupEvents != null && !concludeGroupEvents.isEmpty()) { - concludeGroupEvents.stream().forEach((event) -> { - if (aggregatedEventsTag != null) { - event.getMetadata().addTags(List.of(aggregatedEventsTag)); - } - recordsOut.add(new Record(event)); - actionConcludeGroupEventsOutCounter.increment(); - }); - } else { - actionConcludeGroupEventsDroppedCounter.increment(); + synchronized (this) { + final List> groupsToConclude = aggregateGroupManager.getGroupsToConclude(forceConclude); + for (final Map.Entry groupEntry : groupsToConclude) { + final AggregateActionOutput actionOutput = aggregateActionSynchronizer.concludeGroup(groupEntry.getKey(), groupEntry.getValue(), forceConclude); + + final List concludeGroupEvents = actionOutput != null ? actionOutput.getEvents() : null; + if (concludeGroupEvents != null && !concludeGroupEvents.isEmpty()) { + concludeGroupEvents.stream().forEach((event) -> { + if (aggregatedEventsTag != null) { + event.getMetadata().addTags(List.of(aggregatedEventsTag)); + } + recordsOut.add(new Record(event)); + actionConcludeGroupEventsOutCounter.increment(); + }); + } else { + actionConcludeGroupEventsDroppedCounter.increment(); + } } } @@ -132,9 +134,12 @@ public Collection> doExecute(Collection> records) { continue; } final IdentificationKeysHasher.IdentificationKeysMap identificationKeysMap = identificationKeysHasher.createIdentificationKeysMapFromEvent(event); - final AggregateGroup aggregateGroupForEvent = getAggregateGroupForEvent(identificationKeysMap); + final AggregateActionResponse handleEventResponse; + synchronized (this) { + final AggregateGroup aggregateGroupForEvent = getAggregateGroupForEvent(identificationKeysMap); - final AggregateActionResponse handleEventResponse = aggregateActionSynchronizer.handleEventForGroup(event, identificationKeysMap, aggregateGroupForEvent); + handleEventResponse = aggregateActionSynchronizer.handleEventForGroup(event, identificationKeysMap, aggregateGroupForEvent); + } final Event aggregateActionResponseEvent = handleEventResponse.getEvent();