|
| 1 | +/* |
| 2 | + * Copyright OpenSearch Contributors |
| 3 | + * SPDX-License-Identifier: Apache-2.0 |
| 4 | + */ |
| 5 | +package org.opensearch.dataprepper.integration; |
| 6 | + |
| 7 | +import org.junit.jupiter.api.AfterEach; |
| 8 | +import org.junit.jupiter.api.BeforeAll; |
| 9 | +import org.junit.jupiter.api.BeforeEach; |
| 10 | +import org.junit.jupiter.params.ParameterizedTest; |
| 11 | +import org.junit.jupiter.params.provider.Arguments; |
| 12 | +import org.junit.jupiter.params.provider.MethodSource; |
| 13 | +import org.opensearch.dataprepper.model.event.Event; |
| 14 | +import org.opensearch.dataprepper.model.event.JacksonEvent; |
| 15 | +import org.opensearch.dataprepper.model.record.Record; |
| 16 | +import org.opensearch.dataprepper.plugins.BaseEventsTrackingProcessor; |
| 17 | +import org.opensearch.dataprepper.plugins.InMemorySinkAccessor; |
| 18 | +import org.opensearch.dataprepper.plugins.InMemorySourceAccessor; |
| 19 | +import org.opensearch.dataprepper.plugins.BasicEventsTrackingTestProcessor; |
| 20 | +import org.opensearch.dataprepper.plugins.SingleThreadEventsTrackingTestProcessor; |
| 21 | +import org.opensearch.dataprepper.test.framework.DataPrepperTestRunner; |
| 22 | + |
| 23 | +import java.util.ArrayList; |
| 24 | +import java.util.List; |
| 25 | +import java.util.Map; |
| 26 | +import java.util.Set; |
| 27 | +import java.util.UUID; |
| 28 | +import java.util.concurrent.TimeUnit; |
| 29 | +import java.util.concurrent.atomic.AtomicInteger; |
| 30 | +import java.util.stream.Collectors; |
| 31 | +import java.util.stream.Stream; |
| 32 | + |
| 33 | +import static org.awaitility.Awaitility.await; |
| 34 | +import static org.hamcrest.CoreMatchers.equalTo; |
| 35 | +import static org.hamcrest.MatcherAssert.assertThat; |
| 36 | +import static org.hamcrest.Matchers.greaterThanOrEqualTo; |
| 37 | + |
| 38 | +/** |
| 39 | + * Integration tests for validating processor behavior in pipelines and to verify that |
| 40 | + * events are processed exactly once even while using multiple workers. |
| 41 | + */ |
| 42 | +class ProcessorValidationIT { |
| 43 | + private static final String IN_MEMORY_IDENTIFIER = "ProcessorValidationIT"; |
| 44 | + private static final int BATCH_SIZE = 5; |
| 45 | + private static final int TOTAL_EVENTS = 100; |
| 46 | + private static final int WAIT_TIMEOUT_SECONDS = 10; |
| 47 | + private static Map<String, List<BaseEventsTrackingProcessor>> PIPELINE_TO_PROCESSORS_MAP; |
| 48 | + |
| 49 | + private DataPrepperTestRunner testRunner; |
| 50 | + private InMemorySourceAccessor sourceAccessor; |
| 51 | + private InMemorySinkAccessor sinkAccessor; |
| 52 | + private String pipelineType; |
| 53 | + |
| 54 | + @BeforeAll |
| 55 | + static void setupProcessors() { |
| 56 | + BaseEventsTrackingProcessor singleThreadEventsTrackingProcessor = new SingleThreadEventsTrackingTestProcessor(); |
| 57 | + BaseEventsTrackingProcessor basicEventsTrackingProcessor = new BasicEventsTrackingTestProcessor(); |
| 58 | + PIPELINE_TO_PROCESSORS_MAP = Map.of( |
| 59 | + "single-thread-processor-pipeline", List.of(singleThreadEventsTrackingProcessor), |
| 60 | + "basic-processor-pipeline", List.of(basicEventsTrackingProcessor), |
| 61 | + "multi-processor-pipeline", List.of(singleThreadEventsTrackingProcessor, basicEventsTrackingProcessor) |
| 62 | + ); |
| 63 | + } |
| 64 | + |
| 65 | + @BeforeEach |
| 66 | + void setUp() { |
| 67 | + PIPELINE_TO_PROCESSORS_MAP.values().forEach(processorsList -> |
| 68 | + processorsList.forEach(BaseEventsTrackingProcessor::reset)); |
| 69 | + } |
| 70 | + |
| 71 | + @AfterEach |
| 72 | + void tearDown() { |
| 73 | + if (testRunner != null) { |
| 74 | + testRunner.stop(); |
| 75 | + } |
| 76 | + } |
| 77 | + |
| 78 | + /** |
| 79 | + * Parameterized test that validates event processing across different pipeline configurations. |
| 80 | + * |
| 81 | + * @param pipelineType The type of pipeline configuration to test |
| 82 | + * @param testName A descriptive name for the test scenario |
| 83 | + * @param numberOfBatches Number of batches to send to the pipeline |
| 84 | + * @param eventsPerBatch Number of events in each batch |
| 85 | + * @param expectedTotalEvents Total number of events expected to be processed |
| 86 | + */ |
| 87 | + @ParameterizedTest(name = "{index} - {0} - {1}") |
| 88 | + @MethodSource("provideTestParameters") |
| 89 | + void test_events_processed_validation(String pipelineType, String testName, int numberOfBatches, int eventsPerBatch, int expectedTotalEvents) { |
| 90 | + this.pipelineType = pipelineType; |
| 91 | + initializeTestRunner(); |
| 92 | + List<List<Record<Event>>> batches = createBatches(numberOfBatches, eventsPerBatch); |
| 93 | + batches.forEach(batch -> sourceAccessor.submit(IN_MEMORY_IDENTIFIER, batch)); |
| 94 | + |
| 95 | + verifyProcessingResults(pipelineType, expectedTotalEvents, eventsPerBatch); |
| 96 | + } |
| 97 | + |
| 98 | + /** |
| 99 | + * Provides test parameters for the parameterized test. |
| 100 | + * Creates test scenarios for each pipeline type with both single batch and multiple batch configurations. |
| 101 | + * @return Stream of test parameters |
| 102 | + */ |
| 103 | + private static Stream<Arguments> provideTestParameters() { |
| 104 | + List<Arguments> arguments = new ArrayList<>(); |
| 105 | + for (String pipelineType : PIPELINE_TO_PROCESSORS_MAP.keySet()) { |
| 106 | + arguments.add(Arguments.of(pipelineType, "SingleBatch", 1, TOTAL_EVENTS, TOTAL_EVENTS)); |
| 107 | + arguments.add(Arguments.of(pipelineType, "MultipleBatches", BATCH_SIZE, TOTAL_EVENTS, BATCH_SIZE * TOTAL_EVENTS)); |
| 108 | + } |
| 109 | + return arguments.stream(); |
| 110 | + } |
| 111 | + |
| 112 | + /** |
| 113 | + * Initializes the DataPrepper test runner with the appropriate pipeline configuration. |
| 114 | + * Sets up source and sink accessors for test data input and output. |
| 115 | + */ |
| 116 | + private void initializeTestRunner() { |
| 117 | + String pipelineFile = pipelineType + ".yaml"; |
| 118 | + testRunner = DataPrepperTestRunner.builder() |
| 119 | + .withPipelinesDirectoryOrFile(pipelineFile) |
| 120 | + .build(); |
| 121 | + sourceAccessor = testRunner.getInMemorySourceAccessor(); |
| 122 | + sinkAccessor = testRunner.getInMemorySinkAccessor(); |
| 123 | + testRunner.start(); |
| 124 | + } |
| 125 | + |
| 126 | + /** |
| 127 | + * Verifies that events were processed correctly by the pipeline. |
| 128 | + * @param pipelineType The type of pipeline being tested |
| 129 | + * @param expectedTotalEvents Total number of events expected to be processed |
| 130 | + * @param eventsPerBatch Number of events in each batch |
| 131 | + */ |
| 132 | + private void verifyProcessingResults(String pipelineType, int expectedTotalEvents, int eventsPerBatch) { |
| 133 | + // Wait for all events to be processed |
| 134 | + await().atMost(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS) |
| 135 | + .untilAsserted(() -> assertThat( |
| 136 | + sinkAccessor.get(IN_MEMORY_IDENTIFIER).size(), |
| 137 | + equalTo(expectedTotalEvents))); |
| 138 | + |
| 139 | + List<Record<Event>> outputRecords = sinkAccessor.get(IN_MEMORY_IDENTIFIER); |
| 140 | + assertThat(outputRecords.size(), equalTo(expectedTotalEvents)); |
| 141 | + |
| 142 | + // Verify each processor in the pipeline processed events |
| 143 | + List<BaseEventsTrackingProcessor> processors = PIPELINE_TO_PROCESSORS_MAP.get(pipelineType); |
| 144 | + for (BaseEventsTrackingProcessor processor : processors) { |
| 145 | + String processorName = processor.getName(); |
| 146 | + Map<String, AtomicInteger> processedEventsMap = processor.getEventsMap(); |
| 147 | + |
| 148 | + verifyEventProcessing(processedEventsMap, outputRecords, expectedTotalEvents, processorName); |
| 149 | + } |
| 150 | + |
| 151 | + verifyWorkerThreads(outputRecords, processors); |
| 152 | + |
| 153 | + int numberOfBatches = expectedTotalEvents / eventsPerBatch; |
| 154 | + for (int batch = 0; batch < numberOfBatches; batch++) { |
| 155 | + int finalBatch = batch; |
| 156 | + List<Record<Event>> batchRecords = outputRecords.stream() |
| 157 | + .filter(record -> record.getData().get("batch", Integer.class) == finalBatch) |
| 158 | + .collect(Collectors.toList()); |
| 159 | + assertThat(batchRecords.size(), equalTo(eventsPerBatch)); |
| 160 | + } |
| 161 | + } |
| 162 | + |
| 163 | + /** |
| 164 | + * Verifies that each event was processed exactly once by the specified processor. |
| 165 | + * Checks both the processor's internal tracking map and the event metadata. |
| 166 | + * |
| 167 | + * @param processedEventsMap Map tracking which events were processed by the processor |
| 168 | + * @param outputRecords List of output records from the pipeline |
| 169 | + * @param expectedTotalEvents Total number of events expected to be processed |
| 170 | + * @param processorType Name of the processor being verified |
| 171 | + */ |
| 172 | + private void verifyEventProcessing(Map<String, AtomicInteger> processedEventsMap, |
| 173 | + List<Record<Event>> outputRecords, |
| 174 | + int expectedTotalEvents, |
| 175 | + String processorType) { |
| 176 | + assertThat("Output records list should not be empty", outputRecords.size(), greaterThanOrEqualTo(1)); |
| 177 | + String countField = processorType + "_processed_count"; |
| 178 | + for (Record<Event> record : outputRecords) { |
| 179 | + String id = record.getData().get("id", String.class); |
| 180 | + AtomicInteger count = processedEventsMap.get(id); |
| 181 | + |
| 182 | + assertThat("Event with ID " + id + " should be processed exactly once by " + processorType, |
| 183 | + count.get(), equalTo(1)); |
| 184 | + assertThat("Event processing count should be 1 for " + processorType, |
| 185 | + record.getData().get(countField, Integer.class), equalTo(1)); |
| 186 | + } |
| 187 | + |
| 188 | + assertThat("All events should be processed by " + processorType, |
| 189 | + processedEventsMap.size(), equalTo(expectedTotalEvents)); |
| 190 | + } |
| 191 | + |
| 192 | + /** |
| 193 | + * Verifies that worker threads were assigned correctly based on processor configuration. |
| 194 | + * Ensures that at least one worker thread was used for processing. |
| 195 | + * |
| 196 | + * @param outputRecords List of output records from the pipeline |
| 197 | + * @param processors List of processors in the pipeline |
| 198 | + */ |
| 199 | + private void verifyWorkerThreads(List<Record<Event>> outputRecords, List<BaseEventsTrackingProcessor> processors) { |
| 200 | + Set<String> threadNames = outputRecords.stream() |
| 201 | + .map(Record::getData) |
| 202 | + .flatMap(event -> processors.stream() |
| 203 | + .map(processor -> { |
| 204 | + String processorName = processor.getName(); |
| 205 | + return event.get(processorName + "_processed_by_thread", String.class); |
| 206 | + }) |
| 207 | + .filter(threadName -> threadName != null)) |
| 208 | + .collect(Collectors.toSet()); |
| 209 | + |
| 210 | + assertThat("There should be at least one worker thread", |
| 211 | + threadNames.size(), greaterThanOrEqualTo(1)); |
| 212 | + } |
| 213 | + |
| 214 | + /** |
| 215 | + * Creates a list of event records with unique IDs and sequential numbering. |
| 216 | + * |
| 217 | + * @param count Number of records to create |
| 218 | + * @return List of event records |
| 219 | + */ |
| 220 | + private List<Record<Event>> createRecords(int count) { |
| 221 | + List<Record<Event>> records = new ArrayList<>(); |
| 222 | + for (int i = 0; i < count; i++) { |
| 223 | + records.add(new Record<>(createEvent(i))); |
| 224 | + } |
| 225 | + return records; |
| 226 | + } |
| 227 | + |
| 228 | + /** |
| 229 | + * Creates multiple batches of event records. |
| 230 | + * Each batch is assigned a batch number, and events within each batch |
| 231 | + * are given sequential numbers across all batches. |
| 232 | + * |
| 233 | + * @param batchSize Number of batches to create |
| 234 | + * @param eventsPerBatch Number of events in each batch |
| 235 | + * @return List of batches, where each batch is a list of event records |
| 236 | + */ |
| 237 | + private List<List<Record<Event>>> createBatches(int batchSize, int eventsPerBatch) { |
| 238 | + List<List<Record<Event>>> batches = new ArrayList<>(); |
| 239 | + for (int batch = 0; batch < batchSize; batch++) { |
| 240 | + int batchOffset = batch * eventsPerBatch; |
| 241 | + int currentBatch = batch; |
| 242 | + List<Record<Event>> batchRecords = createRecords(eventsPerBatch).stream() |
| 243 | + .map(record -> { |
| 244 | + Event event = record.getData(); |
| 245 | + event.put("sequence", batchOffset + event.get("sequence", Integer.class)); |
| 246 | + event.put("batch", currentBatch); |
| 247 | + return new Record<>(event); |
| 248 | + }) |
| 249 | + .collect(Collectors.toList()); |
| 250 | + batches.add(batchRecords); |
| 251 | + } |
| 252 | + return batches; |
| 253 | + } |
| 254 | + |
| 255 | + /** |
| 256 | + * Creates a single event with a unique ID and sequence number. |
| 257 | + * |
| 258 | + * @param sequence Sequence number to assign to the event |
| 259 | + * @return The created event |
| 260 | + */ |
| 261 | + private Event createEvent(int sequence) { |
| 262 | + String eventId = UUID.randomUUID().toString(); |
| 263 | + Event event = JacksonEvent.fromMessage(eventId); |
| 264 | + event.put("id", eventId); |
| 265 | + event.put("sequence", sequence); |
| 266 | + return event; |
| 267 | + } |
| 268 | +} |
0 commit comments