Skip to content

Commit 844eac1

Browse files
authored
Adding Processor Registry to provision Atomic swapping of Processor instances (#5794)
* Processor Registry class added to provision Atomic swapping of processor list Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
1 parent 42faa8f commit 844eac1

15 files changed

Lines changed: 435 additions & 197 deletions

File tree

data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/PipelinesWithAcksIT.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import static org.hamcrest.Matchers.empty;
2828
import static org.hamcrest.Matchers.lessThanOrEqualTo;
2929
import static org.junit.Assert.assertFalse;
30+
import static org.junit.jupiter.api.Assertions.assertNotNull;
3031
import static org.junit.jupiter.api.Assertions.assertTrue;
3132

3233
@FixMethodOrder()
@@ -52,15 +53,15 @@ void setUp(String configFile) {
5253
.withPipelinesDirectoryOrFile(configFile)
5354
.build();
5455

55-
LOG.info("PipelinesWithAcksIT with config file {} started at {}", configFile, Instant.now());
56+
LOG.info("PipelinesWithAcksIT with config file {} started at {}", configFile, Instant.now());
5657
dataPrepperTestRunner.start();
5758
inMemorySourceAccessor = dataPrepperTestRunner.getInMemorySourceAccessor();
5859
inMemorySinkAccessor = dataPrepperTestRunner.getInMemorySinkAccessor();
5960
}
6061

6162
@AfterEach
6263
void tearDown() {
63-
LOG.info("PipelinesWithAcksIT with stopped at {}", Instant.now());
64+
LOG.info("PipelinesWithAcksIT with stopped at {}", Instant.now());
6465
dataPrepperTestRunner.stop();
6566
}
6667

@@ -133,8 +134,8 @@ void three_pipelines_with_all_unrouted_records() {
133134

134135
await().atMost(40000, TimeUnit.MILLISECONDS)
135136
.untilAsserted(() -> {
136-
assertTrue(inMemorySourceAccessor != null);
137-
assertTrue(inMemorySourceAccessor.getAckReceived() != null);
137+
assertNotNull(inMemorySourceAccessor);
138+
assertNotNull(inMemorySourceAccessor.getAckReceived());
138139
});
139140
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
140141
assertThat(outputRecords.size(), equalTo(0));
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.integration;
7+
8+
import org.junit.jupiter.api.AfterEach;
9+
import org.junit.jupiter.api.BeforeEach;
10+
import org.junit.jupiter.api.Test;
11+
import org.opensearch.dataprepper.model.configuration.PipelineModel;
12+
import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel;
13+
import org.opensearch.dataprepper.model.configuration.PluginSetting;
14+
import org.opensearch.dataprepper.model.event.Event;
15+
import org.opensearch.dataprepper.model.event.JacksonEvent;
16+
import org.opensearch.dataprepper.model.plugin.PluginFactory;
17+
import org.opensearch.dataprepper.model.processor.Processor;
18+
import org.opensearch.dataprepper.model.record.Record;
19+
import org.opensearch.dataprepper.pipeline.parser.PipelineConfigurationFileReader;
20+
import org.opensearch.dataprepper.pipeline.parser.PipelinesDataflowModelParser;
21+
import org.opensearch.dataprepper.pipeline.parser.model.PipelineConfiguration;
22+
import org.opensearch.dataprepper.plugins.InMemorySinkAccessor;
23+
import org.opensearch.dataprepper.plugins.InMemorySourceAccessor;
24+
import org.opensearch.dataprepper.test.framework.DataPrepperTestRunner;
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
27+
28+
import java.util.ArrayList;
29+
import java.util.Collections;
30+
import java.util.List;
31+
import java.util.UUID;
32+
import java.util.concurrent.TimeUnit;
33+
34+
import static org.awaitility.Awaitility.await;
35+
import static org.hamcrest.CoreMatchers.equalTo;
36+
import static org.hamcrest.CoreMatchers.not;
37+
import static org.hamcrest.CoreMatchers.notNullValue;
38+
import static org.hamcrest.MatcherAssert.assertThat;
39+
import static org.hamcrest.Matchers.empty;
40+
import static org.opensearch.dataprepper.test.framework.DataPrepperTestRunner.BASE_PATH;
41+
42+
class ProcessorSwapPipelineIT {
43+
private static final Logger LOG = LoggerFactory.getLogger(ProcessorSwapPipelineIT.class);
44+
private static final String IN_MEMORY_IDENTIFIER = "ProcessorSwapPipelineIT";
45+
private static final String PIPELINE_CONFIGURATION_FOLDER_UNDER_TEST = "processor-swap";
46+
private static final String PIPELINE_NAME_IN_YAML = "processor-pipeline";
47+
private DataPrepperTestRunner dataPrepperTestRunner;
48+
private InMemorySourceAccessor inMemorySourceAccessor;
49+
private InMemorySinkAccessor inMemorySinkAccessor;
50+
private PluginFactory pluginFactory;
51+
52+
@BeforeEach
53+
void setUp() {
54+
dataPrepperTestRunner = DataPrepperTestRunner.builder()
55+
.withPipelinesDirectoryOrFile(PIPELINE_CONFIGURATION_FOLDER_UNDER_TEST + "/source")
56+
.build();
57+
58+
inMemorySourceAccessor = dataPrepperTestRunner.getInMemorySourceAccessor();
59+
inMemorySinkAccessor = dataPrepperTestRunner.getInMemorySinkAccessor();
60+
pluginFactory = dataPrepperTestRunner.getPluginFactory();
61+
dataPrepperTestRunner.start();
62+
LOG.info("Started test runner.");
63+
}
64+
65+
@AfterEach
66+
void tearDown() {
67+
LOG.info("Test tear down. Stop the test runner.");
68+
dataPrepperTestRunner.stop();
69+
}
70+
71+
@Test
72+
void run_with_single_record() {
73+
final String messageValue = UUID.randomUUID().toString();
74+
final Event event = JacksonEvent.fromMessage(messageValue);
75+
final Record<Event> eventRecord = new Record<>(event);
76+
77+
LOG.info("Submitting a single record.");
78+
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, Collections.singletonList(eventRecord));
79+
80+
await().atMost(400, TimeUnit.MILLISECONDS)
81+
.untilAsserted(() -> assertThat(inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER), not(empty())));
82+
83+
final List<Record<Event>> records = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
84+
85+
assertThat(records.size(), equalTo(1));
86+
87+
assertThat(records.get(0), notNullValue());
88+
assertThat(records.get(0).getData(), notNullValue());
89+
assertThat(records.get(0).getData().get("message", String.class), equalTo(messageValue));
90+
assertThat(records.get(0).getData().get("test1", String.class), equalTo("knownPrefix10"));
91+
assertThat(records.get(0).getData().get("test1_copy_original", String.class), equalTo("knownPrefix10"));
92+
93+
// Dynamically swap the pipeline processors
94+
LOG.info("Swapping the pipeline processors");
95+
List<Processor> targetPipelineProcessors = getTargetPipelineProcessors();
96+
dataPrepperTestRunner.swapProcessors(PIPELINE_NAME_IN_YAML, targetPipelineProcessors);
97+
98+
// Send one more event and assert against the updated processor behavior
99+
LOG.info("Submitting another single record.");
100+
final String updatedMessageValue = UUID.randomUUID().toString();
101+
final Event updatedEvent = JacksonEvent.fromMessage(updatedMessageValue);
102+
final Record<Event> updatedEventRecord = new Record<>(updatedEvent);
103+
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, Collections.singletonList(updatedEventRecord));
104+
105+
await().atMost(400, TimeUnit.MILLISECONDS)
106+
.untilAsserted(() -> assertThat(inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER), not(empty())));
107+
108+
final List<Record<Event>> updatedRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
109+
110+
LOG.info("Asserting the updated processor behavior");
111+
assertThat(updatedRecords.size(), equalTo(2));
112+
Event processedOutput = updatedRecords.get(1).getData();
113+
114+
assertThat(processedOutput, notNullValue());
115+
assertThat(processedOutput, notNullValue());
116+
assertThat(processedOutput.get("message", String.class), equalTo(updatedMessageValue));
117+
assertThat(processedOutput.get("test1", String.class), equalTo("knownUpdatedPrefix10"));
118+
assertThat(processedOutput.get("test1_copy_updated", String.class), equalTo("knownUpdatedPrefix10"));
119+
120+
}
121+
122+
123+
private List<Processor> getTargetPipelineProcessors() {
124+
// Create the target pipeline
125+
String targetPipelineFolderPath = BASE_PATH + "/pipeline/" + PIPELINE_CONFIGURATION_FOLDER_UNDER_TEST + "/target";
126+
PipelinesDataFlowModel targetPipelinesDataFlowModel =
127+
new PipelinesDataflowModelParser(
128+
new PipelineConfigurationFileReader(targetPipelineFolderPath))
129+
.parseConfiguration();
130+
131+
PipelineModel pipelineModel = targetPipelinesDataFlowModel.getPipelines().get(PIPELINE_NAME_IN_YAML);
132+
PipelineConfiguration pipelineConfiguration = new PipelineConfiguration(pipelineModel);
133+
List<Processor> processors = new ArrayList<>();
134+
for (PluginSetting pluginSetting : pipelineConfiguration.getProcessorPluginSettings()) {
135+
List<Processor> processorsList =
136+
pluginFactory.loadPlugins(Processor.class, pluginSetting, (actualClass -> 1));
137+
processors.add(processorsList.get(0));
138+
}
139+
return processors;
140+
}
141+
}

