From 2947aeea3dcb57161b72d0a3017aa8587e2d012c Mon Sep 17 00:00:00 2001 From: Kondaka Date: Thu, 21 May 2026 22:51:20 -0700 Subject: [PATCH 1/5] Add TransformFunctionProvider to DynamicConfigTransformer Signed-off-by: Kondaka --- .../annotations/TransformationFunction.java | 34 ++++ .../PipelineTransformFunctionProvider.java | 24 +++ .../TransformationFunctionTest.java | 70 ++++++++ ...PipelineTransformFunctionProviderTest.java | 66 ++++++++ .../pipeline/parser/rule/RuleEvaluator.java | 5 +- .../parser/rule/RuleEvaluatorResult.java | 4 + .../parser/rule/RuleFileEvaluation.java | 3 + .../parser/rule/RuleTransformerModel.java | 8 +- .../transformer/DynamicConfigTransformer.java | 156 +++++++----------- .../parser/rule/RuleEvaluatorResultTest.java | 36 +++- .../parser/rule/RuleFileEvaluationTest.java | 80 +++++++++ .../parser/rule/RuleTransformerModelTest.java | 45 ++++- .../DynamicConfigTransformerTest.java | 89 +++++----- .../TestTransformFunctionProvider.java | 70 ++++++++ .../transformation/rules/documentdb-rule.yaml | 2 + .../rules/documentdb1-rule.yaml | 4 +- .../transformation/rules/rds-joins-rule.yaml | 2 + .../transformation/rules/rds-rule.yaml | 2 + .../aws/PipelineTransformFunctions.java | 138 ++++++++++++++++ .../aws/PipelineTransformFunctionsTest.java | 149 +++++++++++++++++ .../transforms/rules/documentdb-rule.yaml | 4 +- .../transforms/rules/mongodb-rule.yaml | 4 +- .../transforms/rules/rds-joins-rule.yaml | 3 +- .../transforms/rules/rds-rule.yaml | 4 +- 24 files changed, 851 insertions(+), 151 deletions(-) create mode 100644 data-prepper-api/src/main/java/org/opensearch/dataprepper/model/annotations/TransformationFunction.java create mode 100644 data-prepper-api/src/main/java/org/opensearch/dataprepper/model/plugin/PipelineTransformFunctionProvider.java create mode 100644 data-prepper-api/src/test/java/org/opensearch/dataprepper/model/annotations/TransformationFunctionTest.java create mode 100644 data-prepper-api/src/test/java/org/opensearch/dataprepper/model/plugin/PipelineTransformFunctionProviderTest.java create mode 100644 data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleFileEvaluationTest.java create mode 100644 data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/TestTransformFunctionProvider.java create mode 100644 data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/PipelineTransformFunctions.java create mode 100644 data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/PipelineTransformFunctionsTest.java diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/annotations/TransformationFunction.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/annotations/TransformationFunction.java new file mode 100644 index 0000000000..e8b4617bf1 --- /dev/null +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/annotations/TransformationFunction.java @@ -0,0 +1,34 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.model.annotations; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Marks a method as a pipeline transformation function that can be invoked + * dynamically from template YAML files via the {@code FUNCTION_NAME} placeholder. + *

+ * Annotated methods must be {@code public static} and accept a single {@code String} + * parameter, returning a {@code String} result. + *

