Skip to content

Commit 8315132

Browse files
authored
Decoupling PipelineDataFlowModel dependency from PipelineTransformer (#5809)
* decoupled PipelineDataModel dependency from PipelineTransformer. It will now get the pipeline data model as an argument instead of constructor argument Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
1 parent 4dab7ac commit 8315132

6 files changed

Lines changed: 59 additions & 63 deletions

File tree

data-prepper-core/src/main/java/org/opensearch/dataprepper/core/DataPrepper.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
package org.opensearch.dataprepper.core;
77

8-
import io.micrometer.core.instrument.util.StringUtils;
8+
99
import org.opensearch.dataprepper.DataPrepperShutdownListener;
1010
import org.opensearch.dataprepper.DataPrepperShutdownOptions;
1111
import org.opensearch.dataprepper.core.parser.PipelineTransformer;
@@ -38,7 +38,6 @@ public class DataPrepper implements PipelinesProvider {
3838
private static final Logger LOG = LoggerFactory.getLogger(DataPrepper.class);
3939
private static final String DATAPREPPER_SERVICE_NAME = "DATAPREPPER_SERVICE_NAME";
4040
private static final String DEFAULT_SERVICE_NAME = "dataprepper";
41-
private static final int MAX_RETRIES = 100;
4241

4342
private final PluginFactory pluginFactory;
4443
private final PeerForwarderServer peerForwarderServer;
@@ -51,27 +50,28 @@ public class DataPrepper implements PipelinesProvider {
5150
@Inject
5251
@Lazy
5352
private DataPrepperServer dataPrepperServer;
54-
private List<DataPrepperShutdownListener> shutdownListeners = new LinkedList<>();
53+
private final List<DataPrepperShutdownListener> shutdownListeners = new LinkedList<>();
5554

5655
/**
5756
* returns serviceName if exists or default serviceName
5857
* @return serviceName for data-prepper
5958
*/
6059
public static String getServiceNameForMetrics() {
6160
final String serviceName = System.getenv(DATAPREPPER_SERVICE_NAME);
62-
return StringUtils.isNotBlank(serviceName) ? serviceName : DEFAULT_SERVICE_NAME;
61+
return serviceName != null && !serviceName.trim().isEmpty() ? serviceName : DEFAULT_SERVICE_NAME;
6362
}
6463

6564
@Inject
6665
public DataPrepper(
66+
final PipelinesDataFlowModel pipelinesDataFlowModel,
6767
final PipelineTransformer pipelineTransformer,
6868
final PluginFactory pluginFactory,
6969
final PeerForwarderServer peerForwarderServer,
7070
final Predicate<Map<String, Pipeline>> shouldShutdownOnPipelineFailurePredicate) {
7171
this.pluginFactory = pluginFactory;
7272

73-
transformationPipelines = pipelineTransformer.transformConfiguration();
74-
pipelinesDataFlowModel = pipelineTransformer.getPipelinesDataFlowModel();
73+
this.pipelinesDataFlowModel = pipelinesDataFlowModel;
74+
transformationPipelines = pipelineTransformer.transformConfiguration(pipelinesDataFlowModel);
7575
this.shouldShutdownOnPipelineFailurePredicate = shouldShutdownOnPipelineFailurePredicate;
7676
if (transformationPipelines.isEmpty()) {
7777
throw new RuntimeException("No valid pipeline is available for execution, exiting");

data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/PipelineTransformer.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@ public class PipelineTransformer {
6060
private static final Logger LOG = LoggerFactory.getLogger(PipelineTransformer.class);
6161
private static final String PIPELINE_TYPE = "pipeline";
6262
private static final String ATTRIBUTE_NAME = "name";
63-
private final PipelinesDataFlowModel pipelinesDataFlowModel;
6463
private final RouterFactory routerFactory;
6564
private final DataPrepperConfiguration dataPrepperConfiguration;
6665
private final CircuitBreakerManager circuitBreakerManager;
@@ -75,7 +74,7 @@ public class PipelineTransformer {
7574

7675
private final ExpressionEvaluator expressionEvaluator;
7776

78-
public PipelineTransformer(final PipelinesDataFlowModel pipelinesDataFlowModel,
77+
public PipelineTransformer(
7978
final PluginFactory pluginFactory,
8079
final PeerForwarderProvider peerForwarderProvider,
8180
final RouterFactory routerFactory,
@@ -87,7 +86,6 @@ public PipelineTransformer(final PipelinesDataFlowModel pipelinesDataFlowModel,
8786
final PluginErrorCollector pluginErrorCollector,
8887
final PluginErrorsHandler pluginErrorsHandler,
8988
final ExpressionEvaluator expressionEvaluator) {
90-
this.pipelinesDataFlowModel = pipelinesDataFlowModel;
9189
this.pluginFactory = Objects.requireNonNull(pluginFactory);
9290
this.peerForwarderProvider = Objects.requireNonNull(peerForwarderProvider);
9391
this.routerFactory = routerFactory;
@@ -101,7 +99,7 @@ public PipelineTransformer(final PipelinesDataFlowModel pipelinesDataFlowModel,
10199
this.expressionEvaluator = expressionEvaluator;
102100
}
103101

104-
public Map<String, Pipeline> transformConfiguration() {
102+
public Map<String, Pipeline> transformConfiguration(final PipelinesDataFlowModel pipelinesDataFlowModel) {
105103
final Map<String, PipelineConfiguration> pipelineConfigurationMap = pipelinesDataFlowModel.getPipelines().entrySet()
106104
.stream()
107105
.collect(Collectors.toMap(
@@ -422,10 +420,6 @@ private Buffer applyCircuitBreakerToBuffer(final Source source, final Buffer buf
422420
.orElseGet(() -> buffer);
423421
}
424422

425-
public PipelinesDataFlowModel getPipelinesDataFlowModel() {
426-
return pipelinesDataFlowModel;
427-
}
428-
429423
private static class IdentifiedComponent<T> {
430424
private final T component;
431425
private final String name;

data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/config/PipelineParserConfiguration.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,14 @@
55

66
package org.opensearch.dataprepper.core.parser.config;
77

8-
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
98
import org.opensearch.dataprepper.core.breaker.CircuitBreakerManager;
109
import org.opensearch.dataprepper.core.parser.PipelineTransformer;
1110
import org.opensearch.dataprepper.core.parser.model.DataPrepperConfiguration;
1211
import org.opensearch.dataprepper.core.peerforwarder.PeerForwarderProvider;
1312
import org.opensearch.dataprepper.core.pipeline.router.RouterFactory;
1413
import org.opensearch.dataprepper.core.sourcecoordination.SourceCoordinatorFactory;
1514
import org.opensearch.dataprepper.core.validation.PluginErrorCollector;
16-
import org.opensearch.dataprepper.validation.PluginErrorsHandler;
15+
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
1716
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
1817
import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel;
1918
import org.opensearch.dataprepper.model.event.EventFactory;
@@ -24,6 +23,7 @@
2423
import org.opensearch.dataprepper.pipeline.parser.rule.RuleEvaluator;
2524
import org.opensearch.dataprepper.pipeline.parser.transformer.DynamicConfigTransformer;
2625
import org.opensearch.dataprepper.pipeline.parser.transformer.PipelineConfigurationTransformer;
26+
import org.opensearch.dataprepper.validation.PluginErrorsHandler;
2727
import org.springframework.beans.factory.annotation.Qualifier;
2828
import org.springframework.context.annotation.Bean;
2929
import org.springframework.context.annotation.ComponentScan;
@@ -39,7 +39,6 @@ public class PipelineParserConfiguration {
3939

4040
@Bean
4141
public PipelineTransformer pipelineParser(
42-
final PipelinesDataFlowModel pipelinesDataFlowModel,
4342
final PluginFactory pluginFactory,
4443
final PeerForwarderProvider peerForwarderProvider,
4544
final RouterFactory routerFactory,
@@ -52,8 +51,7 @@ public PipelineTransformer pipelineParser(
5251
final PluginErrorsHandler pluginErrorsHandler,
5352
final ExpressionEvaluator expressionEvaluator
5453
) {
55-
return new PipelineTransformer(pipelinesDataFlowModel,
56-
pluginFactory,
54+
return new PipelineTransformer(pluginFactory,
5755
peerForwarderProvider,
5856
routerFactory,
5957
dataPrepperConfiguration,

data-prepper-core/src/test/java/org/opensearch/dataprepper/core/DataPrepperTests.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.opensearch.dataprepper.core.pipeline.Pipeline;
1919
import org.opensearch.dataprepper.core.pipeline.PipelineObserver;
2020
import org.opensearch.dataprepper.core.pipeline.server.DataPrepperServer;
21+
import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel;
2122
import org.opensearch.dataprepper.model.plugin.PluginFactory;
2223

2324
import java.lang.reflect.Field;
@@ -40,6 +41,8 @@
4041
public class DataPrepperTests {
4142
private Map<String, Pipeline> parseConfigurationFixture;
4243
@Mock
44+
private PipelinesDataFlowModel pipelinesDataFlowModel;
45+
@Mock
4346
private PipelineTransformer pipelineTransformer;
4447
@Mock
4548
private Pipeline pipeline;
@@ -59,12 +62,13 @@ public void before() {
5962
lenient().when(pipeline.getName()).thenReturn("testKey");
6063
lenient().when(pipeline.isReady()).thenReturn(true);
6164

62-
lenient().when(pipelineTransformer.transformConfiguration())
65+
lenient().when(pipelineTransformer.transformConfiguration(this.pipelinesDataFlowModel))
6366
.thenReturn(parseConfigurationFixture);
6467
}
6568

6669
private DataPrepper createObjectUnderTest() throws NoSuchFieldException, IllegalAccessException {
67-
final DataPrepper dataPrepper = new DataPrepper(pipelineTransformer, pluginFactory, peerForwarderServer, shouldShutdownOnPipelineFailurePredicate);
70+
final DataPrepper dataPrepper = new DataPrepper(pipelinesDataFlowModel, pipelineTransformer, pluginFactory,
71+
peerForwarderServer, shouldShutdownOnPipelineFailurePredicate);
6872
final Field dataPrepperServerField = dataPrepper.getClass().getDeclaredField("dataPrepperServer");
6973
dataPrepperServerField.setAccessible(true);
7074
dataPrepperServerField.set(dataPrepper, dataPrepperServer);
@@ -87,7 +91,8 @@ public void testGivenInvalidInputThenExceptionThrown() {
8791

8892
assertThrows(
8993
RuntimeException.class,
90-
() -> new DataPrepper(pipelineTransformer, pluginFactory, peerForwarderServer, shouldShutdownOnPipelineFailurePredicate),
94+
() -> new DataPrepper(pipelinesDataFlowModel, pipelineTransformer, pluginFactory,
95+
peerForwarderServer, shouldShutdownOnPipelineFailurePredicate),
9196
"Exception should be thrown if pipeline parser has no pipeline configuration");
9297
}
9398

0 commit comments

Comments
 (0)