Skip to content

Commit 7f39895

Browse files
Added Null validation and Custom Exception along with few more Test cases
Signed-off-by: Pallempati Saketh <pallempati.saketh@fmr.com>
1 parent cfe5376 commit 7f39895

3 files changed

Lines changed: 79 additions & 6 deletions

File tree

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package org.opensearch.dataprepper.pipeline.parser;
2+
3+
/**
4+
* Exception thrown when pipeline configuration validation fails.
5+
*/
6+
public class InvalidPipelineConfigurationException extends RuntimeException {
7+
public InvalidPipelineConfigurationException(String message) {
8+
super(message);
9+
}
10+
11+
public InvalidPipelineConfigurationException(String message, Throwable cause) {
12+
super(message, cause);
13+
}
14+
}

data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/PipelineConfigurationValidator.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.opensearch.dataprepper.model.configuration.PluginSetting;
1010
import org.opensearch.dataprepper.pipeline.parser.model.PipelineConfiguration;
1111
import org.opensearch.dataprepper.pipeline.parser.model.SinkContextPluginSetting;
12+
import org.opensearch.dataprepper.pipeline.parser.InvalidPipelineConfigurationException;
1213
import org.opensearch.dataprepper.model.configuration.ConditionalRoute;
1314
import org.slf4j.Logger;
1415
import org.slf4j.LoggerFactory;
@@ -194,12 +195,15 @@ private static void validateSinkRoutes(final Map<String, PipelineConfiguration>
194195
final List<SinkContextPluginSetting> sinkSettings = pipelineConfiguration.getSinkPluginSettings();
195196
for (SinkContextPluginSetting sinkPlugin : sinkSettings) {
196197
Collection<String> sinkRoutes = sinkPlugin.getSinkContext().getRoutes();
198+
if (sinkRoutes == null) {
199+
sinkRoutes = Collections.emptyList();
200+
}
197201
List<String> invalidRoutes = sinkRoutes.stream()
198202
.filter(route -> !validRoutes.contains(route))
199203
.collect(Collectors.toList());
200204

201205
if (!invalidRoutes.isEmpty()) {
202-
throw new RuntimeException(String.format(
206+
throw new InvalidPipelineConfigurationException(String.format(
203207
"The following routes do not exist in pipeline \"%s\": %s. Configured routes include %s",
204208
pipelineName, invalidRoutes, validRoutes));
205209
}

data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/PipelineConfigurationValidatorIT.java

Lines changed: 60 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@
1616
import java.util.Map;
1717
import java.util.stream.Collectors;
1818

19+
import static org.hamcrest.Matchers.equalTo;
20+
import static org.hamcrest.Matchers.hasSize;
21+
import static org.hamcrest.MatcherAssert.assertThat;
22+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
1923
import static org.junit.jupiter.api.Assertions.assertThrows;
2024
import static org.mockito.Mockito.mock;
2125
import static org.mockito.Mockito.when;
@@ -39,23 +43,74 @@ public class PipelineConfigurationValidatorIT {
3943
" routes:\n" +
4044
" - SECOND_ROUTE\n";
4145

46+
private static final String SIMPLE_PIPELINE_NO_ROUTES_CONFIG =
47+
"entry-pipeline:\n" +
48+
" source:\n" +
49+
" random:\n" +
50+
" sink:\n" +
51+
" - stdout:\n";
52+
53+
private static final String ROUTES_DEFINED_NO_SINK_ROUTES_CONFIG =
54+
"entry-pipeline:\n" +
55+
" source:\n" +
56+
" random:\n" +
57+
" routes:\n" +
58+
" - route_one: \"/my_route == 10\"\n" +
59+
" - route_two: \"/my_route == 11\"\n" +
60+
" sink:\n" +
61+
" - stdout:\n";
62+
4263
@Test
4364
public void test_with_invalid_sink_routes_in_yaml_should_throw() {
65+
Map<String, PipelineConfiguration> pipelineConfigMap = getPipelineConfigMap(TEST_YAML_CONFIG);
66+
67+
InvalidPipelineConfigurationException exception = assertThrows(InvalidPipelineConfigurationException.class, () ->
68+
PipelineConfigurationValidator.validateAndGetPipelineNames(pipelineConfigMap));
69+
70+
String expectedErrorMessage = "The following routes do not exist in pipeline \"entry-pipeline\": " +
71+
"[FIRST_ROUTE]. Configured routes include [route_one, route_two]";
72+
73+
assertThat(exception.getMessage(), equalTo(expectedErrorMessage));
74+
}
75+
76+
@Test
77+
public void test_simple_pipeline_with_no_routes_succeeds() {
78+
Map<String, PipelineConfiguration> pipelineConfigMap = getPipelineConfigMap(SIMPLE_PIPELINE_NO_ROUTES_CONFIG);
79+
80+
List<String> pipelineNames = assertDoesNotThrow(() ->
81+
PipelineConfigurationValidator.validateAndGetPipelineNames(pipelineConfigMap));
82+
83+
assertThat(pipelineNames, hasSize(1));
84+
assertThat(pipelineNames.get(0), equalTo("entry-pipeline"));
85+
}
86+
87+
@Test
88+
public void test_routes_defined_but_no_routes_on_sinks_succeeds() {
89+
Map<String, PipelineConfiguration> pipelineConfigMap = getPipelineConfigMap(ROUTES_DEFINED_NO_SINK_ROUTES_CONFIG);
90+
91+
List<String> pipelineNames = assertDoesNotThrow(() ->
92+
PipelineConfigurationValidator.validateAndGetPipelineNames(pipelineConfigMap));
93+
94+
assertThat(pipelineNames, hasSize(1));
95+
assertThat(pipelineNames.get(0), equalTo("entry-pipeline"));
96+
}
97+
98+
/**
99+
* Helper method to create a pipeline configuration map from a YAML string
100+
*/
101+
private Map<String, PipelineConfiguration> getPipelineConfigMap(final String yamlConfig) {
44102
PipelineConfigurationReader configReader = mock(PipelineConfigurationReader.class);
45103
when(configReader.getPipelineConfigurationInputStreams())
46-
.thenReturn(List.of(new ByteArrayInputStream(TEST_YAML_CONFIG.getBytes())));
104+
.thenReturn(List.of(new ByteArrayInputStream(yamlConfig.getBytes())));
47105

48106
PipelinesDataflowModelParser modelParser = new PipelinesDataflowModelParser(configReader);
49107
PipelinesDataFlowModel dataFlowModel = modelParser.parseConfiguration();
50108

51-
Map<String, PipelineConfiguration> pipelineConfigMap = dataFlowModel.getPipelines().entrySet()
109+
return dataFlowModel.getPipelines().entrySet()
52110
.stream()
53111
.collect(Collectors.toMap(
54112
Map.Entry::getKey,
55113
entry -> new PipelineConfiguration(entry.getValue())
56114
));
57-
58-
assertThrows(RuntimeException.class, () ->
59-
PipelineConfigurationValidator.validateAndGetPipelineNames(pipelineConfigMap));
60115
}
61116
}

0 commit comments

Comments
 (0)