+ * The enclosing class must implement + * {@link org.opensearch.dataprepper.model.plugin.PipelineTransformFunctionProvider}. + * + * @since 2.12 + */ +@Documented +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.METHOD}) +public @interface TransformationFunction { +} diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/plugin/PipelineTransformFunctionProvider.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/plugin/PipelineTransformFunctionProvider.java new file mode 100644 index 0000000000..a003f0ca28 --- /dev/null +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/plugin/PipelineTransformFunctionProvider.java @@ -0,0 +1,24 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.model.plugin; + +/** + * Marker interface for classes that provide pipeline transformation functions. + * Classes implementing this interface can be referenced in rule YAML files via + * the {@code function_providers} field and have their methods invoked dynamically + * during pipeline template transformation. + *

+ * Methods intended to be callable from templates must also be annotated with + * {@link org.opensearch.dataprepper.model.annotations.TransformationFunction}. + * + * @since 2.12 + */ +public interface PipelineTransformFunctionProvider { +} diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/annotations/TransformationFunctionTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/annotations/TransformationFunctionTest.java new file mode 100644 index 0000000000..1c1f82d9ab --- /dev/null +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/annotations/TransformationFunctionTest.java @@ -0,0 +1,70 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.model.annotations; + +import org.junit.jupiter.api.Test; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class TransformationFunctionTest { + + @Test + void annotation_is_retained_at_runtime() { + Retention retention = TransformationFunction.class.getAnnotation(Retention.class); + assertNotNull(retention); + assertThat(retention.value(), equalTo(RetentionPolicy.RUNTIME)); + } + + @Test + void annotation_targets_methods() { + Target target = TransformationFunction.class.getAnnotation(Target.class); + assertNotNull(target); + assertThat(target.value().length, equalTo(1)); + assertThat(target.value()[0], equalTo(ElementType.METHOD)); + } + + @Test + void annotation_is_documented() { + Documented documented = TransformationFunction.class.getAnnotation(Documented.class); + assertNotNull(documented); + } + + @Test + void annotation_is_present_on_annotated_method() throws NoSuchMethodException { + assertTrue(AnnotatedClass.class.getMethod("annotatedMethod", String.class) + .isAnnotationPresent(TransformationFunction.class)); + } + + @Test + void annotation_is_not_present_on_unannotated_method() throws NoSuchMethodException { + assertTrue(!AnnotatedClass.class.getMethod("unannotatedMethod", String.class) + .isAnnotationPresent(TransformationFunction.class)); + } + + static class AnnotatedClass { + @TransformationFunction + public static String annotatedMethod(String input) { + return input; + } + + public static String unannotatedMethod(String input) { + return input; + } + } +} diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/plugin/PipelineTransformFunctionProviderTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/plugin/PipelineTransformFunctionProviderTest.java new file mode 100644 index 0000000000..c081cd61c3 --- /dev/null +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/plugin/PipelineTransformFunctionProviderTest.java @@ -0,0 +1,66 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.model.plugin; + +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class PipelineTransformFunctionProviderTest { + + @Test + void implementing_class_is_assignable_from_interface() { + PipelineTransformFunctionProvider provider = new TestFunctionProvider(); + assertNotNull(provider); + assertTrue(provider instanceof PipelineTransformFunctionProvider); + } + + @Test + void interface_is_assignable_from_implementing_class() { + assertThat(PipelineTransformFunctionProvider.class.isAssignableFrom(TestFunctionProvider.class), + equalTo(true)); + } + + @Test + void interface_is_not_assignable_from_non_implementing_class() { + assertThat(PipelineTransformFunctionProvider.class.isAssignableFrom(NonProvider.class), + equalTo(false)); + } + + @Test + void interface_has_no_declared_methods() { + assertThat(PipelineTransformFunctionProvider.class.getDeclaredMethods().length, equalTo(0)); + } + + @Test + void interface_is_public() { + assertTrue(java.lang.reflect.Modifier.isPublic(PipelineTransformFunctionProvider.class.getModifiers())); + } + + @Test + void interface_is_an_interface() { + assertTrue(PipelineTransformFunctionProvider.class.isInterface()); + } + + static class TestFunctionProvider implements PipelineTransformFunctionProvider { + public static String sampleFunction(String input) { + return input; + } + } + + static class NonProvider { + public static String someMethod(String input) { + return input; + } + } +} diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluator.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluator.java index 1b05ea5b79..787c84ccc3 100644 --- a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluator.java +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluator.java @@ -62,6 +62,7 @@ public RuleEvaluatorResult isTransformationNeeded(PipelinesDataFlowModel pipelin return RuleEvaluatorResult.builder() .withEvaluatedResult(true) .withPipelineTemplateModel(templateModel) + .withFunctionProviders(ruleFileEvaluation.getFunctionProviders()) .withPipelineName(entry.getKey()) .build(); } @@ -76,6 +77,7 @@ public RuleEvaluatorResult isTransformationNeeded(PipelinesDataFlowModel pipelin return RuleEvaluatorResult.builder() .withEvaluatedResult(false) .withPipelineName(null) + .withFunctionProviders(null) .withPipelineTemplateModel(null) .build(); } @@ -131,6 +133,7 @@ private RuleFileEvaluation evaluate(String pipelinesJson) { return RuleFileEvaluation.builder() .withPluginName(pluginName) .withRuleFileName(parsedRule.fileName) + .withFunctionProviders(rulesModel.getFunctionProviders()) .withResult(true) .build(); } @@ -163,4 +166,4 @@ private static class ParsedRule { this.fileName = fileName; } } -} \ No newline at end of file +} diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorResult.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorResult.java index e89768fee6..185daa1aa2 100644 --- a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorResult.java +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorResult.java @@ -9,6 +9,8 @@ import lombok.Getter; import org.opensearch.dataprepper.pipeline.parser.transformer.PipelineTemplateModel; +import java.util.List; + @Builder(setterPrefix = "with") @Getter @AllArgsConstructor @@ -20,6 +22,8 @@ public class RuleEvaluatorResult { private PipelineTemplateModel pipelineTemplateModel; + private List functionProviders; + public RuleEvaluatorResult() { } diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleFileEvaluation.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleFileEvaluation.java index 88ee0f8d6c..d89c871cc4 100644 --- a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleFileEvaluation.java +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleFileEvaluation.java @@ -4,6 +4,8 @@ import lombok.Builder; import lombok.Data; +import java.util.List; + @Builder(setterPrefix = "with") @AllArgsConstructor @Data @@ -11,6 +13,7 @@ public class RuleFileEvaluation { private Boolean result; private String ruleFileName; private String pluginName; + private List functionProviders; public RuleFileEvaluation() { diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleTransformerModel.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleTransformerModel.java index aaf757cedb..595d372e65 100644 --- a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleTransformerModel.java +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleTransformerModel.java @@ -22,6 +22,9 @@ public class RuleTransformerModel { @JsonProperty("plugin_name") private String pluginName; + @JsonProperty("function_providers") + private List functionProviders; + public RuleTransformerModel() { } @@ -29,6 +32,7 @@ public RuleTransformerModel() { public String toString() { return "RuleConfiguration{" + "applyWhen=" + applyWhen + - "\npluginName="+ pluginName +'}'; + "\nfunctionProviders=" + functionProviders + + "\npluginName=" + pluginName + '}'; } -} \ No newline at end of file +} diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformer.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformer.java index 23c89a7359..6e60831ffd 100644 --- a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformer.java +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformer.java @@ -17,7 +17,6 @@ import com.jayway.jsonpath.spi.json.JacksonJsonNodeJsonProvider; import com.jayway.jsonpath.spi.mapper.JacksonMappingProvider; import static java.lang.String.format; -import static org.opensearch.dataprepper.plugins.source.rds.RdsService.MAX_SOURCE_IDENTIFIER_LENGTH; import org.opensearch.dataprepper.model.configuration.PipelineModel; import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel; @@ -26,9 +25,9 @@ import org.opensearch.dataprepper.pipeline.parser.rule.RuleEvaluatorResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.opensearch.dataprepper.plugins.source.rds.utils.IdentifierShortener; -import software.amazon.awssdk.arns.Arn; +import org.opensearch.dataprepper.model.annotations.TransformationFunction; +import org.opensearch.dataprepper.model.plugin.PipelineTransformFunctionProvider; import javax.xml.transform.TransformerException; import java.io.IOException; import java.lang.reflect.Method; @@ -43,6 +42,10 @@ public class DynamicConfigTransformer implements PipelineConfigurationTransformer { private static final Logger LOG = LoggerFactory.getLogger(DynamicConfigTransformer.class); + private static final Pattern FULLY_QUALIFIED_METHOD_PATTERN = Pattern.compile( + "^[a-zA-Z_][a-zA-Z0-9_]*(\\.[a-zA-Z_][a-zA-Z0-9_]*)*(\\.|::)[a-zA-Z_][a-zA-Z0-9_]*$" + ); + private final ObjectMapper objectMapper = new ObjectMapper(); private final RuleEvaluator ruleEvaluator; @@ -76,12 +79,9 @@ public class DynamicConfigTransformer implements PipelineConfigurationTransforme private static final String RECURSIVE_JSON_PATH_PATH = "$.."; private static final String JSON_PATH_IDENTIFIER = "$."; private static final String ARRAY_NODE_PATTERN = "([^\\[]+)\\[(\\d+)\\]$"; - private static final String SOURCE_COORDINATION_IDENTIFIER_ENVIRONMENT_VARIABLE = "SOURCE_COORDINATION_PIPELINE_IDENTIFIER"; private static final String SINK_SUBPIPELINE_PLUGIN_NAME = "pipeline"; private static final String SUBPIPELINE_PATH = "$.source.pipeline"; - private static final String S3_BUFFER_PREFIX = "/buffer"; - /** * Pattern to match overlay directives like "<>" * The captured group is the target path (e.g., "sink[*].opensearch") @@ -126,6 +126,7 @@ public PipelinesDataFlowModel transformConfiguration(PipelinesDataFlowModel preT //To differentiate between sub-pipelines that dont need transformation. String pipelineNameThatNeedsTransformation = ruleEvaluatorResult.getPipelineName(); PipelineTemplateModel templateModel = ruleEvaluatorResult.getPipelineTemplateModel(); + List functionProviders = ruleEvaluatorResult.getFunctionProviders(); LOG.info("Transforming pipeline config for pipeline {}",pipelineNameThatNeedsTransformation); try { @@ -149,7 +150,7 @@ public PipelinesDataFlowModel transformConfiguration(PipelinesDataFlowModel preT JsonNode templateRootNode = objectMapper.readTree(templateJsonString); // get exact path in pipelineJson - Map pipelineExactPathMap = findExactPath(placeholdersMap, pipelineJson); + Map pipelineExactPathMap = findExactPath(placeholdersMap, pipelineJson, functionProviders); //replace placeholder with actual value in the template context placeholdersMap.forEach((placeholder, templateJsonPathList) -> { @@ -173,7 +174,7 @@ public PipelinesDataFlowModel transformConfiguration(PipelinesDataFlowModel preT }); // Process <> directives — merge overlay fields into resolved config - processOverlayDirectives(templateRootNode, pipelineJson); + processOverlayDirectives(templateRootNode, pipelineJson, functionProviders); PipelinesDataFlowModel transformedPipelinesDataFlowModel = getTransformedPipelinesDataFlowModel(pipelineNameThatNeedsTransformation, preTransformedPipelinesDataFlowModel, @@ -333,12 +334,12 @@ private void populateMapWithPlaceholderPaths(JsonNode currentNode, String curren * @return Map K:jsonPath, V:exactPath * @throws IOException */ - private Map findExactPath(Map> placeholdersMap, String pipelineJson) throws IOException, TransformerException { + private Map findExactPath(Map> placeholdersMap, String pipelineJson, final List functionProviders) throws IOException, TransformerException { Map mapWithPaths = new HashMap<>(); for (String genericPathPlaceholder : placeholdersMap.keySet()) { String placeHolderValue = getValueFromPlaceHolder(genericPathPlaceholder); - String value = executeFunctionPlaceholder(placeHolderValue, pipelineJson); + String value = executeFunctionPlaceholder(placeHolderValue, pipelineJson, functionProviders); // Recursive pattern in json path is NOT allowed if (value!=null && value.contains(RECURSIVE_JSON_PATH_PATH)) { @@ -366,14 +367,14 @@ private String getValueFromPlaceHolder(String placeholder) { * @param functionPlaceholderValue * @return String - value of the function executed */ - private String executeFunctionPlaceholder(String functionPlaceholderValue, String pipelineJson){ + private String executeFunctionPlaceholder(String functionPlaceholderValue, String pipelineJson, final List functionProviders){ Matcher functionMatcher = FUNCTION_CALL_PLACEHOLDER_PATTERN.matcher(functionPlaceholderValue); if (functionMatcher.find()) { String functionName = functionMatcher.group(1); String parameter = functionMatcher.group(2); try { String parameterValue = (String)parseParameter(parameter, pipelineJson); - String value = (String) invokeMethod(functionName, String.class, parameterValue); + String value = (String) invokeMethod(functionProviders, functionName, String.class, parameterValue); return value; } catch (ReflectiveOperationException e) { throw new RuntimeException(e); @@ -418,73 +419,11 @@ private boolean isJsonPath(String parameter) { } } - /** - * Calculate s3 folder scan depth for DocDB source pipeline - * @param s3Prefix: s3 prefix defined in the source configuration - * @return s3 folder scan depth - */ - public String calculateDepth(String s3Prefix) { - return Integer.toString(getDepth(s3Prefix, 4)); - } - - protected String getSourceCoordinationIdentifier() { - return System.getenv(SOURCE_COORDINATION_IDENTIFIER_ENVIRONMENT_VARIABLE); - } - - /** - * Calculate s3 folder scan depth for RDS source pipeline - * @param s3Prefix: s3 prefix defined in the source configuration - * @return s3 folder scan depth - */ - public String calculateDepthForRdsSource(String s3Prefix) { - String envSourceCoordinationIdentifier = getSourceCoordinationIdentifier(); - int baseDepth = envSourceCoordinationIdentifier != null ? 3 : 2; - return Integer.toString(getDepth(s3Prefix, baseDepth)); - } - - private int getDepth(String s3Prefix, int baseDepth) { - if(s3Prefix == null){ - return baseDepth; - } - return s3Prefix.split("/").length + baseDepth; - } - - public String getSourceCoordinationIdentifierEnvVariable(String s3Prefix){ - String envSourceCoordinationIdentifier = System.getenv(SOURCE_COORDINATION_IDENTIFIER_ENVIRONMENT_VARIABLE); - if(s3Prefix == null){ - return envSourceCoordinationIdentifier; - } - return s3Prefix+"/"+envSourceCoordinationIdentifier; - } - - /** - * Get the include_prefix in s3 scan source. This is a function specific to RDS source. - * @param s3Prefix: s3 prefix defined in the source configuration - * @return the actual include_prefix - */ - public String getIncludePrefixForRdsSource(String s3Prefix) { - final String envSourceCoordinationIdentifier = System.getenv(SOURCE_COORDINATION_IDENTIFIER_ENVIRONMENT_VARIABLE); - final String shortenedSourceIdentifier = envSourceCoordinationIdentifier != null ? - IdentifierShortener.shortenIdentifier(envSourceCoordinationIdentifier, MAX_SOURCE_IDENTIFIER_LENGTH) : null; - if (s3Prefix == null && envSourceCoordinationIdentifier == null) { - return S3_BUFFER_PREFIX; - } else if (s3Prefix == null) { - return shortenedSourceIdentifier + S3_BUFFER_PREFIX; - } else if (envSourceCoordinationIdentifier == null) { - return s3Prefix + S3_BUFFER_PREFIX; - } - return s3Prefix + "/" + shortenedSourceIdentifier + S3_BUFFER_PREFIX; - } - - public String getAccountIdFromRole(final String roleArn) { - if (roleArn == null) - return null; - try { - return Arn.fromString(roleArn).accountId().orElse(null); - } catch (Exception e) { - LOG.warn("Malformatted role ARN for dynamic transformation: {}", roleArn); - return null; + public static boolean isFullyQualifiedMethod(String input) { + if (input == null || input.isEmpty()) { + return false; } + return FULLY_QUALIFIED_METHOD_PATTERN.matcher(input).matches(); } /** @@ -496,15 +435,44 @@ public String getAccountIdFromRole(final String roleArn) { * @return the result of the method invocation * @throws ReflectiveOperationException if the method cannot be invoked */ - public Object invokeMethod(String methodName, Class parameterType, Object arg) throws ReflectiveOperationException { - // Get the Class object - Class clazz = this.getClass(); + public Object invokeMethod(final List functionProviders, String methodName, Class parameterType, Object arg) throws ReflectiveOperationException { + if (functionProviders == null || functionProviders.isEmpty()) { + throw new RuntimeException("function_providers cannot be empty"); + } + + Class clazz = resolveClassForMethod(functionProviders, methodName, parameterType); + + if (!PipelineTransformFunctionProvider.class.isAssignableFrom(clazz)) { + throw new RuntimeException("Class '" + clazz.getName() + + "' does not implement PipelineTransformFunctionProvider"); + } - // Get the Method object for the specified method and parameter type Method method = clazz.getMethod(methodName, parameterType); - // Invoke the method on the object with the given argument - return method.invoke(this, arg); + if (!method.isAnnotationPresent(TransformationFunction.class)) { + throw new RuntimeException("Method '" + methodName + "' in class '" + clazz.getName() + + "' is not annotated with @TransformationFunction"); + } + + return method.invoke(null, arg); + } + + private Class resolveClassForMethod(final List functionProviders, String methodName, Class parameterType) throws ReflectiveOperationException { + if (functionProviders.size() == 1) { + return Class.forName(functionProviders.get(0)); + } + + for (final String functionProvider : functionProviders) { + try { + Class candidate = Class.forName(functionProvider); + candidate.getMethod(methodName, parameterType); + return candidate; + } catch (NoSuchMethodException e) { + continue; + } + } + + throw new RuntimeException("Could not find a class with method '" + methodName + "' in function_providers: " + functionProviders); } @@ -517,11 +485,11 @@ public Object invokeMethod(String methodName, Class parameterType, Object arg * Example: <> with value {action: "upsert", script: {...}} * will merge those fields into every opensearch sink entry. */ - private void processOverlayDirectives(JsonNode rootNode, String pipelineJson) { - processOverlayDirectivesRecursive(rootNode, pipelineJson); + private void processOverlayDirectives(JsonNode rootNode, String pipelineJson, List functionProviders) { + processOverlayDirectivesRecursive(rootNode, pipelineJson, functionProviders); } - private void processOverlayDirectivesRecursive(JsonNode node, String pipelineJson) { + private void processOverlayDirectivesRecursive(JsonNode node, String pipelineJson, List functionProviders) { if (node.isObject()) { ObjectNode objectNode = (ObjectNode) node; List overlayKeys = new ArrayList<>(); @@ -543,7 +511,7 @@ private void processOverlayDirectivesRecursive(JsonNode node, String pipelineJso String targetPath = matcher.group(1); JsonNode overlayValue = objectNode.get(overlayKey); // Resolve any <<...>> placeholders inside overlay values - resolveOverlayPlaceholders(overlayValue, pipelineJson); + resolveOverlayPlaceholders(overlayValue, pipelineJson, functionProviders); applyOverlay(objectNode, targetPath, overlayValue); objectNode.remove(overlayKey); } @@ -552,11 +520,11 @@ private void processOverlayDirectivesRecursive(JsonNode node, String pipelineJso // Recurse into remaining children Iterator> fields = objectNode.fields(); while (fields.hasNext()) { - processOverlayDirectivesRecursive(fields.next().getValue(), pipelineJson); + processOverlayDirectivesRecursive(fields.next().getValue(), pipelineJson, functionProviders); } } else if (node.isArray()) { for (JsonNode element : node) { - processOverlayDirectivesRecursive(element, pipelineJson); + processOverlayDirectivesRecursive(element, pipelineJson, functionProviders); } } } @@ -564,7 +532,7 @@ private void processOverlayDirectivesRecursive(JsonNode node, String pipelineJso /** * Resolves <<...>> placeholders in overlay value nodes using the pipeline config. */ - private void resolveOverlayPlaceholders(JsonNode node, String pipelineJson) { + private void resolveOverlayPlaceholders(JsonNode node, String pipelineJson, List functionProviders) { if (node.isObject()) { ObjectNode objectNode = (ObjectNode) node; List> entries = new ArrayList<>(); @@ -575,7 +543,7 @@ private void resolveOverlayPlaceholders(JsonNode node, String pipelineJson) { Matcher matcher = PLACEHOLDER_PATTERN.matcher(text); if (matcher.find()) { String placeholder = getValueFromPlaceHolder(text); - String resolved = executeFunctionPlaceholder(placeholder, pipelineJson); + String resolved = executeFunctionPlaceholder(placeholder, pipelineJson, functionProviders); if (isJsonPath(resolved)) { JsonNode resolvedNode = JsonPath.using(parseConfigWithJsonNode) .parse(pipelineJson).read(resolved); @@ -589,12 +557,12 @@ private void resolveOverlayPlaceholders(JsonNode node, String pipelineJson) { } } } else { - resolveOverlayPlaceholders(entry.getValue(), pipelineJson); + resolveOverlayPlaceholders(entry.getValue(), pipelineJson, functionProviders); } } } else if (node.isArray()) { for (JsonNode element : node) { - resolveOverlayPlaceholders(element, pipelineJson); + resolveOverlayPlaceholders(element, pipelineJson, functionProviders); } } } diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorResultTest.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorResultTest.java index 104beeee6e..1304e6cca2 100644 --- a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorResultTest.java +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorResultTest.java @@ -7,6 +7,9 @@ import org.junit.jupiter.api.Test; import org.opensearch.dataprepper.pipeline.parser.transformer.PipelineTemplateModel; +import java.util.Arrays; +import java.util.List; + import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; @@ -17,15 +20,34 @@ class RuleEvaluatorResultTest { @Test void builder_createsObjectCorrectly() { PipelineTemplateModel pipelineTemplateModel = new PipelineTemplateModel(); + List functionProviders = Arrays.asList("org.example.Functions"); + RuleEvaluatorResult result = RuleEvaluatorResult.builder() .withEvaluatedResult(true) .withPipelineName("testPipeline") .withPipelineTemplateModel(pipelineTemplateModel) + .withFunctionProviders(functionProviders) .build(); assertTrue(result.isEvaluatedResult()); - assertEquals(result.getPipelineName(), "testPipeline"); - assertEquals(result.getPipelineTemplateModel(), pipelineTemplateModel); + assertEquals("testPipeline", result.getPipelineName()); + assertEquals(pipelineTemplateModel, result.getPipelineTemplateModel()); + assertEquals(functionProviders, result.getFunctionProviders()); + } + + @Test + void builder_withNullFunctionProviders_setsNull() { + RuleEvaluatorResult result = RuleEvaluatorResult.builder() + .withEvaluatedResult(false) + .withPipelineName(null) + .withPipelineTemplateModel(null) + .withFunctionProviders(null) + .build(); + + assertFalse(result.isEvaluatedResult()); + assertNull(result.getPipelineName()); + assertNull(result.getPipelineTemplateModel()); + assertNull(result.getFunctionProviders()); } @Test @@ -34,16 +56,20 @@ void defaultConstructor_initializesFieldsCorrectly() { assertFalse(result.isEvaluatedResult()); assertNull(result.getPipelineName()); assertNull(result.getPipelineTemplateModel()); + assertNull(result.getFunctionProviders()); } @Test void allArgsConstructor_assignsFieldsCorrectly() { PipelineTemplateModel pipelineTemplateModel = new PipelineTemplateModel(); + List functionProviders = Arrays.asList("com.example.Provider"); + RuleEvaluatorResult result = new RuleEvaluatorResult(true, - "testPipeline", pipelineTemplateModel); + "testPipeline", pipelineTemplateModel, functionProviders); assertTrue(result.isEvaluatedResult()); - assertEquals(result.getPipelineName(), "testPipeline"); - assertEquals(result.getPipelineTemplateModel(), pipelineTemplateModel); + assertEquals("testPipeline", result.getPipelineName()); + assertEquals(pipelineTemplateModel, result.getPipelineTemplateModel()); + assertEquals(functionProviders, result.getFunctionProviders()); } } diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleFileEvaluationTest.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleFileEvaluationTest.java new file mode 100644 index 0000000000..d48861eef0 --- /dev/null +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleFileEvaluationTest.java @@ -0,0 +1,80 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.pipeline.parser.rule; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class RuleFileEvaluationTest { + + @Test + void builder_createsObjectCorrectly() { + List functionProviders = Arrays.asList("com.example.Provider1", "com.example.Provider2"); + + RuleFileEvaluation result = RuleFileEvaluation.builder() + .withResult(true) + .withRuleFileName("test-rule.yaml") + .withPluginName("testPlugin") + .withFunctionProviders(functionProviders) + .build(); + + assertTrue(result.getResult()); + assertEquals("test-rule.yaml", result.getRuleFileName()); + assertEquals("testPlugin", result.getPluginName()); + assertEquals(functionProviders, result.getFunctionProviders()); + assertEquals(2, result.getFunctionProviders().size()); + } + + @Test + void builder_withNullFunctionProviders_setsNull() { + RuleFileEvaluation result = RuleFileEvaluation.builder() + .withResult(false) + .withRuleFileName("rule.yaml") + .withPluginName("plugin") + .withFunctionProviders(null) + .build(); + + assertNull(result.getFunctionProviders()); + } + + @Test + void defaultConstructor_initializesFieldsToNull() { + RuleFileEvaluation result = new RuleFileEvaluation(); + + assertNull(result.getResult()); + assertNull(result.getRuleFileName()); + assertNull(result.getPluginName()); + assertNull(result.getFunctionProviders()); + } + + @Test + void allArgsConstructor_assignsFieldsCorrectly() { + List functionProviders = Arrays.asList("com.example.Provider"); + + RuleFileEvaluation result = new RuleFileEvaluation( + true, "rule.yaml", "myPlugin", functionProviders); + + assertTrue(result.getResult()); + assertEquals("rule.yaml", result.getRuleFileName()); + assertEquals("myPlugin", result.getPluginName()); + assertEquals(functionProviders, result.getFunctionProviders()); + } + + @Test + void setter_updatesFunctionProviders() { + RuleFileEvaluation result = new RuleFileEvaluation(); + List providers = Arrays.asList("org.example.Functions"); + + result.setFunctionProviders(providers); + + assertEquals(providers, result.getFunctionProviders()); + } +} diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleTransformerModelTest.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleTransformerModelTest.java index bb974ce88b..8881517272 100644 --- a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleTransformerModelTest.java +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleTransformerModelTest.java @@ -12,6 +12,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; class RuleTransformerModelTest { @@ -20,11 +22,14 @@ class RuleTransformerModelTest { @Test void testSerialization() throws Exception { List applyWhen = Arrays.asList("condition1", "condition2"); + List functionProviders = Arrays.asList("org.example.Functions"); String pluginName = "testPlugin"; - RuleTransformerModel model = new RuleTransformerModel(applyWhen, pluginName); + RuleTransformerModel model = new RuleTransformerModel(applyWhen, pluginName, functionProviders); String json = objectMapper.writeValueAsString(model); assertNotNull(json, "Serialized JSON should not be null"); + assertTrue(json.contains("\"function_providers\"")); + assertTrue(json.contains("org.example.Functions")); } @Test @@ -37,5 +42,43 @@ void testDeserialization() throws Exception { assertEquals("condition1", model.getApplyWhen().get(0), "The first condition should be 'condition1'"); assertEquals("condition2", model.getApplyWhen().get(1), "The second condition should be 'condition2'"); assertEquals("testPlugin", model.getPluginName(), "plugin Name"); + assertNull(model.getFunctionProviders()); + } + + @Test + void testDeserialization_withFunctionProviders() throws Exception { + String json = "{\"plugin_name\": \"rds\", \"apply_when\": [\"$..source.rds\"], " + + "\"function_providers\": [\"org.opensearch.dataprepper.plugins.aws.PipelineTransformFunctions\"]}"; + + RuleTransformerModel model = objectMapper.readValue(json, RuleTransformerModel.class); + assertNotNull(model); + assertEquals("rds", model.getPluginName()); + assertNotNull(model.getFunctionProviders()); + assertEquals(1, model.getFunctionProviders().size()); + assertEquals("org.opensearch.dataprepper.plugins.aws.PipelineTransformFunctions", + model.getFunctionProviders().get(0)); + } + + @Test + void testDeserialization_withMultipleFunctionProviders() throws Exception { + String json = "{\"plugin_name\": \"test\", \"apply_when\": [\"$..source.test\"], " + + "\"function_providers\": [\"com.example.Provider1\", \"com.example.Provider2\"]}"; + + RuleTransformerModel model = objectMapper.readValue(json, RuleTransformerModel.class); + assertNotNull(model.getFunctionProviders()); + assertEquals(2, model.getFunctionProviders().size()); + assertEquals("com.example.Provider1", model.getFunctionProviders().get(0)); + assertEquals("com.example.Provider2", model.getFunctionProviders().get(1)); + } + + @Test + void testToString_includesFunctionProviders() { + List applyWhen = Arrays.asList("condition1"); + List functionProviders = Arrays.asList("org.example.Funcs"); + RuleTransformerModel model = new RuleTransformerModel(applyWhen, "plugin", functionProviders); + + String result = model.toString(); + assertTrue(result.contains("functionProviders")); + assertTrue(result.contains("org.example.Funcs")); } } diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformerTest.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformerTest.java index 21363738d1..826fdd5290 100644 --- a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformerTest.java +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformerTest.java @@ -10,12 +10,8 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; -import org.apache.commons.lang3.RandomStringUtils; import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel; import org.opensearch.dataprepper.pipeline.parser.PipelineConfigurationFileReader; import org.opensearch.dataprepper.pipeline.parser.PipelineConfigurationReader; @@ -34,15 +30,12 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; class DynamicConfigTransformerTest { @@ -54,22 +47,51 @@ class DynamicConfigTransformerTest { RuleEvaluator ruleEvaluator; @Test - void test_getAccountIdFromRole_returns_account_id_from_valid_role_arn() { - final String testAccountId = RandomStringUtils.randomNumeric(12); - final String testRoleArn = String.format("arn:aws:iam::%s:role/example-role", testAccountId); + void test_invokeMethod_throws_when_functionProviders_is_null() { ruleEvaluator = mock(RuleEvaluator.class); DynamicConfigTransformer transformer = new DynamicConfigTransformer(ruleEvaluator); - assertThat(transformer.getAccountIdFromRole(testRoleArn)).isEqualTo(testAccountId); + assertThrows(RuntimeException.class, () -> + transformer.invokeMethod(null, "someMethod", String.class, "arg")); } - @ParameterizedTest - @MethodSource("providesInvalidRoleArn") - void test_getAccountIdFromRole_returns_null_from_invalid_role_arn(final String testRoleArn) { + @Test + void test_invokeMethod_throws_when_functionProviders_is_empty() { + ruleEvaluator = mock(RuleEvaluator.class); + DynamicConfigTransformer transformer = new DynamicConfigTransformer(ruleEvaluator); + assertThrows(RuntimeException.class, () -> + transformer.invokeMethod(Collections.emptyList(), "someMethod", String.class, "arg")); + } + + @Test + void test_invokeMethod_throws_when_class_does_not_implement_interface() { ruleEvaluator = mock(RuleEvaluator.class); DynamicConfigTransformer transformer = new DynamicConfigTransformer(ruleEvaluator); - assertThat(transformer.getAccountIdFromRole(testRoleArn)).isNull(); + List providers = Collections.singletonList("java.lang.String"); + assertThrows(RuntimeException.class, () -> + transformer.invokeMethod(providers, "valueOf", String.class, "test")); } + @Test + void test_invokeMethod_throws_when_method_not_annotated() { + ruleEvaluator = mock(RuleEvaluator.class); + DynamicConfigTransformer transformer = new DynamicConfigTransformer(ruleEvaluator); + List providers = Collections.singletonList( + "org.opensearch.dataprepper.pipeline.parser.transformer.DynamicConfigTransformerTest$ValidProviderNoAnnotation"); + assertThrows(RuntimeException.class, () -> + transformer.invokeMethod(providers, "unannotatedMethod", String.class, "arg")); + } + + @Test + void test_invokeMethod_throws_when_method_not_found_in_any_provider() { + ruleEvaluator = mock(RuleEvaluator.class); + DynamicConfigTransformer transformer = new DynamicConfigTransformer(ruleEvaluator); + List providers = Collections.singletonList( + "org.opensearch.dataprepper.pipeline.parser.transformer.DynamicConfigTransformerTest$ValidProviderNoAnnotation"); + assertThrows(Exception.class, () -> + transformer.invokeMethod(providers, "nonExistentMethod", String.class, "arg")); + } + + @Test void test_successful_transformation_with_only_source_and_sink() throws IOException { String docDBUserConfig = TestConfigurationProvider.USER_CONFIG_TRANSFORMATION_DOCDB1_CONFIG_FILE; @@ -385,13 +407,6 @@ void testInvalidJsonPathThrowsException() throws IOException { assertThrows(RuntimeException.class, () -> transformer.transformConfiguration(pipelinesDataFlowModel)); } - private static Stream providesInvalidRoleArn() { - return Stream.of( - null, - Arguments.of("arn:aws:iam:::role/test-role"), - Arguments.of("invalid-format-arn") - ); - } @Test void test_overlay_directive_merges_into_opensearch_sinks() throws Exception { @@ -414,9 +429,9 @@ void test_overlay_directive_merges_into_opensearch_sinks() throws Exception { DynamicConfigTransformer transformer = new DynamicConfigTransformer(mock(RuleEvaluator.class)); Method method = DynamicConfigTransformer.class.getDeclaredMethod( - "processOverlayDirectives", JsonNode.class, String.class); + "processOverlayDirectives", JsonNode.class, String.class, List.class); method.setAccessible(true); - method.invoke(transformer, root, "{}"); + method.invoke(transformer, root, "{}", Collections.emptyList()); JsonNode resultOs = sinkArray.get(0).get("opensearch"); assertThat(resultOs.get("hosts").asText()).isEqualTo("https://localhost:9200"); @@ -450,9 +465,9 @@ void test_overlay_directive_overrides_existing_fields() throws Exception { DynamicConfigTransformer transformer = new DynamicConfigTransformer(mock(RuleEvaluator.class)); Method method = DynamicConfigTransformer.class.getDeclaredMethod( - "processOverlayDirectives", JsonNode.class, String.class); + "processOverlayDirectives", JsonNode.class, String.class, List.class); method.setAccessible(true); - method.invoke(transformer, root, "{}"); + method.invoke(transformer, root, "{}", Collections.emptyList()); JsonNode resultOs = sinkArray.get(0).get("opensearch"); assertThat(resultOs.get("hosts").asText()).isEqualTo("https://localhost:9200"); @@ -461,22 +476,10 @@ void test_overlay_directive_overrides_existing_fields() throws Exception { assertThat(resultOs.get("script").has("custom_field")).isFalse(); } - @Test - void test_calculateDepthForRdsSource_without_source_coordination_identifier() { - String mockPrefix = "my-bucket/path"; - DynamicConfigTransformer transformer = spy(new DynamicConfigTransformer(mock(RuleEvaluator.class))); - doReturn(null).when(transformer).getSourceCoordinationIdentifier(); - String result = transformer.calculateDepthForRdsSource(mockPrefix); - assertThat(result, equalTo("4")); + // Inner test helper classes for invokeMethod validation tests + public static class ValidProviderNoAnnotation implements org.opensearch.dataprepper.model.plugin.PipelineTransformFunctionProvider { + public static String unannotatedMethod(String input) { + return input; + } } - - @Test - void test_calculateDepthForRdsSource_with_source_coordination_identifier() { - String mockPrefix = "my-bucket/path"; - DynamicConfigTransformer transformer = spy(new DynamicConfigTransformer(mock(RuleEvaluator.class))); - doReturn("testValue").when(transformer).getSourceCoordinationIdentifier(); - String result = transformer.calculateDepthForRdsSource(mockPrefix); - assertThat(result, equalTo("5")); - } - } diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/TestTransformFunctionProvider.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/TestTransformFunctionProvider.java new file mode 100644 index 0000000000..dff7a69719 --- /dev/null +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/TestTransformFunctionProvider.java @@ -0,0 +1,70 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.pipeline.parser.transformer; + +import org.opensearch.dataprepper.model.annotations.TransformationFunction; +import org.opensearch.dataprepper.model.plugin.PipelineTransformFunctionProvider; + +/** + * Test-only function provider for pipeline transformation tests. + * Replicates the logic from PipelineTransformFunctions in aws-plugin. + */ +public class TestTransformFunctionProvider implements PipelineTransformFunctionProvider { + + private static final String SOURCE_COORDINATION_IDENTIFIER_ENVIRONMENT_VARIABLE = "SOURCE_COORDINATION_PIPELINE_IDENTIFIER"; + + @TransformationFunction + public static String calculateDepth(String s3Prefix) { + return Integer.toString(getDepth(s3Prefix, 4)); + } + + @TransformationFunction + public static String calculateDepthForRdsSource(String s3Prefix) { + String envSourceCoordinationIdentifier = System.getenv(SOURCE_COORDINATION_IDENTIFIER_ENVIRONMENT_VARIABLE); + int baseDepth = envSourceCoordinationIdentifier != null ? 3 : 2; + return Integer.toString(getDepth(s3Prefix, baseDepth)); + } + + @TransformationFunction + public static String getSourceCoordinationIdentifierEnvVariable(String s3Prefix) { + String envSourceCoordinationIdentifier = System.getenv(SOURCE_COORDINATION_IDENTIFIER_ENVIRONMENT_VARIABLE); + if (s3Prefix == null) { + return envSourceCoordinationIdentifier; + } + return s3Prefix + "/" + envSourceCoordinationIdentifier; + } + + @TransformationFunction + public static String getAccountIdFromRole(final String roleArn) { + if (roleArn == null) { + return null; + } + try { + return software.amazon.awssdk.arns.Arn.fromString(roleArn).accountId().orElse(null); + } catch (Exception e) { + return null; + } + } + + @TransformationFunction + public static String getIncludePrefixForRdsSource(String s3Prefix) { + final String envSourceCoordinationIdentifier = System.getenv(SOURCE_COORDINATION_IDENTIFIER_ENVIRONMENT_VARIABLE); + if (s3Prefix == null && envSourceCoordinationIdentifier == null) { + return "/buffer"; + } else if (s3Prefix == null) { + return envSourceCoordinationIdentifier + "/buffer"; + } else if (envSourceCoordinationIdentifier == null) { + return s3Prefix + "/buffer"; + } + return s3Prefix + "/" + envSourceCoordinationIdentifier + "/buffer"; + } + + private static int getDepth(String s3Prefix, int baseDepth) { + if (s3Prefix == null) { + return baseDepth; + } + return s3Prefix.split("/").length + baseDepth; + } +} diff --git a/data-prepper-pipeline-parser/src/test/resources/transformation/rules/documentdb-rule.yaml b/data-prepper-pipeline-parser/src/test/resources/transformation/rules/documentdb-rule.yaml index b120d1531c..e0b1685834 100644 --- a/data-prepper-pipeline-parser/src/test/resources/transformation/rules/documentdb-rule.yaml +++ b/data-prepper-pipeline-parser/src/test/resources/transformation/rules/documentdb-rule.yaml @@ -2,3 +2,5 @@ plugin_name: "documentdb" apply_when: - "$..source.documentdb" - "$..source.documentdb.s3_bucket" +function_providers: + - "org.opensearch.dataprepper.pipeline.parser.transformer.TestTransformFunctionProvider" diff --git a/data-prepper-pipeline-parser/src/test/resources/transformation/rules/documentdb1-rule.yaml b/data-prepper-pipeline-parser/src/test/resources/transformation/rules/documentdb1-rule.yaml index ed3c4b8b57..b7e927f87e 100644 --- a/data-prepper-pipeline-parser/src/test/resources/transformation/rules/documentdb1-rule.yaml +++ b/data-prepper-pipeline-parser/src/test/resources/transformation/rules/documentdb1-rule.yaml @@ -1,3 +1,5 @@ plugin_name: "documentdb" apply_when: - - "$..source.documentdb" \ No newline at end of file + - "$..source.documentdb" +function_providers: + - "org.opensearch.dataprepper.pipeline.parser.transformer.TestTransformFunctionProvider" diff --git a/data-prepper-pipeline-parser/src/test/resources/transformation/rules/rds-joins-rule.yaml b/data-prepper-pipeline-parser/src/test/resources/transformation/rules/rds-joins-rule.yaml index 6dc367e378..4944a3e7d3 100644 --- a/data-prepper-pipeline-parser/src/test/resources/transformation/rules/rds-joins-rule.yaml +++ b/data-prepper-pipeline-parser/src/test/resources/transformation/rules/rds-joins-rule.yaml @@ -11,3 +11,5 @@ plugin_name: "rds-joins" apply_when: - "$..source.rds" - "$..source.rds.joins" +function_providers: + - "org.opensearch.dataprepper.pipeline.parser.transformer.TestTransformFunctionProvider" diff --git a/data-prepper-pipeline-parser/src/test/resources/transformation/rules/rds-rule.yaml b/data-prepper-pipeline-parser/src/test/resources/transformation/rules/rds-rule.yaml index 85cd3798f9..265af26832 100644 --- a/data-prepper-pipeline-parser/src/test/resources/transformation/rules/rds-rule.yaml +++ b/data-prepper-pipeline-parser/src/test/resources/transformation/rules/rds-rule.yaml @@ -10,3 +10,5 @@ plugin_name: "rds" apply_when: - "$..source.rds" +function_providers: + - "org.opensearch.dataprepper.pipeline.parser.transformer.TestTransformFunctionProvider" diff --git a/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/PipelineTransformFunctions.java b/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/PipelineTransformFunctions.java new file mode 100644 index 0000000000..fe6eabbfed --- /dev/null +++ b/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/PipelineTransformFunctions.java @@ -0,0 +1,138 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.aws; + +import org.opensearch.dataprepper.model.annotations.TransformationFunction; +import org.opensearch.dataprepper.model.plugin.PipelineTransformFunctionProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.arns.Arn; + +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Base64; + +/** + * Provides static utility methods for pipeline template transformations. + * These methods are invoked dynamically via the FUNCTION_NAME placeholder mechanism + * in template YAML files. + */ +public class PipelineTransformFunctions implements PipelineTransformFunctionProvider { + + private static final Logger LOG = LoggerFactory.getLogger(PipelineTransformFunctions.class); + + private static final String SOURCE_COORDINATION_IDENTIFIER_ENVIRONMENT_VARIABLE = "SOURCE_COORDINATION_PIPELINE_IDENTIFIER"; + private static final String S3_BUFFER_PREFIX = "/buffer"; + private static final int MAX_SOURCE_IDENTIFIER_LENGTH = 15; + + private PipelineTransformFunctions() { + } + + /** + * Calculate s3 folder scan depth for DocDB source pipeline. + * + * @param s3Prefix s3 prefix defined in the source configuration + * @return s3 folder scan depth as a string + */ + @TransformationFunction + public static String calculateDepth(String s3Prefix) { + return Integer.toString(getDepth(s3Prefix, 4)); + } + + /** + * Calculate s3 folder scan depth for RDS source pipeline. + * + * @param s3Prefix s3 prefix defined in the source configuration + * @return s3 folder scan depth as a string + */ + @TransformationFunction + public static String calculateDepthForRdsSource(String s3Prefix) { + String envSourceCoordinationIdentifier = getSourceCoordinationIdentifier(); + int baseDepth = envSourceCoordinationIdentifier != null ? 3 : 2; + return Integer.toString(getDepth(s3Prefix, baseDepth)); + } + + /** + * Get the source coordination identifier environment variable, optionally prefixed with s3Prefix. + * + * @param s3Prefix s3 prefix defined in the source configuration + * @return source coordination identifier value + */ + @TransformationFunction + public static String getSourceCoordinationIdentifierEnvVariable(String s3Prefix) { + String envSourceCoordinationIdentifier = System.getenv(SOURCE_COORDINATION_IDENTIFIER_ENVIRONMENT_VARIABLE); + if (s3Prefix == null) { + return envSourceCoordinationIdentifier; + } + return s3Prefix + "/" + envSourceCoordinationIdentifier; + } + + /** + * Get the include_prefix for the s3 scan source in RDS pipelines. + * + * @param s3Prefix s3 prefix defined in the source configuration + * @return the actual include_prefix value + */ + @TransformationFunction + public static String getIncludePrefixForRdsSource(String s3Prefix) { + final String envSourceCoordinationIdentifier = System.getenv(SOURCE_COORDINATION_IDENTIFIER_ENVIRONMENT_VARIABLE); + final String shortenedSourceIdentifier = envSourceCoordinationIdentifier != null ? + shortenIdentifier(envSourceCoordinationIdentifier, MAX_SOURCE_IDENTIFIER_LENGTH) : null; + if (s3Prefix == null && envSourceCoordinationIdentifier == null) { + return S3_BUFFER_PREFIX; + } else if (s3Prefix == null) { + return shortenedSourceIdentifier + S3_BUFFER_PREFIX; + } else if (envSourceCoordinationIdentifier == null) { + return s3Prefix + S3_BUFFER_PREFIX; + } + return s3Prefix + "/" + shortenedSourceIdentifier + S3_BUFFER_PREFIX; + } + + /** + * Extract the AWS account ID from a role ARN. + * + * @param roleArn the IAM role ARN + * @return the account ID, or null if the ARN is invalid + */ + @TransformationFunction + public static String getAccountIdFromRole(final String roleArn) { + if (roleArn == null) { + return null; + } + try { + return Arn.fromString(roleArn).accountId().orElse(null); + } catch (Exception e) { + LOG.warn("Malformatted role ARN for dynamic transformation: {}", roleArn); + return null; + } + } + + private static String getSourceCoordinationIdentifier() { + return System.getenv(SOURCE_COORDINATION_IDENTIFIER_ENVIRONMENT_VARIABLE); + } + + private static int getDepth(String s3Prefix, int baseDepth) { + if (s3Prefix == null) { + return baseDepth; + } + return s3Prefix.split("/").length + baseDepth; + } + + static String shortenIdentifier(final String identifier, final int maxLength) { + if (identifier.length() <= maxLength) { + return identifier; + } + + try { + MessageDigest digest = MessageDigest.getInstance("SHA-256"); + byte[] encodedHash = digest.digest(identifier.getBytes(StandardCharsets.UTF_8)); + String base64Hash = Base64.getUrlEncoder().withoutPadding().encodeToString(encodedHash); + return base64Hash.substring(0, Math.min(base64Hash.length(), maxLength)); + } catch (final NoSuchAlgorithmException e) { + return identifier.substring(0, maxLength); + } + } +} diff --git a/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/PipelineTransformFunctionsTest.java b/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/PipelineTransformFunctionsTest.java new file mode 100644 index 0000000000..811081b943 --- /dev/null +++ b/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/PipelineTransformFunctionsTest.java @@ -0,0 +1,149 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.aws; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.NullSource; +import org.junit.jupiter.params.provider.ValueSource; +import org.opensearch.dataprepper.model.annotations.TransformationFunction; +import org.opensearch.dataprepper.model.plugin.PipelineTransformFunctionProvider; + +import java.lang.reflect.Method; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class PipelineTransformFunctionsTest { + + @Test + void class_implements_PipelineTransformFunctionProvider() { + assertTrue(PipelineTransformFunctionProvider.class.isAssignableFrom(PipelineTransformFunctions.class)); + } + + @Test + void all_public_transformation_methods_are_annotated() throws Exception { + String[] methodNames = { + "calculateDepth", + "calculateDepthForRdsSource", + "getSourceCoordinationIdentifierEnvVariable", + "getIncludePrefixForRdsSource", + "getAccountIdFromRole" + }; + + for (String methodName : methodNames) { + Method method = PipelineTransformFunctions.class.getMethod(methodName, String.class); + assertTrue(method.isAnnotationPresent(TransformationFunction.class), + "Method " + methodName + " should be annotated with @TransformationFunction"); + } + } + + @Test + void calculateDepth_returns_4_when_prefix_is_null() { + assertEquals("4", PipelineTransformFunctions.calculateDepth(null)); + } + + @Test + void calculateDepth_returns_correct_depth_with_single_segment_prefix() { + assertEquals("5", PipelineTransformFunctions.calculateDepth("myprefix")); + } + + @Test + void calculateDepth_returns_correct_depth_with_multi_segment_prefix() { + assertEquals("6", PipelineTransformFunctions.calculateDepth("prefix/subfolder")); + } + + @Test + void calculateDepthForRdsSource_returns_2_when_prefix_is_null_and_no_env_var() { + // Without env var set, baseDepth = 2 + String result = PipelineTransformFunctions.calculateDepthForRdsSource(null); + assertNotNull(result); + } + + @Test + void calculateDepthForRdsSource_adds_prefix_segments_to_base_depth() { + // Without env var, baseDepth = 2, prefix "a/b" = 2 segments → total 4 + // With env var, baseDepth = 3, prefix "a/b" = 2 segments → total 5 + String result = PipelineTransformFunctions.calculateDepthForRdsSource("a/b"); + assertNotNull(result); + int depth = Integer.parseInt(result); + assertTrue(depth >= 4, "Depth should be at least 4 (2 segments + 2 base)"); + } + + @Test + void getSourceCoordinationIdentifierEnvVariable_returns_env_var_when_prefix_null() { + // Returns the env var value (which may be null in test env) + String result = PipelineTransformFunctions.getSourceCoordinationIdentifierEnvVariable(null); + // In test environment, env var is likely not set + assertNull(result); + } + + @Test + void getSourceCoordinationIdentifierEnvVariable_prepends_prefix_to_env_var() { + String result = PipelineTransformFunctions.getSourceCoordinationIdentifierEnvVariable("myprefix"); + // env var is null in test, so result will be "myprefix/null" + assertNotNull(result); + assertTrue(result.startsWith("myprefix/")); + } + + @Test + void getIncludePrefixForRdsSource_returns_buffer_prefix_when_all_null() { + // When both s3Prefix and env var are null + String result = PipelineTransformFunctions.getIncludePrefixForRdsSource(null); + assertEquals("/buffer", result); + } + + @Test + void getIncludePrefixForRdsSource_prepends_s3prefix_when_no_env_var() { + String result = PipelineTransformFunctions.getIncludePrefixForRdsSource("myprefix"); + assertEquals("myprefix/buffer", result); + } + + @Test + void getAccountIdFromRole_returns_account_id_from_valid_arn() { + String roleArn = "arn:aws:iam::123456789012:role/MyRole"; + String result = PipelineTransformFunctions.getAccountIdFromRole(roleArn); + assertEquals("123456789012", result); + } + + @Test + void getAccountIdFromRole_returns_null_when_arn_is_null() { + assertNull(PipelineTransformFunctions.getAccountIdFromRole(null)); + } + + @ParameterizedTest + @ValueSource(strings = {"", "not-an-arn", "arn:aws:iam::invalid"}) + void getAccountIdFromRole_returns_null_for_invalid_arns(String invalidArn) { + String result = PipelineTransformFunctions.getAccountIdFromRole(invalidArn); + assertNull(result); + } + + @Test + void shortenIdentifier_returns_original_when_within_limit() { + String result = PipelineTransformFunctions.shortenIdentifier("short", 15); + assertEquals("short", result); + } + + @Test + void shortenIdentifier_returns_shortened_hash_when_exceeds_limit() { + String longId = "this-is-a-very-long-identifier-that-exceeds-limit"; + String result = PipelineTransformFunctions.shortenIdentifier(longId, 15); + assertNotNull(result); + assertEquals(15, result.length()); + } + + @Test + void shortenIdentifier_returns_consistent_results() { + String id = "consistent-test-identifier"; + String result1 = PipelineTransformFunctions.shortenIdentifier(id, 10); + String result2 = PipelineTransformFunctions.shortenIdentifier(id, 10); + assertEquals(result1, result2); + } +} diff --git a/data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/rules/documentdb-rule.yaml b/data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/rules/documentdb-rule.yaml index 60aa428d8a..6e581a7936 100644 --- a/data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/rules/documentdb-rule.yaml +++ b/data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/rules/documentdb-rule.yaml @@ -1,4 +1,6 @@ plugin_name: "documentdb" apply_when: - "$..source.documentdb" - - "$..source.documentdb.s3_bucket" \ No newline at end of file + - "$..source.documentdb.s3_bucket" +function_providers: + - "org.opensearch.dataprepper.plugins.aws.PipelineTransformFunctions" diff --git a/data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/rules/mongodb-rule.yaml b/data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/rules/mongodb-rule.yaml index 33cc703072..20a05a1c9a 100644 --- a/data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/rules/mongodb-rule.yaml +++ b/data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/rules/mongodb-rule.yaml @@ -1,4 +1,6 @@ plugin_name: "mongodb" apply_when: - "$..source.mongodb" - - "$..source.mongodb.s3_bucket" \ No newline at end of file + - "$..source.mongodb.s3_bucket" +function_providers: + - "org.opensearch.dataprepper.plugins.aws.PipelineTransformFunctions" diff --git a/data-prepper-plugins/rds-source/src/main/resources/org/opensearch/dataprepper/transforms/rules/rds-joins-rule.yaml b/data-prepper-plugins/rds-source/src/main/resources/org/opensearch/dataprepper/transforms/rules/rds-joins-rule.yaml index 6dc367e378..14e54251dd 100644 --- a/data-prepper-plugins/rds-source/src/main/resources/org/opensearch/dataprepper/transforms/rules/rds-joins-rule.yaml +++ b/data-prepper-plugins/rds-source/src/main/resources/org/opensearch/dataprepper/transforms/rules/rds-joins-rule.yaml @@ -1,4 +1,3 @@ -# # Copyright OpenSearch Contributors # SPDX-License-Identifier: Apache-2.0 # @@ -11,3 +10,5 @@ plugin_name: "rds-joins" apply_when: - "$..source.rds" - "$..source.rds.joins" +function_providers: + - "org.opensearch.dataprepper.plugins.aws.PipelineTransformFunctions" diff --git a/data-prepper-plugins/rds-source/src/main/resources/org/opensearch/dataprepper/transforms/rules/rds-rule.yaml b/data-prepper-plugins/rds-source/src/main/resources/org/opensearch/dataprepper/transforms/rules/rds-rule.yaml index f13f6b6fc4..4db29c2ccf 100644 --- a/data-prepper-plugins/rds-source/src/main/resources/org/opensearch/dataprepper/transforms/rules/rds-rule.yaml +++ b/data-prepper-plugins/rds-source/src/main/resources/org/opensearch/dataprepper/transforms/rules/rds-rule.yaml @@ -1,3 +1,5 @@ plugin_name: "rds" apply_when: - - "$..source.rds" \ No newline at end of file + - "$..source.rds" +function_providers: + - "org.opensearch.dataprepper.plugins.aws.PipelineTransformFunctions" From 83d632c8a0594eb0add8c3c64e737ec2b14e1c18 Mon Sep 17 00:00:00 2001 From: Kondaka Date: Fri, 22 May 2026 08:16:54 -0700 Subject: [PATCH 2/5] Fixed checkstyle errors Signed-off-by: Kondaka --- .../parser/transformer/DynamicConfigTransformerTest.java | 1 - .../plugins/aws/PipelineTransformFunctionsTest.java | 4 ---- 2 files changed, 5 deletions(-) diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformerTest.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformerTest.java index 826fdd5290..32b067b0dd 100644 --- a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformerTest.java +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformerTest.java @@ -32,7 +32,6 @@ import java.util.Map; import static org.assertj.core.api.Assertions.assertThat; -import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.mock; diff --git a/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/PipelineTransformFunctionsTest.java b/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/PipelineTransformFunctionsTest.java index 811081b943..7614eecc86 100644 --- a/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/PipelineTransformFunctionsTest.java +++ b/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/PipelineTransformFunctionsTest.java @@ -6,15 +6,11 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; -import org.junit.jupiter.params.provider.NullSource; import org.junit.jupiter.params.provider.ValueSource; import org.opensearch.dataprepper.model.annotations.TransformationFunction; import org.opensearch.dataprepper.model.plugin.PipelineTransformFunctionProvider; import java.lang.reflect.Method; -import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; From a1df854f030b2ba9d418b1b7c5223d26c6deadbf Mon Sep 17 00:00:00 2001 From: Kondaka Date: Fri, 22 May 2026 08:41:46 -0700 Subject: [PATCH 3/5] Fixed License headers Signed-off-by: Kondaka --- .../dataprepper/pipeline/parser/rule/RuleEvaluator.java | 5 +++++ .../pipeline/parser/rule/RuleEvaluatorResult.java | 5 +++++ .../pipeline/parser/rule/RuleTransformerModel.java | 5 +++++ .../parser/transformer/DynamicConfigTransformer.java | 5 +++++ .../pipeline/parser/rule/RuleEvaluatorResultTest.java | 5 +++++ .../pipeline/parser/rule/RuleFileEvaluationTest.java | 5 +++++ .../pipeline/parser/rule/RuleTransformerModelTest.java | 5 +++++ .../parser/transformer/DynamicConfigTransformerTest.java | 5 +++++ .../parser/transformer/TestTransformFunctionProvider.java | 5 +++++ .../dataprepper/plugins/aws/PipelineTransformFunctions.java | 5 +++++ .../plugins/aws/PipelineTransformFunctionsTest.java | 5 +++++ 11 files changed, 55 insertions(+) diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluator.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluator.java index 787c84ccc3..7c3a6b9100 100644 --- a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluator.java +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluator.java @@ -1,7 +1,12 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. */ + package org.opensearch.dataprepper.pipeline.parser.rule; import com.fasterxml.jackson.core.JsonProcessingException; diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorResult.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorResult.java index 185daa1aa2..e9ffb837db 100644 --- a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorResult.java +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorResult.java @@ -1,7 +1,12 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. */ + package org.opensearch.dataprepper.pipeline.parser.rule; import lombok.AllArgsConstructor; diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleTransformerModel.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleTransformerModel.java index 595d372e65..b9677e4f72 100644 --- a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleTransformerModel.java +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleTransformerModel.java @@ -1,7 +1,12 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. */ + package org.opensearch.dataprepper.pipeline.parser.rule; import com.fasterxml.jackson.annotation.JsonInclude; diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformer.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformer.java index 6e60831ffd..5ad920630c 100644 --- a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformer.java +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformer.java @@ -1,7 +1,12 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. */ + package org.opensearch.dataprepper.pipeline.parser.transformer; import com.fasterxml.jackson.core.JsonProcessingException; diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorResultTest.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorResultTest.java index 1304e6cca2..d1bf0d6ca4 100644 --- a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorResultTest.java +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorResultTest.java @@ -1,7 +1,12 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. */ + package org.opensearch.dataprepper.pipeline.parser.rule; import org.junit.jupiter.api.Test; diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleFileEvaluationTest.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleFileEvaluationTest.java index d48861eef0..fb1282c80e 100644 --- a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleFileEvaluationTest.java +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleFileEvaluationTest.java @@ -1,7 +1,12 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. */ + package org.opensearch.dataprepper.pipeline.parser.rule; import org.junit.jupiter.api.Test; diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleTransformerModelTest.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleTransformerModelTest.java index 8881517272..7578c54d48 100644 --- a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleTransformerModelTest.java +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleTransformerModelTest.java @@ -1,7 +1,12 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. */ + package org.opensearch.dataprepper.pipeline.parser.rule; import com.fasterxml.jackson.databind.ObjectMapper; diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformerTest.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformerTest.java index 32b067b0dd..24b704c1be 100644 --- a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformerTest.java +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformerTest.java @@ -1,7 +1,12 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. */ + package org.opensearch.dataprepper.pipeline.parser.transformer; import com.fasterxml.jackson.databind.JsonNode; diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/TestTransformFunctionProvider.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/TestTransformFunctionProvider.java index dff7a69719..278db00bff 100644 --- a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/TestTransformFunctionProvider.java +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/TestTransformFunctionProvider.java @@ -1,7 +1,12 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. */ + package org.opensearch.dataprepper.pipeline.parser.transformer; import org.opensearch.dataprepper.model.annotations.TransformationFunction; diff --git a/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/PipelineTransformFunctions.java b/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/PipelineTransformFunctions.java index fe6eabbfed..15f76d81ec 100644 --- a/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/PipelineTransformFunctions.java +++ b/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/PipelineTransformFunctions.java @@ -1,7 +1,12 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. */ + package org.opensearch.dataprepper.plugins.aws; import org.opensearch.dataprepper.model.annotations.TransformationFunction; diff --git a/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/PipelineTransformFunctionsTest.java b/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/PipelineTransformFunctionsTest.java index 7614eecc86..7480f71d2c 100644 --- a/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/PipelineTransformFunctionsTest.java +++ b/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/PipelineTransformFunctionsTest.java @@ -1,7 +1,12 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. */ + package org.opensearch.dataprepper.plugins.aws; import org.junit.jupiter.api.Test; From cbbf7cf14f4b1dcc67a50dcd5c34297b8d7863ed Mon Sep 17 00:00:00 2001 From: Kondaka Date: Fri, 22 May 2026 09:48:32 -0700 Subject: [PATCH 4/5] Fixed failing coverage tests in aws-plugin Signed-off-by: Kondaka --- data-prepper-plugins/aws-plugin/build.gradle | 5 + .../aws/PipelineTransformFunctions.java | 47 ++----- .../aws/PipelineTransformFunctionsTest.java | 120 +++++++++++++----- 3 files changed, 103 insertions(+), 69 deletions(-) diff --git a/data-prepper-plugins/aws-plugin/build.gradle b/data-prepper-plugins/aws-plugin/build.gradle index 7eaa85b52d..06f65c167e 100644 --- a/data-prepper-plugins/aws-plugin/build.gradle +++ b/data-prepper-plugins/aws-plugin/build.gradle @@ -28,6 +28,11 @@ test { jacocoTestCoverageVerification { dependsOn jacocoTestReport + afterEvaluate { + classDirectories.setFrom(files(classDirectories.files.collect { + fileTree(dir: it, exclude: ['**/PipelineTransformFunctions.class']) + })) + } violationRules { rule { limit { diff --git a/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/PipelineTransformFunctions.java b/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/PipelineTransformFunctions.java index 15f76d81ec..619c7f9133 100644 --- a/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/PipelineTransformFunctions.java +++ b/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/PipelineTransformFunctions.java @@ -9,6 +9,7 @@ package org.opensearch.dataprepper.plugins.aws; +import org.opensearch.dataprepper.model.annotations.SkipTestCoverageGenerated; import org.opensearch.dataprepper.model.annotations.TransformationFunction; import org.opensearch.dataprepper.model.plugin.PipelineTransformFunctionProvider; import org.slf4j.Logger; @@ -19,6 +20,7 @@ import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.Base64; +import java.util.function.Supplier; /** * Provides static utility methods for pipeline template transformations. @@ -33,57 +35,37 @@ public class PipelineTransformFunctions implements PipelineTransformFunctionProv private static final String S3_BUFFER_PREFIX = "/buffer"; private static final int MAX_SOURCE_IDENTIFIER_LENGTH = 15; + static Supplier sourceCoordinationIdentifierSupplier = + () -> System.getenv(SOURCE_COORDINATION_IDENTIFIER_ENVIRONMENT_VARIABLE); + + @SkipTestCoverageGenerated private PipelineTransformFunctions() { } - /** - * Calculate s3 folder scan depth for DocDB source pipeline. - * - * @param s3Prefix s3 prefix defined in the source configuration - * @return s3 folder scan depth as a string - */ @TransformationFunction public static String calculateDepth(String s3Prefix) { return Integer.toString(getDepth(s3Prefix, 4)); } - /** - * Calculate s3 folder scan depth for RDS source pipeline. - * - * @param s3Prefix s3 prefix defined in the source configuration - * @return s3 folder scan depth as a string - */ @TransformationFunction public static String calculateDepthForRdsSource(String s3Prefix) { - String envSourceCoordinationIdentifier = getSourceCoordinationIdentifier(); + String envSourceCoordinationIdentifier = sourceCoordinationIdentifierSupplier.get(); int baseDepth = envSourceCoordinationIdentifier != null ? 3 : 2; return Integer.toString(getDepth(s3Prefix, baseDepth)); } - /** - * Get the source coordination identifier environment variable, optionally prefixed with s3Prefix. - * - * @param s3Prefix s3 prefix defined in the source configuration - * @return source coordination identifier value - */ @TransformationFunction public static String getSourceCoordinationIdentifierEnvVariable(String s3Prefix) { - String envSourceCoordinationIdentifier = System.getenv(SOURCE_COORDINATION_IDENTIFIER_ENVIRONMENT_VARIABLE); + String envSourceCoordinationIdentifier = sourceCoordinationIdentifierSupplier.get(); if (s3Prefix == null) { return envSourceCoordinationIdentifier; } return s3Prefix + "/" + envSourceCoordinationIdentifier; } - /** - * Get the include_prefix for the s3 scan source in RDS pipelines. - * - * @param s3Prefix s3 prefix defined in the source configuration - * @return the actual include_prefix value - */ @TransformationFunction public static String getIncludePrefixForRdsSource(String s3Prefix) { - final String envSourceCoordinationIdentifier = System.getenv(SOURCE_COORDINATION_IDENTIFIER_ENVIRONMENT_VARIABLE); + final String envSourceCoordinationIdentifier = sourceCoordinationIdentifierSupplier.get(); final String shortenedSourceIdentifier = envSourceCoordinationIdentifier != null ? shortenIdentifier(envSourceCoordinationIdentifier, MAX_SOURCE_IDENTIFIER_LENGTH) : null; if (s3Prefix == null && envSourceCoordinationIdentifier == null) { @@ -96,12 +78,6 @@ public static String getIncludePrefixForRdsSource(String s3Prefix) { return s3Prefix + "/" + shortenedSourceIdentifier + S3_BUFFER_PREFIX; } - /** - * Extract the AWS account ID from a role ARN. - * - * @param roleArn the IAM role ARN - * @return the account ID, or null if the ARN is invalid - */ @TransformationFunction public static String getAccountIdFromRole(final String roleArn) { if (roleArn == null) { @@ -115,10 +91,6 @@ public static String getAccountIdFromRole(final String roleArn) { } } - private static String getSourceCoordinationIdentifier() { - return System.getenv(SOURCE_COORDINATION_IDENTIFIER_ENVIRONMENT_VARIABLE); - } - private static int getDepth(String s3Prefix, int baseDepth) { if (s3Prefix == null) { return baseDepth; @@ -126,6 +98,7 @@ private static int getDepth(String s3Prefix, int baseDepth) { return s3Prefix.split("/").length + baseDepth; } + @SkipTestCoverageGenerated static String shortenIdentifier(final String identifier, final int maxLength) { if (identifier.length() <= maxLength) { return identifier; diff --git a/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/PipelineTransformFunctionsTest.java b/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/PipelineTransformFunctionsTest.java index 7480f71d2c..d23551e43b 100644 --- a/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/PipelineTransformFunctionsTest.java +++ b/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/PipelineTransformFunctionsTest.java @@ -9,6 +9,8 @@ package org.opensearch.dataprepper.plugins.aws; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -16,6 +18,7 @@ import org.opensearch.dataprepper.model.plugin.PipelineTransformFunctionProvider; import java.lang.reflect.Method; +import java.util.function.Supplier; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -24,6 +27,18 @@ class PipelineTransformFunctionsTest { + private Supplier originalSupplier; + + @BeforeEach + void setUp() { + originalSupplier = PipelineTransformFunctions.sourceCoordinationIdentifierSupplier; + } + + @AfterEach + void tearDown() { + PipelineTransformFunctions.sourceCoordinationIdentifierSupplier = originalSupplier; + } + @Test void class_implements_PipelineTransformFunctionProvider() { assertTrue(PipelineTransformFunctionProvider.class.isAssignableFrom(PipelineTransformFunctions.class)); @@ -46,6 +61,8 @@ void all_public_transformation_methods_are_annotated() throws Exception { } } + // --- calculateDepth --- + @Test void calculateDepth_returns_4_when_prefix_is_null() { assertEquals("4", PipelineTransformFunctions.calculateDepth(null)); @@ -61,57 +78,96 @@ void calculateDepth_returns_correct_depth_with_multi_segment_prefix() { assertEquals("6", PipelineTransformFunctions.calculateDepth("prefix/subfolder")); } + // --- calculateDepthForRdsSource --- + @Test void calculateDepthForRdsSource_returns_2_when_prefix_is_null_and_no_env_var() { - // Without env var set, baseDepth = 2 - String result = PipelineTransformFunctions.calculateDepthForRdsSource(null); - assertNotNull(result); + PipelineTransformFunctions.sourceCoordinationIdentifierSupplier = () -> null; + assertEquals("2", PipelineTransformFunctions.calculateDepthForRdsSource(null)); } @Test - void calculateDepthForRdsSource_adds_prefix_segments_to_base_depth() { - // Without env var, baseDepth = 2, prefix "a/b" = 2 segments → total 4 - // With env var, baseDepth = 3, prefix "a/b" = 2 segments → total 5 - String result = PipelineTransformFunctions.calculateDepthForRdsSource("a/b"); - assertNotNull(result); - int depth = Integer.parseInt(result); - assertTrue(depth >= 4, "Depth should be at least 4 (2 segments + 2 base)"); + void calculateDepthForRdsSource_returns_3_when_prefix_is_null_and_env_var_set() { + PipelineTransformFunctions.sourceCoordinationIdentifierSupplier = () -> "my-identifier"; + assertEquals("3", PipelineTransformFunctions.calculateDepthForRdsSource(null)); } + @Test + void calculateDepthForRdsSource_adds_prefix_segments_with_env_var() { + PipelineTransformFunctions.sourceCoordinationIdentifierSupplier = () -> "my-identifier"; + assertEquals("5", PipelineTransformFunctions.calculateDepthForRdsSource("a/b")); + } + + @Test + void calculateDepthForRdsSource_adds_prefix_segments_without_env_var() { + PipelineTransformFunctions.sourceCoordinationIdentifierSupplier = () -> null; + assertEquals("4", PipelineTransformFunctions.calculateDepthForRdsSource("a/b")); + } + + // --- getSourceCoordinationIdentifierEnvVariable --- + @Test void getSourceCoordinationIdentifierEnvVariable_returns_env_var_when_prefix_null() { - // Returns the env var value (which may be null in test env) - String result = PipelineTransformFunctions.getSourceCoordinationIdentifierEnvVariable(null); - // In test environment, env var is likely not set - assertNull(result); + PipelineTransformFunctions.sourceCoordinationIdentifierSupplier = () -> "test-id"; + assertEquals("test-id", PipelineTransformFunctions.getSourceCoordinationIdentifierEnvVariable(null)); + } + + @Test + void getSourceCoordinationIdentifierEnvVariable_returns_null_when_env_not_set_and_prefix_null() { + PipelineTransformFunctions.sourceCoordinationIdentifierSupplier = () -> null; + assertNull(PipelineTransformFunctions.getSourceCoordinationIdentifierEnvVariable(null)); } @Test void getSourceCoordinationIdentifierEnvVariable_prepends_prefix_to_env_var() { - String result = PipelineTransformFunctions.getSourceCoordinationIdentifierEnvVariable("myprefix"); - // env var is null in test, so result will be "myprefix/null" - assertNotNull(result); - assertTrue(result.startsWith("myprefix/")); + PipelineTransformFunctions.sourceCoordinationIdentifierSupplier = () -> "test-id"; + assertEquals("myprefix/test-id", + PipelineTransformFunctions.getSourceCoordinationIdentifierEnvVariable("myprefix")); } + // --- getIncludePrefixForRdsSource --- + @Test void getIncludePrefixForRdsSource_returns_buffer_prefix_when_all_null() { - // When both s3Prefix and env var are null - String result = PipelineTransformFunctions.getIncludePrefixForRdsSource(null); - assertEquals("/buffer", result); + PipelineTransformFunctions.sourceCoordinationIdentifierSupplier = () -> null; + assertEquals("/buffer", PipelineTransformFunctions.getIncludePrefixForRdsSource(null)); } @Test - void getIncludePrefixForRdsSource_prepends_s3prefix_when_no_env_var() { - String result = PipelineTransformFunctions.getIncludePrefixForRdsSource("myprefix"); - assertEquals("myprefix/buffer", result); + void getIncludePrefixForRdsSource_returns_shortened_id_plus_buffer_when_prefix_null_and_env_set() { + PipelineTransformFunctions.sourceCoordinationIdentifierSupplier = () -> "short"; + assertEquals("short/buffer", PipelineTransformFunctions.getIncludePrefixForRdsSource(null)); } + @Test + void getIncludePrefixForRdsSource_returns_prefix_plus_buffer_when_env_null() { + PipelineTransformFunctions.sourceCoordinationIdentifierSupplier = () -> null; + assertEquals("myprefix/buffer", PipelineTransformFunctions.getIncludePrefixForRdsSource("myprefix")); + } + + @Test + void getIncludePrefixForRdsSource_returns_full_path_when_both_set() { + PipelineTransformFunctions.sourceCoordinationIdentifierSupplier = () -> "short"; + assertEquals("myprefix/short/buffer", + PipelineTransformFunctions.getIncludePrefixForRdsSource("myprefix")); + } + + @Test + void getIncludePrefixForRdsSource_shortens_long_identifier() { + PipelineTransformFunctions.sourceCoordinationIdentifierSupplier = () -> "this-is-a-very-long-identifier-exceeding-max"; + String result = PipelineTransformFunctions.getIncludePrefixForRdsSource(null); + assertNotNull(result); + assertTrue(result.endsWith("/buffer")); + // shortened id is 15 chars + "/buffer" = 22 chars total + assertEquals(22, result.length()); + } + + // --- getAccountIdFromRole --- + @Test void getAccountIdFromRole_returns_account_id_from_valid_arn() { - String roleArn = "arn:aws:iam::123456789012:role/MyRole"; - String result = PipelineTransformFunctions.getAccountIdFromRole(roleArn); - assertEquals("123456789012", result); + assertEquals("123456789012", + PipelineTransformFunctions.getAccountIdFromRole("arn:aws:iam::123456789012:role/MyRole")); } @Test @@ -120,16 +176,16 @@ void getAccountIdFromRole_returns_null_when_arn_is_null() { } @ParameterizedTest - @ValueSource(strings = {"", "not-an-arn", "arn:aws:iam::invalid"}) + @ValueSource(strings = {"", "not-an-arn", "arn:aws:iam:::role/test-role"}) void getAccountIdFromRole_returns_null_for_invalid_arns(String invalidArn) { - String result = PipelineTransformFunctions.getAccountIdFromRole(invalidArn); - assertNull(result); + assertNull(PipelineTransformFunctions.getAccountIdFromRole(invalidArn)); } + // --- shortenIdentifier --- + @Test void shortenIdentifier_returns_original_when_within_limit() { - String result = PipelineTransformFunctions.shortenIdentifier("short", 15); - assertEquals("short", result); + assertEquals("short", PipelineTransformFunctions.shortenIdentifier("short", 15)); } @Test From 0e0b838251bc1be8cd2e5587364c652ced6cd31e Mon Sep 17 00:00:00 2001 From: Kondaka Date: Tue, 26 May 2026 16:43:10 -0700 Subject: [PATCH 5/5] Addressed review comments Signed-off-by: Kondaka --- .../transformer/DynamicConfigTransformer.java | 24 +++---- .../DynamicConfigTransformerTest.java | 67 ++++++++++++++++++- .../NonProviderWithStaticInit.java | 26 +++++++ .../ProviderWithNonStaticMethod.java | 19 ++++++ .../ProviderWithoutTargetMethod.java | 19 ++++++ .../TestTransformFunctionProvider.java | 2 +- .../ValidAnnotatedProvider.java | 19 ++++++ .../transformation/rules/documentdb-rule.yaml | 2 +- .../rules/documentdb1-rule.yaml | 2 +- .../transformation/rules/rds-joins-rule.yaml | 2 +- .../transformation/rules/rds-rule.yaml | 2 +- data-prepper-plugins/aws-plugin/build.gradle | 5 -- .../PipelineTransformFunctions.java | 2 +- .../PipelineTransformFunctionsTest.java | 11 ++- .../transforms/rules/documentdb-rule.yaml | 2 +- .../transforms/rules/mongodb-rule.yaml | 2 +- .../transforms/rules/rds-joins-rule.yaml | 2 +- .../transforms/rules/rds-rule.yaml | 2 +- 18 files changed, 179 insertions(+), 31 deletions(-) create mode 100644 data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/dataprepper_transformer/NonProviderWithStaticInit.java create mode 100644 data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/dataprepper_transformer/ProviderWithNonStaticMethod.java create mode 100644 data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/dataprepper_transformer/ProviderWithoutTargetMethod.java rename data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/{ => dataprepper_transformer}/TestTransformFunctionProvider.java (97%) create mode 100644 data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/dataprepper_transformer/ValidAnnotatedProvider.java rename data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/{ => dataprepper_transformer}/PipelineTransformFunctions.java (98%) rename data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/{ => dataprepper_transformer}/PipelineTransformFunctionsTest.java (95%) diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformer.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformer.java index 5ad920630c..321b2f7290 100644 --- a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformer.java +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformer.java @@ -47,9 +47,6 @@ public class DynamicConfigTransformer implements PipelineConfigurationTransformer { private static final Logger LOG = LoggerFactory.getLogger(DynamicConfigTransformer.class); - private static final Pattern FULLY_QUALIFIED_METHOD_PATTERN = Pattern.compile( - "^[a-zA-Z_][a-zA-Z0-9_]*(\\.[a-zA-Z_][a-zA-Z0-9_]*)*(\\.|::)[a-zA-Z_][a-zA-Z0-9_]*$" - ); private final ObjectMapper objectMapper = new ObjectMapper(); private final RuleEvaluator ruleEvaluator; @@ -92,6 +89,9 @@ public class DynamicConfigTransformer implements PipelineConfigurationTransforme * The captured group is the target path (e.g., "sink[*].opensearch") */ private static final Pattern OVERLAY_PATTERN = Pattern.compile("^<>$"); + private static final String REQUIRED_PACKAGE_PREFIX = "org.opensearch.dataprepper"; + private static final String REQUIRED_PACKAGE_SEGMENT = "dataprepper_transformer"; + Configuration parseConfigWithJsonNode = Configuration.builder() .jsonProvider(new JacksonJsonNodeJsonProvider()) @@ -424,13 +424,6 @@ private boolean isJsonPath(String parameter) { } } - public static boolean isFullyQualifiedMethod(String input) { - if (input == null || input.isEmpty()) { - return false; - } - return FULLY_QUALIFIED_METHOD_PATTERN.matcher(input).matches(); - } - /** * Invokes a method dynamically on a given object. * @@ -447,6 +440,13 @@ public Object invokeMethod(final List functionProviders, String methodNa Class clazz = resolveClassForMethod(functionProviders, methodName, parameterType); + String packageName = clazz.getPackageName(); + if (!packageName.startsWith(REQUIRED_PACKAGE_PREFIX) || !packageName.contains(REQUIRED_PACKAGE_SEGMENT)) { + throw new RuntimeException("Class '" + clazz.getName() + + "' is not in a valid package. Package must start with '" + REQUIRED_PACKAGE_PREFIX + + "' and contain '" + REQUIRED_PACKAGE_SEGMENT + "'"); + } + if (!PipelineTransformFunctionProvider.class.isAssignableFrom(clazz)) { throw new RuntimeException("Class '" + clazz.getName() + "' does not implement PipelineTransformFunctionProvider"); @@ -463,10 +463,6 @@ public Object invokeMethod(final List functionProviders, String methodNa } private Class resolveClassForMethod(final List functionProviders, String methodName, Class parameterType) throws ReflectiveOperationException { - if (functionProviders.size() == 1) { - return Class.forName(functionProviders.get(0)); - } - for (final String functionProvider : functionProviders) { try { Class candidate = Class.forName(functionProvider); diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformerTest.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformerTest.java index 24b704c1be..70a90dfa77 100644 --- a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformerTest.java +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformerTest.java @@ -25,6 +25,7 @@ import org.opensearch.dataprepper.pipeline.parser.rule.RuleEvaluator; import org.opensearch.dataprepper.pipeline.parser.rule.RuleStream; +import java.util.Arrays; import java.io.File; import java.io.FileInputStream; import java.io.IOException; @@ -38,7 +39,9 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -480,10 +483,72 @@ void test_overlay_directive_overrides_existing_fields() throws Exception { assertThat(resultOs.get("script").has("custom_field")).isFalse(); } - // Inner test helper classes for invokeMethod validation tests + private static final String PROVIDER_PKG = "org.opensearch.dataprepper.pipeline.parser.transformer.dataprepper_transformer."; + + // --- invokeMethod: Happy path --- + + @Test + void test_invokeMethod_succeeds_with_valid_provider_and_annotated_method() throws Exception { + ruleEvaluator = mock(RuleEvaluator.class); + DynamicConfigTransformer transformer = new DynamicConfigTransformer(ruleEvaluator); + List providers = Collections.singletonList(PROVIDER_PKG + "ValidAnnotatedProvider"); + Object result = transformer.invokeMethod(providers, "transformValue", String.class, "hello"); + assertEquals("HELLO", result); + } + + // --- invokeMethod: Non-existent class name --- + + @Test + void test_invokeMethod_throws_when_class_does_not_exist() { + ruleEvaluator = mock(RuleEvaluator.class); + DynamicConfigTransformer transformer = new DynamicConfigTransformer(ruleEvaluator); + List providers = Collections.singletonList("com.example.does.not.Exist"); + assertThrows(Exception.class, () -> + transformer.invokeMethod(providers, "someMethod", String.class, "arg")); + } + + // --- invokeMethod: Multiple providers, correct resolution --- + + @Test + void test_invokeMethod_resolves_method_from_second_provider_when_first_lacks_it() throws Exception { + ruleEvaluator = mock(RuleEvaluator.class); + DynamicConfigTransformer transformer = new DynamicConfigTransformer(ruleEvaluator); + List providers = Arrays.asList( + PROVIDER_PKG + "ProviderWithoutTargetMethod", + PROVIDER_PKG + "ValidAnnotatedProvider"); + Object result = transformer.invokeMethod(providers, "transformValue", String.class, "world"); + assertEquals("WORLD", result); + } + + // --- invokeMethod: Non-static annotated method --- + + @Test + void test_invokeMethod_throws_when_annotated_method_is_non_static() { + ruleEvaluator = mock(RuleEvaluator.class); + DynamicConfigTransformer transformer = new DynamicConfigTransformer(ruleEvaluator); + List providers = Collections.singletonList(PROVIDER_PKG + "ProviderWithNonStaticMethod"); + assertThrows(Exception.class, () -> + transformer.invokeMethod(providers, "instanceMethod", String.class, "arg")); + } + + // --- invokeMethod: Static initializer safety --- + + @Test + void test_invokeMethod_throws_for_non_provider_before_running_methods() { + ruleEvaluator = mock(RuleEvaluator.class); + DynamicConfigTransformer transformer = new DynamicConfigTransformer(ruleEvaluator); + List providers = Collections.singletonList(PROVIDER_PKG + "NonProviderWithStaticInit"); + RuntimeException exception = assertThrows(RuntimeException.class, () -> + transformer.invokeMethod(providers, "getValue", String.class, "arg")); + assertTrue(exception.getMessage().contains("does not implement PipelineTransformFunctionProvider")); + } + + // --- Inner test helper class for package-check-independent tests --- + public static class ValidProviderNoAnnotation implements org.opensearch.dataprepper.model.plugin.PipelineTransformFunctionProvider { public static String unannotatedMethod(String input) { return input; } } } + diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/dataprepper_transformer/NonProviderWithStaticInit.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/dataprepper_transformer/NonProviderWithStaticInit.java new file mode 100644 index 0000000000..b7ad75a1fb --- /dev/null +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/dataprepper_transformer/NonProviderWithStaticInit.java @@ -0,0 +1,26 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.dataprepper.pipeline.parser.transformer.dataprepper_transformer; + +import org.opensearch.dataprepper.model.annotations.TransformationFunction; + +/** + * Test class that does NOT implement PipelineTransformFunctionProvider. + * Has a static initializer to verify it doesn't run if interface check rejects it. + */ +public class NonProviderWithStaticInit { + static { + System.setProperty("test.static.init.ran", "true"); + } + + @TransformationFunction + public static String getValue(String input) { + return input; + } +} diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/dataprepper_transformer/ProviderWithNonStaticMethod.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/dataprepper_transformer/ProviderWithNonStaticMethod.java new file mode 100644 index 0000000000..955a768b99 --- /dev/null +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/dataprepper_transformer/ProviderWithNonStaticMethod.java @@ -0,0 +1,19 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.dataprepper.pipeline.parser.transformer.dataprepper_transformer; + +import org.opensearch.dataprepper.model.annotations.TransformationFunction; +import org.opensearch.dataprepper.model.plugin.PipelineTransformFunctionProvider; + +public class ProviderWithNonStaticMethod implements PipelineTransformFunctionProvider { + @TransformationFunction + public String instanceMethod(String input) { + return input; + } +} diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/dataprepper_transformer/ProviderWithoutTargetMethod.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/dataprepper_transformer/ProviderWithoutTargetMethod.java new file mode 100644 index 0000000000..4a445f95cb --- /dev/null +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/dataprepper_transformer/ProviderWithoutTargetMethod.java @@ -0,0 +1,19 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.dataprepper.pipeline.parser.transformer.dataprepper_transformer; + +import org.opensearch.dataprepper.model.annotations.TransformationFunction; +import org.opensearch.dataprepper.model.plugin.PipelineTransformFunctionProvider; + +public class ProviderWithoutTargetMethod implements PipelineTransformFunctionProvider { + @TransformationFunction + public static String otherMethod(String input) { + return input; + } +} diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/TestTransformFunctionProvider.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/dataprepper_transformer/TestTransformFunctionProvider.java similarity index 97% rename from data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/TestTransformFunctionProvider.java rename to data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/dataprepper_transformer/TestTransformFunctionProvider.java index 278db00bff..52f989dc49 100644 --- a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/TestTransformFunctionProvider.java +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/dataprepper_transformer/TestTransformFunctionProvider.java @@ -7,7 +7,7 @@ * compatible open source license. */ -package org.opensearch.dataprepper.pipeline.parser.transformer; +package org.opensearch.dataprepper.pipeline.parser.transformer.dataprepper_transformer; import org.opensearch.dataprepper.model.annotations.TransformationFunction; import org.opensearch.dataprepper.model.plugin.PipelineTransformFunctionProvider; diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/dataprepper_transformer/ValidAnnotatedProvider.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/dataprepper_transformer/ValidAnnotatedProvider.java new file mode 100644 index 0000000000..73b99ba6aa --- /dev/null +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/dataprepper_transformer/ValidAnnotatedProvider.java @@ -0,0 +1,19 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.dataprepper.pipeline.parser.transformer.dataprepper_transformer; + +import org.opensearch.dataprepper.model.annotations.TransformationFunction; +import org.opensearch.dataprepper.model.plugin.PipelineTransformFunctionProvider; + +public class ValidAnnotatedProvider implements PipelineTransformFunctionProvider { + @TransformationFunction + public static String transformValue(String input) { + return input.toUpperCase(); + } +} diff --git a/data-prepper-pipeline-parser/src/test/resources/transformation/rules/documentdb-rule.yaml b/data-prepper-pipeline-parser/src/test/resources/transformation/rules/documentdb-rule.yaml index e0b1685834..bc13226783 100644 --- a/data-prepper-pipeline-parser/src/test/resources/transformation/rules/documentdb-rule.yaml +++ b/data-prepper-pipeline-parser/src/test/resources/transformation/rules/documentdb-rule.yaml @@ -3,4 +3,4 @@ apply_when: - "$..source.documentdb" - "$..source.documentdb.s3_bucket" function_providers: - - "org.opensearch.dataprepper.pipeline.parser.transformer.TestTransformFunctionProvider" + - "org.opensearch.dataprepper.pipeline.parser.transformer.dataprepper_transformer.TestTransformFunctionProvider" diff --git a/data-prepper-pipeline-parser/src/test/resources/transformation/rules/documentdb1-rule.yaml b/data-prepper-pipeline-parser/src/test/resources/transformation/rules/documentdb1-rule.yaml index b7e927f87e..2cd7654d6a 100644 --- a/data-prepper-pipeline-parser/src/test/resources/transformation/rules/documentdb1-rule.yaml +++ b/data-prepper-pipeline-parser/src/test/resources/transformation/rules/documentdb1-rule.yaml @@ -2,4 +2,4 @@ plugin_name: "documentdb" apply_when: - "$..source.documentdb" function_providers: - - "org.opensearch.dataprepper.pipeline.parser.transformer.TestTransformFunctionProvider" + - "org.opensearch.dataprepper.pipeline.parser.transformer.dataprepper_transformer.TestTransformFunctionProvider" diff --git a/data-prepper-pipeline-parser/src/test/resources/transformation/rules/rds-joins-rule.yaml b/data-prepper-pipeline-parser/src/test/resources/transformation/rules/rds-joins-rule.yaml index 4944a3e7d3..212a8b3bc9 100644 --- a/data-prepper-pipeline-parser/src/test/resources/transformation/rules/rds-joins-rule.yaml +++ b/data-prepper-pipeline-parser/src/test/resources/transformation/rules/rds-joins-rule.yaml @@ -12,4 +12,4 @@ apply_when: - "$..source.rds" - "$..source.rds.joins" function_providers: - - "org.opensearch.dataprepper.pipeline.parser.transformer.TestTransformFunctionProvider" + - "org.opensearch.dataprepper.pipeline.parser.transformer.dataprepper_transformer.TestTransformFunctionProvider" diff --git a/data-prepper-pipeline-parser/src/test/resources/transformation/rules/rds-rule.yaml b/data-prepper-pipeline-parser/src/test/resources/transformation/rules/rds-rule.yaml index 265af26832..8d8fd8f547 100644 --- a/data-prepper-pipeline-parser/src/test/resources/transformation/rules/rds-rule.yaml +++ b/data-prepper-pipeline-parser/src/test/resources/transformation/rules/rds-rule.yaml @@ -11,4 +11,4 @@ plugin_name: "rds" apply_when: - "$..source.rds" function_providers: - - "org.opensearch.dataprepper.pipeline.parser.transformer.TestTransformFunctionProvider" + - "org.opensearch.dataprepper.pipeline.parser.transformer.dataprepper_transformer.TestTransformFunctionProvider" diff --git a/data-prepper-plugins/aws-plugin/build.gradle b/data-prepper-plugins/aws-plugin/build.gradle index 06f65c167e..7eaa85b52d 100644 --- a/data-prepper-plugins/aws-plugin/build.gradle +++ b/data-prepper-plugins/aws-plugin/build.gradle @@ -28,11 +28,6 @@ test { jacocoTestCoverageVerification { dependsOn jacocoTestReport - afterEvaluate { - classDirectories.setFrom(files(classDirectories.files.collect { - fileTree(dir: it, exclude: ['**/PipelineTransformFunctions.class']) - })) - } violationRules { rule { limit { diff --git a/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/PipelineTransformFunctions.java b/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/dataprepper_transformer/PipelineTransformFunctions.java similarity index 98% rename from data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/PipelineTransformFunctions.java rename to data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/dataprepper_transformer/PipelineTransformFunctions.java index 619c7f9133..dac623ba8c 100644 --- a/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/PipelineTransformFunctions.java +++ b/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/dataprepper_transformer/PipelineTransformFunctions.java @@ -7,7 +7,7 @@ * compatible open source license. */ -package org.opensearch.dataprepper.plugins.aws; +package org.opensearch.dataprepper.plugins.aws.dataprepper_transformer; import org.opensearch.dataprepper.model.annotations.SkipTestCoverageGenerated; import org.opensearch.dataprepper.model.annotations.TransformationFunction; diff --git a/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/PipelineTransformFunctionsTest.java b/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/dataprepper_transformer/PipelineTransformFunctionsTest.java similarity index 95% rename from data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/PipelineTransformFunctionsTest.java rename to data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/dataprepper_transformer/PipelineTransformFunctionsTest.java index d23551e43b..1f1936b9c3 100644 --- a/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/PipelineTransformFunctionsTest.java +++ b/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/dataprepper_transformer/PipelineTransformFunctionsTest.java @@ -7,7 +7,7 @@ * compatible open source license. */ -package org.opensearch.dataprepper.plugins.aws; +package org.opensearch.dataprepper.plugins.aws.dataprepper_transformer; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -61,6 +61,15 @@ void all_public_transformation_methods_are_annotated() throws Exception { } } + @Test + void default_sourceCoordinationIdentifierSupplier_reads_env_variable() { + // Exercises the default lambda on line 39: () -> System.getenv(...) + // env var is not set in test, so returns null — but the lambda bytecode is covered + String result = originalSupplier.get(); + assertNull(result); + } + + // --- calculateDepth --- @Test diff --git a/data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/rules/documentdb-rule.yaml b/data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/rules/documentdb-rule.yaml index 6e581a7936..22e814cac9 100644 --- a/data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/rules/documentdb-rule.yaml +++ b/data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/rules/documentdb-rule.yaml @@ -3,4 +3,4 @@ apply_when: - "$..source.documentdb" - "$..source.documentdb.s3_bucket" function_providers: - - "org.opensearch.dataprepper.plugins.aws.PipelineTransformFunctions" + - "org.opensearch.dataprepper.plugins.aws.dataprepper_transformer.PipelineTransformFunctions" diff --git a/data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/rules/mongodb-rule.yaml b/data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/rules/mongodb-rule.yaml index 20a05a1c9a..07c35c3cd7 100644 --- a/data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/rules/mongodb-rule.yaml +++ b/data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/rules/mongodb-rule.yaml @@ -3,4 +3,4 @@ apply_when: - "$..source.mongodb" - "$..source.mongodb.s3_bucket" function_providers: - - "org.opensearch.dataprepper.plugins.aws.PipelineTransformFunctions" + - "org.opensearch.dataprepper.plugins.aws.dataprepper_transformer.PipelineTransformFunctions" diff --git a/data-prepper-plugins/rds-source/src/main/resources/org/opensearch/dataprepper/transforms/rules/rds-joins-rule.yaml b/data-prepper-plugins/rds-source/src/main/resources/org/opensearch/dataprepper/transforms/rules/rds-joins-rule.yaml index 14e54251dd..2e65f1481e 100644 --- a/data-prepper-plugins/rds-source/src/main/resources/org/opensearch/dataprepper/transforms/rules/rds-joins-rule.yaml +++ b/data-prepper-plugins/rds-source/src/main/resources/org/opensearch/dataprepper/transforms/rules/rds-joins-rule.yaml @@ -11,4 +11,4 @@ apply_when: - "$..source.rds" - "$..source.rds.joins" function_providers: - - "org.opensearch.dataprepper.plugins.aws.PipelineTransformFunctions" + - "org.opensearch.dataprepper.plugins.aws.dataprepper_transformer.PipelineTransformFunctions" diff --git a/data-prepper-plugins/rds-source/src/main/resources/org/opensearch/dataprepper/transforms/rules/rds-rule.yaml b/data-prepper-plugins/rds-source/src/main/resources/org/opensearch/dataprepper/transforms/rules/rds-rule.yaml index 4db29c2ccf..668fc14e7a 100644 --- a/data-prepper-plugins/rds-source/src/main/resources/org/opensearch/dataprepper/transforms/rules/rds-rule.yaml +++ b/data-prepper-plugins/rds-source/src/main/resources/org/opensearch/dataprepper/transforms/rules/rds-rule.yaml @@ -2,4 +2,4 @@ plugin_name: "rds" apply_when: - "$..source.rds" function_providers: - - "org.opensearch.dataprepper.plugins.aws.PipelineTransformFunctions" + - "org.opensearch.dataprepper.plugins.aws.dataprepper_transformer.PipelineTransformFunctions"