Skip to content

Commit c18ded7

Browse files
dlvenablesan81
authored andcommitted
Synchronization fix for aggregate processor and aggregate event handles when attaching events to the aggregate group. (opensearch-project#6431)
There is a possible synchronization issue in the aggregate processor. It currently calls attachToEventAcknowledgementSet on the aggregate group outside of any locks. It is possible that one thread gets this group. Then thread two gets the closes the group. If thread 1 then attaches the event to that group, thread 2 may still reset it. The solution is to move attachToEventAcknowledgementSet into the locks. Signed-off-by: David Venable <dlv@amazon.com>
1 parent 096d3f4 commit c18ded7

4 files changed

Lines changed: 13 additions & 6 deletions

File tree

data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateActionSynchronizer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ AggregateActionResponse handleEventForGroup(final Event event, final Identificat
9292
handleEventForGroupLock.lock();
9393
try {
9494
LOG.debug("Start critical section in handleEventForGroup");
95+
aggregateGroup.attachToEventAcknowledgementSet(event);
9596
handleEventResponse = aggregateAction.handleEvent(event, aggregateGroup);
9697
aggregateGroupManager.putGroupWithHash(hash, aggregateGroup);
9798
} catch (final Exception e) {

data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessor.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,8 @@ private AggregateAction loadAggregateAction(final PluginFactory pluginFactory) {
9797
return pluginFactory.loadPlugin(AggregateAction.class, actionPluginSetting);
9898
}
9999

100-
AggregateGroup getAggregateGroupForEvent(final IdentificationKeysHasher.IdentificationKeysMap identificationKeysMap, final Event event) {
101-
AggregateGroup aggregateGroup = aggregateGroupManager.getAggregateGroup(identificationKeysMap);
102-
aggregateGroup.attachToEventAcknowledgementSet(event);
103-
return aggregateGroup;
100+
private AggregateGroup getAggregateGroupForEvent(final IdentificationKeysHasher.IdentificationKeysMap identificationKeysMap) {
101+
return aggregateGroupManager.getAggregateGroup(identificationKeysMap);
104102
}
105103

106104
@Override
@@ -134,7 +132,7 @@ public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
134132
continue;
135133
}
136134
final IdentificationKeysHasher.IdentificationKeysMap identificationKeysMap = identificationKeysHasher.createIdentificationKeysMapFromEvent(event);
137-
final AggregateGroup aggregateGroupForEvent = getAggregateGroupForEvent(identificationKeysMap, event);
135+
final AggregateGroup aggregateGroupForEvent = getAggregateGroupForEvent(identificationKeysMap);
138136

139137
final AggregateActionResponse handleEventResponse = aggregateActionSynchronizer.handleEventForGroup(event, identificationKeysMap, aggregateGroupForEvent);
140138

data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateActionSynchronizerTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,10 +158,11 @@ void handleEventForGroup_calls_expected_functions_and_returns_correct_AggregateA
158158

159159
final AggregateActionResponse handleEventResponse = objectUnderTest.handleEventForGroup(event, identificationKeysMap, aggregateGroup);
160160

161-
final InOrder inOrder = Mockito.inOrder(concludeGroupLock, handleEventForGroupLock, aggregateAction, aggregateGroupManager);
161+
final InOrder inOrder = Mockito.inOrder(concludeGroupLock, handleEventForGroupLock, aggregateGroup, aggregateAction, aggregateGroupManager);
162162
inOrder.verify(concludeGroupLock).lock();
163163
inOrder.verify(concludeGroupLock).unlock();
164164
inOrder.verify(handleEventForGroupLock).lock();
165+
inOrder.verify(aggregateGroup).attachToEventAcknowledgementSet(event);
165166
inOrder.verify(aggregateAction).handleEvent(event, aggregateGroup);
166167
inOrder.verify(aggregateGroupManager).putGroupWithHash(identificationKeysMap, aggregateGroup);
167168
inOrder.verify(handleEventForGroupLock).unlock();

data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorTest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.opensearch.dataprepper.plugins.processor.aggregate;
1111

12+
import org.junit.jupiter.api.AfterEach;
1213
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
1314
import org.opensearch.dataprepper.metrics.MetricNames;
1415
import org.opensearch.dataprepper.metrics.PluginMetrics;
@@ -50,6 +51,7 @@
5051
import static org.junit.jupiter.api.Assertions.assertTrue;
5152
import static org.mockito.ArgumentMatchers.any;
5253
import static org.mockito.ArgumentMatchers.eq;
54+
import static org.mockito.Mockito.never;
5355
import static org.mockito.Mockito.verify;
5456
import static org.mockito.Mockito.verifyNoInteractions;
5557
import static org.mockito.Mockito.when;
@@ -158,6 +160,11 @@ void setUp() {
158160
when(pluginMetrics.timer(MetricNames.TIME_ELAPSED)).thenReturn(timeElapsed);
159161
}
160162

163+
@AfterEach
164+
void processorDoesNotAttachEventsDirectly() {
165+
verify(aggregateGroup, never()).attachToEventAcknowledgementSet(any());
166+
}
167+
161168
@Test
162169
void invalid_aggregate_when_statement_throws_InvalidPluginConfigurationException() {
163170
final String whenCondition = UUID.randomUUID().toString();

0 commit comments

Comments
 (0)