Skip to content

Commit bc1a424

Browse files
saketh-pallempatiGalactus22625
authored andcommitted
Fix issue 5106 (opensearch-project#5501)
Fix validation of pipeline's sink routes opensearch-project#5106 * Added validation for sink routes in PipelineConfigurationValidator * Added unit tests in PipelineConfigurationValidatorTest * Added integration tests in PipelineConfigurationValidatorIT * Routes now properly validate against defined conditional routes Signed-off-by: Pallempati Saketh <pallempati.saketh@fmr.com>
1 parent 5b32655 commit bc1a424

5 files changed

Lines changed: 227 additions & 1 deletion

File tree

data-prepper-core/src/integrationTest/resources/org/opensearch/dataprepper/pipeline/route/three-route-with-default-route.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ routing-pipeline:
1313
- alpha: '/value == "a"'
1414
- beta: '/value == "b"'
1515
- gamma: '/value == "g"'
16+
- _default: '/_default == "z"'
1617
sink:
1718
- in_memory:
1819
testing_key: ConditionalRoutingIT_alpha
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package org.opensearch.dataprepper.pipeline.parser;
2+
3+
/**
4+
* Exception thrown when pipeline configuration validation fails.
5+
*/
6+
public class InvalidPipelineConfigurationException extends RuntimeException {
7+
public InvalidPipelineConfigurationException(String message) {
8+
super(message);
9+
}
10+
11+
public InvalidPipelineConfigurationException(String message, Throwable cause) {
12+
super(message, cause);
13+
}
14+
}

data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/PipelineConfigurationValidator.java

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.opensearch.dataprepper.model.configuration.PluginSetting;
1010
import org.opensearch.dataprepper.pipeline.parser.model.PipelineConfiguration;
1111
import org.opensearch.dataprepper.pipeline.parser.model.SinkContextPluginSetting;
12+
import org.opensearch.dataprepper.model.configuration.ConditionalRoute;
1213
import org.slf4j.Logger;
1314
import org.slf4j.LoggerFactory;
1415

@@ -19,6 +20,7 @@
1920
import java.util.List;
2021
import java.util.Map;
2122
import java.util.Set;
23+
import java.util.stream.Collectors;
2224

2325
import static java.lang.String.format;
2426

@@ -33,6 +35,7 @@ public class PipelineConfigurationValidator {
3335
* i. cycles in pipeline configuration
3436
* ii. incorrect pipeline source-sink configuration
3537
* iii. orphan pipelines [check is disabled for now]
38+
* iv. routes defined in sink pipeline plugins exist in the pipeline's configured routes.
3639
*
3740
* @param pipelineConfigurationMap String to PipelineConfiguration map
3841
* @return List of pipeline names in topological order
@@ -44,6 +47,8 @@ public static List<String> validateAndGetPipelineNames(final Map<String, Pipelin
4447

4548
checkInvalidPipelineNames(pipelineConfigurationMap);
4649

50+
validateSinkRoutes(pipelineConfigurationMap);
51+
4752
pipelineConfigurationMap.forEach((pipeline, configuration) -> {
4853
if (!visitedAndProcessedPipelineSet.contains(pipeline)) {
4954
visitAndValidate(pipeline, pipelineConfigurationMap, touchedPipelineSet, visitedAndProcessedPipelineSet,
@@ -170,4 +175,38 @@ private static void validateForOrphans(
170175
}
171176
}
172177

173-
}
178+
/**
179+
* Validates that the routes set in the sink pipeline plugin are present in the pipeline's overall conditional routes.
180+
* Each pipeline configuration provides its valid routes via getRoutes() which returns a set of ConditionalRoute.
181+
* Sink pipeline plugins have their desired routes configured in their settings.
182+
* This method ensures that every route used in a sink exists in the parent's configuration.
183+
*
184+
* @param pipelineConfigurationMap A map of pipeline names to PipelineConfiguration objects.
185+
*/
186+
private static void validateSinkRoutes(final Map<String, PipelineConfiguration> pipelineConfigurationMap) {
187+
for (Map.Entry<String, PipelineConfiguration> entry : pipelineConfigurationMap.entrySet()) {
188+
final String pipelineName = entry.getKey();
189+
final PipelineConfiguration pipelineConfiguration = entry.getValue();
190+
final Set<String> validRoutes = pipelineConfiguration.getRoutes().stream()
191+
.map(ConditionalRoute::getName)
192+
.collect(Collectors.toSet());
193+
194+
final List<SinkContextPluginSetting> sinkSettings = pipelineConfiguration.getSinkPluginSettings();
195+
for (SinkContextPluginSetting sinkPlugin : sinkSettings) {
196+
Collection<String> sinkRoutes = sinkPlugin.getSinkContext().getRoutes();
197+
if (sinkRoutes == null) {
198+
sinkRoutes = Collections.emptyList();
199+
}
200+
List<String> invalidRoutes = sinkRoutes.stream()
201+
.filter(route -> !validRoutes.contains(route))
202+
.collect(Collectors.toList());
203+
204+
if (!invalidRoutes.isEmpty()) {
205+
throw new InvalidPipelineConfigurationException(String.format(
206+
"The following routes do not exist in pipeline \"%s\": %s. Configured routes include %s",
207+
pipelineName, invalidRoutes, validRoutes));
208+
}
209+
}
210+
}
211+
}
212+
}
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.pipeline.parser;
7+
8+
import org.junit.jupiter.api.Test;
9+
import org.junit.jupiter.api.extension.ExtendWith;
10+
import org.mockito.junit.jupiter.MockitoExtension;
11+
import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel;
12+
import org.opensearch.dataprepper.pipeline.parser.model.PipelineConfiguration;
13+
14+
import java.io.ByteArrayInputStream;
15+
import java.util.List;
16+
import java.util.Map;
17+
import java.util.stream.Collectors;
18+
19+
import static org.hamcrest.Matchers.equalTo;
20+
import static org.hamcrest.Matchers.hasSize;
21+
import static org.hamcrest.MatcherAssert.assertThat;
22+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
23+
import static org.junit.jupiter.api.Assertions.assertThrows;
24+
import static org.mockito.Mockito.mock;
25+
import static org.mockito.Mockito.when;
26+
27+
@ExtendWith(MockitoExtension.class)
28+
public class PipelineConfigurationValidatorIT {
29+
private static final String TEST_YAML_CONFIG =
30+
"entry-pipeline:\n" +
31+
" source:\n" +
32+
" random:\n" +
33+
" routes:\n" +
34+
" - route_one: \"/my_route == 10\"\n" +
35+
" - route_two: \"/my_route == 11\"\n" +
36+
" sink:\n" +
37+
" - pipeline:\n" +
38+
" name: \"sub-pipeline-1\"\n" +
39+
" routes:\n" +
40+
" - FIRST_ROUTE\n" +
41+
" - pipeline:\n" +
42+
" name: \"sub-pipeline-2\"\n" +
43+
" routes:\n" +
44+
" - SECOND_ROUTE\n";
45+
46+
private static final String SIMPLE_PIPELINE_NO_ROUTES_CONFIG =
47+
"entry-pipeline:\n" +
48+
" source:\n" +
49+
" random:\n" +
50+
" sink:\n" +
51+
" - stdout:\n";
52+
53+
private static final String ROUTES_DEFINED_NO_SINK_ROUTES_CONFIG =
54+
"entry-pipeline:\n" +
55+
" source:\n" +
56+
" random:\n" +
57+
" routes:\n" +
58+
" - route_one: \"/my_route == 10\"\n" +
59+
" - route_two: \"/my_route == 11\"\n" +
60+
" sink:\n" +
61+
" - stdout:\n";
62+
63+
@Test
64+
public void test_with_invalid_sink_routes_in_yaml_should_throw() {
65+
Map<String, PipelineConfiguration> pipelineConfigMap = getPipelineConfigMap(TEST_YAML_CONFIG);
66+
67+
InvalidPipelineConfigurationException exception = assertThrows(InvalidPipelineConfigurationException.class, () ->
68+
PipelineConfigurationValidator.validateAndGetPipelineNames(pipelineConfigMap));
69+
70+
String expectedErrorMessage = "The following routes do not exist in pipeline \"entry-pipeline\": " +
71+
"[FIRST_ROUTE]. Configured routes include [route_one, route_two]";
72+
73+
assertThat(exception.getMessage(), equalTo(expectedErrorMessage));
74+
}
75+
76+
@Test
77+
public void test_simple_pipeline_with_no_routes_succeeds() {
78+
Map<String, PipelineConfiguration> pipelineConfigMap = getPipelineConfigMap(SIMPLE_PIPELINE_NO_ROUTES_CONFIG);
79+
80+
List<String> pipelineNames = assertDoesNotThrow(() ->
81+
PipelineConfigurationValidator.validateAndGetPipelineNames(pipelineConfigMap));
82+
83+
assertThat(pipelineNames, hasSize(1));
84+
assertThat(pipelineNames.get(0), equalTo("entry-pipeline"));
85+
}
86+
87+
@Test
88+
public void test_routes_defined_but_no_routes_on_sinks_succeeds() {
89+
Map<String, PipelineConfiguration> pipelineConfigMap = getPipelineConfigMap(ROUTES_DEFINED_NO_SINK_ROUTES_CONFIG);
90+
91+
List<String> pipelineNames = assertDoesNotThrow(() ->
92+
PipelineConfigurationValidator.validateAndGetPipelineNames(pipelineConfigMap));
93+
94+
assertThat(pipelineNames, hasSize(1));
95+
assertThat(pipelineNames.get(0), equalTo("entry-pipeline"));
96+
}
97+
98+
/**
99+
* Helper method to create a pipeline configuration map from a YAML string
100+
*/
101+
private Map<String, PipelineConfiguration> getPipelineConfigMap(final String yamlConfig) {
102+
PipelineConfigurationReader configReader = mock(PipelineConfigurationReader.class);
103+
when(configReader.getPipelineConfigurationInputStreams())
104+
.thenReturn(List.of(new ByteArrayInputStream(yamlConfig.getBytes())));
105+
106+
PipelinesDataflowModelParser modelParser = new PipelinesDataflowModelParser(configReader);
107+
PipelinesDataFlowModel dataFlowModel = modelParser.parseConfiguration();
108+
109+
return dataFlowModel.getPipelines().entrySet()
110+
.stream()
111+
.collect(Collectors.toMap(
112+
Map.Entry::getKey,
113+
entry -> new PipelineConfiguration(entry.getValue())
114+
));
115+
}
116+
}

data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/PipelineConfigurationValidatorTest.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,23 @@
88
import org.junit.jupiter.api.Test;
99
import org.junit.jupiter.api.extension.ExtendWith;
1010
import org.mockito.junit.jupiter.MockitoExtension;
11+
import org.opensearch.dataprepper.model.configuration.PluginSetting;
12+
import org.opensearch.dataprepper.model.configuration.ConditionalRoute;
13+
import org.opensearch.dataprepper.model.sink.SinkContext;
1114
import org.opensearch.dataprepper.pipeline.parser.model.PipelineConfiguration;
15+
import org.opensearch.dataprepper.pipeline.parser.model.SinkContextPluginSetting;
16+
1217

1318
import java.util.HashMap;
19+
import java.util.List;
1420
import java.util.Map;
21+
import java.util.Set;
1522

23+
import static org.junit.jupiter.api.Assertions.assertEquals;
1624
import static org.junit.jupiter.api.Assertions.assertThrows;
25+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
1726
import static org.mockito.Mockito.mock;
27+
import static org.mockito.Mockito.when;
1828

1929
@ExtendWith(MockitoExtension.class)
2030
class PipelineConfigurationValidatorTest {
@@ -37,4 +47,50 @@ void test_with_valid_pipeline_names_should_not_throw() {
3747

3848
PipelineConfigurationValidator.validateAndGetPipelineNames(objectObjectHashMap);
3949
}
50+
51+
@Test
52+
void test_with_invalid_sink_routes_should_throw() {
53+
PipelineConfiguration pipelineConfig = mock(PipelineConfiguration.class);
54+
SinkContextPluginSetting sinkSetting = mock(SinkContextPluginSetting.class);
55+
SinkContext sinkContext = mock(SinkContext.class);
56+
57+
when(sinkSetting.getSinkContext()).thenReturn(sinkContext);
58+
when(sinkContext.getRoutes()).thenReturn(List.of("INVALID_ROUTE"));
59+
when(pipelineConfig.getSinkPluginSettings()).thenReturn(List.of(sinkSetting));
60+
when(pipelineConfig.getRoutes()).thenReturn(Set.of(new ConditionalRoute("VALID_ROUTE", "condition")));
61+
62+
Map<String, PipelineConfiguration> pipelineConfigMap = Map.of("test-pipeline", pipelineConfig);
63+
64+
RuntimeException exception = assertThrows(RuntimeException.class,
65+
() -> PipelineConfigurationValidator.validateAndGetPipelineNames(pipelineConfigMap));
66+
67+
assertEquals("The following routes do not exist in pipeline \"test-pipeline\": [INVALID_ROUTE]. " +
68+
"Configured routes include [VALID_ROUTE]", exception.getMessage());
69+
}
70+
71+
@Test
72+
void test_with_valid_sink_routes_should_not_throw() {
73+
PipelineConfiguration pipelineConfig = mock(PipelineConfiguration.class);
74+
SinkContextPluginSetting sinkSetting = mock(SinkContextPluginSetting.class);
75+
SinkContext sinkContext = mock(SinkContext.class);
76+
PipelineConfiguration targetPipelineConfig = mock(PipelineConfiguration.class);
77+
PluginSetting targetSourceSetting = mock(PluginSetting.class);
78+
79+
when(sinkSetting.getName()).thenReturn("pipeline");
80+
when(sinkSetting.getAttributeFromSettings("name")).thenReturn("target-pipeline");
81+
when(sinkContext.getRoutes()).thenReturn(List.of("VALID_ROUTE"));
82+
when(sinkSetting.getSinkContext()).thenReturn(sinkContext);
83+
when(pipelineConfig.getSinkPluginSettings()).thenReturn(List.of(sinkSetting));
84+
when(pipelineConfig.getRoutes()).thenReturn(Set.of(new ConditionalRoute("VALID_ROUTE", "condition")));
85+
86+
when(targetSourceSetting.getName()).thenReturn("pipeline");
87+
when(targetSourceSetting.getAttributeFromSettings("name")).thenReturn("test-pipeline");
88+
when(targetPipelineConfig.getSourcePluginSetting()).thenReturn(targetSourceSetting);
89+
90+
Map<String, PipelineConfiguration> pipelineConfigMap = new HashMap<>();
91+
pipelineConfigMap.put("test-pipeline", pipelineConfig);
92+
pipelineConfigMap.put("target-pipeline", targetPipelineConfig);
93+
94+
assertDoesNotThrow(() -> PipelineConfigurationValidator.validateAndGetPipelineNames(pipelineConfigMap));
95+
}
4096
}

0 commit comments

Comments
 (0)