Skip to content

Commit f2512e0

Browse files
authored
Fixes a false reporting bug for the invalidEventHandles counter (#6420)
Fixes a bug with the invalidEventHandles counter in the PipelineRunner. This metric was being counted for any event that is not a default event (ie. for aggregate events). This would happen even if there is no need to discard the event. This change should count this when aggregate events should be released but are not. We probably need some deeper investigation into how we can properly release aggregate events. But, for now this metric will be more accurate. Also improves some code to reduce unnecessary variables, use final modifiers, and better legibility. Signed-off-by: David Venable <dlv@amazon.com>
1 parent 48e7d5d commit f2512e0

4 files changed

Lines changed: 65 additions & 32 deletions

File tree

data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineRunnerImpl.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
/*
22
* Copyright OpenSearch Contributors
33
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
48
*/
59

610
package org.opensearch.dataprepper.core.pipeline;
@@ -15,7 +19,6 @@
1519
import org.opensearch.dataprepper.model.event.DefaultEventHandle;
1620
import org.opensearch.dataprepper.model.event.Event;
1721
import org.opensearch.dataprepper.model.event.EventHandle;
18-
import org.opensearch.dataprepper.model.event.InternalEventHandle;
1922
import org.opensearch.dataprepper.model.processor.Processor;
2023
import org.opensearch.dataprepper.model.record.Record;
2124
import org.slf4j.Logger;
@@ -33,8 +36,7 @@ public class PipelineRunnerImpl implements PipelineRunner {
3336
private static final Logger LOG = LoggerFactory.getLogger(PipelineRunnerImpl.class);
3437
private static final String INVALID_EVENT_HANDLES = "invalidEventHandles";
3538
private boolean isEmptyRecordsLogged = false;
36-
@VisibleForTesting
37-
final Counter invalidEventHandlesCounter;
39+
private final Counter invalidEventHandlesCounter;
3840
private final Pipeline pipeline;
3941
private final PluginMetrics pluginMetrics;
4042
private final ProcessorProvider processorProvider;
@@ -75,18 +77,19 @@ Map.Entry<Collection, CheckpointState> readFromBuffer(Buffer buffer, Pipeline pi
7577
}
7678

7779
@VisibleForTesting
78-
void processAcknowledgements(List<Event> inputEvents, Collection<Record<Event>> outputRecords) {
79-
Set<Event> outputEventsSet = outputRecords.stream().map(Record::getData).collect(Collectors.toSet());
80+
void processAcknowledgements(final List<Event> inputEvents, final Collection<Record<Event>> outputRecords) {
81+
final Set<Event> outputEventsSet = outputRecords.stream().map(Record::getData).collect(Collectors.toSet());
8082
// For each event in the input events list that is not present in the output events, send positive acknowledgement, if acknowledgements are enabled for it
8183
inputEvents.forEach(event -> {
82-
EventHandle eventHandle = event.getEventHandle();
83-
if (eventHandle != null && eventHandle instanceof DefaultEventHandle) {
84-
InternalEventHandle internalEventHandle = (InternalEventHandle) eventHandle;
84+
final EventHandle eventHandle = event.getEventHandle();
85+
if (eventHandle != null) {
8586
if (!outputEventsSet.contains(event)) {
86-
eventHandle.release(true);
87+
if (eventHandle instanceof DefaultEventHandle) {
88+
eventHandle.release(true);
89+
} else {
90+
invalidEventHandlesCounter.increment();
91+
}
8792
}
88-
} else if (eventHandle != null) {
89-
invalidEventHandlesCounter.increment();
9093
}
9194
});
9295
}

data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/router/RouterFactory.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,24 @@
11
/*
22
* Copyright OpenSearch Contributors
33
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
48
*/
59

610
package org.opensearch.dataprepper.core.pipeline.router;
711

812
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
913
import org.opensearch.dataprepper.model.configuration.ConditionalRoute;
14+
import org.opensearch.dataprepper.model.event.Event;
1015

1116
import java.util.Objects;
1217
import java.util.Set;
18+
import java.util.function.Consumer;
1319

1420
public class RouterFactory {
21+
private static final Consumer<Event> RELEASE_EVENT_ON_NO_ROUTE = event -> event.getEventHandle().release(true);
1522
private final ExpressionEvaluator expressionEvaluator;
1623
private final DataFlowComponentRouter dataFlowComponentRouter;
1724

@@ -23,6 +30,6 @@ public class RouterFactory {
2330
public Router createRouter(final Set<ConditionalRoute> routes) {
2431
final RouteEventEvaluator routeEventEvaluator = new RouteEventEvaluator(expressionEvaluator, routes);
2532
return new Router(routeEventEvaluator, dataFlowComponentRouter,
26-
event -> event.getEventHandle().release(true));
33+
RELEASE_EVENT_ON_NO_ROUTE);
2734
}
2835
}

data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/PipelineRunnerTest.java

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
/*
22
* Copyright OpenSearch Contributors
33
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
48
*/
59

610
package org.opensearch.dataprepper.core.pipeline;
@@ -55,13 +59,18 @@
5559
import static org.mockito.Mockito.mockStatic;
5660
import static org.mockito.Mockito.never;
5761
import static org.mockito.Mockito.verify;
62+
import static org.mockito.Mockito.verifyNoInteractions;
5863
import static org.mockito.Mockito.when;
5964

6065
@ExtendWith(MockitoExtension.class)
6166
class PipelineRunnerTest {
6267
private static final int BUFFER_READ_TIMEOUT_MILLIS = 1000;
6368
private static final String MOCK_PIPELINE_NAME = "Test-Pipeline";
6469

70+
@Mock
71+
private PluginMetrics pluginMetrics;
72+
@Mock
73+
private Counter counter;
6574
@Mock
6675
Pipeline pipeline;
6776
@Mock
@@ -91,12 +100,19 @@ private void setupPipeline(boolean shouldEnableAcknowledgements) {
91100
}
92101

93102
private PipelineRunnerImpl createObjectUnderTest() {
94-
return new PipelineRunnerImpl(pipeline, processorProvider);
103+
try (final MockedStatic<PluginMetrics> pluginMetricsStatic = mockStatic(PluginMetrics.class)) {
104+
pluginMetricsStatic.when(() -> PluginMetrics.fromNames("PipelineRunner", pipeline.getName()))
105+
.thenReturn(pluginMetrics);
106+
107+
return new PipelineRunnerImpl(pipeline, processorProvider);
108+
}
95109
}
96110

97111
@BeforeEach
98112
void setUp() {
99113
processors = List.of(processor);
114+
115+
when(pluginMetrics.counter(any())).thenReturn(counter);
100116
}
101117

102118
@Nested
@@ -118,6 +134,8 @@ public void testProcessAcknowledgementsSuccess() {
118134
PipelineRunnerImpl pipelineRunner = createObjectUnderTest();
119135
pipelineRunner.processAcknowledgements(inputEvents, outputRecords);
120136
verify(defaultEventHandle).release(true);
137+
138+
verifyNoInteractions(counter);
121139
}
122140

123141
@Test
@@ -134,6 +152,8 @@ void testProcessAcknowledgementsReleasesMissingEvents() {
134152
pipelineRunner.processAcknowledgements(inputEvents, outputRecords);
135153
verify(defaultEventHandle).release(true);
136154
assertNotSame(event, differentEvent);
155+
156+
verifyNoInteractions(counter);
137157
}
138158

139159
@Test
@@ -148,6 +168,8 @@ void testProcessAcknowledgementsDoesNotReleaseWhenEventsPresent() {
148168
PipelineRunnerImpl pipelineRunner = createObjectUnderTest();
149169
pipelineRunner.processAcknowledgements(inputEvents, outputRecords);
150170
verify(defaultEventHandle, never()).release(true);
171+
172+
verifyNoInteractions(counter);
151173
}
152174

153175
@Test
@@ -156,18 +178,20 @@ void testProcessAcknowledgementsInvalidEventHandleIncrementsCounter() {
156178
Collection<Record<Event>> outputRecords = List.of(record);
157179
when(event.getEventHandle()).thenReturn(eventHandle);
158180

159-
try (MockedStatic<PluginMetrics> pluginMetricsStatic = mockStatic(PluginMetrics.class)) {
160-
final PluginMetrics pluginMetrics = mock(PluginMetrics.class);
161-
final Counter counter = mock(Counter.class);
162-
when(pluginMetrics.counter(any())).thenReturn(counter);
163-
pluginMetricsStatic.when(() -> PluginMetrics.fromNames("PipelineRunner", pipeline.getName()))
164-
.thenReturn(pluginMetrics);
181+
assertDoesNotThrow(() -> createObjectUnderTest().processAcknowledgements(inputEvents, outputRecords));
182+
verify(counter, atLeastOnce()).increment();
183+
}
165184

166-
PipelineRunnerImpl pipelineRunner = createObjectUnderTest();
167-
assertDoesNotThrow(() -> pipelineRunner.processAcknowledgements(inputEvents, outputRecords));
185+
@Test
186+
void testProcessAcknowledgementsInvalidEventHandleIncrementsCounter2() {
187+
Record<Event> eventRecord = mock(Record.class);
188+
when(eventRecord.getData()).thenReturn(event);
189+
List<Event> inputEvents = List.of(event);
190+
Collection<Record<Event>> outputRecords = List.of(record, eventRecord);
191+
when(event.getEventHandle()).thenReturn(eventHandle);
168192

169-
verify(counter, atLeastOnce()).increment();
170-
}
193+
assertDoesNotThrow(() -> createObjectUnderTest().processAcknowledgements(inputEvents, outputRecords));
194+
verifyNoInteractions(counter);
171195
}
172196
}
173197

data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateGroup.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,27 +28,26 @@ class AggregateGroup implements AggregateActionInput {
2828
private final Lock handleEventForGroupLock;
2929
private final Map<Object, Object> identificationKeys;
3030
private Function<Duration, Boolean> customShouldConclude;
31-
private EventHandle eventHandle;
31+
private EventHandle groupEventHandle;
3232

3333
AggregateGroup(final Map<Object, Object> identificationKeys) {
3434
this.groupState = new DefaultGroupState();
3535
this.identificationKeys = identificationKeys;
3636
this.groupStart = Instant.now();
3737
this.concludeGroupLock = new ReentrantLock();
3838
this.handleEventForGroupLock = new ReentrantLock();
39-
this.eventHandle = new AggregateEventHandle(Instant.now());
39+
this.groupEventHandle = new AggregateEventHandle(Instant.now());
4040
}
4141

4242
@Override
4343
public EventHandle getEventHandle() {
44-
return eventHandle;
44+
return groupEventHandle;
4545
}
4646

47-
public void attachToEventAcknowledgementSet(Event event) {
48-
InternalEventHandle internalEventHandle;
49-
EventHandle handle = event.getEventHandle();
50-
internalEventHandle = (InternalEventHandle)(handle);
51-
internalEventHandle.addEventHandle(eventHandle);
47+
public void attachToEventAcknowledgementSet(final Event event) {
48+
final EventHandle handle = event.getEventHandle();
49+
final InternalEventHandle internalEventHandle = (InternalEventHandle) (handle);
50+
internalEventHandle.addEventHandle(groupEventHandle);
5251
}
5352

5453
public GroupState getGroupState() {
@@ -86,6 +85,6 @@ boolean shouldConcludeGroup(final Duration groupDuration) {
8685
void resetGroup() {
8786
groupStart = Instant.now();
8887
groupState.clear();
89-
this.eventHandle = new AggregateEventHandle(groupStart);
88+
this.groupEventHandle = new AggregateEventHandle(groupStart);
9089
}
9190
}

0 commit comments

Comments
 (0)