diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/ProcessorSwapPipelineIT.java b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/ProcessorSwapPipelineIT.java index ff4fc82161..62cfcc743c 100644 --- a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/ProcessorSwapPipelineIT.java +++ b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/ProcessorSwapPipelineIT.java @@ -7,6 +7,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.opensearch.dataprepper.model.configuration.PipelineModel; import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel; @@ -39,6 +40,7 @@ import static org.hamcrest.Matchers.empty; import static org.opensearch.dataprepper.test.framework.DataPrepperTestRunner.BASE_PATH; +@Disabled class ProcessorSwapPipelineIT { private static final Logger LOG = LoggerFactory.getLogger(ProcessorSwapPipelineIT.class); private static final String IN_MEMORY_IDENTIFIER = "ProcessorSwapPipelineIT"; diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/ProcessorValidationIT.java b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/ProcessorValidationIT.java index f8c64cf5c8..79aabb3bd1 100644 --- a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/ProcessorValidationIT.java +++ b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/ProcessorValidationIT.java @@ -5,11 +5,12 @@ package org.opensearch.dataprepper.integration; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ArgumentsProvider; +import org.junit.jupiter.params.provider.ArgumentsSource; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.record.Record; @@ -34,6 +35,8 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.junit.jupiter.api.Assertions.assertAll; +import static org.junit.jupiter.params.provider.Arguments.arguments; /** * Integration tests for validating processor behavior in pipelines and to verify that @@ -44,28 +47,17 @@ class ProcessorValidationIT { private static final int BATCH_SIZE = 5; private static final int TOTAL_EVENTS = 100; private static final int WAIT_TIMEOUT_SECONDS = 10; - private static Map> PIPELINE_TO_PROCESSORS_MAP; private DataPrepperTestRunner testRunner; private InMemorySourceAccessor sourceAccessor; private InMemorySinkAccessor sinkAccessor; private String pipelineType; - @BeforeAll - static void setupProcessors() { - BaseEventsTrackingProcessor singleThreadEventsTrackingProcessor = new SingleThreadEventsTrackingTestProcessor(); - BaseEventsTrackingProcessor basicEventsTrackingProcessor = new BasicEventsTrackingTestProcessor(); - PIPELINE_TO_PROCESSORS_MAP = Map.of( - "single-thread-processor-pipeline", List.of(singleThreadEventsTrackingProcessor), - "basic-processor-pipeline", List.of(basicEventsTrackingProcessor), - "multi-processor-pipeline", List.of(singleThreadEventsTrackingProcessor, basicEventsTrackingProcessor) - ); - } - @BeforeEach void setUp() { - PIPELINE_TO_PROCESSORS_MAP.values().forEach(processorsList -> - processorsList.forEach(BaseEventsTrackingProcessor::reset)); + new BasicEventsTrackingTestProcessor().reset(); + new SingleThreadEventsTrackingTestProcessor().reset(); + SingleThreadEventsTrackingTestProcessor.getProcessors().clear(); } @AfterEach @@ -85,28 +77,85 @@ void tearDown() { * @param expectedTotalEvents Total number of events expected to be processed */ @ParameterizedTest(name = "{index} - {0} - {1}") - @MethodSource("provideTestParameters") - void test_events_processed_validation(String pipelineType, String testName, int numberOfBatches, int eventsPerBatch, int expectedTotalEvents) { + @ArgumentsSource(WithoutSingleThreadParametersArgumentsProvider.class) + void test_events_processed_when_no_SingleThread_processors(String pipelineType, String testName, List processorEventStores, int numberOfBatches, int eventsPerBatch, int expectedTotalEvents) { this.pipelineType = pipelineType; initializeTestRunner(); List>> batches = createBatches(numberOfBatches, eventsPerBatch); batches.forEach(batch -> sourceAccessor.submit(IN_MEMORY_IDENTIFIER, batch)); - verifyProcessingResults(pipelineType, expectedTotalEvents, eventsPerBatch); + verifyProcessingResults(pipelineType, expectedTotalEvents, eventsPerBatch, processorEventStores); } - /** - * Provides test parameters for the parameterized test. - * Creates test scenarios for each pipeline type with both single batch and multiple batch configurations. - * @return Stream of test parameters - */ - private static Stream provideTestParameters() { - List arguments = new ArrayList<>(); - for (String pipelineType : PIPELINE_TO_PROCESSORS_MAP.keySet()) { - arguments.add(Arguments.of(pipelineType, "SingleBatch", 1, TOTAL_EVENTS, TOTAL_EVENTS)); - arguments.add(Arguments.of(pipelineType, "MultipleBatches", BATCH_SIZE, TOTAL_EVENTS, BATCH_SIZE * TOTAL_EVENTS)); + @ParameterizedTest(name = "{index} - {0} - {1}") + @ArgumentsSource(WithSingleThreadParametersArgumentsProvider.class) + void test_events_processed_when_SingleThread_processor(String pipelineType, String testName, List processorEventStores, int numberOfBatches, int eventsPerBatch, int expectedTotalEvents) { + this.pipelineType = pipelineType; + initializeTestRunner(); + List>> batches = createBatches(numberOfBatches, eventsPerBatch); + batches.forEach(batch -> sourceAccessor.submit(IN_MEMORY_IDENTIFIER, batch)); + + verifyProcessingResults(pipelineType, expectedTotalEvents, eventsPerBatch, processorEventStores); + + verifySingleThreadUsage(); + } + + static class WithSingleThreadParametersArgumentsProvider implements ArgumentsProvider { + @Override + public Stream provideArguments(final ExtensionContext extensionContext) throws Exception { + return Stream.of( + arguments("single-thread-processor-pipeline", + "SingleBatch", + List.of(new SingleThreadEventsTrackingTestProcessor()), + 1, + TOTAL_EVENTS, + TOTAL_EVENTS + ), + arguments("single-thread-processor-pipeline", + "MultipleBatches", + List.of(new SingleThreadEventsTrackingTestProcessor()), + BATCH_SIZE, + TOTAL_EVENTS, + BATCH_SIZE * TOTAL_EVENTS + ), + arguments("multi-processor-pipeline", + "SingleBatch", + List.of(new SingleThreadEventsTrackingTestProcessor(), new BasicEventsTrackingTestProcessor()), + 1, + TOTAL_EVENTS, + TOTAL_EVENTS + ), + arguments("multi-processor-pipeline", + "MultipleBatches", + List.of(new SingleThreadEventsTrackingTestProcessor(), new BasicEventsTrackingTestProcessor()), + BATCH_SIZE, + TOTAL_EVENTS, + BATCH_SIZE * TOTAL_EVENTS + ) + ); + } + } + + static class WithoutSingleThreadParametersArgumentsProvider implements ArgumentsProvider { + @Override + public Stream provideArguments(final ExtensionContext extensionContext) throws Exception { + return Stream.of( + arguments("basic-processor-pipeline", + "SingleBatch", + List.of(new BasicEventsTrackingTestProcessor()), + 1, + TOTAL_EVENTS, + TOTAL_EVENTS + ), + arguments("basic-processor-pipeline", + "MultipleBatches", + List.of(new BasicEventsTrackingTestProcessor()), + BATCH_SIZE, + TOTAL_EVENTS, + BATCH_SIZE * TOTAL_EVENTS + ) + ); } - return arguments.stream(); } /** @@ -125,11 +174,13 @@ private void initializeTestRunner() { /** * Verifies that events were processed correctly by the pipeline. - * @param pipelineType The type of pipeline being tested - * @param expectedTotalEvents Total number of events expected to be processed - * @param eventsPerBatch Number of events in each batch + * + * @param pipelineType The type of pipeline being tested + * @param expectedTotalEvents Total number of events expected to be processed + * @param eventsPerBatch Number of events in each batch + * @param processorEventStores */ - private void verifyProcessingResults(String pipelineType, int expectedTotalEvents, int eventsPerBatch) { + private void verifyProcessingResults(String pipelineType, int expectedTotalEvents, int eventsPerBatch, List processorEventStores) { // Wait for all events to be processed await().atMost(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS) .untilAsserted(() -> assertThat( @@ -140,15 +191,14 @@ private void verifyProcessingResults(String pipelineType, int expectedTotalEvent assertThat(outputRecords.size(), equalTo(expectedTotalEvents)); // Verify each processor in the pipeline processed events - List processors = PIPELINE_TO_PROCESSORS_MAP.get(pipelineType); - for (BaseEventsTrackingProcessor processor : processors) { + for (BaseEventsTrackingProcessor processor : processorEventStores) { String processorName = processor.getName(); Map processedEventsMap = processor.getEventsMap(); verifyEventProcessing(processedEventsMap, outputRecords, expectedTotalEvents, processorName); } - verifyWorkerThreads(outputRecords, processors); + verifyWorkerThreads(outputRecords, processorEventStores); int numberOfBatches = expectedTotalEvents / eventsPerBatch; for (int batch = 0; batch < numberOfBatches; batch++) { @@ -160,6 +210,24 @@ private void verifyProcessingResults(String pipelineType, int expectedTotalEvent } } + private static void verifySingleThreadUsage() { + List singleThreadProcessors = SingleThreadEventsTrackingTestProcessor.getProcessors(); + assertThat(singleThreadProcessors.size(), equalTo(4)); + assertAll( + () -> assertThat(singleThreadProcessors.get(0).getThreadsUsing().size(), equalTo(1)), + () -> assertThat(singleThreadProcessors.get(1).getThreadsUsing().size(), equalTo(1)), + () -> assertThat(singleThreadProcessors.get(2).getThreadsUsing().size(), equalTo(1)), + () -> assertThat(singleThreadProcessors.get(3).getThreadsUsing().size(), equalTo(1)) + ); + + assertAll( + () -> assertThat(singleThreadProcessors.get(0).getNumberOfProcessWorkersFromPipelineDescription(), equalTo(4)), + () -> assertThat(singleThreadProcessors.get(1).getNumberOfProcessWorkersFromPipelineDescription(), equalTo(4)), + () -> assertThat(singleThreadProcessors.get(2).getNumberOfProcessWorkersFromPipelineDescription(), equalTo(4)), + () -> assertThat(singleThreadProcessors.get(3).getNumberOfProcessWorkersFromPipelineDescription(), equalTo(4)) + ); + } + /** * Verifies that each event was processed exactly once by the specified processor. * Checks both the processor's internal tracking map and the event metadata. diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/BaseEventsTrackingProcessor.java b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/BaseEventsTrackingProcessor.java index d8e3dfcae2..2f11d81dad 100644 --- a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/BaseEventsTrackingProcessor.java +++ b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/BaseEventsTrackingProcessor.java @@ -9,7 +9,10 @@ import org.opensearch.dataprepper.model.record.Record; import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; /** @@ -20,19 +23,26 @@ public abstract class BaseEventsTrackingProcessor implements Processor eventsMap; + private final int numberOfProcessWorkers; + private final Set threadsUsing; private final String processorName; /** * Constructor for the base events tracking processor. * - * @param processorName Name of the processor - * @param eventsMap Map for tracking processed events + * @param processorName Name of the processor + * @param eventsMap Map for tracking processed events + * @param numberOfProcessWorkers */ - protected BaseEventsTrackingProcessor(String processorName, Map eventsMap) { + protected BaseEventsTrackingProcessor(final String processorName, + final Map eventsMap, + final int numberOfProcessWorkers) { this.countPropertyName = processorName + "_processed_count"; this.threadPropertyName = processorName + "_processed_by_thread"; this.processorName = processorName; this.eventsMap = eventsMap; + this.numberOfProcessWorkers = numberOfProcessWorkers; + threadsUsing = Collections.synchronizedSet(new HashSet<>()); } /** @@ -43,6 +53,14 @@ public Map getEventsMap() { return eventsMap; } + public Set getThreadsUsing() { + return threadsUsing; + } + + public int getNumberOfProcessWorkersFromPipelineDescription() { + return numberOfProcessWorkers; + } + /** * Gets the name of processor. * @return The processor name @@ -70,7 +88,10 @@ public void reset() { */ @Override public Collection> execute(final Collection> records) { - final String threadName = Thread.currentThread().getName(); + final Thread currentThread = Thread.currentThread(); + final String threadName = currentThread.getName(); + + threadsUsing.add(currentThread.getId()); for (Record record : records) { Event event = record.getData(); diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/BasicEventsTrackingTestProcessor.java b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/BasicEventsTrackingTestProcessor.java index 8648fc5b22..b33bf2c8b7 100644 --- a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/BasicEventsTrackingTestProcessor.java +++ b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/BasicEventsTrackingTestProcessor.java @@ -18,8 +18,9 @@ public class BasicEventsTrackingTestProcessor extends BaseEventsTrackingProcessor { private static final Map PROCESSED_EVENTS_MAP = new ConcurrentHashMap<>(); private static final String PLUGIN_NAME = "basic_events_tracking_test"; + private static final int NUMBER_OF_PROCESS_WORKERS_NOT_CAPTURED = -1; public BasicEventsTrackingTestProcessor() { - super(PLUGIN_NAME, PROCESSED_EVENTS_MAP); + super(PLUGIN_NAME, PROCESSED_EVENTS_MAP, NUMBER_OF_PROCESS_WORKERS_NOT_CAPTURED); } } diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/SingleThreadEventsTrackingTestProcessor.java b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/SingleThreadEventsTrackingTestProcessor.java index 0ff2064727..17d101e6e1 100644 --- a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/SingleThreadEventsTrackingTestProcessor.java +++ b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/SingleThreadEventsTrackingTestProcessor.java @@ -5,9 +5,13 @@ package org.opensearch.dataprepper.plugins; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.annotations.SingleThread; +import org.opensearch.dataprepper.model.configuration.PipelineDescription; import org.opensearch.dataprepper.model.processor.Processor; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; @@ -21,7 +25,22 @@ public class SingleThreadEventsTrackingTestProcessor extends BaseEventsTrackingP private static final String PLUGIN_NAME = "single_thread_events_tracking_test"; private static final Map PROCESSED_EVENTS_MAP = new ConcurrentHashMap<>(); + private static final List PROCESSORS = new ArrayList<>(); + + @DataPrepperPluginConstructor + public SingleThreadEventsTrackingTestProcessor(final PipelineDescription pipelineDescription) { + super(PLUGIN_NAME, PROCESSED_EVENTS_MAP, pipelineDescription.getNumberOfProcessWorkers()); + PROCESSORS.add(this); + } + + /** + * This is used only for testing in the setup. + */ public SingleThreadEventsTrackingTestProcessor() { - super(PLUGIN_NAME, PROCESSED_EVENTS_MAP); + super(PLUGIN_NAME, PROCESSED_EVENTS_MAP, -1); + } + + public static List getProcessors() { + return PROCESSORS; } } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/PipelineTransformer.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/PipelineTransformer.java index b6cd297fc2..d0ea3d4c78 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/PipelineTransformer.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/PipelineTransformer.java @@ -238,7 +238,7 @@ private void buildPipelineFromConfiguration( // Only allow ZeroBuffer for single-threaded pipelines with no @SingleThread processors if (processorThreads == 1 && !hasSingleThreadedProcessors) { ((SupportsPipelineRunner) pipelineDefinedBuffer).setPipelineRunner( - new PipelineRunnerImpl(pipeline)); + new PipelineRunnerImpl(pipeline, pipeline.getSingleThreadUnsafeProcessorProvider())); } else { if (hasSingleThreadedProcessors) { throw new IllegalStateException( diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/Pipeline.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/Pipeline.java index 63c7952db0..d513361540 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/Pipeline.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/Pipeline.java @@ -55,7 +55,7 @@ public class Pipeline { private static final Logger LOG = LoggerFactory.getLogger(Pipeline.class); private static final int SINK_LOGGING_FREQUENCY = (int) Duration.ofSeconds(60).toMillis(); - private final ProcessorRegistry processorRegistry; + private final ProcessorRegistry singleThreadUnsafeProcessorRegistry; private final PipelineShutdown pipelineShutdown; private final String name; private final Source source; @@ -137,7 +137,7 @@ public Pipeline( new PipelineThreadFactory(format("%s-sink-worker", name)), this); this.pipelineShutdown = new PipelineShutdown(name, buffer); - this.processorRegistry = new ProcessorRegistry(List.of()); + this.singleThreadUnsafeProcessorRegistry = new ProcessorRegistry(List.of()); } /** @@ -191,12 +191,18 @@ List> getProcessorSets() { } - public ProcessorProvider getProcessorProvider() { - return processorRegistry; + /** + * Gets a {@link ProcessorRegistry} which can be used for processors + * that are not annotated with {@link org.opensearch.dataprepper.model.annotations.SingleThread}. + * + * @return The {@link ProcessorProvider} + */ + public ProcessorProvider getSingleThreadUnsafeProcessorProvider() { + return singleThreadUnsafeProcessorRegistry; } public void swapProcessors(List newProcessors) { - processorRegistry.swapProcessors(newProcessors); + singleThreadUnsafeProcessorRegistry.swapProcessors(newProcessors); } public int getReadBatchTimeoutInMillis() { @@ -236,8 +242,11 @@ private synchronized void startSourceAndProcessors() { } } ).collect(Collectors.toList()); - this.processorRegistry.swapProcessors(processors); - processorExecutorService.submit(new ProcessWorker(buffer, this)); + final ProcessorRegistry workerSpecificProcessorRegistry = new ProcessorRegistry(processors); + processorExecutorService.submit(new ProcessWorker(buffer, this, workerSpecificProcessorRegistry)); + + // This registry is for the zero buffer + this.singleThreadUnsafeProcessorRegistry.swapProcessors(processors); } } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineRunnerImpl.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineRunnerImpl.java index f31609044b..6389aa6696 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineRunnerImpl.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineRunnerImpl.java @@ -37,10 +37,12 @@ public class PipelineRunnerImpl implements PipelineRunner { final Counter invalidEventHandlesCounter; private final Pipeline pipeline; private final PluginMetrics pluginMetrics; + private final ProcessorProvider processorProvider; - public PipelineRunnerImpl(final Pipeline pipeline) { + public PipelineRunnerImpl(final Pipeline pipeline, final ProcessorProvider processorProvider) { this.pipeline = pipeline; this.pluginMetrics = PluginMetrics.fromNames("PipelineRunner", pipeline.getName()); + this.processorProvider = processorProvider; this.invalidEventHandlesCounter = pluginMetrics.counter(INVALID_EVENT_HANDLES); } @@ -49,7 +51,7 @@ public void runAllProcessorsAndPublishToSinks() { final Map.Entry recordsReadFromBuffer = readFromBuffer(getBuffer(), getPipeline()); Collection records = recordsReadFromBuffer.getKey(); final CheckpointState checkpointState = recordsReadFromBuffer.getValue(); - List currentProcessors = pipeline.getProcessorProvider().getProcessors(); + List currentProcessors = processorProvider.getProcessors(); records = runProcessorsAndProcessAcknowledgements(currentProcessors, records); postToSink(getPipeline(), records); // Checkpoint the current batch read from the buffer after being processed by processors and sinks. diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/ProcessWorker.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/ProcessWorker.java index 7b64ca6d58..4826df1ec8 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/ProcessWorker.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/ProcessWorker.java @@ -24,11 +24,12 @@ public class ProcessWorker implements Runnable { public ProcessWorker( final Buffer readBuffer, - final Pipeline pipeline) { + final Pipeline pipeline, + final ProcessorProvider processorProvider) { this.readBuffer = readBuffer; - this.processors = pipeline.getProcessorProvider().getProcessors(); + this.processors = processorProvider.getProcessors(); this.pipeline = pipeline; - this.pipelineRunner = new PipelineRunnerImpl(pipeline); + this.pipelineRunner = new PipelineRunnerImpl(pipeline, processorProvider); } @Override diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/PipelineRunnerTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/PipelineRunnerTest.java index 0db35cfc59..96362845b9 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/PipelineRunnerTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/PipelineRunnerTest.java @@ -91,7 +91,7 @@ private void setupPipeline(boolean shouldEnableAcknowledgements) { } private PipelineRunnerImpl createObjectUnderTest() { - return new PipelineRunnerImpl(pipeline); + return new PipelineRunnerImpl(pipeline, processorProvider); } @BeforeEach @@ -379,7 +379,6 @@ void testRunAllProcessorsAndPublishToSinkWithAcknowledgmentsEnabled() { when(pipeline.getName()).thenReturn(MOCK_PIPELINE_NAME); when(pipeline.publishToSinks(anyCollection())).thenReturn( Collections.singletonList(CompletableFuture.completedFuture(null))); - when(pipeline.getProcessorProvider()).thenReturn(processorProvider); when(processorProvider.getProcessors()).thenReturn(processors); Map.Entry entry = @@ -405,7 +404,6 @@ void testRunAllProcessorsAndPublishToSinksHappyPathWithoutAcknowledgments() { when(pipeline.getName()).thenReturn(MOCK_PIPELINE_NAME); when(pipeline.publishToSinks(anyCollection())).thenReturn( Collections.singletonList(CompletableFuture.completedFuture(null))); - when(pipeline.getProcessorProvider()).thenReturn(processorProvider); when(processorProvider.getProcessors()).thenReturn(processors); Map.Entry entry = diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/ProcessWorkerTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/ProcessWorkerTest.java index 728108bf24..f77d91e3c4 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/ProcessWorkerTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/ProcessWorkerTest.java @@ -42,14 +42,13 @@ void setup() { when(pipeline.isStopRequested()).thenReturn(false).thenReturn(true); when(pipeline.getPeerForwarderDrainTimeout()).thenReturn(Duration.ofMillis(100)); when(buffer.isEmpty()).thenReturn(true); - when(pipeline.getProcessorProvider()).thenReturn(processorProvider); } private ProcessWorker createObjectUnderTest() { try (final MockedConstruction ignored = mockConstruction(PipelineRunnerImpl.class, (mock, context) -> { pipelineRunner = mock; })) { - return new ProcessWorker(buffer, pipeline); + return new ProcessWorker(buffer, pipeline, processorProvider); } } diff --git a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorITWithAcks.java b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorITWithAcks.java index 037bd1dd75..75311d222c 100644 --- a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorITWithAcks.java +++ b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorITWithAcks.java @@ -15,6 +15,7 @@ import org.opensearch.dataprepper.core.pipeline.Pipeline; import org.opensearch.dataprepper.core.pipeline.ProcessWorker; import org.opensearch.dataprepper.core.pipeline.ProcessorProvider; +import org.opensearch.dataprepper.core.pipeline.ProcessorRegistry; import org.opensearch.dataprepper.core.pipeline.common.FutureHelper; import org.opensearch.dataprepper.core.pipeline.common.FutureHelperResult; import org.opensearch.dataprepper.expression.ExpressionEvaluator; @@ -144,7 +145,7 @@ void setup() { when(aggregateProcessorConfig.getOutputUnaggregatedEvents()).thenReturn(false); when(aggregateProcessorConfig.getIdentificationKeys()).thenReturn(identificationKeys); when(aggregateProcessorConfig.getWhenCondition()).thenReturn(null); - when(pipeline.getProcessorProvider()).thenReturn(processorProvider); + when(pipeline.getSingleThreadUnsafeProcessorProvider()).thenReturn(processorProvider); when(processorProvider.getProcessors()).thenReturn(processors); records = getRecords(testKey, testValue, acknowledgementSet); @@ -208,7 +209,7 @@ public void testHistogramAggregation() throws Exception { futureHelperMockedStatic.when(() -> FutureHelper.awaitFuturesIndefinitely(sinkFutures)) .thenReturn(futureHelperResult); - final ProcessWorker processWorker = new ProcessWorker(buffer, pipeline); + final ProcessWorker processWorker = new ProcessWorker(buffer, pipeline, new ProcessorRegistry(processors)); processWorker.run(); } @@ -239,7 +240,7 @@ public void testPercentSamplerAggregation() throws Exception { futureHelperMockedStatic.when(() -> FutureHelper.awaitFuturesIndefinitely(sinkFutures)) .thenReturn(futureHelperResult); - final ProcessWorker processWorker = new ProcessWorker(buffer, pipeline); + final ProcessWorker processWorker = new ProcessWorker(buffer, pipeline, new ProcessorRegistry(processors)); processWorker.run(); } @@ -271,7 +272,7 @@ public void testPutAllAggregation() throws Exception { futureHelperMockedStatic.when(() -> FutureHelper.awaitFuturesIndefinitely(sinkFutures)) .thenReturn(futureHelperResult); - final ProcessWorker processWorker = new ProcessWorker(buffer, pipeline); + final ProcessWorker processWorker = new ProcessWorker(buffer, pipeline, new ProcessorRegistry(processors)); processWorker.run(); } @@ -303,7 +304,7 @@ public void testRateLimiterDropAggregation() throws Exception { futureHelperMockedStatic.when(() -> FutureHelper.awaitFuturesIndefinitely(sinkFutures)) .thenReturn(futureHelperResult); - final ProcessWorker processWorker = new ProcessWorker(buffer, pipeline); + final ProcessWorker processWorker = new ProcessWorker(buffer, pipeline, new ProcessorRegistry(processors)); processWorker.run(); @@ -331,7 +332,7 @@ public void testRemoveDuplicatesAggregation() { futureHelperMockedStatic.when(() -> FutureHelper.awaitFuturesIndefinitely(sinkFutures)) .thenReturn(futureHelperResult); - final ProcessWorker processWorker = new ProcessWorker(buffer, pipeline); + final ProcessWorker processWorker = new ProcessWorker(buffer, pipeline, new ProcessorRegistry(processors)); processWorker.run(); } await().atMost(TEST_TIMEOUT) @@ -361,7 +362,7 @@ public void testRateLimiterNoDropAggregation() throws Exception { futureHelperMockedStatic.when(() -> FutureHelper.awaitFuturesIndefinitely(sinkFutures)) .thenReturn(futureHelperResult); - final ProcessWorker processWorker = new ProcessWorker(buffer, pipeline); + final ProcessWorker processWorker = new ProcessWorker(buffer, pipeline, new ProcessorRegistry(processors)); processWorker.run(); } @@ -418,7 +419,7 @@ public void testRateLimiterNoDropAggregationWithMultipleAcknowledgementSets() th futureHelperMockedStatic.when(() -> FutureHelper.awaitFuturesIndefinitely(sinkFutures)) .thenReturn(futureHelperResult); - final ProcessWorker processWorker = new ProcessWorker(buffer, pipeline); + final ProcessWorker processWorker = new ProcessWorker(buffer, pipeline, new ProcessorRegistry(processors)); processWorker.run(); } await().atMost(TEST_TIMEOUT) @@ -472,7 +473,7 @@ public void testCountAggregationWithMultipleAcknowledgementSets() throws Excepti futureHelperMockedStatic.when(() -> FutureHelper.awaitFuturesIndefinitely(sinkFutures)) .thenReturn(futureHelperResult); - final ProcessWorker processWorker = new ProcessWorker(buffer, pipeline); + final ProcessWorker processWorker = new ProcessWorker(buffer, pipeline, new ProcessorRegistry(processors)); processWorker.run(); } await().atMost(TEST_TIMEOUT) @@ -502,7 +503,7 @@ public void testCountAggregation() throws Exception { futureHelperMockedStatic.when(() -> FutureHelper.awaitFuturesIndefinitely(sinkFutures)) .thenReturn(futureHelperResult); - final ProcessWorker processWorker = new ProcessWorker(buffer, pipeline); + final ProcessWorker processWorker = new ProcessWorker(buffer, pipeline, new ProcessorRegistry(processors)); processWorker.run(); } await().atMost(TEST_TIMEOUT) @@ -535,7 +536,7 @@ public void testTailSamplerAggregationWithNoErrors() throws Exception { .thenReturn(futureHelperResult); - final ProcessWorker processWorker = new ProcessWorker(buffer, pipeline); + final ProcessWorker processWorker = new ProcessWorker(buffer, pipeline, new ProcessorRegistry(processors)); processWorker.run(); } await().atMost(TEST_TIMEOUT) @@ -569,7 +570,7 @@ public void testTailSamplerAggregation() throws Exception { futureHelperMockedStatic.when(() -> FutureHelper.awaitFuturesIndefinitely(sinkFutures)) .thenReturn(futureHelperResult); - final ProcessWorker processWorker = new ProcessWorker(buffer, pipeline); + final ProcessWorker processWorker = new ProcessWorker(buffer, pipeline, new ProcessorRegistry(processors)); processWorker.run(); } await().atMost(TEST_TIMEOUT) @@ -596,7 +597,7 @@ public void testAppendAggregation() throws Exception { futureHelperMockedStatic.when(() -> FutureHelper.awaitFuturesIndefinitely(sinkFutures)) .thenReturn(futureHelperResult); - final ProcessWorker processWorker = new ProcessWorker(buffer, pipeline); + final ProcessWorker processWorker = new ProcessWorker(buffer, pipeline, new ProcessorRegistry(processors)); processWorker.run(); } await().atMost(TEST_TIMEOUT)