data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/test/framework/DataPrepperTestRunner.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,16 @@
88
import org.opensearch.dataprepper.AbstractContextManager;
99
import org.opensearch.dataprepper.core.DataPrepper;
1010
import org.opensearch.dataprepper.core.parser.config.FileStructurePathProvider;
11+
import org.opensearch.dataprepper.model.plugin.PluginFactory;
12+
import org.opensearch.dataprepper.model.processor.Processor;
1113
import org.opensearch.dataprepper.plugins.InMemorySinkAccessor;
1214
import org.opensearch.dataprepper.plugins.InMemorySourceAccessor;
1315
import org.slf4j.Logger;
1416
import org.slf4j.LoggerFactory;
1517
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
1618

1719
import javax.annotation.Nullable;
20+
import java.util.List;
1821

1922
/**
2023
* Provides the ability to run a Data Prepper test instance. Each of these will run
@@ -25,7 +28,7 @@
2528
*/
2629
public class DataPrepperTestRunner {
2730
private static final Logger LOG = LoggerFactory.getLogger(DataPrepperTestRunner.class);
28-
private static final String BASE_PATH = "src/integrationTest/resources/org/opensearch/dataprepper";
31+
public static final String BASE_PATH = "src/integrationTest/resources/org/opensearch/dataprepper";
2932
private final String dataPrepperConfigFile;
3033
private final String pipelinesDirectoryOrFile;
3134
private final InMemorySourceAccessor inMemorySourceAccessor;
@@ -60,6 +63,20 @@ public void start() {
6063
.execute();
6164
}
6265

66+
public PluginFactory getPluginFactory() {
67+
return contextManager.getDataPrepperBean()
68+
.getPluginFactory();
69+
}
70+
71+
/**
72+
* Dynamically Swap processors in a running pipeline
73+
*/
74+
public void swapProcessors(String pipelineName, List<Processor> newProcessors) {
75+
contextManager.getDataPrepperBean()
76+
.getPipeline(pipelineName)
77+
.swapProcessors(newProcessors);
78+
}
79+
6380
/**
6481
* Stops the Data Prepper test instance.
6582
*/
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
processor-pipeline:
2+
delay: 10
3+
source:
4+
in_memory:
5+
testing_key: ProcessorSwapPipelineIT
6+
7+
processor:
8+
- simple_test:
9+
key1: /test1
10+
value_prefix1: knownPrefix1
11+
- simple_copy_test:
12+
source: /test1
13+
target: /test1_copy_original
14+
15+
sink:
16+
- in_memory:
17+
testing_key: ProcessorSwapPipelineIT
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
processor-pipeline:
2+
delay: 10
3+
source:
4+
in_memory:
5+
testing_key: ProcessorSwapPipelineIT
6+
7+
processor:
8+
- simple_test:
9+
key1: /test1
10+
value_prefix1: knownUpdatedPrefix1
11+
- simple_copy_test:
12+
source: /test1
13+
target: /test1_copy_updated
14+
15+
sink:
16+
- in_memory:
17+
testing_key: ProcessorSwapPipelineIT

data-prepper-core/src/main/java/org/opensearch/dataprepper/core/DataPrepper.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,10 @@ public void shutdown() {
102102
shutdownServers();
103103
}
104104

105+
public Pipeline getPipeline(String pipelineName) {
106+
return transformationPipelines.get(pipelineName);
107+
}
108+
105109
private void shutdownPipelines() {
106110
shutdownPipelines(DataPrepperShutdownOptions.defaultOptions());
107111
}

data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/PipelineTransformer.java

Lines changed: 30 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,9 @@
5555

5656
@SuppressWarnings("rawtypes")
5757
public class PipelineTransformer {
58-
private static final Logger LOG = LoggerFactory.getLogger(PipelineTransformer.class);
59-
6058
static final String CONDITIONAL_ROUTE_INVALID_EXPRESSION_FORMAT = "Route %s contains an invalid conditional expression '%s'. " +
6159
"See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax.";
60+
private static final Logger LOG = LoggerFactory.getLogger(PipelineTransformer.class);
6261
private static final String PIPELINE_TYPE = "pipeline";
6362
private static final String ATTRIBUTE_NAME = "name";
6463
private final PipelinesDataFlowModel pipelinesDataFlowModel;
@@ -207,7 +206,7 @@ private void buildPipelineFromConfiguration(
207206
return PeerForwardingProcessorDecorator.decorateProcessors(
208207
processors, peerForwarderProvider, pipelineName, processorComponentList.get(0).getName(),
209208
dataPrepperConfiguration.getPeerForwarderConfiguration() != null ?
210-
dataPrepperConfiguration.getPeerForwarderConfiguration().getExcludeIdentificationKeys() : null,
209+
dataPrepperConfiguration.getPeerForwarderConfiguration().getExcludeIdentificationKeys() : null,
211210
pipelineConfiguration.getWorkers()
212211
);
213212
}
@@ -233,24 +232,24 @@ private void buildPipelineFromConfiguration(
233232
if (pipelineDefinedBuffer instanceof SupportsPipelineRunner) {
234233
// Check if there are any processors with @SingleThread annotation
235234
boolean hasSingleThreadedProcessors = processorSets.stream()
236-
.flatMap(List::stream)
237-
.map(IdentifiedComponent::getComponent)
238-
.map(Object::getClass)
239-
.anyMatch(processorClass -> processorClass.isAnnotationPresent(SingleThread.class));
235+
.flatMap(List::stream)
236+
.map(IdentifiedComponent::getComponent)
237+
.map(Object::getClass)
238+
.anyMatch(processorClass -> processorClass.isAnnotationPresent(SingleThread.class));
240239

241240
// Only allow ZeroBuffer for single-threaded pipelines with no @SingleThread processors
242241
if (processorThreads == 1 && !hasSingleThreadedProcessors) {
243242
((SupportsPipelineRunner) pipelineDefinedBuffer).setPipelineRunner(
244-
new PipelineRunnerImpl(pipeline, pipeline.getProcessors()));
243+
new PipelineRunnerImpl(pipeline));
245244
} else {
246245
if (hasSingleThreadedProcessors) {
247246
throw new IllegalStateException(
248-
"ZeroBuffer cannot be used with @SingleThread processors. " +
249-
"Pipeline [" + pipelineName + "] contains one or more @SingleThread processors.");
247+
"ZeroBuffer cannot be used with @SingleThread processors. " +
248+
"Pipeline [" + pipelineName + "] contains one or more @SingleThread processors.");
250249
} else {
251250
throw new IllegalStateException(
252-
"ZeroBuffer cannot be used with multiple processor threads. " +
253-
"Pipeline [" + pipelineName + "] is configured with " + processorThreads + " threads.");
251+
"ZeroBuffer cannot be used with multiple processor threads. " +
252+
"Pipeline [" + pipelineName + "] is configured with " + processorThreads + " threads.");
254253
}
255254
}
256255
}
@@ -396,24 +395,6 @@ private void processRemoveIfRequired(
396395
}
397396
}
398397

