Skip to content

Commit fcfcc04

Browse files
Added integration tests for validating that events are processed exactly once (#5691)
* Added integration tests for validating that events are processed exactly once by any processor - Added additional validation for BasicEndToEndTests Signed-off-by: Mohammed Aghil Puthiyottil <57040494+MohammedAghil@users.noreply.github.com> * Moved getEventsMap() and getName() methods to base class Signed-off-by: Mohammed Aghil Puthiyottil <57040494+MohammedAghil@users.noreply.github.com> * Addressed comments on PIPELINE_TO_PROCESSORS_MAP Signed-off-by: Mohammed Aghil Puthiyottil <57040494+MohammedAghil@users.noreply.github.com> --------- Signed-off-by: Mohammed Aghil Puthiyottil <57040494+MohammedAghil@users.noreply.github.com>
1 parent a08cd7b commit fcfcc04

9 files changed

Lines changed: 481 additions & 0 deletions

File tree

Lines changed: 268 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,268 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
package org.opensearch.dataprepper.integration;
6+
7+
import org.junit.jupiter.api.AfterEach;
8+
import org.junit.jupiter.api.BeforeAll;
9+
import org.junit.jupiter.api.BeforeEach;
10+
import org.junit.jupiter.params.ParameterizedTest;
11+
import org.junit.jupiter.params.provider.Arguments;
12+
import org.junit.jupiter.params.provider.MethodSource;
13+
import org.opensearch.dataprepper.model.event.Event;
14+
import org.opensearch.dataprepper.model.event.JacksonEvent;
15+
import org.opensearch.dataprepper.model.record.Record;
16+
import org.opensearch.dataprepper.plugins.BaseEventsTrackingProcessor;
17+
import org.opensearch.dataprepper.plugins.InMemorySinkAccessor;
18+
import org.opensearch.dataprepper.plugins.InMemorySourceAccessor;
19+
import org.opensearch.dataprepper.plugins.BasicEventsTrackingTestProcessor;
20+
import org.opensearch.dataprepper.plugins.SingleThreadEventsTrackingTestProcessor;
21+
import org.opensearch.dataprepper.test.framework.DataPrepperTestRunner;
22+
23+
import java.util.ArrayList;
24+
import java.util.List;
25+
import java.util.Map;
26+
import java.util.Set;
27+
import java.util.UUID;
28+
import java.util.concurrent.TimeUnit;
29+
import java.util.concurrent.atomic.AtomicInteger;
30+
import java.util.stream.Collectors;
31+
import java.util.stream.Stream;
32+
33+
import static org.awaitility.Awaitility.await;
34+
import static org.hamcrest.CoreMatchers.equalTo;
35+
import static org.hamcrest.MatcherAssert.assertThat;
36+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
37+
38+
/**
39+
* Integration tests for validating processor behavior in pipelines and to verify that
40+
* events are processed exactly once even while using multiple workers.
41+
*/
42+
class ProcessorValidationIT {
43+
private static final String IN_MEMORY_IDENTIFIER = "ProcessorValidationIT";
44+
private static final int BATCH_SIZE = 5;
45+
private static final int TOTAL_EVENTS = 100;
46+
private static final int WAIT_TIMEOUT_SECONDS = 10;
47+
private static Map<String, List<BaseEventsTrackingProcessor>> PIPELINE_TO_PROCESSORS_MAP;
48+
49+
private DataPrepperTestRunner testRunner;
50+
private InMemorySourceAccessor sourceAccessor;
51+
private InMemorySinkAccessor sinkAccessor;
52+
private String pipelineType;
53+
54+
@BeforeAll
55+
static void setupProcessors() {
56+
BaseEventsTrackingProcessor singleThreadEventsTrackingProcessor = new SingleThreadEventsTrackingTestProcessor();
57+
BaseEventsTrackingProcessor basicEventsTrackingProcessor = new BasicEventsTrackingTestProcessor();
58+
PIPELINE_TO_PROCESSORS_MAP = Map.of(
59+
"single-thread-processor-pipeline", List.of(singleThreadEventsTrackingProcessor),
60+
"basic-processor-pipeline", List.of(basicEventsTrackingProcessor),
61+
"multi-processor-pipeline", List.of(singleThreadEventsTrackingProcessor, basicEventsTrackingProcessor)
62+
);
63+
}
64+
65+
@BeforeEach
66+
void setUp() {
67+
PIPELINE_TO_PROCESSORS_MAP.values().forEach(processorsList ->
68+
processorsList.forEach(BaseEventsTrackingProcessor::reset));
69+
}
70+
71+
@AfterEach
72+
void tearDown() {
73+
if (testRunner != null) {
74+
testRunner.stop();
75+
}
76+
}
77+
78+
/**
79+
* Parameterized test that validates event processing across different pipeline configurations.
80+
*
81+
* @param pipelineType The type of pipeline configuration to test
82+
* @param testName A descriptive name for the test scenario
83+
* @param numberOfBatches Number of batches to send to the pipeline
84+
* @param eventsPerBatch Number of events in each batch
85+
* @param expectedTotalEvents Total number of events expected to be processed
86+
*/
87+
@ParameterizedTest(name = "{index} - {0} - {1}")
88+
@MethodSource("provideTestParameters")
89+
void test_events_processed_validation(String pipelineType, String testName, int numberOfBatches, int eventsPerBatch, int expectedTotalEvents) {
90+
this.pipelineType = pipelineType;
91+
initializeTestRunner();
92+
List<List<Record<Event>>> batches = createBatches(numberOfBatches, eventsPerBatch);
93+
batches.forEach(batch -> sourceAccessor.submit(IN_MEMORY_IDENTIFIER, batch));
94+
95+
verifyProcessingResults(pipelineType, expectedTotalEvents, eventsPerBatch);
96+
}
97+
98+
/**
99+
* Provides test parameters for the parameterized test.
100+
* Creates test scenarios for each pipeline type with both single batch and multiple batch configurations.
101+
* @return Stream of test parameters
102+
*/
103+
private static Stream<Arguments> provideTestParameters() {
104+
List<Arguments> arguments = new ArrayList<>();
105+
for (String pipelineType : PIPELINE_TO_PROCESSORS_MAP.keySet()) {
106+
arguments.add(Arguments.of(pipelineType, "SingleBatch", 1, TOTAL_EVENTS, TOTAL_EVENTS));
107+
arguments.add(Arguments.of(pipelineType, "MultipleBatches", BATCH_SIZE, TOTAL_EVENTS, BATCH_SIZE * TOTAL_EVENTS));
108+
}
109+
return arguments.stream();
110+
}
111+
112+
/**
113+
* Initializes the DataPrepper test runner with the appropriate pipeline configuration.
114+
* Sets up source and sink accessors for test data input and output.
115+
*/
116+
private void initializeTestRunner() {
117+
String pipelineFile = pipelineType + ".yaml";
118+
testRunner = DataPrepperTestRunner.builder()
119+
.withPipelinesDirectoryOrFile(pipelineFile)
120+
.build();
121+
sourceAccessor = testRunner.getInMemorySourceAccessor();
122+
sinkAccessor = testRunner.getInMemorySinkAccessor();
123+
testRunner.start();
124+
}
125+
126+
/**
127+
* Verifies that events were processed correctly by the pipeline.
128+
* @param pipelineType The type of pipeline being tested
129+
* @param expectedTotalEvents Total number of events expected to be processed
130+
* @param eventsPerBatch Number of events in each batch
131+
*/
132+
private void verifyProcessingResults(String pipelineType, int expectedTotalEvents, int eventsPerBatch) {
133+
// Wait for all events to be processed
134+
await().atMost(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)
135+
.untilAsserted(() -> assertThat(
136+
sinkAccessor.get(IN_MEMORY_IDENTIFIER).size(),
137+
equalTo(expectedTotalEvents)));
138+
139+
List<Record<Event>> outputRecords = sinkAccessor.get(IN_MEMORY_IDENTIFIER);
140+
assertThat(outputRecords.size(), equalTo(expectedTotalEvents));
141+
142+
// Verify each processor in the pipeline processed events
143+
List<BaseEventsTrackingProcessor> processors = PIPELINE_TO_PROCESSORS_MAP.get(pipelineType);
144+
for (BaseEventsTrackingProcessor processor : processors) {
145+
String processorName = processor.getName();
146+
Map<String, AtomicInteger> processedEventsMap = processor.getEventsMap();
147+
148+
verifyEventProcessing(processedEventsMap, outputRecords, expectedTotalEvents, processorName);
149+
}
150+
151+
verifyWorkerThreads(outputRecords, processors);
152+
153+
int numberOfBatches = expectedTotalEvents / eventsPerBatch;
154+
for (int batch = 0; batch < numberOfBatches; batch++) {
155+
int finalBatch = batch;
156+
List<Record<Event>> batchRecords = outputRecords.stream()
157+
.filter(record -> record.getData().get("batch", Integer.class) == finalBatch)
158+
.collect(Collectors.toList());
159+
assertThat(batchRecords.size(), equalTo(eventsPerBatch));
160+
}
161+
}
162+
163+
/**
164+
* Verifies that each event was processed exactly once by the specified processor.
165+
* Checks both the processor's internal tracking map and the event metadata.
166+
*
167+
* @param processedEventsMap Map tracking which events were processed by the processor
168+
* @param outputRecords List of output records from the pipeline
169+
* @param expectedTotalEvents Total number of events expected to be processed
170+
* @param processorType Name of the processor being verified
171+
*/
172+
private void verifyEventProcessing(Map<String, AtomicInteger> processedEventsMap,
173+
List<Record<Event>> outputRecords,
174+
int expectedTotalEvents,
175+
String processorType) {
176+
assertThat("Output records list should not be empty", outputRecords.size(), greaterThanOrEqualTo(1));
177+
String countField = processorType + "_processed_count";
178+
for (Record<Event> record : outputRecords) {
179+
String id = record.getData().get("id", String.class);
180+
AtomicInteger count = processedEventsMap.get(id);
181+
182+
assertThat("Event with ID " + id + " should be processed exactly once by " + processorType,
183+
count.get(), equalTo(1));
184+
assertThat("Event processing count should be 1 for " + processorType,
185+
record.getData().get(countField, Integer.class), equalTo(1));
186+
}
187+
188+
assertThat("All events should be processed by " + processorType,
189+
processedEventsMap.size(), equalTo(expectedTotalEvents));
190+
}
191+
192+
/**
193+
* Verifies that worker threads were assigned correctly based on processor configuration.
194+
* Ensures that at least one worker thread was used for processing.
195+
*
196+
* @param outputRecords List of output records from the pipeline
197+
* @param processors List of processors in the pipeline
198+
*/
199+
private void verifyWorkerThreads(List<Record<Event>> outputRecords, List<BaseEventsTrackingProcessor> processors) {
200+
Set<String> threadNames = outputRecords.stream()
201+
.map(Record::getData)
202+
.flatMap(event -> processors.stream()
203+
.map(processor -> {
204+
String processorName = processor.getName();
205+
return event.get(processorName + "_processed_by_thread", String.class);
206+
})
207+
.filter(threadName -> threadName != null))
208+
.collect(Collectors.toSet());
209+
210+
assertThat("There should be at least one worker thread",
211+
threadNames.size(), greaterThanOrEqualTo(1));
212+
}
213+
214+
/**
215+
* Creates a list of event records with unique IDs and sequential numbering.
216+
*
217+
* @param count Number of records to create
218+
* @return List of event records
219+
*/
220+
private List<Record<Event>> createRecords(int count) {
221+
List<Record<Event>> records = new ArrayList<>();
222+
for (int i = 0; i < count; i++) {
223+
records.add(new Record<>(createEvent(i)));
224+
}
225+
return records;
226+
}
227+
228+
/**
229+
* Creates multiple batches of event records.
230+
* Each batch is assigned a batch number, and events within each batch
231+
* are given sequential numbers across all batches.
232+
*
233+
* @param batchSize Number of batches to create
234+
* @param eventsPerBatch Number of events in each batch
235+
* @return List of batches, where each batch is a list of event records
236+
*/
237+
private List<List<Record<Event>>> createBatches(int batchSize, int eventsPerBatch) {
238+
List<List<Record<Event>>> batches = new ArrayList<>();
239+
for (int batch = 0; batch < batchSize; batch++) {
240+
int batchOffset = batch * eventsPerBatch;
241+
int currentBatch = batch;
242+
List<Record<Event>> batchRecords = createRecords(eventsPerBatch).stream()
243+
.map(record -> {
244+
Event event = record.getData();
245+
event.put("sequence", batchOffset + event.get("sequence", Integer.class));
246+
event.put("batch", currentBatch);
247+
return new Record<>(event);
248+
})
249+
.collect(Collectors.toList());
250+
batches.add(batchRecords);
251+
}
252+
return batches;
253+
}
254+
255+
/**
256+
* Creates a single event with a unique ID and sequence number.
257+
*
258+
* @param sequence Sequence number to assign to the event
259+
* @return The created event
260+
*/
261+
private Event createEvent(int sequence) {
262+
String eventId = UUID.randomUUID().toString();
263+
Event event = JacksonEvent.fromMessage(eventId);
264+
event.put("id", eventId);
265+
event.put("sequence", sequence);
266+
return event;
267+
}
268+
}
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
package org.opensearch.dataprepper.plugins;
6+
7+
import org.opensearch.dataprepper.model.event.Event;
8+
import org.opensearch.dataprepper.model.processor.Processor;
9+
import org.opensearch.dataprepper.model.record.Record;
10+
11+
import java.util.Collection;
12+
import java.util.Map;
13+
import java.util.concurrent.atomic.AtomicInteger;
14+
15+
/**
16+
* Abstract base class for test processors that track event processing.
17+
* This class provides common functionality for tracking all the events that are processed
18+
*/
19+
public abstract class BaseEventsTrackingProcessor implements Processor<Record<Event>, Record<Event>> {
20+
private final String countPropertyName;
21+
private final String threadPropertyName;
22+
private final Map<String, AtomicInteger> eventsMap;
23+
private final String processorName;
24+
25+
/**
26+
* Constructor for the base events tracking processor.
27+
*
28+
* @param processorName Name of the processor
29+
* @param eventsMap Map for tracking processed events
30+
*/
31+
protected BaseEventsTrackingProcessor(String processorName, Map<String, AtomicInteger> eventsMap) {
32+
this.countPropertyName = processorName + "_processed_count";
33+
this.threadPropertyName = processorName + "_processed_by_thread";
34+
this.processorName = processorName;
35+
this.eventsMap = eventsMap;
36+
}
37+
38+
/**
39+
* Gets the map of processed events.
40+
* @return Map of event IDs to processing counts
41+
*/
42+
public Map<String, AtomicInteger> getEventsMap() {
43+
return eventsMap;
44+
}
45+
46+
/**
47+
* Gets the name of processor.
48+
* @return The processor name
49+
*/
50+
public String getName() {
51+
return processorName;
52+
}
53+
54+
/**
55+
* Resets the processor's state by clearing the events map.
56+
*/
57+
public void reset() {
58+
eventsMap.clear();
59+
}
60+
61+
/**
62+
* Processes a collection of event records.
63+
* For each event:
64+
* 1. Stores the event which was provided for processing in the events map
65+
* 2. Records the processing count in the event metadata
66+
* 3. Records which thread processed the event in the event metadata
67+
*
68+
* @param records Collection of event records to process
69+
* @return records with added metadata
70+
*/
71+
@Override
72+
public Collection<Record<Event>> execute(final Collection<Record<Event>> records) {
73+
final String threadName = Thread.currentThread().getName();
74+
75+
for (Record<Event> record : records) {
76+
Event event = record.getData();
77+
String eventId = event.get("id", String.class);
78+
79+
if (eventId != null) {
80+
AtomicInteger counter = eventsMap.computeIfAbsent(eventId, id -> new AtomicInteger(0));
81+
int count = counter.incrementAndGet();
82+
83+
event.put(countPropertyName, count);
84+
event.put(threadPropertyName, threadName);
85+
}
86+
}
87+
return records;
88+
}
89+
90+
@Override
91+
public void prepareForShutdown() {
92+
}
93+
94+
@Override
95+
public boolean isReadyForShutdown() {
96+
return true;
97+
}
98+
99+
@Override
100+
public void shutdown() {
101+
}
102+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
package org.opensearch.dataprepper.plugins;
6+
7+
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
8+
import org.opensearch.dataprepper.model.processor.Processor;
9+
10+
import java.util.Map;
11+
import java.util.concurrent.ConcurrentHashMap;
12+
import java.util.concurrent.atomic.AtomicInteger;
13+
14+
/**
15+
* A basic test processor implementation which tracks the list of all events processed by it
16+
*/
17+
@DataPrepperPlugin(name = "basic_events_tracking_test", pluginType = Processor.class)
18+
public class BasicEventsTrackingTestProcessor extends BaseEventsTrackingProcessor {
19+
private static final Map<String, AtomicInteger> PROCESSED_EVENTS_MAP = new ConcurrentHashMap<>();
20+
private static final String PLUGIN_NAME = "basic_events_tracking_test";
21+
22+
public BasicEventsTrackingTestProcessor() {
23+
super(PLUGIN_NAME, PROCESSED_EVENTS_MAP);
24+
}
25+
}

0 commit comments

Comments
 (0)