Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ routing-pipeline:
- alpha: '/value == "a"'
- beta: '/value == "b"'
- gamma: '/value == "g"'
- _default: '/_default == "z"'
sink:
- in_memory:
testing_key: ConditionalRoutingIT_alpha
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.opensearch.dataprepper.pipeline.parser;

/**
* Exception thrown when pipeline configuration validation fails.
*/
public class InvalidPipelineConfigurationException extends RuntimeException {
public InvalidPipelineConfigurationException(String message) {
super(message);
}

public InvalidPipelineConfigurationException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.pipeline.parser.model.PipelineConfiguration;
import org.opensearch.dataprepper.pipeline.parser.model.SinkContextPluginSetting;
import org.opensearch.dataprepper.model.configuration.ConditionalRoute;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -19,6 +20,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static java.lang.String.format;

Expand All @@ -33,6 +35,7 @@ public class PipelineConfigurationValidator {
* i. cycles in pipeline configuration
* ii. incorrect pipeline source-sink configuration
* iii. orphan pipelines [check is disabled for now]
* iv. routes defined in sink pipeline plugins exist in the pipeline's configured routes.
*
* @param pipelineConfigurationMap String to PipelineConfiguration map
* @return List of pipeline names in topological order
Expand All @@ -44,6 +47,8 @@ public static List<String> validateAndGetPipelineNames(final Map<String, Pipelin

checkInvalidPipelineNames(pipelineConfigurationMap);

validateSinkRoutes(pipelineConfigurationMap);

pipelineConfigurationMap.forEach((pipeline, configuration) -> {
if (!visitedAndProcessedPipelineSet.contains(pipeline)) {
visitAndValidate(pipeline, pipelineConfigurationMap, touchedPipelineSet, visitedAndProcessedPipelineSet,
Expand Down Expand Up @@ -170,4 +175,38 @@ private static void validateForOrphans(
}
}

}
/**
* Validates that the routes set in the sink pipeline plugin are present in the pipeline's overall conditional routes.
* Each pipeline configuration provides its valid routes via getRoutes() which returns a set of ConditionalRoute.
* Sink pipeline plugins have their desired routes configured in their settings.
* This method ensures that every route used in a sink exists in the parent's configuration.
*
* @param pipelineConfigurationMap A map of pipeline names to PipelineConfiguration objects.
*/
private static void validateSinkRoutes(final Map<String, PipelineConfiguration> pipelineConfigurationMap) {
for (Map.Entry<String, PipelineConfiguration> entry : pipelineConfigurationMap.entrySet()) {
final String pipelineName = entry.getKey();
final PipelineConfiguration pipelineConfiguration = entry.getValue();
final Set<String> validRoutes = pipelineConfiguration.getRoutes().stream()
.map(ConditionalRoute::getName)
.collect(Collectors.toSet());

final List<SinkContextPluginSetting> sinkSettings = pipelineConfiguration.getSinkPluginSettings();
for (SinkContextPluginSetting sinkPlugin : sinkSettings) {
Collection<String> sinkRoutes = sinkPlugin.getSinkContext().getRoutes();
Comment thread
saketh-pallempati marked this conversation as resolved.
if (sinkRoutes == null) {
sinkRoutes = Collections.emptyList();
}
List<String> invalidRoutes = sinkRoutes.stream()
.filter(route -> !validRoutes.contains(route))
.collect(Collectors.toList());

if (!invalidRoutes.isEmpty()) {
throw new InvalidPipelineConfigurationException(String.format(
"The following routes do not exist in pipeline \"%s\": %s. Configured routes include %s",
pipelineName, invalidRoutes, validRoutes));
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.pipeline.parser;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel;
import org.opensearch.dataprepper.pipeline.parser.model.PipelineConfiguration;

import java.io.ByteArrayInputStream;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
public class PipelineConfigurationValidatorIT {
private static final String TEST_YAML_CONFIG =
Comment thread
saketh-pallempati marked this conversation as resolved.
"entry-pipeline:\n" +
" source:\n" +
" random:\n" +
" routes:\n" +
" - route_one: \"/my_route == 10\"\n" +
" - route_two: \"/my_route == 11\"\n" +
" sink:\n" +
" - pipeline:\n" +
" name: \"sub-pipeline-1\"\n" +
" routes:\n" +
" - FIRST_ROUTE\n" +
" - pipeline:\n" +
" name: \"sub-pipeline-2\"\n" +
" routes:\n" +
" - SECOND_ROUTE\n";

private static final String SIMPLE_PIPELINE_NO_ROUTES_CONFIG =
"entry-pipeline:\n" +
" source:\n" +
" random:\n" +
" sink:\n" +
" - stdout:\n";

private static final String ROUTES_DEFINED_NO_SINK_ROUTES_CONFIG =
"entry-pipeline:\n" +
" source:\n" +
" random:\n" +
" routes:\n" +
" - route_one: \"/my_route == 10\"\n" +
" - route_two: \"/my_route == 11\"\n" +
" sink:\n" +
" - stdout:\n";

@Test
public void test_with_invalid_sink_routes_in_yaml_should_throw() {
Map<String, PipelineConfiguration> pipelineConfigMap = getPipelineConfigMap(TEST_YAML_CONFIG);

InvalidPipelineConfigurationException exception = assertThrows(InvalidPipelineConfigurationException.class, () ->
PipelineConfigurationValidator.validateAndGetPipelineNames(pipelineConfigMap));

String expectedErrorMessage = "The following routes do not exist in pipeline \"entry-pipeline\": " +
"[FIRST_ROUTE]. Configured routes include [route_one, route_two]";

assertThat(exception.getMessage(), equalTo(expectedErrorMessage));
}

@Test
public void test_simple_pipeline_with_no_routes_succeeds() {
Map<String, PipelineConfiguration> pipelineConfigMap = getPipelineConfigMap(SIMPLE_PIPELINE_NO_ROUTES_CONFIG);

List<String> pipelineNames = assertDoesNotThrow(() ->
PipelineConfigurationValidator.validateAndGetPipelineNames(pipelineConfigMap));

assertThat(pipelineNames, hasSize(1));
assertThat(pipelineNames.get(0), equalTo("entry-pipeline"));
}

@Test
public void test_routes_defined_but_no_routes_on_sinks_succeeds() {
Map<String, PipelineConfiguration> pipelineConfigMap = getPipelineConfigMap(ROUTES_DEFINED_NO_SINK_ROUTES_CONFIG);

List<String> pipelineNames = assertDoesNotThrow(() ->
PipelineConfigurationValidator.validateAndGetPipelineNames(pipelineConfigMap));

assertThat(pipelineNames, hasSize(1));
assertThat(pipelineNames.get(0), equalTo("entry-pipeline"));
}

/**
* Helper method to create a pipeline configuration map from a YAML string
*/
private Map<String, PipelineConfiguration> getPipelineConfigMap(final String yamlConfig) {
PipelineConfigurationReader configReader = mock(PipelineConfigurationReader.class);
when(configReader.getPipelineConfigurationInputStreams())
.thenReturn(List.of(new ByteArrayInputStream(yamlConfig.getBytes())));

PipelinesDataflowModelParser modelParser = new PipelinesDataflowModelParser(configReader);
PipelinesDataFlowModel dataFlowModel = modelParser.parseConfiguration();

return dataFlowModel.getPipelines().entrySet()
.stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> new PipelineConfiguration(entry.getValue())
));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,23 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.configuration.ConditionalRoute;
import org.opensearch.dataprepper.model.sink.SinkContext;
import org.opensearch.dataprepper.pipeline.parser.model.PipelineConfiguration;
import org.opensearch.dataprepper.pipeline.parser.model.SinkContextPluginSetting;


import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class PipelineConfigurationValidatorTest {
Expand All @@ -37,4 +47,50 @@ void test_with_valid_pipeline_names_should_not_throw() {

PipelineConfigurationValidator.validateAndGetPipelineNames(objectObjectHashMap);
}

@Test
void test_with_invalid_sink_routes_should_throw() {
PipelineConfiguration pipelineConfig = mock(PipelineConfiguration.class);
SinkContextPluginSetting sinkSetting = mock(SinkContextPluginSetting.class);
SinkContext sinkContext = mock(SinkContext.class);

when(sinkSetting.getSinkContext()).thenReturn(sinkContext);
when(sinkContext.getRoutes()).thenReturn(List.of("INVALID_ROUTE"));
when(pipelineConfig.getSinkPluginSettings()).thenReturn(List.of(sinkSetting));
when(pipelineConfig.getRoutes()).thenReturn(Set.of(new ConditionalRoute("VALID_ROUTE", "condition")));

Map<String, PipelineConfiguration> pipelineConfigMap = Map.of("test-pipeline", pipelineConfig);

RuntimeException exception = assertThrows(RuntimeException.class,
() -> PipelineConfigurationValidator.validateAndGetPipelineNames(pipelineConfigMap));

assertEquals("The following routes do not exist in pipeline \"test-pipeline\": [INVALID_ROUTE]. " +
"Configured routes include [VALID_ROUTE]", exception.getMessage());
}

@Test
void test_with_valid_sink_routes_should_not_throw() {
PipelineConfiguration pipelineConfig = mock(PipelineConfiguration.class);
SinkContextPluginSetting sinkSetting = mock(SinkContextPluginSetting.class);
SinkContext sinkContext = mock(SinkContext.class);
PipelineConfiguration targetPipelineConfig = mock(PipelineConfiguration.class);
PluginSetting targetSourceSetting = mock(PluginSetting.class);

when(sinkSetting.getName()).thenReturn("pipeline");
when(sinkSetting.getAttributeFromSettings("name")).thenReturn("target-pipeline");
when(sinkContext.getRoutes()).thenReturn(List.of("VALID_ROUTE"));
when(sinkSetting.getSinkContext()).thenReturn(sinkContext);
when(pipelineConfig.getSinkPluginSettings()).thenReturn(List.of(sinkSetting));
when(pipelineConfig.getRoutes()).thenReturn(Set.of(new ConditionalRoute("VALID_ROUTE", "condition")));

when(targetSourceSetting.getName()).thenReturn("pipeline");
when(targetSourceSetting.getAttributeFromSettings("name")).thenReturn("test-pipeline");
when(targetPipelineConfig.getSourcePluginSetting()).thenReturn(targetSourceSetting);

Map<String, PipelineConfiguration> pipelineConfigMap = new HashMap<>();
pipelineConfigMap.put("test-pipeline", pipelineConfig);
pipelineConfigMap.put("target-pipeline", targetPipelineConfig);

assertDoesNotThrow(() -> PipelineConfigurationValidator.validateAndGetPipelineNames(pipelineConfigMap));
}
}