Skip to content

Commit e6145c1

Browse files
committed
Revert "Adding Processor Registry to provision Atomic swapping of Processor instances (opensearch-project#5794)"
This reverts commit 844eac1 Signed-off-by: David Venable <dlv@amazon.com>
1 parent 44b62a2 commit e6145c1

15 files changed

Lines changed: 167 additions & 406 deletions

File tree

data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/PipelinesWithAcksIT.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import static org.hamcrest.Matchers.empty;
2828
import static org.hamcrest.Matchers.lessThanOrEqualTo;
2929
import static org.junit.Assert.assertFalse;
30-
import static org.junit.jupiter.api.Assertions.assertNotNull;
3130
import static org.junit.jupiter.api.Assertions.assertTrue;
3231

3332
@FixMethodOrder()
@@ -53,15 +52,15 @@ void setUp(String configFile) {
5352
.withPipelinesDirectoryOrFile(configFile)
5453
.build();
5554

56-
LOG.info("PipelinesWithAcksIT with config file {} started at {}", configFile, Instant.now());
55+
LOG.info("PipelinesWithAcksIT with config file {} started at {}", configFile, Instant.now());
5756
dataPrepperTestRunner.start();
5857
inMemorySourceAccessor = dataPrepperTestRunner.getInMemorySourceAccessor();
5958
inMemorySinkAccessor = dataPrepperTestRunner.getInMemorySinkAccessor();
6059
}
6160

6261
@AfterEach
6362
void tearDown() {
64-
LOG.info("PipelinesWithAcksIT with stopped at {}", Instant.now());
63+
LOG.info("PipelinesWithAcksIT with stopped at {}", Instant.now());
6564
dataPrepperTestRunner.stop();
6665
}
6766

@@ -134,8 +133,8 @@ void three_pipelines_with_all_unrouted_records() {
134133

135134
await().atMost(40000, TimeUnit.MILLISECONDS)
136135
.untilAsserted(() -> {
137-
assertNotNull(inMemorySourceAccessor);
138-
assertNotNull(inMemorySourceAccessor.getAckReceived());
136+
assertTrue(inMemorySourceAccessor != null);
137+
assertTrue(inMemorySourceAccessor.getAckReceived() != null);
139138
});
140139
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
141140
assertThat(outputRecords.size(), equalTo(0));

data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/ProcessorSwapPipelineIT.java

Lines changed: 0 additions & 141 deletions
This file was deleted.

data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/test/framework/DataPrepperTestRunner.java

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,13 @@
88
import org.opensearch.dataprepper.AbstractContextManager;
99
import org.opensearch.dataprepper.core.DataPrepper;
1010
import org.opensearch.dataprepper.core.parser.config.FileStructurePathProvider;
11-
import org.opensearch.dataprepper.model.plugin.PluginFactory;
12-
import org.opensearch.dataprepper.model.processor.Processor;
1311
import org.opensearch.dataprepper.plugins.InMemorySinkAccessor;
1412
import org.opensearch.dataprepper.plugins.InMemorySourceAccessor;
1513
import org.slf4j.Logger;
1614
import org.slf4j.LoggerFactory;
1715
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
1816

1917
import javax.annotation.Nullable;
20-
import java.util.List;
2118

2219
/**
2320
* Provides the ability to run a Data Prepper test instance. Each of these will run
@@ -28,7 +25,7 @@
2825
*/
2926
public class DataPrepperTestRunner {
3027
private static final Logger LOG = LoggerFactory.getLogger(DataPrepperTestRunner.class);
31-
public static final String BASE_PATH = "src/integrationTest/resources/org/opensearch/dataprepper";
28+
private static final String BASE_PATH = "src/integrationTest/resources/org/opensearch/dataprepper";
3229
private final String dataPrepperConfigFile;
3330
private final String pipelinesDirectoryOrFile;
3431
private final InMemorySourceAccessor inMemorySourceAccessor;
@@ -63,20 +60,6 @@ public void start() {
6360
.execute();
6461
}
6562

66-
public PluginFactory getPluginFactory() {
67-
return contextManager.getDataPrepperBean()
68-
.getPluginFactory();
69-
}
70-
71-
/**
72-
* Dynamically Swap processors in a running pipeline
73-
*/
74-
public void swapProcessors(String pipelineName, List<Processor> newProcessors) {
75-
contextManager.getDataPrepperBean()
76-
.getPipeline(pipelineName)
77-
.swapProcessors(newProcessors);
78-
}
79-
8063
/**
8164
* Stops the Data Prepper test instance.
8265
*/

data-prepper-core/src/integrationTest/resources/org/opensearch/dataprepper/pipeline/processor-swap/source/processor-swap-pipeline.yaml

Lines changed: 0 additions & 17 deletions
This file was deleted.

data-prepper-core/src/integrationTest/resources/org/opensearch/dataprepper/pipeline/processor-swap/target/processor-swap-pipeline.yaml

Lines changed: 0 additions & 17 deletions
This file was deleted.

data-prepper-core/src/main/java/org/opensearch/dataprepper/core/DataPrepper.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,10 +102,6 @@ public void shutdown() {
102102
shutdownServers();
103103
}
104104

105-
public Pipeline getPipeline(String pipelineName) {
106-
return transformationPipelines.get(pipelineName);
107-
}
108-
109105
private void shutdownPipelines() {
110106
shutdownPipelines(DataPrepperShutdownOptions.defaultOptions());
111107
}

data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/PipelineTransformer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public class PipelineTransformer {
7474

7575
private final ExpressionEvaluator expressionEvaluator;
7676

77-
public PipelineTransformer(
77+
public PipelineTransformer(final PipelinesDataFlowModel pipelinesDataFlowModel,
7878
final PluginFactory pluginFactory,
7979
final PeerForwarderProvider peerForwarderProvider,
8080
final RouterFactory routerFactory,

0 commit comments

Comments
 (0)