diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/annotations/TransformationFunction.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/annotations/TransformationFunction.java new file mode 100644 index 0000000000..e8b4617bf1 --- /dev/null +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/annotations/TransformationFunction.java @@ -0,0 +1,34 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.model.annotations; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Marks a method as a pipeline transformation function that can be invoked + * dynamically from template YAML files via the {@code FUNCTION_NAME} placeholder. + *

+ * Annotated methods must be {@code public static} and accept a single {@code String} + * parameter, returning a {@code String} result. + *

+ * The enclosing class must implement + * {@link org.opensearch.dataprepper.model.plugin.PipelineTransformFunctionProvider}. + * + * @since 2.12 + */ +@Documented +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.METHOD}) +public @interface TransformationFunction { +} diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/plugin/PipelineTransformFunctionProvider.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/plugin/PipelineTransformFunctionProvider.java new file mode 100644 index 0000000000..a003f0ca28 --- /dev/null +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/plugin/PipelineTransformFunctionProvider.java @@ -0,0 +1,24 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.model.plugin; + +/** + * Marker interface for classes that provide pipeline transformation functions. + * Classes implementing this interface can be referenced in rule YAML files via + * the {@code function_providers} field and have their methods invoked dynamically + * during pipeline template transformation. + *

+ * Methods intended to be callable from templates must also be annotated with + * {@link org.opensearch.dataprepper.model.annotations.TransformationFunction}. + * + * @since 2.12 + */ +public interface PipelineTransformFunctionProvider { +} diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/annotations/TransformationFunctionTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/annotations/TransformationFunctionTest.java new file mode 100644 index 0000000000..1c1f82d9ab --- /dev/null +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/annotations/TransformationFunctionTest.java @@ -0,0 +1,70 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.model.annotations; + +import org.junit.jupiter.api.Test; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class TransformationFunctionTest { + + @Test + void annotation_is_retained_at_runtime() { + Retention retention = TransformationFunction.class.getAnnotation(Retention.class); + assertNotNull(retention); + assertThat(retention.value(), equalTo(RetentionPolicy.RUNTIME)); + } + + @Test + void annotation_targets_methods() { + Target target = TransformationFunction.class.getAnnotation(Target.class); + assertNotNull(target); + assertThat(target.value().length, equalTo(1)); + assertThat(target.value()[0], equalTo(ElementType.METHOD)); + } + + @Test + void annotation_is_documented() { + Documented documented = TransformationFunction.class.getAnnotation(Documented.class); + assertNotNull(documented); + } + + @Test + void annotation_is_present_on_annotated_method() throws NoSuchMethodException { + assertTrue(AnnotatedClass.class.getMethod("annotatedMethod", String.class) + .isAnnotationPresent(TransformationFunction.class)); + } + + @Test + void annotation_is_not_present_on_unannotated_method() throws NoSuchMethodException { + assertTrue(!AnnotatedClass.class.getMethod("unannotatedMethod", String.class) + .isAnnotationPresent(TransformationFunction.class)); + } + + static class AnnotatedClass { + @TransformationFunction + public static String annotatedMethod(String input) { + return input; + } + + public static String unannotatedMethod(String input) { + return input; + } + } +} diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/plugin/PipelineTransformFunctionProviderTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/plugin/PipelineTransformFunctionProviderTest.java new file mode 100644 index 0000000000..c081cd61c3 --- /dev/null +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/plugin/PipelineTransformFunctionProviderTest.java @@ -0,0 +1,66 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.model.plugin; + +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class PipelineTransformFunctionProviderTest { + + @Test + void implementing_class_is_assignable_from_interface() { + PipelineTransformFunctionProvider provider = new TestFunctionProvider(); + assertNotNull(provider); + assertTrue(provider instanceof PipelineTransformFunctionProvider); + } + + @Test + void interface_is_assignable_from_implementing_class() { + assertThat(PipelineTransformFunctionProvider.class.isAssignableFrom(TestFunctionProvider.class), + equalTo(true)); + } + + @Test + void interface_is_not_assignable_from_non_implementing_class() { + assertThat(PipelineTransformFunctionProvider.class.isAssignableFrom(NonProvider.class), + equalTo(false)); + } + + @Test + void interface_has_no_declared_methods() { + assertThat(PipelineTransformFunctionProvider.class.getDeclaredMethods().length, equalTo(0)); + } + + @Test + void interface_is_public() { + assertTrue(java.lang.reflect.Modifier.isPublic(PipelineTransformFunctionProvider.class.getModifiers())); + } + + @Test + void interface_is_an_interface() { + assertTrue(PipelineTransformFunctionProvider.class.isInterface()); + } + + static class TestFunctionProvider implements PipelineTransformFunctionProvider { + public static String sampleFunction(String input) { + return input; + } + } + + static class NonProvider { + public static String someMethod(String input) { + return input; + } + } +} diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluator.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluator.java index 1b05ea5b79..7c3a6b9100 100644 --- a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluator.java +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluator.java @@ -1,7 +1,12 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. */ + package org.opensearch.dataprepper.pipeline.parser.rule; import com.fasterxml.jackson.core.JsonProcessingException; @@ -62,6 +67,7 @@ public RuleEvaluatorResult isTransformationNeeded(PipelinesDataFlowModel pipelin return RuleEvaluatorResult.builder() .withEvaluatedResult(true) .withPipelineTemplateModel(templateModel) + .withFunctionProviders(ruleFileEvaluation.getFunctionProviders()) .withPipelineName(entry.getKey()) .build(); } @@ -76,6 +82,7 @@ public RuleEvaluatorResult isTransformationNeeded(PipelinesDataFlowModel pipelin return RuleEvaluatorResult.builder() .withEvaluatedResult(false) .withPipelineName(null) + .withFunctionProviders(null) .withPipelineTemplateModel(null) .build(); } @@ -131,6 +138,7 @@ private RuleFileEvaluation evaluate(String pipelinesJson) { return RuleFileEvaluation.builder() .withPluginName(pluginName) .withRuleFileName(parsedRule.fileName) + .withFunctionProviders(rulesModel.getFunctionProviders()) .withResult(true) .build(); } @@ -163,4 +171,4 @@ private static class ParsedRule { this.fileName = fileName; } } -} \ No newline at end of file +} diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorResult.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorResult.java index e89768fee6..e9ffb837db 100644 --- a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorResult.java +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorResult.java @@ -1,7 +1,12 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. */ + package org.opensearch.dataprepper.pipeline.parser.rule; import lombok.AllArgsConstructor; @@ -9,6 +14,8 @@ import lombok.Getter; import org.opensearch.dataprepper.pipeline.parser.transformer.PipelineTemplateModel; +import java.util.List; + @Builder(setterPrefix = "with") @Getter @AllArgsConstructor @@ -20,6 +27,8 @@ public class RuleEvaluatorResult { private PipelineTemplateModel pipelineTemplateModel; + private List functionProviders; + public RuleEvaluatorResult() { } diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleFileEvaluation.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleFileEvaluation.java index 88ee0f8d6c..d89c871cc4 100644 --- a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleFileEvaluation.java +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleFileEvaluation.java @@ -4,6 +4,8 @@ import lombok.Builder; import lombok.Data; +import java.util.List; + @Builder(setterPrefix = "with") @AllArgsConstructor @Data @@ -11,6 +13,7 @@ public class RuleFileEvaluation { private Boolean result; private String ruleFileName; private String pluginName; + private List functionProviders; public RuleFileEvaluation() { diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleTransformerModel.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleTransformerModel.java index aaf757cedb..b9677e4f72 100644 --- a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleTransformerModel.java +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleTransformerModel.java @@ -1,7 +1,12 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. */ + package org.opensearch.dataprepper.pipeline.parser.rule; import com.fasterxml.jackson.annotation.JsonInclude; @@ -22,6 +27,9 @@ public class RuleTransformerModel { @JsonProperty("plugin_name") private String pluginName; + @JsonProperty("function_providers") + private List functionProviders; + public RuleTransformerModel() { } @@ -29,6 +37,7 @@ public RuleTransformerModel() { public String toString() { return "RuleConfiguration{" + "applyWhen=" + applyWhen + - "\npluginName="+ pluginName +'}'; + "\nfunctionProviders=" + functionProviders + + "\npluginName=" + pluginName + '}'; } -} \ No newline at end of file +} diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformer.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformer.java index 23c89a7359..321b2f7290 100644 --- a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformer.java +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformer.java @@ -1,7 +1,12 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. */ + package org.opensearch.dataprepper.pipeline.parser.transformer; import com.fasterxml.jackson.core.JsonProcessingException; @@ -17,7 +22,6 @@ import com.jayway.jsonpath.spi.json.JacksonJsonNodeJsonProvider; import com.jayway.jsonpath.spi.mapper.JacksonMappingProvider; import static java.lang.String.format; -import static org.opensearch.dataprepper.plugins.source.rds.RdsService.MAX_SOURCE_IDENTIFIER_LENGTH; import org.opensearch.dataprepper.model.configuration.PipelineModel; import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel; @@ -26,9 +30,9 @@ import org.opensearch.dataprepper.pipeline.parser.rule.RuleEvaluatorResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.opensearch.dataprepper.plugins.source.rds.utils.IdentifierShortener; -import software.amazon.awssdk.arns.Arn; +import org.opensearch.dataprepper.model.annotations.TransformationFunction; +import org.opensearch.dataprepper.model.plugin.PipelineTransformFunctionProvider; import javax.xml.transform.TransformerException; import java.io.IOException; import java.lang.reflect.Method; @@ -43,6 +47,7 @@ public class DynamicConfigTransformer implements PipelineConfigurationTransformer { private static final Logger LOG = LoggerFactory.getLogger(DynamicConfigTransformer.class); + private final ObjectMapper objectMapper = new ObjectMapper(); private final RuleEvaluator ruleEvaluator; @@ -76,17 +81,17 @@ public class DynamicConfigTransformer implements PipelineConfigurationTransforme private static final String RECURSIVE_JSON_PATH_PATH = "$.."; private static final String JSON_PATH_IDENTIFIER = "$."; private static final String ARRAY_NODE_PATTERN = "([^\\[]+)\\[(\\d+)\\]$"; - private static final String SOURCE_COORDINATION_IDENTIFIER_ENVIRONMENT_VARIABLE = "SOURCE_COORDINATION_PIPELINE_IDENTIFIER"; private static final String SINK_SUBPIPELINE_PLUGIN_NAME = "pipeline"; private static final String SUBPIPELINE_PATH = "$.source.pipeline"; - private static final String S3_BUFFER_PREFIX = "/buffer"; - /** * Pattern to match overlay directives like "<>" * The captured group is the target path (e.g., "sink[*].opensearch") */ private static final Pattern OVERLAY_PATTERN = Pattern.compile("^<>$"); + private static final String REQUIRED_PACKAGE_PREFIX = "org.opensearch.dataprepper"; + private static final String REQUIRED_PACKAGE_SEGMENT = "dataprepper_transformer"; + Configuration parseConfigWithJsonNode = Configuration.builder() .jsonProvider(new JacksonJsonNodeJsonProvider()) @@ -126,6 +131,7 @@ public PipelinesDataFlowModel transformConfiguration(PipelinesDataFlowModel preT //To differentiate between sub-pipelines that dont need transformation. String pipelineNameThatNeedsTransformation = ruleEvaluatorResult.getPipelineName(); PipelineTemplateModel templateModel = ruleEvaluatorResult.getPipelineTemplateModel(); + List functionProviders = ruleEvaluatorResult.getFunctionProviders(); LOG.info("Transforming pipeline config for pipeline {}",pipelineNameThatNeedsTransformation); try { @@ -149,7 +155,7 @@ public PipelinesDataFlowModel transformConfiguration(PipelinesDataFlowModel preT JsonNode templateRootNode = objectMapper.readTree(templateJsonString); // get exact path in pipelineJson - Map pipelineExactPathMap = findExactPath(placeholdersMap, pipelineJson); + Map pipelineExactPathMap = findExactPath(placeholdersMap, pipelineJson, functionProviders); //replace placeholder with actual value in the template context placeholdersMap.forEach((placeholder, templateJsonPathList) -> { @@ -173,7 +179,7 @@ public PipelinesDataFlowModel transformConfiguration(PipelinesDataFlowModel preT }); // Process <> directives — merge overlay fields into resolved config - processOverlayDirectives(templateRootNode, pipelineJson); + processOverlayDirectives(templateRootNode, pipelineJson, functionProviders); PipelinesDataFlowModel transformedPipelinesDataFlowModel = getTransformedPipelinesDataFlowModel(pipelineNameThatNeedsTransformation, preTransformedPipelinesDataFlowModel, @@ -333,12 +339,12 @@ private void populateMapWithPlaceholderPaths(JsonNode currentNode, String curren * @return Map K:jsonPath, V:exactPath * @throws IOException */ - private Map findExactPath(Map> placeholdersMap, String pipelineJson) throws IOException, TransformerException { + private Map findExactPath(Map> placeholdersMap, String pipelineJson, final List functionProviders) throws IOException, TransformerException { Map mapWithPaths = new HashMap<>(); for (String genericPathPlaceholder : placeholdersMap.keySet()) { String placeHolderValue = getValueFromPlaceHolder(genericPathPlaceholder); - String value = executeFunctionPlaceholder(placeHolderValue, pipelineJson); + String value = executeFunctionPlaceholder(placeHolderValue, pipelineJson, functionProviders); // Recursive pattern in json path is NOT allowed if (value!=null && value.contains(RECURSIVE_JSON_PATH_PATH)) { @@ -366,14 +372,14 @@ private String getValueFromPlaceHolder(String placeholder) { * @param functionPlaceholderValue * @return String - value of the function executed */ - private String executeFunctionPlaceholder(String functionPlaceholderValue, String pipelineJson){ + private String executeFunctionPlaceholder(String functionPlaceholderValue, String pipelineJson, final List functionProviders){ Matcher functionMatcher = FUNCTION_CALL_PLACEHOLDER_PATTERN.matcher(functionPlaceholderValue); if (functionMatcher.find()) { String functionName = functionMatcher.group(1); String parameter = functionMatcher.group(2); try { String parameterValue = (String)parseParameter(parameter, pipelineJson); - String value = (String) invokeMethod(functionName, String.class, parameterValue); + String value = (String) invokeMethod(functionProviders, functionName, String.class, parameterValue); return value; } catch (ReflectiveOperationException e) { throw new RuntimeException(e); @@ -419,92 +425,55 @@ private boolean isJsonPath(String parameter) { } /** - * Calculate s3 folder scan depth for DocDB source pipeline - * @param s3Prefix: s3 prefix defined in the source configuration - * @return s3 folder scan depth + * Invokes a method dynamically on a given object. + * + * @param methodName the name of the method to be invoked + * @param parameterType the Class object representing the parameter type + * @param arg the parameter to be passed to the method + * @return the result of the method invocation + * @throws ReflectiveOperationException if the method cannot be invoked */ - public String calculateDepth(String s3Prefix) { - return Integer.toString(getDepth(s3Prefix, 4)); - } - - protected String getSourceCoordinationIdentifier() { - return System.getenv(SOURCE_COORDINATION_IDENTIFIER_ENVIRONMENT_VARIABLE); - } + public Object invokeMethod(final List functionProviders, String methodName, Class parameterType, Object arg) throws ReflectiveOperationException { + if (functionProviders == null || functionProviders.isEmpty()) { + throw new RuntimeException("function_providers cannot be empty"); + } - /** - * Calculate s3 folder scan depth for RDS source pipeline - * @param s3Prefix: s3 prefix defined in the source configuration - * @return s3 folder scan depth - */ - public String calculateDepthForRdsSource(String s3Prefix) { - String envSourceCoordinationIdentifier = getSourceCoordinationIdentifier(); - int baseDepth = envSourceCoordinationIdentifier != null ? 3 : 2; - return Integer.toString(getDepth(s3Prefix, baseDepth)); - } + Class clazz = resolveClassForMethod(functionProviders, methodName, parameterType); - private int getDepth(String s3Prefix, int baseDepth) { - if(s3Prefix == null){ - return baseDepth; + String packageName = clazz.getPackageName(); + if (!packageName.startsWith(REQUIRED_PACKAGE_PREFIX) || !packageName.contains(REQUIRED_PACKAGE_SEGMENT)) { + throw new RuntimeException("Class '" + clazz.getName() + + "' is not in a valid package. Package must start with '" + REQUIRED_PACKAGE_PREFIX + + "' and contain '" + REQUIRED_PACKAGE_SEGMENT + "'"); } - return s3Prefix.split("/").length + baseDepth; - } - public String getSourceCoordinationIdentifierEnvVariable(String s3Prefix){ - String envSourceCoordinationIdentifier = System.getenv(SOURCE_COORDINATION_IDENTIFIER_ENVIRONMENT_VARIABLE); - if(s3Prefix == null){ - return envSourceCoordinationIdentifier; + if (!PipelineTransformFunctionProvider.class.isAssignableFrom(clazz)) { + throw new RuntimeException("Class '" + clazz.getName() + + "' does not implement PipelineTransformFunctionProvider"); } - return s3Prefix+"/"+envSourceCoordinationIdentifier; - } - /** - * Get the include_prefix in s3 scan source. This is a function specific to RDS source. - * @param s3Prefix: s3 prefix defined in the source configuration - * @return the actual include_prefix - */ - public String getIncludePrefixForRdsSource(String s3Prefix) { - final String envSourceCoordinationIdentifier = System.getenv(SOURCE_COORDINATION_IDENTIFIER_ENVIRONMENT_VARIABLE); - final String shortenedSourceIdentifier = envSourceCoordinationIdentifier != null ? - IdentifierShortener.shortenIdentifier(envSourceCoordinationIdentifier, MAX_SOURCE_IDENTIFIER_LENGTH) : null; - if (s3Prefix == null && envSourceCoordinationIdentifier == null) { - return S3_BUFFER_PREFIX; - } else if (s3Prefix == null) { - return shortenedSourceIdentifier + S3_BUFFER_PREFIX; - } else if (envSourceCoordinationIdentifier == null) { - return s3Prefix + S3_BUFFER_PREFIX; - } - return s3Prefix + "/" + shortenedSourceIdentifier + S3_BUFFER_PREFIX; - } + Method method = clazz.getMethod(methodName, parameterType); - public String getAccountIdFromRole(final String roleArn) { - if (roleArn == null) - return null; - try { - return Arn.fromString(roleArn).accountId().orElse(null); - } catch (Exception e) { - LOG.warn("Malformatted role ARN for dynamic transformation: {}", roleArn); - return null; + if (!method.isAnnotationPresent(TransformationFunction.class)) { + throw new RuntimeException("Method '" + methodName + "' in class '" + clazz.getName() + + "' is not annotated with @TransformationFunction"); } - } - /** - * Invokes a method dynamically on a given object. - * - * @param methodName the name of the method to be invoked - * @param parameterType the Class object representing the parameter type - * @param arg the parameter to be passed to the method - * @return the result of the method invocation - * @throws ReflectiveOperationException if the method cannot be invoked - */ - public Object invokeMethod(String methodName, Class parameterType, Object arg) throws ReflectiveOperationException { - // Get the Class object - Class clazz = this.getClass(); + return method.invoke(null, arg); + } - // Get the Method object for the specified method and parameter type - Method method = clazz.getMethod(methodName, parameterType); + private Class resolveClassForMethod(final List functionProviders, String methodName, Class parameterType) throws ReflectiveOperationException { + for (final String functionProvider : functionProviders) { + try { + Class candidate = Class.forName(functionProvider); + candidate.getMethod(methodName, parameterType); + return candidate; + } catch (NoSuchMethodException e) { + continue; + } + } - // Invoke the method on the object with the given argument - return method.invoke(this, arg); + throw new RuntimeException("Could not find a class with method '" + methodName + "' in function_providers: " + functionProviders); } @@ -517,11 +486,11 @@ public Object invokeMethod(String methodName, Class parameterType, Object arg * Example: <> with value {action: "upsert", script: {...}} * will merge those fields into every opensearch sink entry. */ - private void processOverlayDirectives(JsonNode rootNode, String pipelineJson) { - processOverlayDirectivesRecursive(rootNode, pipelineJson); + private void processOverlayDirectives(JsonNode rootNode, String pipelineJson, List functionProviders) { + processOverlayDirectivesRecursive(rootNode, pipelineJson, functionProviders); } - private void processOverlayDirectivesRecursive(JsonNode node, String pipelineJson) { + private void processOverlayDirectivesRecursive(JsonNode node, String pipelineJson, List functionProviders) { if (node.isObject()) { ObjectNode objectNode = (ObjectNode) node; List overlayKeys = new ArrayList<>(); @@ -543,7 +512,7 @@ private void processOverlayDirectivesRecursive(JsonNode node, String pipelineJso String targetPath = matcher.group(1); JsonNode overlayValue = objectNode.get(overlayKey); // Resolve any <<...>> placeholders inside overlay values - resolveOverlayPlaceholders(overlayValue, pipelineJson); + resolveOverlayPlaceholders(overlayValue, pipelineJson, functionProviders); applyOverlay(objectNode, targetPath, overlayValue); objectNode.remove(overlayKey); } @@ -552,11 +521,11 @@ private void processOverlayDirectivesRecursive(JsonNode node, String pipelineJso // Recurse into remaining children Iterator> fields = objectNode.fields(); while (fields.hasNext()) { - processOverlayDirectivesRecursive(fields.next().getValue(), pipelineJson); + processOverlayDirectivesRecursive(fields.next().getValue(), pipelineJson, functionProviders); } } else if (node.isArray()) { for (JsonNode element : node) { - processOverlayDirectivesRecursive(element, pipelineJson); + processOverlayDirectivesRecursive(element, pipelineJson, functionProviders); } } } @@ -564,7 +533,7 @@ private void processOverlayDirectivesRecursive(JsonNode node, String pipelineJso /** * Resolves <<...>> placeholders in overlay value nodes using the pipeline config. */ - private void resolveOverlayPlaceholders(JsonNode node, String pipelineJson) { + private void resolveOverlayPlaceholders(JsonNode node, String pipelineJson, List functionProviders) { if (node.isObject()) { ObjectNode objectNode = (ObjectNode) node; List> entries = new ArrayList<>(); @@ -575,7 +544,7 @@ private void resolveOverlayPlaceholders(JsonNode node, String pipelineJson) { Matcher matcher = PLACEHOLDER_PATTERN.matcher(text); if (matcher.find()) { String placeholder = getValueFromPlaceHolder(text); - String resolved = executeFunctionPlaceholder(placeholder, pipelineJson); + String resolved = executeFunctionPlaceholder(placeholder, pipelineJson, functionProviders); if (isJsonPath(resolved)) { JsonNode resolvedNode = JsonPath.using(parseConfigWithJsonNode) .parse(pipelineJson).read(resolved); @@ -589,12 +558,12 @@ private void resolveOverlayPlaceholders(JsonNode node, String pipelineJson) { } } } else { - resolveOverlayPlaceholders(entry.getValue(), pipelineJson); + resolveOverlayPlaceholders(entry.getValue(), pipelineJson, functionProviders); } } } else if (node.isArray()) { for (JsonNode element : node) { - resolveOverlayPlaceholders(element, pipelineJson); + resolveOverlayPlaceholders(element, pipelineJson, functionProviders); } } } diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorResultTest.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorResultTest.java index 104beeee6e..d1bf0d6ca4 100644 --- a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorResultTest.java +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorResultTest.java @@ -1,12 +1,20 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. */ + package org.opensearch.dataprepper.pipeline.parser.rule; import org.junit.jupiter.api.Test; import org.opensearch.dataprepper.pipeline.parser.transformer.PipelineTemplateModel; +import java.util.Arrays; +import java.util.List; + import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; @@ -17,15 +25,34 @@ class RuleEvaluatorResultTest { @Test void builder_createsObjectCorrectly() { PipelineTemplateModel pipelineTemplateModel = new PipelineTemplateModel(); + List functionProviders = Arrays.asList("org.example.Functions"); + RuleEvaluatorResult result = RuleEvaluatorResult.builder() .withEvaluatedResult(true) .withPipelineName("testPipeline") .withPipelineTemplateModel(pipelineTemplateModel) + .withFunctionProviders(functionProviders) .build(); assertTrue(result.isEvaluatedResult()); - assertEquals(result.getPipelineName(), "testPipeline"); - assertEquals(result.getPipelineTemplateModel(), pipelineTemplateModel); + assertEquals("testPipeline", result.getPipelineName()); + assertEquals(pipelineTemplateModel, result.getPipelineTemplateModel()); + assertEquals(functionProviders, result.getFunctionProviders()); + } + + @Test + void builder_withNullFunctionProviders_setsNull() { + RuleEvaluatorResult result = RuleEvaluatorResult.builder() + .withEvaluatedResult(false) + .withPipelineName(null) + .withPipelineTemplateModel(null) + .withFunctionProviders(null) + .build(); + + assertFalse(result.isEvaluatedResult()); + assertNull(result.getPipelineName()); + assertNull(result.getPipelineTemplateModel()); + assertNull(result.getFunctionProviders()); } @Test @@ -34,16 +61,20 @@ void defaultConstructor_initializesFieldsCorrectly() { assertFalse(result.isEvaluatedResult()); assertNull(result.getPipelineName()); assertNull(result.getPipelineTemplateModel()); + assertNull(result.getFunctionProviders()); } @Test void allArgsConstructor_assignsFieldsCorrectly() { PipelineTemplateModel pipelineTemplateModel = new PipelineTemplateModel(); + List functionProviders = Arrays.asList("com.example.Provider"); + RuleEvaluatorResult result = new RuleEvaluatorResult(true, - "testPipeline", pipelineTemplateModel); + "testPipeline", pipelineTemplateModel, functionProviders); assertTrue(result.isEvaluatedResult()); - assertEquals(result.getPipelineName(), "testPipeline"); - assertEquals(result.getPipelineTemplateModel(), pipelineTemplateModel); + assertEquals("testPipeline", result.getPipelineName()); + assertEquals(pipelineTemplateModel, result.getPipelineTemplateModel()); + assertEquals(functionProviders, result.getFunctionProviders()); } } diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleFileEvaluationTest.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleFileEvaluationTest.java new file mode 100644 index 0000000000..fb1282c80e --- /dev/null +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleFileEvaluationTest.java @@ -0,0 +1,85 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.pipeline.parser.rule; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class RuleFileEvaluationTest { + + @Test + void builder_createsObjectCorrectly() { + List functionProviders = Arrays.asList("com.example.Provider1", "com.example.Provider2"); + + RuleFileEvaluation result = RuleFileEvaluation.builder() + .withResult(true) + .withRuleFileName("test-rule.yaml") + .withPluginName("testPlugin") + .withFunctionProviders(functionProviders) + .build(); + + assertTrue(result.getResult()); + assertEquals("test-rule.yaml", result.getRuleFileName()); + assertEquals("testPlugin", result.getPluginName()); + assertEquals(functionProviders, result.getFunctionProviders()); + assertEquals(2, result.getFunctionProviders().size()); + } + + @Test + void builder_withNullFunctionProviders_setsNull() { + RuleFileEvaluation result = RuleFileEvaluation.builder() + .withResult(false) + .withRuleFileName("rule.yaml") + .withPluginName("plugin") + .withFunctionProviders(null) + .build(); + + assertNull(result.getFunctionProviders()); + } + + @Test + void defaultConstructor_initializesFieldsToNull() { + RuleFileEvaluation result = new RuleFileEvaluation(); + + assertNull(result.getResult()); + assertNull(result.getRuleFileName()); + assertNull(result.getPluginName()); + assertNull(result.getFunctionProviders()); + } + + @Test + void allArgsConstructor_assignsFieldsCorrectly() { + List functionProviders = Arrays.asList("com.example.Provider"); + + RuleFileEvaluation result = new RuleFileEvaluation( + true, "rule.yaml", "myPlugin", functionProviders); + + assertTrue(result.getResult()); + assertEquals("rule.yaml", result.getRuleFileName()); + assertEquals("myPlugin", result.getPluginName()); + assertEquals(functionProviders, result.getFunctionProviders()); + } + + @Test + void setter_updatesFunctionProviders() { + RuleFileEvaluation result = new RuleFileEvaluation(); + List providers = Arrays.asList("org.example.Functions"); + + result.setFunctionProviders(providers); + + assertEquals(providers, result.getFunctionProviders()); + } +} diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleTransformerModelTest.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleTransformerModelTest.java index bb974ce88b..7578c54d48 100644 --- a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleTransformerModelTest.java +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleTransformerModelTest.java @@ -1,7 +1,12 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. */ + package org.opensearch.dataprepper.pipeline.parser.rule; import com.fasterxml.jackson.databind.ObjectMapper; @@ -12,6 +17,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; class RuleTransformerModelTest { @@ -20,11 +27,14 @@ class RuleTransformerModelTest { @Test void testSerialization() throws Exception { List applyWhen = Arrays.asList("condition1", "condition2"); + List functionProviders = Arrays.asList("org.example.Functions"); String pluginName = "testPlugin"; - RuleTransformerModel model = new RuleTransformerModel(applyWhen, pluginName); + RuleTransformerModel model = new RuleTransformerModel(applyWhen, pluginName, functionProviders); String json = objectMapper.writeValueAsString(model); assertNotNull(json, "Serialized JSON should not be null"); + assertTrue(json.contains("\"function_providers\"")); + assertTrue(json.contains("org.example.Functions")); } @Test @@ -37,5 +47,43 @@ void testDeserialization() throws Exception { assertEquals("condition1", model.getApplyWhen().get(0), "The first condition should be 'condition1'"); assertEquals("condition2", model.getApplyWhen().get(1), "The second condition should be 'condition2'"); assertEquals("testPlugin", model.getPluginName(), "plugin Name"); + assertNull(model.getFunctionProviders()); + } + + @Test + void testDeserialization_withFunctionProviders() throws Exception { + String json = "{\"plugin_name\": \"rds\", \"apply_when\": [\"$..source.rds\"], " + + "\"function_providers\": [\"org.opensearch.dataprepper.plugins.aws.PipelineTransformFunctions\"]}"; + + RuleTransformerModel model = objectMapper.readValue(json, RuleTransformerModel.class); + assertNotNull(model); + assertEquals("rds", model.getPluginName()); + assertNotNull(model.getFunctionProviders()); + assertEquals(1, model.getFunctionProviders().size()); + assertEquals("org.opensearch.dataprepper.plugins.aws.PipelineTransformFunctions", + model.getFunctionProviders().get(0)); + } + + @Test + void testDeserialization_withMultipleFunctionProviders() throws Exception { + String json = "{\"plugin_name\": \"test\", \"apply_when\": [\"$..source.test\"], " + + "\"function_providers\": [\"com.example.Provider1\", \"com.example.Provider2\"]}"; + + RuleTransformerModel model = objectMapper.readValue(json, RuleTransformerModel.class); + assertNotNull(model.getFunctionProviders()); + assertEquals(2, model.getFunctionProviders().size()); + assertEquals("com.example.Provider1", model.getFunctionProviders().get(0)); + assertEquals("com.example.Provider2", model.getFunctionProviders().get(1)); + } + + @Test + void testToString_includesFunctionProviders() { + List applyWhen = Arrays.asList("condition1"); + List functionProviders = Arrays.asList("org.example.Funcs"); + RuleTransformerModel model = new RuleTransformerModel(applyWhen, "plugin", functionProviders); + + String result = model.toString(); + assertTrue(result.contains("functionProviders")); + assertTrue(result.contains("org.example.Funcs")); } } diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformerTest.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformerTest.java index 21363738d1..70a90dfa77 100644 --- a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformerTest.java +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformerTest.java @@ -1,7 +1,12 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. */ + package org.opensearch.dataprepper.pipeline.parser.transformer; import com.fasterxml.jackson.databind.JsonNode; @@ -10,12 +15,8 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; -import org.apache.commons.lang3.RandomStringUtils; import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel; import org.opensearch.dataprepper.pipeline.parser.PipelineConfigurationFileReader; import org.opensearch.dataprepper.pipeline.parser.PipelineConfigurationReader; @@ -24,6 +25,7 @@ import org.opensearch.dataprepper.pipeline.parser.rule.RuleEvaluator; import org.opensearch.dataprepper.pipeline.parser.rule.RuleStream; +import java.util.Arrays; import java.io.File; import java.io.FileInputStream; import java.io.IOException; @@ -34,15 +36,13 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; -import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.Mockito.doReturn; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; class DynamicConfigTransformerTest { @@ -54,22 +54,51 @@ class DynamicConfigTransformerTest { RuleEvaluator ruleEvaluator; @Test - void test_getAccountIdFromRole_returns_account_id_from_valid_role_arn() { - final String testAccountId = RandomStringUtils.randomNumeric(12); - final String testRoleArn = String.format("arn:aws:iam::%s:role/example-role", testAccountId); + void test_invokeMethod_throws_when_functionProviders_is_null() { + ruleEvaluator = mock(RuleEvaluator.class); + DynamicConfigTransformer transformer = new DynamicConfigTransformer(ruleEvaluator); + assertThrows(RuntimeException.class, () -> + transformer.invokeMethod(null, "someMethod", String.class, "arg")); + } + + @Test + void test_invokeMethod_throws_when_functionProviders_is_empty() { + ruleEvaluator = mock(RuleEvaluator.class); + DynamicConfigTransformer transformer = new DynamicConfigTransformer(ruleEvaluator); + assertThrows(RuntimeException.class, () -> + transformer.invokeMethod(Collections.emptyList(), "someMethod", String.class, "arg")); + } + + @Test + void test_invokeMethod_throws_when_class_does_not_implement_interface() { + ruleEvaluator = mock(RuleEvaluator.class); + DynamicConfigTransformer transformer = new DynamicConfigTransformer(ruleEvaluator); + List providers = Collections.singletonList("java.lang.String"); + assertThrows(RuntimeException.class, () -> + transformer.invokeMethod(providers, "valueOf", String.class, "test")); + } + + @Test + void test_invokeMethod_throws_when_method_not_annotated() { ruleEvaluator = mock(RuleEvaluator.class); DynamicConfigTransformer transformer = new DynamicConfigTransformer(ruleEvaluator); - assertThat(transformer.getAccountIdFromRole(testRoleArn)).isEqualTo(testAccountId); + List providers = Collections.singletonList( + "org.opensearch.dataprepper.pipeline.parser.transformer.DynamicConfigTransformerTest$ValidProviderNoAnnotation"); + assertThrows(RuntimeException.class, () -> + transformer.invokeMethod(providers, "unannotatedMethod", String.class, "arg")); } - @ParameterizedTest - @MethodSource("providesInvalidRoleArn") - void test_getAccountIdFromRole_returns_null_from_invalid_role_arn(final String testRoleArn) { + @Test + void test_invokeMethod_throws_when_method_not_found_in_any_provider() { ruleEvaluator = mock(RuleEvaluator.class); DynamicConfigTransformer transformer = new DynamicConfigTransformer(ruleEvaluator); - assertThat(transformer.getAccountIdFromRole(testRoleArn)).isNull(); + List providers = Collections.singletonList( + "org.opensearch.dataprepper.pipeline.parser.transformer.DynamicConfigTransformerTest$ValidProviderNoAnnotation"); + assertThrows(Exception.class, () -> + transformer.invokeMethod(providers, "nonExistentMethod", String.class, "arg")); } + @Test void test_successful_transformation_with_only_source_and_sink() throws IOException { String docDBUserConfig = TestConfigurationProvider.USER_CONFIG_TRANSFORMATION_DOCDB1_CONFIG_FILE; @@ -385,13 +414,6 @@ void testInvalidJsonPathThrowsException() throws IOException { assertThrows(RuntimeException.class, () -> transformer.transformConfiguration(pipelinesDataFlowModel)); } - private static Stream providesInvalidRoleArn() { - return Stream.of( - null, - Arguments.of("arn:aws:iam:::role/test-role"), - Arguments.of("invalid-format-arn") - ); - } @Test void test_overlay_directive_merges_into_opensearch_sinks() throws Exception { @@ -414,9 +436,9 @@ void test_overlay_directive_merges_into_opensearch_sinks() throws Exception { DynamicConfigTransformer transformer = new DynamicConfigTransformer(mock(RuleEvaluator.class)); Method method = DynamicConfigTransformer.class.getDeclaredMethod( - "processOverlayDirectives", JsonNode.class, String.class); + "processOverlayDirectives", JsonNode.class, String.class, List.class); method.setAccessible(true); - method.invoke(transformer, root, "{}"); + method.invoke(transformer, root, "{}", Collections.emptyList()); JsonNode resultOs = sinkArray.get(0).get("opensearch"); assertThat(resultOs.get("hosts").asText()).isEqualTo("https://localhost:9200"); @@ -450,9 +472,9 @@ void test_overlay_directive_overrides_existing_fields() throws Exception { DynamicConfigTransformer transformer = new DynamicConfigTransformer(mock(RuleEvaluator.class)); Method method = DynamicConfigTransformer.class.getDeclaredMethod( - "processOverlayDirectives", JsonNode.class, String.class); + "processOverlayDirectives", JsonNode.class, String.class, List.class); method.setAccessible(true); - method.invoke(transformer, root, "{}"); + method.invoke(transformer, root, "{}", Collections.emptyList()); JsonNode resultOs = sinkArray.get(0).get("opensearch"); assertThat(resultOs.get("hosts").asText()).isEqualTo("https://localhost:9200"); @@ -461,22 +483,72 @@ void test_overlay_directive_overrides_existing_fields() throws Exception { assertThat(resultOs.get("script").has("custom_field")).isFalse(); } + private static final String PROVIDER_PKG = "org.opensearch.dataprepper.pipeline.parser.transformer.dataprepper_transformer."; + + // --- invokeMethod: Happy path --- + @Test - void test_calculateDepthForRdsSource_without_source_coordination_identifier() { - String mockPrefix = "my-bucket/path"; - DynamicConfigTransformer transformer = spy(new DynamicConfigTransformer(mock(RuleEvaluator.class))); - doReturn(null).when(transformer).getSourceCoordinationIdentifier(); - String result = transformer.calculateDepthForRdsSource(mockPrefix); - assertThat(result, equalTo("4")); + void test_invokeMethod_succeeds_with_valid_provider_and_annotated_method() throws Exception { + ruleEvaluator = mock(RuleEvaluator.class); + DynamicConfigTransformer transformer = new DynamicConfigTransformer(ruleEvaluator); + List providers = Collections.singletonList(PROVIDER_PKG + "ValidAnnotatedProvider"); + Object result = transformer.invokeMethod(providers, "transformValue", String.class, "hello"); + assertEquals("HELLO", result); } + // --- invokeMethod: Non-existent class name --- + @Test - void test_calculateDepthForRdsSource_with_source_coordination_identifier() { - String mockPrefix = "my-bucket/path"; - DynamicConfigTransformer transformer = spy(new DynamicConfigTransformer(mock(RuleEvaluator.class))); - doReturn("testValue").when(transformer).getSourceCoordinationIdentifier(); - String result = transformer.calculateDepthForRdsSource(mockPrefix); - assertThat(result, equalTo("5")); + void test_invokeMethod_throws_when_class_does_not_exist() { + ruleEvaluator = mock(RuleEvaluator.class); + DynamicConfigTransformer transformer = new DynamicConfigTransformer(ruleEvaluator); + List providers = Collections.singletonList("com.example.does.not.Exist"); + assertThrows(Exception.class, () -> + transformer.invokeMethod(providers, "someMethod", String.class, "arg")); + } + + // --- invokeMethod: Multiple providers, correct resolution --- + + @Test + void test_invokeMethod_resolves_method_from_second_provider_when_first_lacks_it() throws Exception { + ruleEvaluator = mock(RuleEvaluator.class); + DynamicConfigTransformer transformer = new DynamicConfigTransformer(ruleEvaluator); + List providers = Arrays.asList( + PROVIDER_PKG + "ProviderWithoutTargetMethod", + PROVIDER_PKG + "ValidAnnotatedProvider"); + Object result = transformer.invokeMethod(providers, "transformValue", String.class, "world"); + assertEquals("WORLD", result); } + // --- invokeMethod: Non-static annotated method --- + + @Test + void test_invokeMethod_throws_when_annotated_method_is_non_static() { + ruleEvaluator = mock(RuleEvaluator.class); + DynamicConfigTransformer transformer = new DynamicConfigTransformer(ruleEvaluator); + List providers = Collections.singletonList(PROVIDER_PKG + "ProviderWithNonStaticMethod"); + assertThrows(Exception.class, () -> + transformer.invokeMethod(providers, "instanceMethod", String.class, "arg")); + } + + // --- invokeMethod: Static initializer safety --- + + @Test + void test_invokeMethod_throws_for_non_provider_before_running_methods() { + ruleEvaluator = mock(RuleEvaluator.class); + DynamicConfigTransformer transformer = new DynamicConfigTransformer(ruleEvaluator); + List providers = Collections.singletonList(PROVIDER_PKG + "NonProviderWithStaticInit"); + RuntimeException exception = assertThrows(RuntimeException.class, () -> + transformer.invokeMethod(providers, "getValue", String.class, "arg")); + assertTrue(exception.getMessage().contains("does not implement PipelineTransformFunctionProvider")); + } + + // --- Inner test helper class for package-check-independent tests --- + + public static class ValidProviderNoAnnotation implements org.opensearch.dataprepper.model.plugin.PipelineTransformFunctionProvider { + public static String unannotatedMethod(String input) { + return input; + } + } } + diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/dataprepper_transformer/NonProviderWithStaticInit.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/dataprepper_transformer/NonProviderWithStaticInit.java new file mode 100644 index 0000000000..b7ad75a1fb --- /dev/null +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/dataprepper_transformer/NonProviderWithStaticInit.java @@ -0,0 +1,26 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.dataprepper.pipeline.parser.transformer.dataprepper_transformer; + +import org.opensearch.dataprepper.model.annotations.TransformationFunction; + +/** + * Test class that does NOT implement PipelineTransformFunctionProvider. + * Has a static initializer to verify it doesn't run if interface check rejects it. + */ +public class NonProviderWithStaticInit { + static { + System.setProperty("test.static.init.ran", "true"); + } + + @TransformationFunction + public static String getValue(String input) { + return input; + } +} diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/dataprepper_transformer/ProviderWithNonStaticMethod.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/dataprepper_transformer/ProviderWithNonStaticMethod.java new file mode 100644 index 0000000000..955a768b99 --- /dev/null +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/dataprepper_transformer/ProviderWithNonStaticMethod.java @@ -0,0 +1,19 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.dataprepper.pipeline.parser.transformer.dataprepper_transformer; + +import org.opensearch.dataprepper.model.annotations.TransformationFunction; +import org.opensearch.dataprepper.model.plugin.PipelineTransformFunctionProvider; + +public class ProviderWithNonStaticMethod implements PipelineTransformFunctionProvider { + @TransformationFunction + public String instanceMethod(String input) { + return input; + } +} diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/dataprepper_transformer/ProviderWithoutTargetMethod.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/dataprepper_transformer/ProviderWithoutTargetMethod.java new file mode 100644 index 0000000000..4a445f95cb --- /dev/null +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/dataprepper_transformer/ProviderWithoutTargetMethod.java @@ -0,0 +1,19 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.dataprepper.pipeline.parser.transformer.dataprepper_transformer; + +import org.opensearch.dataprepper.model.annotations.TransformationFunction; +import org.opensearch.dataprepper.model.plugin.PipelineTransformFunctionProvider; + +public class ProviderWithoutTargetMethod implements PipelineTransformFunctionProvider { + @TransformationFunction + public static String otherMethod(String input) { + return input; + } +} diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/dataprepper_transformer/TestTransformFunctionProvider.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/dataprepper_transformer/TestTransformFunctionProvider.java new file mode 100644 index 0000000000..52f989dc49 --- /dev/null +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/dataprepper_transformer/TestTransformFunctionProvider.java @@ -0,0 +1,75 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.pipeline.parser.transformer.dataprepper_transformer; + +import org.opensearch.dataprepper.model.annotations.TransformationFunction; +import org.opensearch.dataprepper.model.plugin.PipelineTransformFunctionProvider; + +/** + * Test-only function provider for pipeline transformation tests. + * Replicates the logic from PipelineTransformFunctions in aws-plugin. + */ +public class TestTransformFunctionProvider implements PipelineTransformFunctionProvider { + + private static final String SOURCE_COORDINATION_IDENTIFIER_ENVIRONMENT_VARIABLE = "SOURCE_COORDINATION_PIPELINE_IDENTIFIER"; + + @TransformationFunction + public static String calculateDepth(String s3Prefix) { + return Integer.toString(getDepth(s3Prefix, 4)); + } + + @TransformationFunction + public static String calculateDepthForRdsSource(String s3Prefix) { + String envSourceCoordinationIdentifier = System.getenv(SOURCE_COORDINATION_IDENTIFIER_ENVIRONMENT_VARIABLE); + int baseDepth = envSourceCoordinationIdentifier != null ? 3 : 2; + return Integer.toString(getDepth(s3Prefix, baseDepth)); + } + + @TransformationFunction + public static String getSourceCoordinationIdentifierEnvVariable(String s3Prefix) { + String envSourceCoordinationIdentifier = System.getenv(SOURCE_COORDINATION_IDENTIFIER_ENVIRONMENT_VARIABLE); + if (s3Prefix == null) { + return envSourceCoordinationIdentifier; + } + return s3Prefix + "/" + envSourceCoordinationIdentifier; + } + + @TransformationFunction + public static String getAccountIdFromRole(final String roleArn) { + if (roleArn == null) { + return null; + } + try { + return software.amazon.awssdk.arns.Arn.fromString(roleArn).accountId().orElse(null); + } catch (Exception e) { + return null; + } + } + + @TransformationFunction + public static String getIncludePrefixForRdsSource(String s3Prefix) { + final String envSourceCoordinationIdentifier = System.getenv(SOURCE_COORDINATION_IDENTIFIER_ENVIRONMENT_VARIABLE); + if (s3Prefix == null && envSourceCoordinationIdentifier == null) { + return "/buffer"; + } else if (s3Prefix == null) { + return envSourceCoordinationIdentifier + "/buffer"; + } else if (envSourceCoordinationIdentifier == null) { + return s3Prefix + "/buffer"; + } + return s3Prefix + "/" + envSourceCoordinationIdentifier + "/buffer"; + } + + private static int getDepth(String s3Prefix, int baseDepth) { + if (s3Prefix == null) { + return baseDepth; + } + return s3Prefix.split("/").length + baseDepth; + } +} diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/dataprepper_transformer/ValidAnnotatedProvider.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/dataprepper_transformer/ValidAnnotatedProvider.java new file mode 100644 index 0000000000..73b99ba6aa --- /dev/null +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/dataprepper_transformer/ValidAnnotatedProvider.java @@ -0,0 +1,19 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.dataprepper.pipeline.parser.transformer.dataprepper_transformer; + +import org.opensearch.dataprepper.model.annotations.TransformationFunction; +import org.opensearch.dataprepper.model.plugin.PipelineTransformFunctionProvider; + +public class ValidAnnotatedProvider implements PipelineTransformFunctionProvider { + @TransformationFunction + public static String transformValue(String input) { + return input.toUpperCase(); + } +} diff --git a/data-prepper-pipeline-parser/src/test/resources/transformation/rules/documentdb-rule.yaml b/data-prepper-pipeline-parser/src/test/resources/transformation/rules/documentdb-rule.yaml index b120d1531c..bc13226783 100644 --- a/data-prepper-pipeline-parser/src/test/resources/transformation/rules/documentdb-rule.yaml +++ b/data-prepper-pipeline-parser/src/test/resources/transformation/rules/documentdb-rule.yaml @@ -2,3 +2,5 @@ plugin_name: "documentdb" apply_when: - "$..source.documentdb" - "$..source.documentdb.s3_bucket" +function_providers: + - "org.opensearch.dataprepper.pipeline.parser.transformer.dataprepper_transformer.TestTransformFunctionProvider" diff --git a/data-prepper-pipeline-parser/src/test/resources/transformation/rules/documentdb1-rule.yaml b/data-prepper-pipeline-parser/src/test/resources/transformation/rules/documentdb1-rule.yaml index ed3c4b8b57..2cd7654d6a 100644 --- a/data-prepper-pipeline-parser/src/test/resources/transformation/rules/documentdb1-rule.yaml +++ b/data-prepper-pipeline-parser/src/test/resources/transformation/rules/documentdb1-rule.yaml @@ -1,3 +1,5 @@ plugin_name: "documentdb" apply_when: - - "$..source.documentdb" \ No newline at end of file + - "$..source.documentdb" +function_providers: + - "org.opensearch.dataprepper.pipeline.parser.transformer.dataprepper_transformer.TestTransformFunctionProvider" diff --git a/data-prepper-pipeline-parser/src/test/resources/transformation/rules/rds-joins-rule.yaml b/data-prepper-pipeline-parser/src/test/resources/transformation/rules/rds-joins-rule.yaml index 6dc367e378..212a8b3bc9 100644 --- a/data-prepper-pipeline-parser/src/test/resources/transformation/rules/rds-joins-rule.yaml +++ b/data-prepper-pipeline-parser/src/test/resources/transformation/rules/rds-joins-rule.yaml @@ -11,3 +11,5 @@ plugin_name: "rds-joins" apply_when: - "$..source.rds" - "$..source.rds.joins" +function_providers: + - "org.opensearch.dataprepper.pipeline.parser.transformer.dataprepper_transformer.TestTransformFunctionProvider" diff --git a/data-prepper-pipeline-parser/src/test/resources/transformation/rules/rds-rule.yaml b/data-prepper-pipeline-parser/src/test/resources/transformation/rules/rds-rule.yaml index 85cd3798f9..8d8fd8f547 100644 --- a/data-prepper-pipeline-parser/src/test/resources/transformation/rules/rds-rule.yaml +++ b/data-prepper-pipeline-parser/src/test/resources/transformation/rules/rds-rule.yaml @@ -10,3 +10,5 @@ plugin_name: "rds" apply_when: - "$..source.rds" +function_providers: + - "org.opensearch.dataprepper.pipeline.parser.transformer.dataprepper_transformer.TestTransformFunctionProvider" diff --git a/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/dataprepper_transformer/PipelineTransformFunctions.java b/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/dataprepper_transformer/PipelineTransformFunctions.java new file mode 100644 index 0000000000..dac623ba8c --- /dev/null +++ b/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/dataprepper_transformer/PipelineTransformFunctions.java @@ -0,0 +1,116 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins.aws.dataprepper_transformer; + +import org.opensearch.dataprepper.model.annotations.SkipTestCoverageGenerated; +import org.opensearch.dataprepper.model.annotations.TransformationFunction; +import org.opensearch.dataprepper.model.plugin.PipelineTransformFunctionProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.arns.Arn; + +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Base64; +import java.util.function.Supplier; + +/** + * Provides static utility methods for pipeline template transformations. + * These methods are invoked dynamically via the FUNCTION_NAME placeholder mechanism + * in template YAML files. + */ +public class PipelineTransformFunctions implements PipelineTransformFunctionProvider { + + private static final Logger LOG = LoggerFactory.getLogger(PipelineTransformFunctions.class); + + private static final String SOURCE_COORDINATION_IDENTIFIER_ENVIRONMENT_VARIABLE = "SOURCE_COORDINATION_PIPELINE_IDENTIFIER"; + private static final String S3_BUFFER_PREFIX = "/buffer"; + private static final int MAX_SOURCE_IDENTIFIER_LENGTH = 15; + + static Supplier sourceCoordinationIdentifierSupplier = + () -> System.getenv(SOURCE_COORDINATION_IDENTIFIER_ENVIRONMENT_VARIABLE); + + @SkipTestCoverageGenerated + private PipelineTransformFunctions() { + } + + @TransformationFunction + public static String calculateDepth(String s3Prefix) { + return Integer.toString(getDepth(s3Prefix, 4)); + } + + @TransformationFunction + public static String calculateDepthForRdsSource(String s3Prefix) { + String envSourceCoordinationIdentifier = sourceCoordinationIdentifierSupplier.get(); + int baseDepth = envSourceCoordinationIdentifier != null ? 3 : 2; + return Integer.toString(getDepth(s3Prefix, baseDepth)); + } + + @TransformationFunction + public static String getSourceCoordinationIdentifierEnvVariable(String s3Prefix) { + String envSourceCoordinationIdentifier = sourceCoordinationIdentifierSupplier.get(); + if (s3Prefix == null) { + return envSourceCoordinationIdentifier; + } + return s3Prefix + "/" + envSourceCoordinationIdentifier; + } + + @TransformationFunction + public static String getIncludePrefixForRdsSource(String s3Prefix) { + final String envSourceCoordinationIdentifier = sourceCoordinationIdentifierSupplier.get(); + final String shortenedSourceIdentifier = envSourceCoordinationIdentifier != null ? + shortenIdentifier(envSourceCoordinationIdentifier, MAX_SOURCE_IDENTIFIER_LENGTH) : null; + if (s3Prefix == null && envSourceCoordinationIdentifier == null) { + return S3_BUFFER_PREFIX; + } else if (s3Prefix == null) { + return shortenedSourceIdentifier + S3_BUFFER_PREFIX; + } else if (envSourceCoordinationIdentifier == null) { + return s3Prefix + S3_BUFFER_PREFIX; + } + return s3Prefix + "/" + shortenedSourceIdentifier + S3_BUFFER_PREFIX; + } + + @TransformationFunction + public static String getAccountIdFromRole(final String roleArn) { + if (roleArn == null) { + return null; + } + try { + return Arn.fromString(roleArn).accountId().orElse(null); + } catch (Exception e) { + LOG.warn("Malformatted role ARN for dynamic transformation: {}", roleArn); + return null; + } + } + + private static int getDepth(String s3Prefix, int baseDepth) { + if (s3Prefix == null) { + return baseDepth; + } + return s3Prefix.split("/").length + baseDepth; + } + + @SkipTestCoverageGenerated + static String shortenIdentifier(final String identifier, final int maxLength) { + if (identifier.length() <= maxLength) { + return identifier; + } + + try { + MessageDigest digest = MessageDigest.getInstance("SHA-256"); + byte[] encodedHash = digest.digest(identifier.getBytes(StandardCharsets.UTF_8)); + String base64Hash = Base64.getUrlEncoder().withoutPadding().encodeToString(encodedHash); + return base64Hash.substring(0, Math.min(base64Hash.length(), maxLength)); + } catch (final NoSuchAlgorithmException e) { + return identifier.substring(0, maxLength); + } + } +} diff --git a/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/dataprepper_transformer/PipelineTransformFunctionsTest.java b/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/dataprepper_transformer/PipelineTransformFunctionsTest.java new file mode 100644 index 0000000000..1f1936b9c3 --- /dev/null +++ b/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/dataprepper_transformer/PipelineTransformFunctionsTest.java @@ -0,0 +1,215 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins.aws.dataprepper_transformer; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.opensearch.dataprepper.model.annotations.TransformationFunction; +import org.opensearch.dataprepper.model.plugin.PipelineTransformFunctionProvider; + +import java.lang.reflect.Method; +import java.util.function.Supplier; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class PipelineTransformFunctionsTest { + + private Supplier originalSupplier; + + @BeforeEach + void setUp() { + originalSupplier = PipelineTransformFunctions.sourceCoordinationIdentifierSupplier; + } + + @AfterEach + void tearDown() { + PipelineTransformFunctions.sourceCoordinationIdentifierSupplier = originalSupplier; + } + + @Test + void class_implements_PipelineTransformFunctionProvider() { + assertTrue(PipelineTransformFunctionProvider.class.isAssignableFrom(PipelineTransformFunctions.class)); + } + + @Test + void all_public_transformation_methods_are_annotated() throws Exception { + String[] methodNames = { + "calculateDepth", + "calculateDepthForRdsSource", + "getSourceCoordinationIdentifierEnvVariable", + "getIncludePrefixForRdsSource", + "getAccountIdFromRole" + }; + + for (String methodName : methodNames) { + Method method = PipelineTransformFunctions.class.getMethod(methodName, String.class); + assertTrue(method.isAnnotationPresent(TransformationFunction.class), + "Method " + methodName + " should be annotated with @TransformationFunction"); + } + } + + @Test + void default_sourceCoordinationIdentifierSupplier_reads_env_variable() { + // Exercises the default lambda on line 39: () -> System.getenv(...) + // env var is not set in test, so returns null — but the lambda bytecode is covered + String result = originalSupplier.get(); + assertNull(result); + } + + + // --- calculateDepth --- + + @Test + void calculateDepth_returns_4_when_prefix_is_null() { + assertEquals("4", PipelineTransformFunctions.calculateDepth(null)); + } + + @Test + void calculateDepth_returns_correct_depth_with_single_segment_prefix() { + assertEquals("5", PipelineTransformFunctions.calculateDepth("myprefix")); + } + + @Test + void calculateDepth_returns_correct_depth_with_multi_segment_prefix() { + assertEquals("6", PipelineTransformFunctions.calculateDepth("prefix/subfolder")); + } + + // --- calculateDepthForRdsSource --- + + @Test + void calculateDepthForRdsSource_returns_2_when_prefix_is_null_and_no_env_var() { + PipelineTransformFunctions.sourceCoordinationIdentifierSupplier = () -> null; + assertEquals("2", PipelineTransformFunctions.calculateDepthForRdsSource(null)); + } + + @Test + void calculateDepthForRdsSource_returns_3_when_prefix_is_null_and_env_var_set() { + PipelineTransformFunctions.sourceCoordinationIdentifierSupplier = () -> "my-identifier"; + assertEquals("3", PipelineTransformFunctions.calculateDepthForRdsSource(null)); + } + + @Test + void calculateDepthForRdsSource_adds_prefix_segments_with_env_var() { + PipelineTransformFunctions.sourceCoordinationIdentifierSupplier = () -> "my-identifier"; + assertEquals("5", PipelineTransformFunctions.calculateDepthForRdsSource("a/b")); + } + + @Test + void calculateDepthForRdsSource_adds_prefix_segments_without_env_var() { + PipelineTransformFunctions.sourceCoordinationIdentifierSupplier = () -> null; + assertEquals("4", PipelineTransformFunctions.calculateDepthForRdsSource("a/b")); + } + + // --- getSourceCoordinationIdentifierEnvVariable --- + + @Test + void getSourceCoordinationIdentifierEnvVariable_returns_env_var_when_prefix_null() { + PipelineTransformFunctions.sourceCoordinationIdentifierSupplier = () -> "test-id"; + assertEquals("test-id", PipelineTransformFunctions.getSourceCoordinationIdentifierEnvVariable(null)); + } + + @Test + void getSourceCoordinationIdentifierEnvVariable_returns_null_when_env_not_set_and_prefix_null() { + PipelineTransformFunctions.sourceCoordinationIdentifierSupplier = () -> null; + assertNull(PipelineTransformFunctions.getSourceCoordinationIdentifierEnvVariable(null)); + } + + @Test + void getSourceCoordinationIdentifierEnvVariable_prepends_prefix_to_env_var() { + PipelineTransformFunctions.sourceCoordinationIdentifierSupplier = () -> "test-id"; + assertEquals("myprefix/test-id", + PipelineTransformFunctions.getSourceCoordinationIdentifierEnvVariable("myprefix")); + } + + // --- getIncludePrefixForRdsSource --- + + @Test + void getIncludePrefixForRdsSource_returns_buffer_prefix_when_all_null() { + PipelineTransformFunctions.sourceCoordinationIdentifierSupplier = () -> null; + assertEquals("/buffer", PipelineTransformFunctions.getIncludePrefixForRdsSource(null)); + } + + @Test + void getIncludePrefixForRdsSource_returns_shortened_id_plus_buffer_when_prefix_null_and_env_set() { + PipelineTransformFunctions.sourceCoordinationIdentifierSupplier = () -> "short"; + assertEquals("short/buffer", PipelineTransformFunctions.getIncludePrefixForRdsSource(null)); + } + + @Test + void getIncludePrefixForRdsSource_returns_prefix_plus_buffer_when_env_null() { + PipelineTransformFunctions.sourceCoordinationIdentifierSupplier = () -> null; + assertEquals("myprefix/buffer", PipelineTransformFunctions.getIncludePrefixForRdsSource("myprefix")); + } + + @Test + void getIncludePrefixForRdsSource_returns_full_path_when_both_set() { + PipelineTransformFunctions.sourceCoordinationIdentifierSupplier = () -> "short"; + assertEquals("myprefix/short/buffer", + PipelineTransformFunctions.getIncludePrefixForRdsSource("myprefix")); + } + + @Test + void getIncludePrefixForRdsSource_shortens_long_identifier() { + PipelineTransformFunctions.sourceCoordinationIdentifierSupplier = () -> "this-is-a-very-long-identifier-exceeding-max"; + String result = PipelineTransformFunctions.getIncludePrefixForRdsSource(null); + assertNotNull(result); + assertTrue(result.endsWith("/buffer")); + // shortened id is 15 chars + "/buffer" = 22 chars total + assertEquals(22, result.length()); + } + + // --- getAccountIdFromRole --- + + @Test + void getAccountIdFromRole_returns_account_id_from_valid_arn() { + assertEquals("123456789012", + PipelineTransformFunctions.getAccountIdFromRole("arn:aws:iam::123456789012:role/MyRole")); + } + + @Test + void getAccountIdFromRole_returns_null_when_arn_is_null() { + assertNull(PipelineTransformFunctions.getAccountIdFromRole(null)); + } + + @ParameterizedTest + @ValueSource(strings = {"", "not-an-arn", "arn:aws:iam:::role/test-role"}) + void getAccountIdFromRole_returns_null_for_invalid_arns(String invalidArn) { + assertNull(PipelineTransformFunctions.getAccountIdFromRole(invalidArn)); + } + + // --- shortenIdentifier --- + + @Test + void shortenIdentifier_returns_original_when_within_limit() { + assertEquals("short", PipelineTransformFunctions.shortenIdentifier("short", 15)); + } + + @Test + void shortenIdentifier_returns_shortened_hash_when_exceeds_limit() { + String longId = "this-is-a-very-long-identifier-that-exceeds-limit"; + String result = PipelineTransformFunctions.shortenIdentifier(longId, 15); + assertNotNull(result); + assertEquals(15, result.length()); + } + + @Test + void shortenIdentifier_returns_consistent_results() { + String id = "consistent-test-identifier"; + String result1 = PipelineTransformFunctions.shortenIdentifier(id, 10); + String result2 = PipelineTransformFunctions.shortenIdentifier(id, 10); + assertEquals(result1, result2); + } +} diff --git a/data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/rules/documentdb-rule.yaml b/data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/rules/documentdb-rule.yaml index 60aa428d8a..22e814cac9 100644 --- a/data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/rules/documentdb-rule.yaml +++ b/data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/rules/documentdb-rule.yaml @@ -1,4 +1,6 @@ plugin_name: "documentdb" apply_when: - "$..source.documentdb" - - "$..source.documentdb.s3_bucket" \ No newline at end of file + - "$..source.documentdb.s3_bucket" +function_providers: + - "org.opensearch.dataprepper.plugins.aws.dataprepper_transformer.PipelineTransformFunctions" diff --git a/data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/rules/mongodb-rule.yaml b/data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/rules/mongodb-rule.yaml index 33cc703072..07c35c3cd7 100644 --- a/data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/rules/mongodb-rule.yaml +++ b/data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/rules/mongodb-rule.yaml @@ -1,4 +1,6 @@ plugin_name: "mongodb" apply_when: - "$..source.mongodb" - - "$..source.mongodb.s3_bucket" \ No newline at end of file + - "$..source.mongodb.s3_bucket" +function_providers: + - "org.opensearch.dataprepper.plugins.aws.dataprepper_transformer.PipelineTransformFunctions" diff --git a/data-prepper-plugins/rds-source/src/main/resources/org/opensearch/dataprepper/transforms/rules/rds-joins-rule.yaml b/data-prepper-plugins/rds-source/src/main/resources/org/opensearch/dataprepper/transforms/rules/rds-joins-rule.yaml index 6dc367e378..2e65f1481e 100644 --- a/data-prepper-plugins/rds-source/src/main/resources/org/opensearch/dataprepper/transforms/rules/rds-joins-rule.yaml +++ b/data-prepper-plugins/rds-source/src/main/resources/org/opensearch/dataprepper/transforms/rules/rds-joins-rule.yaml @@ -1,4 +1,3 @@ -# # Copyright OpenSearch Contributors # SPDX-License-Identifier: Apache-2.0 # @@ -11,3 +10,5 @@ plugin_name: "rds-joins" apply_when: - "$..source.rds" - "$..source.rds.joins" +function_providers: + - "org.opensearch.dataprepper.plugins.aws.dataprepper_transformer.PipelineTransformFunctions" diff --git a/data-prepper-plugins/rds-source/src/main/resources/org/opensearch/dataprepper/transforms/rules/rds-rule.yaml b/data-prepper-plugins/rds-source/src/main/resources/org/opensearch/dataprepper/transforms/rules/rds-rule.yaml index f13f6b6fc4..668fc14e7a 100644 --- a/data-prepper-plugins/rds-source/src/main/resources/org/opensearch/dataprepper/transforms/rules/rds-rule.yaml +++ b/data-prepper-plugins/rds-source/src/main/resources/org/opensearch/dataprepper/transforms/rules/rds-rule.yaml @@ -1,3 +1,5 @@ plugin_name: "rds" apply_when: - - "$..source.rds" \ No newline at end of file + - "$..source.rds" +function_providers: + - "org.opensearch.dataprepper.plugins.aws.dataprepper_transformer.PipelineTransformFunctions"