Skip to content

Commit 73ff2ba

Browse files
Fixes a regression in core where @SingleThread annotated processors are only running the last instance. (#5902) (#5904)
Fixes a regression in core where @SingleThread annotated processors are only running the last instance. Also, disable the ProcessorSwapPipelineIT test since this feature is not yet completed. Fixes #5901 (cherry picked from commit 53f16d7) Signed-off-by: David Venable <dlv@amazon.com> Co-authored-by: David Venable <dlv@amazon.com>
1 parent d3fa0ab commit 73ff2ba

12 files changed

Lines changed: 195 additions & 74 deletions

File tree

data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/ProcessorSwapPipelineIT.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import org.junit.jupiter.api.AfterEach;
99
import org.junit.jupiter.api.BeforeEach;
10+
import org.junit.jupiter.api.Disabled;
1011
import org.junit.jupiter.api.Test;
1112
import org.opensearch.dataprepper.model.configuration.PipelineModel;
1213
import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel;
@@ -39,6 +40,7 @@
3940
import static org.hamcrest.Matchers.empty;
4041
import static org.opensearch.dataprepper.test.framework.DataPrepperTestRunner.BASE_PATH;
4142

43+
@Disabled
4244
class ProcessorSwapPipelineIT {
4345
private static final Logger LOG = LoggerFactory.getLogger(ProcessorSwapPipelineIT.class);
4446
private static final String IN_MEMORY_IDENTIFIER = "ProcessorSwapPipelineIT";

data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/ProcessorValidationIT.java

Lines changed: 105 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,12 @@
55
package org.opensearch.dataprepper.integration;
66

77
import org.junit.jupiter.api.AfterEach;
8-
import org.junit.jupiter.api.BeforeAll;
98
import org.junit.jupiter.api.BeforeEach;
9+
import org.junit.jupiter.api.extension.ExtensionContext;
1010
import org.junit.jupiter.params.ParameterizedTest;
1111
import org.junit.jupiter.params.provider.Arguments;
12-
import org.junit.jupiter.params.provider.MethodSource;
12+
import org.junit.jupiter.params.provider.ArgumentsProvider;
13+
import org.junit.jupiter.params.provider.ArgumentsSource;
1314
import org.opensearch.dataprepper.model.event.Event;
1415
import org.opensearch.dataprepper.model.event.JacksonEvent;
1516
import org.opensearch.dataprepper.model.record.Record;
@@ -34,6 +35,8 @@
3435
import static org.hamcrest.CoreMatchers.equalTo;
3536
import static org.hamcrest.MatcherAssert.assertThat;
3637
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
38+
import static org.junit.jupiter.api.Assertions.assertAll;
39+
import static org.junit.jupiter.params.provider.Arguments.arguments;
3740

3841
/**
3942
* Integration tests for validating processor behavior in pipelines and to verify that
@@ -44,28 +47,17 @@ class ProcessorValidationIT {
4447
private static final int BATCH_SIZE = 5;
4548
private static final int TOTAL_EVENTS = 100;
4649
private static final int WAIT_TIMEOUT_SECONDS = 10;
47-
private static Map<String, List<BaseEventsTrackingProcessor>> PIPELINE_TO_PROCESSORS_MAP;
4850

4951
private DataPrepperTestRunner testRunner;
5052
private InMemorySourceAccessor sourceAccessor;
5153
private InMemorySinkAccessor sinkAccessor;
5254
private String pipelineType;
5355

54-
@BeforeAll
55-
static void setupProcessors() {
56-
BaseEventsTrackingProcessor singleThreadEventsTrackingProcessor = new SingleThreadEventsTrackingTestProcessor();
57-
BaseEventsTrackingProcessor basicEventsTrackingProcessor = new BasicEventsTrackingTestProcessor();
58-
PIPELINE_TO_PROCESSORS_MAP = Map.of(
59-
"single-thread-processor-pipeline", List.of(singleThreadEventsTrackingProcessor),
60-
"basic-processor-pipeline", List.of(basicEventsTrackingProcessor),
61-
"multi-processor-pipeline", List.of(singleThreadEventsTrackingProcessor, basicEventsTrackingProcessor)
62-
);
63-
}
64-
6556
@BeforeEach
6657
void setUp() {
67-
PIPELINE_TO_PROCESSORS_MAP.values().forEach(processorsList ->
68-
processorsList.forEach(BaseEventsTrackingProcessor::reset));
58+
new BasicEventsTrackingTestProcessor().reset();
59+
new SingleThreadEventsTrackingTestProcessor().reset();
60+
SingleThreadEventsTrackingTestProcessor.getProcessors().clear();
6961
}
7062

7163
@AfterEach
@@ -85,28 +77,85 @@ void tearDown() {
8577
* @param expectedTotalEvents Total number of events expected to be processed
8678
*/
8779
@ParameterizedTest(name = "{index} - {0} - {1}")
88-
@MethodSource("provideTestParameters")
89-
void test_events_processed_validation(String pipelineType, String testName, int numberOfBatches, int eventsPerBatch, int expectedTotalEvents) {
80+
@ArgumentsSource(WithoutSingleThreadParametersArgumentsProvider.class)
81+
void test_events_processed_when_no_SingleThread_processors(String pipelineType, String testName, List<BaseEventsTrackingProcessor> processorEventStores, int numberOfBatches, int eventsPerBatch, int expectedTotalEvents) {
9082
this.pipelineType = pipelineType;
9183
initializeTestRunner();
9284
List<List<Record<Event>>> batches = createBatches(numberOfBatches, eventsPerBatch);
9385
batches.forEach(batch -> sourceAccessor.submit(IN_MEMORY_IDENTIFIER, batch));
9486

95-
verifyProcessingResults(pipelineType, expectedTotalEvents, eventsPerBatch);
87+
verifyProcessingResults(pipelineType, expectedTotalEvents, eventsPerBatch, processorEventStores);
9688
}
9789

98-
/**
99-
* Provides test parameters for the parameterized test.
100-
* Creates test scenarios for each pipeline type with both single batch and multiple batch configurations.
101-
* @return Stream of test parameters
102-
*/
103-
private static Stream<Arguments> provideTestParameters() {
104-
List<Arguments> arguments = new ArrayList<>();
105-
for (String pipelineType : PIPELINE_TO_PROCESSORS_MAP.keySet()) {
106-
arguments.add(Arguments.of(pipelineType, "SingleBatch", 1, TOTAL_EVENTS, TOTAL_EVENTS));
107-
arguments.add(Arguments.of(pipelineType, "MultipleBatches", BATCH_SIZE, TOTAL_EVENTS, BATCH_SIZE * TOTAL_EVENTS));
90+
@ParameterizedTest(name = "{index} - {0} - {1}")
91+
@ArgumentsSource(WithSingleThreadParametersArgumentsProvider.class)
92+
void test_events_processed_when_SingleThread_processor(String pipelineType, String testName, List<BaseEventsTrackingProcessor> processorEventStores, int numberOfBatches, int eventsPerBatch, int expectedTotalEvents) {
93+
this.pipelineType = pipelineType;
94+
initializeTestRunner();
95+
List<List<Record<Event>>> batches = createBatches(numberOfBatches, eventsPerBatch);
96+
batches.forEach(batch -> sourceAccessor.submit(IN_MEMORY_IDENTIFIER, batch));
97+
98+
verifyProcessingResults(pipelineType, expectedTotalEvents, eventsPerBatch, processorEventStores);
99+
100+
verifySingleThreadUsage();
101+
}
102+
103+
static class WithSingleThreadParametersArgumentsProvider implements ArgumentsProvider {
104+
@Override
105+
public Stream<? extends Arguments> provideArguments(final ExtensionContext extensionContext) throws Exception {
106+
return Stream.of(
107+
arguments("single-thread-processor-pipeline",
108+
"SingleBatch",
109+
List.of(new SingleThreadEventsTrackingTestProcessor()),
110+
1,
111+
TOTAL_EVENTS,
112+
TOTAL_EVENTS
113+
),
114+
arguments("single-thread-processor-pipeline",
115+
"MultipleBatches",
116+
List.of(new SingleThreadEventsTrackingTestProcessor()),
117+
BATCH_SIZE,
118+
TOTAL_EVENTS,
119+
BATCH_SIZE * TOTAL_EVENTS
120+
),
121+
arguments("multi-processor-pipeline",
122+
"SingleBatch",
123+
List.of(new SingleThreadEventsTrackingTestProcessor(), new BasicEventsTrackingTestProcessor()),
124+
1,
125+
TOTAL_EVENTS,
126+
TOTAL_EVENTS
127+
),
128+
arguments("multi-processor-pipeline",
129+
"MultipleBatches",
130+
List.of(new SingleThreadEventsTrackingTestProcessor(), new BasicEventsTrackingTestProcessor()),
131+
BATCH_SIZE,
132+
TOTAL_EVENTS,
133+
BATCH_SIZE * TOTAL_EVENTS
134+
)
135+
);
136+
}
137+
}
138+
139+
static class WithoutSingleThreadParametersArgumentsProvider implements ArgumentsProvider {
140+
@Override
141+
public Stream<? extends Arguments> provideArguments(final ExtensionContext extensionContext) throws Exception {
142+
return Stream.of(
143+
arguments("basic-processor-pipeline",
144+
"SingleBatch",
145+
List.of(new BasicEventsTrackingTestProcessor()),
146+
1,
147+
TOTAL_EVENTS,
148+
TOTAL_EVENTS
149+
),
150+
arguments("basic-processor-pipeline",
151+
"MultipleBatches",
152+
List.of(new BasicEventsTrackingTestProcessor()),
153+
BATCH_SIZE,
154+
TOTAL_EVENTS,
155+
BATCH_SIZE * TOTAL_EVENTS
156+
)
157+
);
108158
}
109-
return arguments.stream();
110159
}
111160

112161
/**
@@ -125,11 +174,13 @@ private void initializeTestRunner() {
125174

126175
/**
127176
* Verifies that events were processed correctly by the pipeline.
128-
* @param pipelineType The type of pipeline being tested
129-
* @param expectedTotalEvents Total number of events expected to be processed
130-
* @param eventsPerBatch Number of events in each batch
177+
*
178+
* @param pipelineType The type of pipeline being tested
179+
* @param expectedTotalEvents Total number of events expected to be processed
180+
* @param eventsPerBatch Number of events in each batch
181+
* @param processorEventStores
131182
*/
132-
private void verifyProcessingResults(String pipelineType, int expectedTotalEvents, int eventsPerBatch) {
183+
private void verifyProcessingResults(String pipelineType, int expectedTotalEvents, int eventsPerBatch, List<BaseEventsTrackingProcessor> processorEventStores) {
133184
// Wait for all events to be processed
134185
await().atMost(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)
135186
.untilAsserted(() -> assertThat(
@@ -140,15 +191,14 @@ private void verifyProcessingResults(String pipelineType, int expectedTotalEvent
140191
assertThat(outputRecords.size(), equalTo(expectedTotalEvents));
141192

142193
// Verify each processor in the pipeline processed events
143-
List<BaseEventsTrackingProcessor> processors = PIPELINE_TO_PROCESSORS_MAP.get(pipelineType);
144-
for (BaseEventsTrackingProcessor processor : processors) {
194+
for (BaseEventsTrackingProcessor processor : processorEventStores) {
145195
String processorName = processor.getName();
146196
Map<String, AtomicInteger> processedEventsMap = processor.getEventsMap();
147197

148198
verifyEventProcessing(processedEventsMap, outputRecords, expectedTotalEvents, processorName);
149199
}
150200

151-
verifyWorkerThreads(outputRecords, processors);
201+
verifyWorkerThreads(outputRecords, processorEventStores);
152202

153203
int numberOfBatches = expectedTotalEvents / eventsPerBatch;
154204
for (int batch = 0; batch < numberOfBatches; batch++) {
@@ -160,6 +210,24 @@ private void verifyProcessingResults(String pipelineType, int expectedTotalEvent
160210
}
161211
}
162212

213+
private static void verifySingleThreadUsage() {
214+
List<SingleThreadEventsTrackingTestProcessor> singleThreadProcessors = SingleThreadEventsTrackingTestProcessor.getProcessors();
215+
assertThat(singleThreadProcessors.size(), equalTo(4));
216+
assertAll(
217+
() -> assertThat(singleThreadProcessors.get(0).getThreadsUsing().size(), equalTo(1)),
218+
() -> assertThat(singleThreadProcessors.get(1).getThreadsUsing().size(), equalTo(1)),
219+
() -> assertThat(singleThreadProcessors.get(2).getThreadsUsing().size(), equalTo(1)),
220+
() -> assertThat(singleThreadProcessors.get(3).getThreadsUsing().size(), equalTo(1))
221+
);
222+
223+
assertAll(
224+
() -> assertThat(singleThreadProcessors.get(0).getNumberOfProcessWorkersFromPipelineDescription(), equalTo(4)),
225+
() -> assertThat(singleThreadProcessors.get(1).getNumberOfProcessWorkersFromPipelineDescription(), equalTo(4)),
226+
() -> assertThat(singleThreadProcessors.get(2).getNumberOfProcessWorkersFromPipelineDescription(), equalTo(4)),
227+
() -> assertThat(singleThreadProcessors.get(3).getNumberOfProcessWorkersFromPipelineDescription(), equalTo(4))
228+
);
229+
}
230+
163231
/**
164232
* Verifies that each event was processed exactly once by the specified processor.
165233
* Checks both the processor's internal tracking map and the event metadata.

data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/BaseEventsTrackingProcessor.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@
99
import org.opensearch.dataprepper.model.record.Record;
1010

1111
import java.util.Collection;
12+
import java.util.Collections;
13+
import java.util.HashSet;
1214
import java.util.Map;
15+
import java.util.Set;
1316
import java.util.concurrent.atomic.AtomicInteger;
1417

1518
/**
@@ -20,19 +23,26 @@ public abstract class BaseEventsTrackingProcessor implements Processor<Record<Ev
2023
private final String countPropertyName;
2124
private final String threadPropertyName;
2225
private final Map<String, AtomicInteger> eventsMap;
26+
private final int numberOfProcessWorkers;
27+
private final Set<Long> threadsUsing;
2328
private final String processorName;
2429

2530
/**
2631
* Constructor for the base events tracking processor.
2732
*
28-
* @param processorName Name of the processor
29-
* @param eventsMap Map for tracking processed events
33+
* @param processorName Name of the processor
34+
* @param eventsMap Map for tracking processed events
35+
* @param numberOfProcessWorkers
3036
*/
31-
protected BaseEventsTrackingProcessor(String processorName, Map<String, AtomicInteger> eventsMap) {
37+
protected BaseEventsTrackingProcessor(final String processorName,
38+
final Map<String, AtomicInteger> eventsMap,
39+
final int numberOfProcessWorkers) {
3240
this.countPropertyName = processorName + "_processed_count";
3341
this.threadPropertyName = processorName + "_processed_by_thread";
3442
this.processorName = processorName;
3543
this.eventsMap = eventsMap;
44+
this.numberOfProcessWorkers = numberOfProcessWorkers;
45+
threadsUsing = Collections.synchronizedSet(new HashSet<>());
3646
}
3747

3848
/**
@@ -43,6 +53,14 @@ public Map<String, AtomicInteger> getEventsMap() {
4353
return eventsMap;
4454
}
4555

56+
public Set<?> getThreadsUsing() {
57+
return threadsUsing;
58+
}
59+
60+
public int getNumberOfProcessWorkersFromPipelineDescription() {
61+
return numberOfProcessWorkers;
62+
}
63+
4664
/**
4765
* Gets the name of processor.
4866
* @return The processor name
@@ -70,7 +88,10 @@ public void reset() {
7088
*/
7189
@Override
7290
public Collection<Record<Event>> execute(final Collection<Record<Event>> records) {
73-
final String threadName = Thread.currentThread().getName();
91+
final Thread currentThread = Thread.currentThread();
92+
final String threadName = currentThread.getName();
93+
94+
threadsUsing.add(currentThread.getId());
7495

7596
for (Record<Event> record : records) {
7697
Event event = record.getData();

data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/BasicEventsTrackingTestProcessor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@
1818
public class BasicEventsTrackingTestProcessor extends BaseEventsTrackingProcessor {
1919
private static final Map<String, AtomicInteger> PROCESSED_EVENTS_MAP = new ConcurrentHashMap<>();
2020
private static final String PLUGIN_NAME = "basic_events_tracking_test";
21+
private static final int NUMBER_OF_PROCESS_WORKERS_NOT_CAPTURED = -1;
2122

2223
public BasicEventsTrackingTestProcessor() {
23-
super(PLUGIN_NAME, PROCESSED_EVENTS_MAP);
24+
super(PLUGIN_NAME, PROCESSED_EVENTS_MAP, NUMBER_OF_PROCESS_WORKERS_NOT_CAPTURED);
2425
}
2526
}

data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/SingleThreadEventsTrackingTestProcessor.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,13 @@
55
package org.opensearch.dataprepper.plugins;
66

77
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
8+
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
89
import org.opensearch.dataprepper.model.annotations.SingleThread;
10+
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
911
import org.opensearch.dataprepper.model.processor.Processor;
1012

13+
import java.util.ArrayList;
14+
import java.util.List;
1115
import java.util.Map;
1216
import java.util.concurrent.ConcurrentHashMap;
1317
import java.util.concurrent.atomic.AtomicInteger;
@@ -21,7 +25,22 @@ public class SingleThreadEventsTrackingTestProcessor extends BaseEventsTrackingP
2125
private static final String PLUGIN_NAME = "single_thread_events_tracking_test";
2226
private static final Map<String, AtomicInteger> PROCESSED_EVENTS_MAP = new ConcurrentHashMap<>();
2327

28+
private static final List<SingleThreadEventsTrackingTestProcessor> PROCESSORS = new ArrayList<>();
29+
30+
@DataPrepperPluginConstructor
31+
public SingleThreadEventsTrackingTestProcessor(final PipelineDescription pipelineDescription) {
32+
super(PLUGIN_NAME, PROCESSED_EVENTS_MAP, pipelineDescription.getNumberOfProcessWorkers());
33+
PROCESSORS.add(this);
34+
}
35+
36+
/**
37+
* This is used only for testing in the setup.
38+
*/
2439
public SingleThreadEventsTrackingTestProcessor() {
25-
super(PLUGIN_NAME, PROCESSED_EVENTS_MAP);
40+
super(PLUGIN_NAME, PROCESSED_EVENTS_MAP, -1);
41+
}
42+
43+
public static List<SingleThreadEventsTrackingTestProcessor> getProcessors() {
44+
return PROCESSORS;
2645
}
2746
}

data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/PipelineTransformer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ private void buildPipelineFromConfiguration(
238238
// Only allow ZeroBuffer for single-threaded pipelines with no @SingleThread processors
239239
if (processorThreads == 1 && !hasSingleThreadedProcessors) {
240240
((SupportsPipelineRunner) pipelineDefinedBuffer).setPipelineRunner(
241-
new PipelineRunnerImpl(pipeline));
241+
new PipelineRunnerImpl(pipeline, pipeline.getSingleThreadUnsafeProcessorProvider()));
242242
} else {
243243
if (hasSingleThreadedProcessors) {
244244
throw new IllegalStateException(

0 commit comments

Comments
 (0)