Skip to content

Commit e35b4ea

Browse files
authored
Address route and subpipeline for pipeline tranformation (#4528)
Address route and subpipeline for pipeline tranformation Signed-off-by: srigovs <srigovs@amazon.com>
1 parent 2653acd commit e35b4ea

18 files changed

Lines changed: 882 additions & 12 deletions

File tree

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/PipelineModel.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ public class PipelineModel {
3737
@JsonInclude(JsonInclude.Include.NON_NULL)
3838
private final PluginModel buffer;
3939

40-
@JsonProperty("route")
41-
@JsonAlias("routes")
40+
@JsonProperty("routes")
41+
@JsonAlias("route")
4242
@JsonInclude(JsonInclude.Include.NON_EMPTY)
4343
private final List<ConditionalRoute> routes;
4444

@@ -68,7 +68,7 @@ public PipelineModel(
6868
@JsonProperty("source") final PluginModel source,
6969
@JsonProperty("buffer") final PluginModel buffer,
7070
@JsonProperty("processor") final List<PluginModel> processors,
71-
@JsonProperty("route")@JsonAlias("routes") final List<ConditionalRoute> routes,
71+
@JsonProperty("routes")@JsonAlias("route") final List<ConditionalRoute> routes,
7272
@JsonProperty("sink") final List<SinkModel> sinks,
7373
@JsonProperty("workers") final Integer workers,
7474
@JsonProperty("delay") final Integer delay) {

data-prepper-api/src/test/resources/pipelines_data_flow_route.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ test-pipeline:
33
testSource: null
44
processor:
55
- testPrepper: null
6-
route:
6+
routes:
77
- my-route: "/a==b"
88
sink:
99
- testSink:

data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public RuleEvaluator(TransformersFactory transformersFactory) {
4141
}
4242

4343
public RuleEvaluatorResult isTransformationNeeded(PipelinesDataFlowModel pipelineModel) {
44+
//TODO - Dynamically scan the rules folder and get the corresponding template.
4445
return isDocDBSource(pipelineModel);
4546
}
4647

data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformer.java

Lines changed: 56 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import com.fasterxml.jackson.databind.ObjectMapper;
1010
import com.fasterxml.jackson.databind.node.ArrayNode;
1111
import com.fasterxml.jackson.databind.node.ObjectNode;
12+
import com.fasterxml.jackson.databind.node.TextNode;
1213
import com.jayway.jsonpath.Configuration;
1314
import com.jayway.jsonpath.JsonPath;
1415
import com.jayway.jsonpath.Option;
@@ -18,6 +19,7 @@
1819
import static java.lang.String.format;
1920
import org.opensearch.dataprepper.model.configuration.PipelineModel;
2021
import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel;
22+
import org.opensearch.dataprepper.model.configuration.SinkModel;
2123
import org.opensearch.dataprepper.pipeline.parser.rule.RuleEvaluator;
2224
import org.opensearch.dataprepper.pipeline.parser.rule.RuleEvaluatorResult;
2325
import org.slf4j.Logger;
@@ -72,6 +74,8 @@ public class DynamicConfigTransformer implements PipelineConfigurationTransforme
7274
private static final String JSON_PATH_IDENTIFIER = "$.";
7375
private static final String ARRAY_NODE_PATTERN = "([^\\[]+)\\[(\\d+)\\]$";
7476
private static final String SOURCE_COORDINATION_IDENTIFIER_ENVIRONMENT_VARIABLE = "SOURCE_COORDINATION_PIPELINE_IDENTIFIER";
77+
private static final String SINK_SUBPIPELINE_PLUGIN_NAME = "pipeline";
78+
private static final String SUBPIPELINE_PATH = "$.source.pipeline";
7579

7680

7781
Configuration parseConfigWithJsonNode = Configuration.builder()
@@ -116,6 +120,8 @@ public PipelinesDataFlowModel transformConfiguration(PipelinesDataFlowModel preT
116120
try {
117121

118122
Map<String, PipelineModel> pipelines = preTransformedPipelinesDataFlowModel.getPipelines();
123+
List<String> subPipelineNames = new ArrayList<>();
124+
checkForSubPipelines(preTransformedPipelinesDataFlowModel, pipelineNameThatNeedsTransformation, subPipelineNames);
119125
Map<String, PipelineModel> pipelineMap = new HashMap<>();
120126
pipelineMap.put(pipelineNameThatNeedsTransformation,
121127
pipelines.get(pipelineNameThatNeedsTransformation));
@@ -156,7 +162,10 @@ public PipelinesDataFlowModel transformConfiguration(PipelinesDataFlowModel preT
156162
}
157163
});
158164

159-
PipelinesDataFlowModel transformedPipelinesDataFlowModel = getTransformedPipelinesDataFlowModel(pipelineNameThatNeedsTransformation, preTransformedPipelinesDataFlowModel, templateRootNode);
165+
PipelinesDataFlowModel transformedPipelinesDataFlowModel = getTransformedPipelinesDataFlowModel(pipelineNameThatNeedsTransformation,
166+
preTransformedPipelinesDataFlowModel,
167+
templateRootNode,
168+
subPipelineNames);
160169
return transformedPipelinesDataFlowModel;
161170
} catch (JsonProcessingException | TransformerException e) {
162171
throw new RuntimeException(e);
@@ -165,16 +174,36 @@ public PipelinesDataFlowModel transformConfiguration(PipelinesDataFlowModel preT
165174
}
166175
}
167176

177+
private void checkForSubPipelines(PipelinesDataFlowModel preTransformedPipelinesDataFlowModel,
178+
String pipelineNameThatNeedsTransformation,
179+
List<String> subPipelineNames) {
180+
Map<String, PipelineModel> pipelines = preTransformedPipelinesDataFlowModel.getPipelines();
181+
PipelineModel transformationPipeline = pipelines.get(pipelineNameThatNeedsTransformation);
182+
183+
List<SinkModel> sinks = transformationPipeline.getSinks();
184+
for(SinkModel sink : sinks){
185+
String pluginName = sink.getPluginName();
186+
if (pluginName.equals(SINK_SUBPIPELINE_PLUGIN_NAME)) {
187+
String subPipelineName = sink.getPluginSettings().get("name").toString();
188+
subPipelineNames.add(subPipelineName);
189+
}
190+
}
191+
}
192+
168193
/**
169194
* Convert templateRootNode which contains the transformedJson to PipelinesDataFlowModel
170195
*
171196
* @param pipelineNameThatNeedsTransformation
172197
* @param preTransformedPipelinesDataFlowModel
173198
* @param templateRootNode - transformedJson Node.
199+
* @param subPipelineNames
174200
* @return PipelinesDataFlowModel - transformed model.
175201
* @throws JsonProcessingException
176202
*/
177-
private PipelinesDataFlowModel getTransformedPipelinesDataFlowModel(String pipelineNameThatNeedsTransformation, PipelinesDataFlowModel preTransformedPipelinesDataFlowModel, JsonNode templateRootNode) throws JsonProcessingException {
203+
private PipelinesDataFlowModel getTransformedPipelinesDataFlowModel(String pipelineNameThatNeedsTransformation,
204+
PipelinesDataFlowModel preTransformedPipelinesDataFlowModel,
205+
JsonNode templateRootNode,
206+
List<String> subPipelineNames) throws JsonProcessingException {
178207

179208
//update template json
180209
JsonNode transformedJsonNode = templateRootNode.get(TEMPLATE_PIPELINE_ROOT_STRING);
@@ -189,7 +218,16 @@ private PipelinesDataFlowModel getTransformedPipelinesDataFlowModel(String pipel
189218
Map<String, PipelineModel> pipelines = preTransformedPipelinesDataFlowModel.getPipelines();
190219
pipelines.forEach((pipelineName, pipeline) -> {
191220
if (!pipelineName.equals(pipelineNameThatNeedsTransformation)) {
192-
transformedPipelines.put(pipelineName, pipeline);
221+
if(subPipelineNames.size()>0 && subPipelineNames.contains(pipelineName)){ //if there are subpipelines
222+
try {
223+
PipelineModel subPipeline = getSubModifiedPipeline(pipeline, pipelineNameThatNeedsTransformation);
224+
transformedPipelines.put(pipelineName, subPipeline);
225+
} catch (JsonProcessingException e) {
226+
throw new RuntimeException(e);
227+
}
228+
}else {
229+
transformedPipelines.put(pipelineName, pipeline);
230+
}
193231
}
194232
});
195233

@@ -204,6 +242,21 @@ private PipelinesDataFlowModel getTransformedPipelinesDataFlowModel(String pipel
204242
return transformedPipelinesDataFlowModel;
205243
}
206244

245+
private PipelineModel getSubModifiedPipeline(PipelineModel pipeline,
246+
String pipelineNameThatNeedsTransformation) throws JsonProcessingException {
247+
String pipelineJson = objectMapper.writeValueAsString(pipeline);
248+
JsonNode pipelineNode = objectMapper.readTree(pipelineJson);
249+
JsonNode parentNode = JsonPath.using(parseConfigWithJsonNode).parse(pipelineNode).read(SUBPIPELINE_PATH);
250+
251+
//TODO - Dynamically detect the 2nd pipeline in the template of the transformed pipeline
252+
JsonNode newNode = new TextNode(pipelineNameThatNeedsTransformation +"-s3");
253+
((ObjectNode) parentNode).replace("name", newNode);
254+
String subPipelineJson = objectMapper.writeValueAsString(pipelineNode);
255+
PipelineModel subPipeline = objectMapper.readValue(subPipelineJson, PipelineModel.class);
256+
257+
return subPipeline;
258+
}
259+
207260
private String replaceTemplatePipelineName(String templateJsonStringWithPipelinePlaceholder, String pipelineName) {
208261
return templateJsonStringWithPipelinePlaceholder.replaceAll(PIPELINE_NAME_PLACEHOLDER_REGEX, pipelineName);
209262
}

data-prepper-pipeline-parser/src/main/resources/templates/documentdb-template.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,4 +78,4 @@
7878
interval: "60s"
7979
processor: "<<$.<<pipeline-name>>.processor>>"
8080
sink: "<<$.<<pipeline-name>>.sink>>"
81-
routes: "<<$.<<pipeline-name>>.routes>>"
81+
routes: "<<$.<<pipeline-name>>.routes>>" # In placeholder, routes or route (defined as alias) will be transformed to route in json as route will be primarily picked in pipelineModel.

data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/TestConfigurationProvider.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ public class TestConfigurationProvider {
4444
public static final String USER_CONFIG_TRANSFORMATION_DOCDB2_CONFIG_FILE = "src/test/resources/transformation/userConfig/documentdb2-userconfig.yaml";
4545
public static final String USER_CONFIG_TRANSFORMATION_DOCUMENTDB_CONFIG_FILE = "src/test/resources/transformation/userConfig/documentdb-userconfig.yaml";
4646
public static final String USER_CONFIG_TRANSFORMATION_DOCUMENTDB_SUBPIPELINES_CONFIG_FILE = "src/test/resources/transformation/userConfig/documentdb-subpipelines-userconfig.yaml";
47+
public static final String USER_CONFIG_TRANSFORMATION_DOCUMENTDB_ROUTES_CONFIG_FILE = "src/test/resources/transformation/userConfig/documentdb-routes-userconfig.yaml";
48+
public static final String USER_CONFIG_TRANSFORMATION_DOCUMENTDB_ROUTE_CONFIG_FILE = "src/test/resources/transformation/userConfig/documentdb-route-userconfig.yaml";
49+
public static final String USER_CONFIG_TRANSFORMATION_DOCUMENTDB_SUBPIPELINES_ROUTES_CONFIG_FILE = "src/test/resources/transformation/userConfig/documentdb-subpipelines-routes-userconfig.yaml";
4750
public static final String USER_CONFIG_TRANSFORMATION_DOCUMENTDB_FUNCTION_CONFIG_FILE = "src/test/resources/transformation/userConfig/documentdb-function-userconfig.yaml";
4851

4952
public static final String RULES_TRANSFORMATION_DOCDB1_CONFIG_FILE = "src/test/resources/transformation/rules/documentdb1-rule.yaml";
@@ -55,12 +58,16 @@ public class TestConfigurationProvider {
5558
public static final String TEMPLATE_TRANSFORMATION_DOCDB2_CONFIG_FILE = "src/test/resources/transformation/templates/testSource/documentdb2-template.yaml";
5659
public static final String TEMPLATE_TRANSFORMATION_DOCUMENTDB_CONFIG_FILE = "src/test/resources/transformation/templates/testSource/documentdb-template.yaml";
5760
public static final String TEMPLATE_TRANSFORMATION_DOCUMENTDB_SUBPIPELINES_CONFIG_FILE = "src/test/resources/transformation/templates/testSource/documentdb-subpipelines-template.yaml";
61+
public static final String TEMPLATE_TRANSFORMATION_DOCUMENTDB_FINAL_CONFIG_FILE = "src/test/resources/transformation/templates/testSource/documentdb-template-final.yaml";
5862
public static final String TEMPLATE_TRANSFORMATION_DOCUMENTDB_FUNCTION_CONFIG_FILE = "src/test/resources/transformation/templates/testSource/documentdb-function-template.yaml";
5963

6064
public static final String EXPECTED_TRANSFORMATION_DOCDB1_CONFIG_FILE = "src/test/resources/transformation/expected/documentdb1-expected.yaml";
6165
public static final String EXPECTED_TRANSFORMATION_DOCDB2_CONFIG_FILE = "src/test/resources/transformation/expected/documentdb2-expected.yaml";
6266
public static final String EXPECTED_TRANSFORMATION_DOCUMENTDB_CONFIG_FILE = "src/test/resources/transformation/expected/documentdb-expected.yaml";
6367
public static final String EXPECTED_TRANSFORMATION_DOCUMENTDB_SUBPIPLINES_CONFIG_FILE = "src/test/resources/transformation/expected/documentdb-subpipelines-expected.yaml";
68+
public static final String EXPECTED_TRANSFORMATION_DOCUMENTDB_ROUTES_CONFIG_FILE = "src/test/resources/transformation/expected/documentdb-routes-expected.yaml";
69+
public static final String EXPECTED_TRANSFORMATION_DOCUMENTDB_ROUTE_CONFIG_FILE = "src/test/resources/transformation/expected/documentdb-route-expected.yaml";
70+
public static final String EXPECTED_TRANSFORMATION_DOCUMENTDB_SUBPIPELINES_ROUTES_CONFIG_FILE = "src/test/resources/transformation/expected/documentdb-subpipelines-routes-expected.yaml";
6471
public static final String EXPECTED_TRANSFORMATION_DOCUMENTDB_FUNCTION_CONFIG_FILE = "src/test/resources/transformation/expected/documentdb-function-expected.yaml";
6572

6673

0 commit comments

Comments
 (0)