Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.model.configuration.PipelineModel;
import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel;
Expand Down Expand Up @@ -39,6 +40,7 @@
import static org.hamcrest.Matchers.empty;
import static org.opensearch.dataprepper.test.framework.DataPrepperTestRunner.BASE_PATH;

@Disabled
class ProcessorSwapPipelineIT {
private static final Logger LOG = LoggerFactory.getLogger(ProcessorSwapPipelineIT.class);
private static final String IN_MEMORY_IDENTIFIER = "ProcessorSwapPipelineIT";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
package org.opensearch.dataprepper.integration;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ArgumentsProvider;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;
Expand All @@ -34,6 +35,8 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.params.provider.Arguments.arguments;

/**
* Integration tests for validating processor behavior in pipelines and to verify that
Expand All @@ -44,28 +47,17 @@ class ProcessorValidationIT {
private static final int BATCH_SIZE = 5;
private static final int TOTAL_EVENTS = 100;
private static final int WAIT_TIMEOUT_SECONDS = 10;
private static Map<String, List<BaseEventsTrackingProcessor>> PIPELINE_TO_PROCESSORS_MAP;

private DataPrepperTestRunner testRunner;
private InMemorySourceAccessor sourceAccessor;
private InMemorySinkAccessor sinkAccessor;
private String pipelineType;

@BeforeAll
static void setupProcessors() {
BaseEventsTrackingProcessor singleThreadEventsTrackingProcessor = new SingleThreadEventsTrackingTestProcessor();
BaseEventsTrackingProcessor basicEventsTrackingProcessor = new BasicEventsTrackingTestProcessor();
PIPELINE_TO_PROCESSORS_MAP = Map.of(
"single-thread-processor-pipeline", List.of(singleThreadEventsTrackingProcessor),
"basic-processor-pipeline", List.of(basicEventsTrackingProcessor),
"multi-processor-pipeline", List.of(singleThreadEventsTrackingProcessor, basicEventsTrackingProcessor)
);
}

@BeforeEach
void setUp() {
PIPELINE_TO_PROCESSORS_MAP.values().forEach(processorsList ->
processorsList.forEach(BaseEventsTrackingProcessor::reset));
new BasicEventsTrackingTestProcessor().reset();
new SingleThreadEventsTrackingTestProcessor().reset();
SingleThreadEventsTrackingTestProcessor.getProcessors().clear();
}

@AfterEach
Expand All @@ -85,28 +77,85 @@ void tearDown() {
* @param expectedTotalEvents Total number of events expected to be processed
*/
@ParameterizedTest(name = "{index} - {0} - {1}")
@MethodSource("provideTestParameters")
void test_events_processed_validation(String pipelineType, String testName, int numberOfBatches, int eventsPerBatch, int expectedTotalEvents) {
@ArgumentsSource(WithoutSingleThreadParametersArgumentsProvider.class)
void test_events_processed_when_no_SingleThread_processors(String pipelineType, String testName, List<BaseEventsTrackingProcessor> processorEventStores, int numberOfBatches, int eventsPerBatch, int expectedTotalEvents) {
this.pipelineType = pipelineType;
initializeTestRunner();
List<List<Record<Event>>> batches = createBatches(numberOfBatches, eventsPerBatch);
batches.forEach(batch -> sourceAccessor.submit(IN_MEMORY_IDENTIFIER, batch));

verifyProcessingResults(pipelineType, expectedTotalEvents, eventsPerBatch);
verifyProcessingResults(pipelineType, expectedTotalEvents, eventsPerBatch, processorEventStores);
}

/**
* Provides test parameters for the parameterized test.
* Creates test scenarios for each pipeline type with both single batch and multiple batch configurations.
* @return Stream of test parameters
*/
private static Stream<Arguments> provideTestParameters() {
List<Arguments> arguments = new ArrayList<>();
for (String pipelineType : PIPELINE_TO_PROCESSORS_MAP.keySet()) {
arguments.add(Arguments.of(pipelineType, "SingleBatch", 1, TOTAL_EVENTS, TOTAL_EVENTS));
arguments.add(Arguments.of(pipelineType, "MultipleBatches", BATCH_SIZE, TOTAL_EVENTS, BATCH_SIZE * TOTAL_EVENTS));
@ParameterizedTest(name = "{index} - {0} - {1}")
@ArgumentsSource(WithSingleThreadParametersArgumentsProvider.class)
void test_events_processed_when_SingleThread_processor(String pipelineType, String testName, List<BaseEventsTrackingProcessor> processorEventStores, int numberOfBatches, int eventsPerBatch, int expectedTotalEvents) {
this.pipelineType = pipelineType;
initializeTestRunner();
List<List<Record<Event>>> batches = createBatches(numberOfBatches, eventsPerBatch);
batches.forEach(batch -> sourceAccessor.submit(IN_MEMORY_IDENTIFIER, batch));

verifyProcessingResults(pipelineType, expectedTotalEvents, eventsPerBatch, processorEventStores);

verifySingleThreadUsage();
}

static class WithSingleThreadParametersArgumentsProvider implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(final ExtensionContext extensionContext) throws Exception {
return Stream.of(
arguments("single-thread-processor-pipeline",
"SingleBatch",
List.of(new SingleThreadEventsTrackingTestProcessor()),
1,
TOTAL_EVENTS,
TOTAL_EVENTS
),
arguments("single-thread-processor-pipeline",
"MultipleBatches",
List.of(new SingleThreadEventsTrackingTestProcessor()),
BATCH_SIZE,
TOTAL_EVENTS,
BATCH_SIZE * TOTAL_EVENTS
),
arguments("multi-processor-pipeline",
"SingleBatch",
List.of(new SingleThreadEventsTrackingTestProcessor(), new BasicEventsTrackingTestProcessor()),
1,
TOTAL_EVENTS,
TOTAL_EVENTS
),
arguments("multi-processor-pipeline",
"MultipleBatches",
List.of(new SingleThreadEventsTrackingTestProcessor(), new BasicEventsTrackingTestProcessor()),
BATCH_SIZE,
TOTAL_EVENTS,
BATCH_SIZE * TOTAL_EVENTS
)
);
}
}

static class WithoutSingleThreadParametersArgumentsProvider implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(final ExtensionContext extensionContext) throws Exception {
return Stream.of(
arguments("basic-processor-pipeline",
"SingleBatch",
List.of(new BasicEventsTrackingTestProcessor()),
1,
TOTAL_EVENTS,
TOTAL_EVENTS
),
arguments("basic-processor-pipeline",
"MultipleBatches",
List.of(new BasicEventsTrackingTestProcessor()),
BATCH_SIZE,
TOTAL_EVENTS,
BATCH_SIZE * TOTAL_EVENTS
)
);
}
return arguments.stream();
}

/**
Expand All @@ -125,11 +174,13 @@ private void initializeTestRunner() {

/**
* Verifies that events were processed correctly by the pipeline.
* @param pipelineType The type of pipeline being tested
* @param expectedTotalEvents Total number of events expected to be processed
* @param eventsPerBatch Number of events in each batch
*
* @param pipelineType The type of pipeline being tested
* @param expectedTotalEvents Total number of events expected to be processed
* @param eventsPerBatch Number of events in each batch
* @param processorEventStores
*/
private void verifyProcessingResults(String pipelineType, int expectedTotalEvents, int eventsPerBatch) {
private void verifyProcessingResults(String pipelineType, int expectedTotalEvents, int eventsPerBatch, List<BaseEventsTrackingProcessor> processorEventStores) {
// Wait for all events to be processed
await().atMost(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)
.untilAsserted(() -> assertThat(
Expand All @@ -140,15 +191,14 @@ private void verifyProcessingResults(String pipelineType, int expectedTotalEvent
assertThat(outputRecords.size(), equalTo(expectedTotalEvents));

// Verify each processor in the pipeline processed events
List<BaseEventsTrackingProcessor> processors = PIPELINE_TO_PROCESSORS_MAP.get(pipelineType);
for (BaseEventsTrackingProcessor processor : processors) {
for (BaseEventsTrackingProcessor processor : processorEventStores) {
String processorName = processor.getName();
Map<String, AtomicInteger> processedEventsMap = processor.getEventsMap();

verifyEventProcessing(processedEventsMap, outputRecords, expectedTotalEvents, processorName);
}

verifyWorkerThreads(outputRecords, processors);
verifyWorkerThreads(outputRecords, processorEventStores);

int numberOfBatches = expectedTotalEvents / eventsPerBatch;
for (int batch = 0; batch < numberOfBatches; batch++) {
Expand All @@ -160,6 +210,24 @@ private void verifyProcessingResults(String pipelineType, int expectedTotalEvent
}
}

private static void verifySingleThreadUsage() {
List<SingleThreadEventsTrackingTestProcessor> singleThreadProcessors = SingleThreadEventsTrackingTestProcessor.getProcessors();
assertThat(singleThreadProcessors.size(), equalTo(4));
assertAll(
() -> assertThat(singleThreadProcessors.get(0).getThreadsUsing().size(), equalTo(1)),
() -> assertThat(singleThreadProcessors.get(1).getThreadsUsing().size(), equalTo(1)),
() -> assertThat(singleThreadProcessors.get(2).getThreadsUsing().size(), equalTo(1)),
() -> assertThat(singleThreadProcessors.get(3).getThreadsUsing().size(), equalTo(1))
);

assertAll(
() -> assertThat(singleThreadProcessors.get(0).getNumberOfProcessWorkersFromPipelineDescription(), equalTo(4)),
() -> assertThat(singleThreadProcessors.get(1).getNumberOfProcessWorkersFromPipelineDescription(), equalTo(4)),
() -> assertThat(singleThreadProcessors.get(2).getNumberOfProcessWorkersFromPipelineDescription(), equalTo(4)),
() -> assertThat(singleThreadProcessors.get(3).getNumberOfProcessWorkersFromPipelineDescription(), equalTo(4))
);
}

/**
* Verifies that each event was processed exactly once by the specified processor.
* Checks both the processor's internal tracking map and the event metadata.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
import org.opensearch.dataprepper.model.record.Record;

import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

/**
Expand All @@ -20,19 +23,26 @@ public abstract class BaseEventsTrackingProcessor implements Processor<Record<Ev
private final String countPropertyName;
private final String threadPropertyName;
private final Map<String, AtomicInteger> eventsMap;
private final int numberOfProcessWorkers;
private final Set<Long> threadsUsing;
private final String processorName;

/**
* Constructor for the base events tracking processor.
*
* @param processorName Name of the processor
* @param eventsMap Map for tracking processed events
* @param processorName Name of the processor
* @param eventsMap Map for tracking processed events
* @param numberOfProcessWorkers
*/
protected BaseEventsTrackingProcessor(String processorName, Map<String, AtomicInteger> eventsMap) {
protected BaseEventsTrackingProcessor(final String processorName,
final Map<String, AtomicInteger> eventsMap,
final int numberOfProcessWorkers) {
this.countPropertyName = processorName + "_processed_count";
this.threadPropertyName = processorName + "_processed_by_thread";
this.processorName = processorName;
this.eventsMap = eventsMap;
this.numberOfProcessWorkers = numberOfProcessWorkers;
threadsUsing = Collections.synchronizedSet(new HashSet<>());
}

/**
Expand All @@ -43,6 +53,14 @@ public Map<String, AtomicInteger> getEventsMap() {
return eventsMap;
}

public Set<?> getThreadsUsing() {
return threadsUsing;
}

public int getNumberOfProcessWorkersFromPipelineDescription() {
return numberOfProcessWorkers;
}

/**
* Gets the name of processor.
* @return The processor name
Expand Down Expand Up @@ -70,7 +88,10 @@ public void reset() {
*/
@Override
public Collection<Record<Event>> execute(final Collection<Record<Event>> records) {
final String threadName = Thread.currentThread().getName();
final Thread currentThread = Thread.currentThread();
final String threadName = currentThread.getName();

threadsUsing.add(currentThread.getId());

for (Record<Event> record : records) {
Event event = record.getData();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
public class BasicEventsTrackingTestProcessor extends BaseEventsTrackingProcessor {
private static final Map<String, AtomicInteger> PROCESSED_EVENTS_MAP = new ConcurrentHashMap<>();
private static final String PLUGIN_NAME = "basic_events_tracking_test";
private static final int NUMBER_OF_PROCESS_WORKERS_NOT_CAPTURED = -1;

public BasicEventsTrackingTestProcessor() {
super(PLUGIN_NAME, PROCESSED_EVENTS_MAP);
super(PLUGIN_NAME, PROCESSED_EVENTS_MAP, NUMBER_OF_PROCESS_WORKERS_NOT_CAPTURED);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@
package org.opensearch.dataprepper.plugins;

import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.annotations.SingleThread;
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
import org.opensearch.dataprepper.model.processor.Processor;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -21,7 +25,22 @@ public class SingleThreadEventsTrackingTestProcessor extends BaseEventsTrackingP
private static final String PLUGIN_NAME = "single_thread_events_tracking_test";
private static final Map<String, AtomicInteger> PROCESSED_EVENTS_MAP = new ConcurrentHashMap<>();

private static final List<SingleThreadEventsTrackingTestProcessor> PROCESSORS = new ArrayList<>();

@DataPrepperPluginConstructor
public SingleThreadEventsTrackingTestProcessor(final PipelineDescription pipelineDescription) {
super(PLUGIN_NAME, PROCESSED_EVENTS_MAP, pipelineDescription.getNumberOfProcessWorkers());
PROCESSORS.add(this);
}

/**
* This is used only for testing in the setup.
*/
public SingleThreadEventsTrackingTestProcessor() {
super(PLUGIN_NAME, PROCESSED_EVENTS_MAP);
super(PLUGIN_NAME, PROCESSED_EVENTS_MAP, -1);
}

public static List<SingleThreadEventsTrackingTestProcessor> getProcessors() {
return PROCESSORS;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ private void buildPipelineFromConfiguration(
// Only allow ZeroBuffer for single-threaded pipelines with no @SingleThread processors
if (processorThreads == 1 && !hasSingleThreadedProcessors) {
((SupportsPipelineRunner) pipelineDefinedBuffer).setPipelineRunner(
new PipelineRunnerImpl(pipeline));
new PipelineRunnerImpl(pipeline, pipeline.getSingleThreadUnsafeProcessorProvider()));
} else {
if (hasSingleThreadedProcessors) {
throw new IllegalStateException(
Expand Down
Loading
Loading