399-
private static class IdentifiedComponent<T> {
400-
private final T component;
401-
private final String name;
402-
403-
private IdentifiedComponent(final T component, final String name) {
404-
this.component = component;
405-
this.name = name;
406-
}
407-
408-
T getComponent() {
409-
return component;
410-
}
411-
412-
String getName() {
413-
return name;
414-
}
415-
}
416-
417398
Duration getPeerForwarderDrainTimeout(final DataPrepperConfiguration dataPrepperConfiguration) {
418399
return Optional.ofNullable(dataPrepperConfiguration)
419400
.map(DataPrepperConfiguration::getPeerForwarderConfiguration)
@@ -432,7 +413,7 @@ private Buffer applyCircuitBreakerToBuffer(final Source source, final Buffer buf
432413
if (source instanceof PipelineConnector)
433414
return buffer;
434415

435-
if(buffer.isWrittenOffHeapOnly())
416+
if (buffer.isWrittenOffHeapOnly())
436417
return buffer;
437418

438419
return circuitBreakerManager.getGlobalCircuitBreaker()
@@ -444,4 +425,22 @@ private Buffer applyCircuitBreakerToBuffer(final Source source, final Buffer buf
444425
public PipelinesDataFlowModel getPipelinesDataFlowModel() {
445426
return pipelinesDataFlowModel;
446427
}
428+
429+
private static class IdentifiedComponent<T> {
430+
private final T component;
431+
private final String name;
432+
433+
private IdentifiedComponent(final T component, final String name) {
434+
this.component = component;
435+
this.name = name;
436+
}
437+
438+
T getComponent() {
439+
return component;
440+
}
441+
442+
String getName() {
443+
return name;
444+
}
445+
}
447446
}

0 commit comments

Comments
 (0)