diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/annotations/TransformationFunction.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/annotations/TransformationFunction.java
new file mode 100644
index 0000000000..e8b4617bf1
--- /dev/null
+++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/annotations/TransformationFunction.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.dataprepper.model.annotations;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Marks a method as a pipeline transformation function that can be invoked
+ * dynamically from template YAML files via the {@code FUNCTION_NAME} placeholder.
+ *
+ * Annotated methods must be {@code public static} and accept a single {@code String}
+ * parameter, returning a {@code String} result.
+ *
+ * The enclosing class must implement
+ * {@link org.opensearch.dataprepper.model.plugin.PipelineTransformFunctionProvider}.
+ *
+ * @since 2.12
+ */
+@Documented
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.METHOD})
+public @interface TransformationFunction {
+}
diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/plugin/PipelineTransformFunctionProvider.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/plugin/PipelineTransformFunctionProvider.java
new file mode 100644
index 0000000000..a003f0ca28
--- /dev/null
+++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/plugin/PipelineTransformFunctionProvider.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.dataprepper.model.plugin;
+
+/**
+ * Marker interface for classes that provide pipeline transformation functions.
+ * Classes implementing this interface can be referenced in rule YAML files via
+ * the {@code function_providers} field and have their methods invoked dynamically
+ * during pipeline template transformation.
+ *
+ * Methods intended to be callable from templates must also be annotated with
+ * {@link org.opensearch.dataprepper.model.annotations.TransformationFunction}.
+ *
+ * @since 2.12
+ */
+public interface PipelineTransformFunctionProvider {
+}
diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/annotations/TransformationFunctionTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/annotations/TransformationFunctionTest.java
new file mode 100644
index 0000000000..1c1f82d9ab
--- /dev/null
+++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/annotations/TransformationFunctionTest.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.dataprepper.model.annotations;
+
+import org.junit.jupiter.api.Test;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class TransformationFunctionTest {
+
+ @Test
+ void annotation_is_retained_at_runtime() {
+ Retention retention = TransformationFunction.class.getAnnotation(Retention.class);
+ assertNotNull(retention);
+ assertThat(retention.value(), equalTo(RetentionPolicy.RUNTIME));
+ }
+
+ @Test
+ void annotation_targets_methods() {
+ Target target = TransformationFunction.class.getAnnotation(Target.class);
+ assertNotNull(target);
+ assertThat(target.value().length, equalTo(1));
+ assertThat(target.value()[0], equalTo(ElementType.METHOD));
+ }
+
+ @Test
+ void annotation_is_documented() {
+ Documented documented = TransformationFunction.class.getAnnotation(Documented.class);
+ assertNotNull(documented);
+ }
+
+ @Test
+ void annotation_is_present_on_annotated_method() throws NoSuchMethodException {
+ assertTrue(AnnotatedClass.class.getMethod("annotatedMethod", String.class)
+ .isAnnotationPresent(TransformationFunction.class));
+ }
+
+ @Test
+ void annotation_is_not_present_on_unannotated_method() throws NoSuchMethodException {
+ assertTrue(!AnnotatedClass.class.getMethod("unannotatedMethod", String.class)
+ .isAnnotationPresent(TransformationFunction.class));
+ }
+
+ static class AnnotatedClass {
+ @TransformationFunction
+ public static String annotatedMethod(String input) {
+ return input;
+ }
+
+ public static String unannotatedMethod(String input) {
+ return input;
+ }
+ }
+}
diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/plugin/PipelineTransformFunctionProviderTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/plugin/PipelineTransformFunctionProviderTest.java
new file mode 100644
index 0000000000..c081cd61c3
--- /dev/null
+++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/plugin/PipelineTransformFunctionProviderTest.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.dataprepper.model.plugin;
+
+import org.junit.jupiter.api.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class PipelineTransformFunctionProviderTest {
+
+ @Test
+ void implementing_class_is_assignable_from_interface() {
+ PipelineTransformFunctionProvider provider = new TestFunctionProvider();
+ assertNotNull(provider);
+ assertTrue(provider instanceof PipelineTransformFunctionProvider);
+ }
+
+ @Test
+ void interface_is_assignable_from_implementing_class() {
+ assertThat(PipelineTransformFunctionProvider.class.isAssignableFrom(TestFunctionProvider.class),
+ equalTo(true));
+ }
+
+ @Test
+ void interface_is_not_assignable_from_non_implementing_class() {
+ assertThat(PipelineTransformFunctionProvider.class.isAssignableFrom(NonProvider.class),
+ equalTo(false));
+ }
+
+ @Test
+ void interface_has_no_declared_methods() {
+ assertThat(PipelineTransformFunctionProvider.class.getDeclaredMethods().length, equalTo(0));
+ }
+
+ @Test
+ void interface_is_public() {
+ assertTrue(java.lang.reflect.Modifier.isPublic(PipelineTransformFunctionProvider.class.getModifiers()));
+ }
+
+ @Test
+ void interface_is_an_interface() {
+ assertTrue(PipelineTransformFunctionProvider.class.isInterface());
+ }
+
+ static class TestFunctionProvider implements PipelineTransformFunctionProvider {
+ public static String sampleFunction(String input) {
+ return input;
+ }
+ }
+
+ static class NonProvider {
+ public static String someMethod(String input) {
+ return input;
+ }
+ }
+}
diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluator.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluator.java
index 1b05ea5b79..7c3a6b9100 100644
--- a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluator.java
+++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluator.java
@@ -1,7 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
*/
+
package org.opensearch.dataprepper.pipeline.parser.rule;
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -62,6 +67,7 @@ public RuleEvaluatorResult isTransformationNeeded(PipelinesDataFlowModel pipelin
return RuleEvaluatorResult.builder()
.withEvaluatedResult(true)
.withPipelineTemplateModel(templateModel)
+ .withFunctionProviders(ruleFileEvaluation.getFunctionProviders())
.withPipelineName(entry.getKey())
.build();
}
@@ -76,6 +82,7 @@ public RuleEvaluatorResult isTransformationNeeded(PipelinesDataFlowModel pipelin
return RuleEvaluatorResult.builder()
.withEvaluatedResult(false)
.withPipelineName(null)
+ .withFunctionProviders(null)
.withPipelineTemplateModel(null)
.build();
}
@@ -131,6 +138,7 @@ private RuleFileEvaluation evaluate(String pipelinesJson) {
return RuleFileEvaluation.builder()
.withPluginName(pluginName)
.withRuleFileName(parsedRule.fileName)
+ .withFunctionProviders(rulesModel.getFunctionProviders())
.withResult(true)
.build();
}
@@ -163,4 +171,4 @@ private static class ParsedRule {
this.fileName = fileName;
}
}
-}
\ No newline at end of file
+}
diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorResult.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorResult.java
index e89768fee6..e9ffb837db 100644
--- a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorResult.java
+++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorResult.java
@@ -1,7 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
*/
+
package org.opensearch.dataprepper.pipeline.parser.rule;
import lombok.AllArgsConstructor;
@@ -9,6 +14,8 @@
import lombok.Getter;
import org.opensearch.dataprepper.pipeline.parser.transformer.PipelineTemplateModel;
+import java.util.List;
+
@Builder(setterPrefix = "with")
@Getter
@AllArgsConstructor
@@ -20,6 +27,8 @@ public class RuleEvaluatorResult {
private PipelineTemplateModel pipelineTemplateModel;
+ private List functionProviders;
+
public RuleEvaluatorResult() {
}
diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleFileEvaluation.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleFileEvaluation.java
index 88ee0f8d6c..d89c871cc4 100644
--- a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleFileEvaluation.java
+++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleFileEvaluation.java
@@ -4,6 +4,8 @@
import lombok.Builder;
import lombok.Data;
+import java.util.List;
+
@Builder(setterPrefix = "with")
@AllArgsConstructor
@Data
@@ -11,6 +13,7 @@ public class RuleFileEvaluation {
private Boolean result;
private String ruleFileName;
private String pluginName;
+ private List functionProviders;
public RuleFileEvaluation() {
diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleTransformerModel.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleTransformerModel.java
index aaf757cedb..b9677e4f72 100644
--- a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleTransformerModel.java
+++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleTransformerModel.java
@@ -1,7 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
*/
+
package org.opensearch.dataprepper.pipeline.parser.rule;
import com.fasterxml.jackson.annotation.JsonInclude;
@@ -22,6 +27,9 @@ public class RuleTransformerModel {
@JsonProperty("plugin_name")
private String pluginName;
+ @JsonProperty("function_providers")
+ private List functionProviders;
+
public RuleTransformerModel() {
}
@@ -29,6 +37,7 @@ public RuleTransformerModel() {
public String toString() {
return "RuleConfiguration{" +
"applyWhen=" + applyWhen +
- "\npluginName="+ pluginName +'}';
+ "\nfunctionProviders=" + functionProviders +
+ "\npluginName=" + pluginName + '}';
}
-}
\ No newline at end of file
+}
diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformer.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformer.java
index 23c89a7359..321b2f7290 100644
--- a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformer.java
+++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformer.java
@@ -1,7 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
*/
+
package org.opensearch.dataprepper.pipeline.parser.transformer;
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -17,7 +22,6 @@
import com.jayway.jsonpath.spi.json.JacksonJsonNodeJsonProvider;
import com.jayway.jsonpath.spi.mapper.JacksonMappingProvider;
import static java.lang.String.format;
-import static org.opensearch.dataprepper.plugins.source.rds.RdsService.MAX_SOURCE_IDENTIFIER_LENGTH;
import org.opensearch.dataprepper.model.configuration.PipelineModel;
import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel;
@@ -26,9 +30,9 @@
import org.opensearch.dataprepper.pipeline.parser.rule.RuleEvaluatorResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.opensearch.dataprepper.plugins.source.rds.utils.IdentifierShortener;
-import software.amazon.awssdk.arns.Arn;
+import org.opensearch.dataprepper.model.annotations.TransformationFunction;
+import org.opensearch.dataprepper.model.plugin.PipelineTransformFunctionProvider;
import javax.xml.transform.TransformerException;
import java.io.IOException;
import java.lang.reflect.Method;
@@ -43,6 +47,7 @@
public class DynamicConfigTransformer implements PipelineConfigurationTransformer {
private static final Logger LOG = LoggerFactory.getLogger(DynamicConfigTransformer.class);
+
private final ObjectMapper objectMapper = new ObjectMapper();
private final RuleEvaluator ruleEvaluator;
@@ -76,17 +81,17 @@ public class DynamicConfigTransformer implements PipelineConfigurationTransforme
private static final String RECURSIVE_JSON_PATH_PATH = "$..";
private static final String JSON_PATH_IDENTIFIER = "$.";
private static final String ARRAY_NODE_PATTERN = "([^\\[]+)\\[(\\d+)\\]$";
- private static final String SOURCE_COORDINATION_IDENTIFIER_ENVIRONMENT_VARIABLE = "SOURCE_COORDINATION_PIPELINE_IDENTIFIER";
private static final String SINK_SUBPIPELINE_PLUGIN_NAME = "pipeline";
private static final String SUBPIPELINE_PATH = "$.source.pipeline";
- private static final String S3_BUFFER_PREFIX = "/buffer";
-
/**
* Pattern to match overlay directives like "<>"
* The captured group is the target path (e.g., "sink[*].opensearch")
*/
private static final Pattern OVERLAY_PATTERN = Pattern.compile("^<>$");
+ private static final String REQUIRED_PACKAGE_PREFIX = "org.opensearch.dataprepper";
+ private static final String REQUIRED_PACKAGE_SEGMENT = "dataprepper_transformer";
+
Configuration parseConfigWithJsonNode = Configuration.builder()
.jsonProvider(new JacksonJsonNodeJsonProvider())
@@ -126,6 +131,7 @@ public PipelinesDataFlowModel transformConfiguration(PipelinesDataFlowModel preT
//To differentiate between sub-pipelines that dont need transformation.
String pipelineNameThatNeedsTransformation = ruleEvaluatorResult.getPipelineName();
PipelineTemplateModel templateModel = ruleEvaluatorResult.getPipelineTemplateModel();
+ List functionProviders = ruleEvaluatorResult.getFunctionProviders();
LOG.info("Transforming pipeline config for pipeline {}",pipelineNameThatNeedsTransformation);
try {
@@ -149,7 +155,7 @@ public PipelinesDataFlowModel transformConfiguration(PipelinesDataFlowModel preT
JsonNode templateRootNode = objectMapper.readTree(templateJsonString);
// get exact path in pipelineJson
- Map pipelineExactPathMap = findExactPath(placeholdersMap, pipelineJson);
+ Map pipelineExactPathMap = findExactPath(placeholdersMap, pipelineJson, functionProviders);
//replace placeholder with actual value in the template context
placeholdersMap.forEach((placeholder, templateJsonPathList) -> {
@@ -173,7 +179,7 @@ public PipelinesDataFlowModel transformConfiguration(PipelinesDataFlowModel preT
});
// Process <> directives — merge overlay fields into resolved config
- processOverlayDirectives(templateRootNode, pipelineJson);
+ processOverlayDirectives(templateRootNode, pipelineJson, functionProviders);
PipelinesDataFlowModel transformedPipelinesDataFlowModel = getTransformedPipelinesDataFlowModel(pipelineNameThatNeedsTransformation,
preTransformedPipelinesDataFlowModel,
@@ -333,12 +339,12 @@ private void populateMapWithPlaceholderPaths(JsonNode currentNode, String curren
* @return Map K:jsonPath, V:exactPath
* @throws IOException
*/
- private Map findExactPath(Map> placeholdersMap, String pipelineJson) throws IOException, TransformerException {
+ private Map findExactPath(Map> placeholdersMap, String pipelineJson, final List functionProviders) throws IOException, TransformerException {
Map mapWithPaths = new HashMap<>();
for (String genericPathPlaceholder : placeholdersMap.keySet()) {
String placeHolderValue = getValueFromPlaceHolder(genericPathPlaceholder);
- String value = executeFunctionPlaceholder(placeHolderValue, pipelineJson);
+ String value = executeFunctionPlaceholder(placeHolderValue, pipelineJson, functionProviders);
// Recursive pattern in json path is NOT allowed
if (value!=null && value.contains(RECURSIVE_JSON_PATH_PATH)) {
@@ -366,14 +372,14 @@ private String getValueFromPlaceHolder(String placeholder) {
* @param functionPlaceholderValue
* @return String - value of the function executed
*/
- private String executeFunctionPlaceholder(String functionPlaceholderValue, String pipelineJson){
+ private String executeFunctionPlaceholder(String functionPlaceholderValue, String pipelineJson, final List functionProviders){
Matcher functionMatcher = FUNCTION_CALL_PLACEHOLDER_PATTERN.matcher(functionPlaceholderValue);
if (functionMatcher.find()) {
String functionName = functionMatcher.group(1);
String parameter = functionMatcher.group(2);
try {
String parameterValue = (String)parseParameter(parameter, pipelineJson);
- String value = (String) invokeMethod(functionName, String.class, parameterValue);
+ String value = (String) invokeMethod(functionProviders, functionName, String.class, parameterValue);
return value;
} catch (ReflectiveOperationException e) {
throw new RuntimeException(e);
@@ -419,92 +425,55 @@ private boolean isJsonPath(String parameter) {
}
/**
- * Calculate s3 folder scan depth for DocDB source pipeline
- * @param s3Prefix: s3 prefix defined in the source configuration
- * @return s3 folder scan depth
+ * Invokes a method dynamically on a given object.
+ *
+ * @param methodName the name of the method to be invoked
+ * @param parameterType the Class object representing the parameter type
+ * @param arg the parameter to be passed to the method
+ * @return the result of the method invocation
+ * @throws ReflectiveOperationException if the method cannot be invoked
*/
- public String calculateDepth(String s3Prefix) {
- return Integer.toString(getDepth(s3Prefix, 4));
- }
-
- protected String getSourceCoordinationIdentifier() {
- return System.getenv(SOURCE_COORDINATION_IDENTIFIER_ENVIRONMENT_VARIABLE);
- }
+ public Object invokeMethod(final List functionProviders, String methodName, Class> parameterType, Object arg) throws ReflectiveOperationException {
+ if (functionProviders == null || functionProviders.isEmpty()) {
+ throw new RuntimeException("function_providers cannot be empty");
+ }
- /**
- * Calculate s3 folder scan depth for RDS source pipeline
- * @param s3Prefix: s3 prefix defined in the source configuration
- * @return s3 folder scan depth
- */
- public String calculateDepthForRdsSource(String s3Prefix) {
- String envSourceCoordinationIdentifier = getSourceCoordinationIdentifier();
- int baseDepth = envSourceCoordinationIdentifier != null ? 3 : 2;
- return Integer.toString(getDepth(s3Prefix, baseDepth));
- }
+ Class> clazz = resolveClassForMethod(functionProviders, methodName, parameterType);
- private int getDepth(String s3Prefix, int baseDepth) {
- if(s3Prefix == null){
- return baseDepth;
+ String packageName = clazz.getPackageName();
+ if (!packageName.startsWith(REQUIRED_PACKAGE_PREFIX) || !packageName.contains(REQUIRED_PACKAGE_SEGMENT)) {
+ throw new RuntimeException("Class '" + clazz.getName() +
+ "' is not in a valid package. Package must start with '" + REQUIRED_PACKAGE_PREFIX +
+ "' and contain '" + REQUIRED_PACKAGE_SEGMENT + "'");
}
- return s3Prefix.split("/").length + baseDepth;
- }
- public String getSourceCoordinationIdentifierEnvVariable(String s3Prefix){
- String envSourceCoordinationIdentifier = System.getenv(SOURCE_COORDINATION_IDENTIFIER_ENVIRONMENT_VARIABLE);
- if(s3Prefix == null){
- return envSourceCoordinationIdentifier;
+ if (!PipelineTransformFunctionProvider.class.isAssignableFrom(clazz)) {
+ throw new RuntimeException("Class '" + clazz.getName() +
+ "' does not implement PipelineTransformFunctionProvider");
}
- return s3Prefix+"/"+envSourceCoordinationIdentifier;
- }
- /**
- * Get the include_prefix in s3 scan source. This is a function specific to RDS source.
- * @param s3Prefix: s3 prefix defined in the source configuration
- * @return the actual include_prefix
- */
- public String getIncludePrefixForRdsSource(String s3Prefix) {
- final String envSourceCoordinationIdentifier = System.getenv(SOURCE_COORDINATION_IDENTIFIER_ENVIRONMENT_VARIABLE);
- final String shortenedSourceIdentifier = envSourceCoordinationIdentifier != null ?
- IdentifierShortener.shortenIdentifier(envSourceCoordinationIdentifier, MAX_SOURCE_IDENTIFIER_LENGTH) : null;
- if (s3Prefix == null && envSourceCoordinationIdentifier == null) {
- return S3_BUFFER_PREFIX;
- } else if (s3Prefix == null) {
- return shortenedSourceIdentifier + S3_BUFFER_PREFIX;
- } else if (envSourceCoordinationIdentifier == null) {
- return s3Prefix + S3_BUFFER_PREFIX;
- }
- return s3Prefix + "/" + shortenedSourceIdentifier + S3_BUFFER_PREFIX;
- }
+ Method method = clazz.getMethod(methodName, parameterType);
- public String getAccountIdFromRole(final String roleArn) {
- if (roleArn == null)
- return null;
- try {
- return Arn.fromString(roleArn).accountId().orElse(null);
- } catch (Exception e) {
- LOG.warn("Malformatted role ARN for dynamic transformation: {}", roleArn);
- return null;
+ if (!method.isAnnotationPresent(TransformationFunction.class)) {
+ throw new RuntimeException("Method '" + methodName + "' in class '" + clazz.getName() +
+ "' is not annotated with @TransformationFunction");
}
- }
- /**
- * Invokes a method dynamically on a given object.
- *
- * @param methodName the name of the method to be invoked
- * @param parameterType the Class object representing the parameter type
- * @param arg the parameter to be passed to the method
- * @return the result of the method invocation
- * @throws ReflectiveOperationException if the method cannot be invoked
- */
- public Object invokeMethod(String methodName, Class> parameterType, Object arg) throws ReflectiveOperationException {
- // Get the Class object
- Class> clazz = this.getClass();
+ return method.invoke(null, arg);
+ }
- // Get the Method object for the specified method and parameter type
- Method method = clazz.getMethod(methodName, parameterType);
+ private Class> resolveClassForMethod(final List functionProviders, String methodName, Class> parameterType) throws ReflectiveOperationException {
+ for (final String functionProvider : functionProviders) {
+ try {
+ Class> candidate = Class.forName(functionProvider);
+ candidate.getMethod(methodName, parameterType);
+ return candidate;
+ } catch (NoSuchMethodException e) {
+ continue;
+ }
+ }
- // Invoke the method on the object with the given argument
- return method.invoke(this, arg);
+ throw new RuntimeException("Could not find a class with method '" + methodName + "' in function_providers: " + functionProviders);
}
@@ -517,11 +486,11 @@ public Object invokeMethod(String methodName, Class> parameterType, Object arg
* Example: <> with value {action: "upsert", script: {...}}
* will merge those fields into every opensearch sink entry.
*/
- private void processOverlayDirectives(JsonNode rootNode, String pipelineJson) {
- processOverlayDirectivesRecursive(rootNode, pipelineJson);
+ private void processOverlayDirectives(JsonNode rootNode, String pipelineJson, List functionProviders) {
+ processOverlayDirectivesRecursive(rootNode, pipelineJson, functionProviders);
}
- private void processOverlayDirectivesRecursive(JsonNode node, String pipelineJson) {
+ private void processOverlayDirectivesRecursive(JsonNode node, String pipelineJson, List functionProviders) {
if (node.isObject()) {
ObjectNode objectNode = (ObjectNode) node;
List overlayKeys = new ArrayList<>();
@@ -543,7 +512,7 @@ private void processOverlayDirectivesRecursive(JsonNode node, String pipelineJso
String targetPath = matcher.group(1);
JsonNode overlayValue = objectNode.get(overlayKey);
// Resolve any <<...>> placeholders inside overlay values
- resolveOverlayPlaceholders(overlayValue, pipelineJson);
+ resolveOverlayPlaceholders(overlayValue, pipelineJson, functionProviders);
applyOverlay(objectNode, targetPath, overlayValue);
objectNode.remove(overlayKey);
}
@@ -552,11 +521,11 @@ private void processOverlayDirectivesRecursive(JsonNode node, String pipelineJso
// Recurse into remaining children
Iterator> fields = objectNode.fields();
while (fields.hasNext()) {
- processOverlayDirectivesRecursive(fields.next().getValue(), pipelineJson);
+ processOverlayDirectivesRecursive(fields.next().getValue(), pipelineJson, functionProviders);
}
} else if (node.isArray()) {
for (JsonNode element : node) {
- processOverlayDirectivesRecursive(element, pipelineJson);
+ processOverlayDirectivesRecursive(element, pipelineJson, functionProviders);
}
}
}
@@ -564,7 +533,7 @@ private void processOverlayDirectivesRecursive(JsonNode node, String pipelineJso
/**
* Resolves <<...>> placeholders in overlay value nodes using the pipeline config.
*/
- private void resolveOverlayPlaceholders(JsonNode node, String pipelineJson) {
+ private void resolveOverlayPlaceholders(JsonNode node, String pipelineJson, List functionProviders) {
if (node.isObject()) {
ObjectNode objectNode = (ObjectNode) node;
List> entries = new ArrayList<>();
@@ -575,7 +544,7 @@ private void resolveOverlayPlaceholders(JsonNode node, String pipelineJson) {
Matcher matcher = PLACEHOLDER_PATTERN.matcher(text);
if (matcher.find()) {
String placeholder = getValueFromPlaceHolder(text);
- String resolved = executeFunctionPlaceholder(placeholder, pipelineJson);
+ String resolved = executeFunctionPlaceholder(placeholder, pipelineJson, functionProviders);
if (isJsonPath(resolved)) {
JsonNode resolvedNode = JsonPath.using(parseConfigWithJsonNode)
.parse(pipelineJson).read(resolved);
@@ -589,12 +558,12 @@ private void resolveOverlayPlaceholders(JsonNode node, String pipelineJson) {
}
}
} else {
- resolveOverlayPlaceholders(entry.getValue(), pipelineJson);
+ resolveOverlayPlaceholders(entry.getValue(), pipelineJson, functionProviders);
}
}
} else if (node.isArray()) {
for (JsonNode element : node) {
- resolveOverlayPlaceholders(element, pipelineJson);
+ resolveOverlayPlaceholders(element, pipelineJson, functionProviders);
}
}
}
diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorResultTest.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorResultTest.java
index 104beeee6e..d1bf0d6ca4 100644
--- a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorResultTest.java
+++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorResultTest.java
@@ -1,12 +1,20 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
*/
+
package org.opensearch.dataprepper.pipeline.parser.rule;
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.pipeline.parser.transformer.PipelineTemplateModel;
+import java.util.Arrays;
+import java.util.List;
+
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -17,15 +25,34 @@ class RuleEvaluatorResultTest {
@Test
void builder_createsObjectCorrectly() {
PipelineTemplateModel pipelineTemplateModel = new PipelineTemplateModel();
+ List functionProviders = Arrays.asList("org.example.Functions");
+
RuleEvaluatorResult result = RuleEvaluatorResult.builder()
.withEvaluatedResult(true)
.withPipelineName("testPipeline")
.withPipelineTemplateModel(pipelineTemplateModel)
+ .withFunctionProviders(functionProviders)
.build();
assertTrue(result.isEvaluatedResult());
- assertEquals(result.getPipelineName(), "testPipeline");
- assertEquals(result.getPipelineTemplateModel(), pipelineTemplateModel);
+ assertEquals("testPipeline", result.getPipelineName());
+ assertEquals(pipelineTemplateModel, result.getPipelineTemplateModel());
+ assertEquals(functionProviders, result.getFunctionProviders());
+ }
+
+ @Test
+ void builder_withNullFunctionProviders_setsNull() {
+ RuleEvaluatorResult result = RuleEvaluatorResult.builder()
+ .withEvaluatedResult(false)
+ .withPipelineName(null)
+ .withPipelineTemplateModel(null)
+ .withFunctionProviders(null)
+ .build();
+
+ assertFalse(result.isEvaluatedResult());
+ assertNull(result.getPipelineName());
+ assertNull(result.getPipelineTemplateModel());
+ assertNull(result.getFunctionProviders());
}
@Test
@@ -34,16 +61,20 @@ void defaultConstructor_initializesFieldsCorrectly() {
assertFalse(result.isEvaluatedResult());
assertNull(result.getPipelineName());
assertNull(result.getPipelineTemplateModel());
+ assertNull(result.getFunctionProviders());
}
@Test
void allArgsConstructor_assignsFieldsCorrectly() {
PipelineTemplateModel pipelineTemplateModel = new PipelineTemplateModel();
+ List functionProviders = Arrays.asList("com.example.Provider");
+
RuleEvaluatorResult result = new RuleEvaluatorResult(true,
- "testPipeline", pipelineTemplateModel);
+ "testPipeline", pipelineTemplateModel, functionProviders);
assertTrue(result.isEvaluatedResult());
- assertEquals(result.getPipelineName(), "testPipeline");
- assertEquals(result.getPipelineTemplateModel(), pipelineTemplateModel);
+ assertEquals("testPipeline", result.getPipelineName());
+ assertEquals(pipelineTemplateModel, result.getPipelineTemplateModel());
+ assertEquals(functionProviders, result.getFunctionProviders());
}
}
diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleFileEvaluationTest.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleFileEvaluationTest.java
new file mode 100644
index 0000000000..fb1282c80e
--- /dev/null
+++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleFileEvaluationTest.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.dataprepper.pipeline.parser.rule;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class RuleFileEvaluationTest {
+
+ @Test
+ void builder_createsObjectCorrectly() {
+ List functionProviders = Arrays.asList("com.example.Provider1", "com.example.Provider2");
+
+ RuleFileEvaluation result = RuleFileEvaluation.builder()
+ .withResult(true)
+ .withRuleFileName("test-rule.yaml")
+ .withPluginName("testPlugin")
+ .withFunctionProviders(functionProviders)
+ .build();
+
+ assertTrue(result.getResult());
+ assertEquals("test-rule.yaml", result.getRuleFileName());
+ assertEquals("testPlugin", result.getPluginName());
+ assertEquals(functionProviders, result.getFunctionProviders());
+ assertEquals(2, result.getFunctionProviders().size());
+ }
+
+ @Test
+ void builder_withNullFunctionProviders_setsNull() {
+ RuleFileEvaluation result = RuleFileEvaluation.builder()
+ .withResult(false)
+ .withRuleFileName("rule.yaml")
+ .withPluginName("plugin")
+ .withFunctionProviders(null)
+ .build();
+
+ assertNull(result.getFunctionProviders());
+ }
+
+ @Test
+ void defaultConstructor_initializesFieldsToNull() {
+ RuleFileEvaluation result = new RuleFileEvaluation();
+
+ assertNull(result.getResult());
+ assertNull(result.getRuleFileName());
+ assertNull(result.getPluginName());
+ assertNull(result.getFunctionProviders());
+ }
+
+ @Test
+ void allArgsConstructor_assignsFieldsCorrectly() {
+ List functionProviders = Arrays.asList("com.example.Provider");
+
+ RuleFileEvaluation result = new RuleFileEvaluation(
+ true, "rule.yaml", "myPlugin", functionProviders);
+
+ assertTrue(result.getResult());
+ assertEquals("rule.yaml", result.getRuleFileName());
+ assertEquals("myPlugin", result.getPluginName());
+ assertEquals(functionProviders, result.getFunctionProviders());
+ }
+
+ @Test
+ void setter_updatesFunctionProviders() {
+ RuleFileEvaluation result = new RuleFileEvaluation();
+ List providers = Arrays.asList("org.example.Functions");
+
+ result.setFunctionProviders(providers);
+
+ assertEquals(providers, result.getFunctionProviders());
+ }
+}
diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleTransformerModelTest.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleTransformerModelTest.java
index bb974ce88b..7578c54d48 100644
--- a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleTransformerModelTest.java
+++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleTransformerModelTest.java
@@ -1,7 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
*/
+
package org.opensearch.dataprepper.pipeline.parser.rule;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -12,6 +17,8 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
class RuleTransformerModelTest {
@@ -20,11 +27,14 @@ class RuleTransformerModelTest {
@Test
void testSerialization() throws Exception {
List applyWhen = Arrays.asList("condition1", "condition2");
+ List functionProviders = Arrays.asList("org.example.Functions");
String pluginName = "testPlugin";
- RuleTransformerModel model = new RuleTransformerModel(applyWhen, pluginName);
+ RuleTransformerModel model = new RuleTransformerModel(applyWhen, pluginName, functionProviders);
String json = objectMapper.writeValueAsString(model);
assertNotNull(json, "Serialized JSON should not be null");
+ assertTrue(json.contains("\"function_providers\""));
+ assertTrue(json.contains("org.example.Functions"));
}
@Test
@@ -37,5 +47,43 @@ void testDeserialization() throws Exception {
assertEquals("condition1", model.getApplyWhen().get(0), "The first condition should be 'condition1'");
assertEquals("condition2", model.getApplyWhen().get(1), "The second condition should be 'condition2'");
assertEquals("testPlugin", model.getPluginName(), "plugin Name");
+ assertNull(model.getFunctionProviders());
+ }
+
+ @Test
+ void testDeserialization_withFunctionProviders() throws Exception {
+ String json = "{\"plugin_name\": \"rds\", \"apply_when\": [\"$..source.rds\"], " +
+ "\"function_providers\": [\"org.opensearch.dataprepper.plugins.aws.PipelineTransformFunctions\"]}";
+
+ RuleTransformerModel model = objectMapper.readValue(json, RuleTransformerModel.class);
+ assertNotNull(model);
+ assertEquals("rds", model.getPluginName());
+ assertNotNull(model.getFunctionProviders());
+ assertEquals(1, model.getFunctionProviders().size());
+ assertEquals("org.opensearch.dataprepper.plugins.aws.PipelineTransformFunctions",
+ model.getFunctionProviders().get(0));
+ }
+
+ @Test
+ void testDeserialization_withMultipleFunctionProviders() throws Exception {
+ String json = "{\"plugin_name\": \"test\", \"apply_when\": [\"$..source.test\"], " +
+ "\"function_providers\": [\"com.example.Provider1\", \"com.example.Provider2\"]}";
+
+ RuleTransformerModel model = objectMapper.readValue(json, RuleTransformerModel.class);
+ assertNotNull(model.getFunctionProviders());
+ assertEquals(2, model.getFunctionProviders().size());
+ assertEquals("com.example.Provider1", model.getFunctionProviders().get(0));
+ assertEquals("com.example.Provider2", model.getFunctionProviders().get(1));
+ }
+
+ @Test
+ void testToString_includesFunctionProviders() {
+ List applyWhen = Arrays.asList("condition1");
+ List functionProviders = Arrays.asList("org.example.Funcs");
+ RuleTransformerModel model = new RuleTransformerModel(applyWhen, "plugin", functionProviders);
+
+ String result = model.toString();
+ assertTrue(result.contains("functionProviders"));
+ assertTrue(result.contains("org.example.Funcs"));
}
}
diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformerTest.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformerTest.java
index 21363738d1..70a90dfa77 100644
--- a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformerTest.java
+++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformerTest.java
@@ -1,7 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
*/
+
package org.opensearch.dataprepper.pipeline.parser.transformer;
import com.fasterxml.jackson.databind.JsonNode;
@@ -10,12 +15,8 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator;
-import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel;
import org.opensearch.dataprepper.pipeline.parser.PipelineConfigurationFileReader;
import org.opensearch.dataprepper.pipeline.parser.PipelineConfigurationReader;
@@ -24,6 +25,7 @@
import org.opensearch.dataprepper.pipeline.parser.rule.RuleEvaluator;
import org.opensearch.dataprepper.pipeline.parser.rule.RuleStream;
+import java.util.Arrays;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
@@ -34,15 +36,13 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.mockito.Mockito.doReturn;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
class DynamicConfigTransformerTest {
@@ -54,22 +54,51 @@ class DynamicConfigTransformerTest {
RuleEvaluator ruleEvaluator;
@Test
- void test_getAccountIdFromRole_returns_account_id_from_valid_role_arn() {
- final String testAccountId = RandomStringUtils.randomNumeric(12);
- final String testRoleArn = String.format("arn:aws:iam::%s:role/example-role", testAccountId);
+ void test_invokeMethod_throws_when_functionProviders_is_null() {
+ ruleEvaluator = mock(RuleEvaluator.class);
+ DynamicConfigTransformer transformer = new DynamicConfigTransformer(ruleEvaluator);
+ assertThrows(RuntimeException.class, () ->
+ transformer.invokeMethod(null, "someMethod", String.class, "arg"));
+ }
+
+ @Test
+ void test_invokeMethod_throws_when_functionProviders_is_empty() {
+ ruleEvaluator = mock(RuleEvaluator.class);
+ DynamicConfigTransformer transformer = new DynamicConfigTransformer(ruleEvaluator);
+ assertThrows(RuntimeException.class, () ->
+ transformer.invokeMethod(Collections.emptyList(), "someMethod", String.class, "arg"));
+ }
+
+ @Test
+ void test_invokeMethod_throws_when_class_does_not_implement_interface() {
+ ruleEvaluator = mock(RuleEvaluator.class);
+ DynamicConfigTransformer transformer = new DynamicConfigTransformer(ruleEvaluator);
+ List providers = Collections.singletonList("java.lang.String");
+ assertThrows(RuntimeException.class, () ->
+ transformer.invokeMethod(providers, "valueOf", String.class, "test"));
+ }
+
+ @Test
+ void test_invokeMethod_throws_when_method_not_annotated() {
ruleEvaluator = mock(RuleEvaluator.class);
DynamicConfigTransformer transformer = new DynamicConfigTransformer(ruleEvaluator);
- assertThat(transformer.getAccountIdFromRole(testRoleArn)).isEqualTo(testAccountId);
+ List providers = Collections.singletonList(
+ "org.opensearch.dataprepper.pipeline.parser.transformer.DynamicConfigTransformerTest$ValidProviderNoAnnotation");
+ assertThrows(RuntimeException.class, () ->
+ transformer.invokeMethod(providers, "unannotatedMethod", String.class, "arg"));
}
- @ParameterizedTest
- @MethodSource("providesInvalidRoleArn")
- void test_getAccountIdFromRole_returns_null_from_invalid_role_arn(final String testRoleArn) {
+ @Test
+ void test_invokeMethod_throws_when_method_not_found_in_any_provider() {
ruleEvaluator = mock(RuleEvaluator.class);
DynamicConfigTransformer transformer = new DynamicConfigTransformer(ruleEvaluator);
- assertThat(transformer.getAccountIdFromRole(testRoleArn)).isNull();
+ List providers = Collections.singletonList(
+ "org.opensearch.dataprepper.pipeline.parser.transformer.DynamicConfigTransformerTest$ValidProviderNoAnnotation");
+ assertThrows(Exception.class, () ->
+ transformer.invokeMethod(providers, "nonExistentMethod", String.class, "arg"));
}
+
@Test
void test_successful_transformation_with_only_source_and_sink() throws IOException {
String docDBUserConfig = TestConfigurationProvider.USER_CONFIG_TRANSFORMATION_DOCDB1_CONFIG_FILE;
@@ -385,13 +414,6 @@ void testInvalidJsonPathThrowsException() throws IOException {
assertThrows(RuntimeException.class, () -> transformer.transformConfiguration(pipelinesDataFlowModel));
}
- private static Stream providesInvalidRoleArn() {
- return Stream.of(
- null,
- Arguments.of("arn:aws:iam:::role/test-role"),
- Arguments.of("invalid-format-arn")
- );
- }
@Test
void test_overlay_directive_merges_into_opensearch_sinks() throws Exception {
@@ -414,9 +436,9 @@ void test_overlay_directive_merges_into_opensearch_sinks() throws Exception {
DynamicConfigTransformer transformer = new DynamicConfigTransformer(mock(RuleEvaluator.class));
Method method = DynamicConfigTransformer.class.getDeclaredMethod(
- "processOverlayDirectives", JsonNode.class, String.class);
+ "processOverlayDirectives", JsonNode.class, String.class, List.class);
method.setAccessible(true);
- method.invoke(transformer, root, "{}");
+ method.invoke(transformer, root, "{}", Collections.emptyList());
JsonNode resultOs = sinkArray.get(0).get("opensearch");
assertThat(resultOs.get("hosts").asText()).isEqualTo("https://localhost:9200");
@@ -450,9 +472,9 @@ void test_overlay_directive_overrides_existing_fields() throws Exception {
DynamicConfigTransformer transformer = new DynamicConfigTransformer(mock(RuleEvaluator.class));
Method method = DynamicConfigTransformer.class.getDeclaredMethod(
- "processOverlayDirectives", JsonNode.class, String.class);
+ "processOverlayDirectives", JsonNode.class, String.class, List.class);
method.setAccessible(true);
- method.invoke(transformer, root, "{}");
+ method.invoke(transformer, root, "{}", Collections.emptyList());
JsonNode resultOs = sinkArray.get(0).get("opensearch");
assertThat(resultOs.get("hosts").asText()).isEqualTo("https://localhost:9200");
@@ -461,22 +483,72 @@ void test_overlay_directive_overrides_existing_fields() throws Exception {
assertThat(resultOs.get("script").has("custom_field")).isFalse();
}
+ private static final String PROVIDER_PKG = "org.opensearch.dataprepper.pipeline.parser.transformer.dataprepper_transformer.";
+
+ // --- invokeMethod: Happy path ---
+
@Test
- void test_calculateDepthForRdsSource_without_source_coordination_identifier() {
- String mockPrefix = "my-bucket/path";
- DynamicConfigTransformer transformer = spy(new DynamicConfigTransformer(mock(RuleEvaluator.class)));
- doReturn(null).when(transformer).getSourceCoordinationIdentifier();
- String result = transformer.calculateDepthForRdsSource(mockPrefix);
- assertThat(result, equalTo("4"));
+ void test_invokeMethod_succeeds_with_valid_provider_and_annotated_method() throws Exception {
+ ruleEvaluator = mock(RuleEvaluator.class);
+ DynamicConfigTransformer transformer = new DynamicConfigTransformer(ruleEvaluator);
+ List providers = Collections.singletonList(PROVIDER_PKG + "ValidAnnotatedProvider");
+ Object result = transformer.invokeMethod(providers, "transformValue", String.class, "hello");
+ assertEquals("HELLO", result);
}
+ // --- invokeMethod: Non-existent class name ---
+
@Test
- void test_calculateDepthForRdsSource_with_source_coordination_identifier() {
- String mockPrefix = "my-bucket/path";
- DynamicConfigTransformer transformer = spy(new DynamicConfigTransformer(mock(RuleEvaluator.class)));
- doReturn("testValue").when(transformer).getSourceCoordinationIdentifier();
- String result = transformer.calculateDepthForRdsSource(mockPrefix);
- assertThat(result, equalTo("5"));
+ void test_invokeMethod_throws_when_class_does_not_exist() {
+ ruleEvaluator = mock(RuleEvaluator.class);
+ DynamicConfigTransformer transformer = new DynamicConfigTransformer(ruleEvaluator);
+ List providers = Collections.singletonList("com.example.does.not.Exist");
+ assertThrows(Exception.class, () ->
+ transformer.invokeMethod(providers, "someMethod", String.class, "arg"));
+ }
+
+ // --- invokeMethod: Multiple providers, correct resolution ---
+
+ @Test
+ void test_invokeMethod_resolves_method_from_second_provider_when_first_lacks_it() throws Exception {
+ ruleEvaluator = mock(RuleEvaluator.class);
+ DynamicConfigTransformer transformer = new DynamicConfigTransformer(ruleEvaluator);
+ List providers = Arrays.asList(
+ PROVIDER_PKG + "ProviderWithoutTargetMethod",
+ PROVIDER_PKG + "ValidAnnotatedProvider");
+ Object result = transformer.invokeMethod(providers, "transformValue", String.class, "world");
+ assertEquals("WORLD", result);
}
+ // --- invokeMethod: Non-static annotated method ---
+
+ @Test
+ void test_invokeMethod_throws_when_annotated_method_is_non_static() {
+ ruleEvaluator = mock(RuleEvaluator.class);
+ DynamicConfigTransformer transformer = new DynamicConfigTransformer(ruleEvaluator);
+ List providers = Collections.singletonList(PROVIDER_PKG + "ProviderWithNonStaticMethod");
+ assertThrows(Exception.class, () ->
+ transformer.invokeMethod(providers, "instanceMethod", String.class, "arg"));
+ }
+
+ // --- invokeMethod: Static initializer safety ---
+
+ @Test
+ void test_invokeMethod_throws_for_non_provider_before_running_methods() {
+ ruleEvaluator = mock(RuleEvaluator.class);
+ DynamicConfigTransformer transformer = new DynamicConfigTransformer(ruleEvaluator);
+ List providers = Collections.singletonList(PROVIDER_PKG + "NonProviderWithStaticInit");
+ RuntimeException exception = assertThrows(RuntimeException.class, () ->
+ transformer.invokeMethod(providers, "getValue", String.class, "arg"));
+ assertTrue(exception.getMessage().contains("does not implement PipelineTransformFunctionProvider"));
+ }
+
+ // --- Inner test helper class for package-check-independent tests ---
+
+ public static class ValidProviderNoAnnotation implements org.opensearch.dataprepper.model.plugin.PipelineTransformFunctionProvider {
+ public static String unannotatedMethod(String input) {
+ return input;
+ }
+ }
}
+
diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/dataprepper_transformer/NonProviderWithStaticInit.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/dataprepper_transformer/NonProviderWithStaticInit.java
new file mode 100644
index 0000000000..b7ad75a1fb
--- /dev/null
+++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/dataprepper_transformer/NonProviderWithStaticInit.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+package org.opensearch.dataprepper.pipeline.parser.transformer.dataprepper_transformer;
+
+import org.opensearch.dataprepper.model.annotations.TransformationFunction;
+
+/**
+ * Test class that does NOT implement PipelineTransformFunctionProvider.
+ * Has a static initializer to verify it doesn't run if interface check rejects it.
+ */
+public class NonProviderWithStaticInit {
+ static {
+ System.setProperty("test.static.init.ran", "true");
+ }
+
+ @TransformationFunction
+ public static String getValue(String input) {
+ return input;
+ }
+}
diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/dataprepper_transformer/ProviderWithNonStaticMethod.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/dataprepper_transformer/ProviderWithNonStaticMethod.java
new file mode 100644
index 0000000000..955a768b99
--- /dev/null
+++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/dataprepper_transformer/ProviderWithNonStaticMethod.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+package org.opensearch.dataprepper.pipeline.parser.transformer.dataprepper_transformer;
+
+import org.opensearch.dataprepper.model.annotations.TransformationFunction;
+import org.opensearch.dataprepper.model.plugin.PipelineTransformFunctionProvider;
+
+public class ProviderWithNonStaticMethod implements PipelineTransformFunctionProvider {
+ @TransformationFunction
+ public String instanceMethod(String input) {
+ return input;
+ }
+}
diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/dataprepper_transformer/ProviderWithoutTargetMethod.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/dataprepper_transformer/ProviderWithoutTargetMethod.java
new file mode 100644
index 0000000000..4a445f95cb
--- /dev/null
+++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/dataprepper_transformer/ProviderWithoutTargetMethod.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+package org.opensearch.dataprepper.pipeline.parser.transformer.dataprepper_transformer;
+
+import org.opensearch.dataprepper.model.annotations.TransformationFunction;
+import org.opensearch.dataprepper.model.plugin.PipelineTransformFunctionProvider;
+
+public class ProviderWithoutTargetMethod implements PipelineTransformFunctionProvider {
+ @TransformationFunction
+ public static String otherMethod(String input) {
+ return input;
+ }
+}
diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/dataprepper_transformer/TestTransformFunctionProvider.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/dataprepper_transformer/TestTransformFunctionProvider.java
new file mode 100644
index 0000000000..52f989dc49
--- /dev/null
+++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/dataprepper_transformer/TestTransformFunctionProvider.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.dataprepper.pipeline.parser.transformer.dataprepper_transformer;
+
+import org.opensearch.dataprepper.model.annotations.TransformationFunction;
+import org.opensearch.dataprepper.model.plugin.PipelineTransformFunctionProvider;
+
+/**
+ * Test-only function provider for pipeline transformation tests.
+ * Replicates the logic from PipelineTransformFunctions in aws-plugin.
+ */
+public class TestTransformFunctionProvider implements PipelineTransformFunctionProvider {
+
+ private static final String SOURCE_COORDINATION_IDENTIFIER_ENVIRONMENT_VARIABLE = "SOURCE_COORDINATION_PIPELINE_IDENTIFIER";
+
+ @TransformationFunction
+ public static String calculateDepth(String s3Prefix) {
+ return Integer.toString(getDepth(s3Prefix, 4));
+ }
+
+ @TransformationFunction
+ public static String calculateDepthForRdsSource(String s3Prefix) {
+ String envSourceCoordinationIdentifier = System.getenv(SOURCE_COORDINATION_IDENTIFIER_ENVIRONMENT_VARIABLE);
+ int baseDepth = envSourceCoordinationIdentifier != null ? 3 : 2;
+ return Integer.toString(getDepth(s3Prefix, baseDepth));
+ }
+
+ @TransformationFunction
+ public static String getSourceCoordinationIdentifierEnvVariable(String s3Prefix) {
+ String envSourceCoordinationIdentifier = System.getenv(SOURCE_COORDINATION_IDENTIFIER_ENVIRONMENT_VARIABLE);
+ if (s3Prefix == null) {
+ return envSourceCoordinationIdentifier;
+ }
+ return s3Prefix + "/" + envSourceCoordinationIdentifier;
+ }
+
+ @TransformationFunction
+ public static String getAccountIdFromRole(final String roleArn) {
+ if (roleArn == null) {
+ return null;
+ }
+ try {
+ return software.amazon.awssdk.arns.Arn.fromString(roleArn).accountId().orElse(null);
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
+ @TransformationFunction
+ public static String getIncludePrefixForRdsSource(String s3Prefix) {
+ final String envSourceCoordinationIdentifier = System.getenv(SOURCE_COORDINATION_IDENTIFIER_ENVIRONMENT_VARIABLE);
+ if (s3Prefix == null && envSourceCoordinationIdentifier == null) {
+ return "/buffer";
+ } else if (s3Prefix == null) {
+ return envSourceCoordinationIdentifier + "/buffer";
+ } else if (envSourceCoordinationIdentifier == null) {
+ return s3Prefix + "/buffer";
+ }
+ return s3Prefix + "/" + envSourceCoordinationIdentifier + "/buffer";
+ }
+
+ private static int getDepth(String s3Prefix, int baseDepth) {
+ if (s3Prefix == null) {
+ return baseDepth;
+ }
+ return s3Prefix.split("/").length + baseDepth;
+ }
+}
diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/dataprepper_transformer/ValidAnnotatedProvider.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/dataprepper_transformer/ValidAnnotatedProvider.java
new file mode 100644
index 0000000000..73b99ba6aa
--- /dev/null
+++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/dataprepper_transformer/ValidAnnotatedProvider.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+package org.opensearch.dataprepper.pipeline.parser.transformer.dataprepper_transformer;
+
+import org.opensearch.dataprepper.model.annotations.TransformationFunction;
+import org.opensearch.dataprepper.model.plugin.PipelineTransformFunctionProvider;
+
+public class ValidAnnotatedProvider implements PipelineTransformFunctionProvider {
+ @TransformationFunction
+ public static String transformValue(String input) {
+ return input.toUpperCase();
+ }
+}
diff --git a/data-prepper-pipeline-parser/src/test/resources/transformation/rules/documentdb-rule.yaml b/data-prepper-pipeline-parser/src/test/resources/transformation/rules/documentdb-rule.yaml
index b120d1531c..bc13226783 100644
--- a/data-prepper-pipeline-parser/src/test/resources/transformation/rules/documentdb-rule.yaml
+++ b/data-prepper-pipeline-parser/src/test/resources/transformation/rules/documentdb-rule.yaml
@@ -2,3 +2,5 @@ plugin_name: "documentdb"
apply_when:
- "$..source.documentdb"
- "$..source.documentdb.s3_bucket"
+function_providers:
+ - "org.opensearch.dataprepper.pipeline.parser.transformer.dataprepper_transformer.TestTransformFunctionProvider"
diff --git a/data-prepper-pipeline-parser/src/test/resources/transformation/rules/documentdb1-rule.yaml b/data-prepper-pipeline-parser/src/test/resources/transformation/rules/documentdb1-rule.yaml
index ed3c4b8b57..2cd7654d6a 100644
--- a/data-prepper-pipeline-parser/src/test/resources/transformation/rules/documentdb1-rule.yaml
+++ b/data-prepper-pipeline-parser/src/test/resources/transformation/rules/documentdb1-rule.yaml
@@ -1,3 +1,5 @@
plugin_name: "documentdb"
apply_when:
- - "$..source.documentdb"
\ No newline at end of file
+ - "$..source.documentdb"
+function_providers:
+ - "org.opensearch.dataprepper.pipeline.parser.transformer.dataprepper_transformer.TestTransformFunctionProvider"
diff --git a/data-prepper-pipeline-parser/src/test/resources/transformation/rules/rds-joins-rule.yaml b/data-prepper-pipeline-parser/src/test/resources/transformation/rules/rds-joins-rule.yaml
index 6dc367e378..212a8b3bc9 100644
--- a/data-prepper-pipeline-parser/src/test/resources/transformation/rules/rds-joins-rule.yaml
+++ b/data-prepper-pipeline-parser/src/test/resources/transformation/rules/rds-joins-rule.yaml
@@ -11,3 +11,5 @@ plugin_name: "rds-joins"
apply_when:
- "$..source.rds"
- "$..source.rds.joins"
+function_providers:
+ - "org.opensearch.dataprepper.pipeline.parser.transformer.dataprepper_transformer.TestTransformFunctionProvider"
diff --git a/data-prepper-pipeline-parser/src/test/resources/transformation/rules/rds-rule.yaml b/data-prepper-pipeline-parser/src/test/resources/transformation/rules/rds-rule.yaml
index 85cd3798f9..8d8fd8f547 100644
--- a/data-prepper-pipeline-parser/src/test/resources/transformation/rules/rds-rule.yaml
+++ b/data-prepper-pipeline-parser/src/test/resources/transformation/rules/rds-rule.yaml
@@ -10,3 +10,5 @@
plugin_name: "rds"
apply_when:
- "$..source.rds"
+function_providers:
+ - "org.opensearch.dataprepper.pipeline.parser.transformer.dataprepper_transformer.TestTransformFunctionProvider"
diff --git a/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/dataprepper_transformer/PipelineTransformFunctions.java b/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/dataprepper_transformer/PipelineTransformFunctions.java
new file mode 100644
index 0000000000..dac623ba8c
--- /dev/null
+++ b/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/dataprepper_transformer/PipelineTransformFunctions.java
@@ -0,0 +1,116 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.dataprepper.plugins.aws.dataprepper_transformer;
+
+import org.opensearch.dataprepper.model.annotations.SkipTestCoverageGenerated;
+import org.opensearch.dataprepper.model.annotations.TransformationFunction;
+import org.opensearch.dataprepper.model.plugin.PipelineTransformFunctionProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.arns.Arn;
+
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Base64;
+import java.util.function.Supplier;
+
+/**
+ * Provides static utility methods for pipeline template transformations.
+ * These methods are invoked dynamically via the FUNCTION_NAME placeholder mechanism
+ * in template YAML files.
+ */
+public class PipelineTransformFunctions implements PipelineTransformFunctionProvider {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PipelineTransformFunctions.class);
+
+ private static final String SOURCE_COORDINATION_IDENTIFIER_ENVIRONMENT_VARIABLE = "SOURCE_COORDINATION_PIPELINE_IDENTIFIER";
+ private static final String S3_BUFFER_PREFIX = "/buffer";
+ private static final int MAX_SOURCE_IDENTIFIER_LENGTH = 15;
+
+ static Supplier sourceCoordinationIdentifierSupplier =
+ () -> System.getenv(SOURCE_COORDINATION_IDENTIFIER_ENVIRONMENT_VARIABLE);
+
+ @SkipTestCoverageGenerated
+ private PipelineTransformFunctions() {
+ }
+
+ @TransformationFunction
+ public static String calculateDepth(String s3Prefix) {
+ return Integer.toString(getDepth(s3Prefix, 4));
+ }
+
+ @TransformationFunction
+ public static String calculateDepthForRdsSource(String s3Prefix) {
+ String envSourceCoordinationIdentifier = sourceCoordinationIdentifierSupplier.get();
+ int baseDepth = envSourceCoordinationIdentifier != null ? 3 : 2;
+ return Integer.toString(getDepth(s3Prefix, baseDepth));
+ }
+
+ @TransformationFunction
+ public static String getSourceCoordinationIdentifierEnvVariable(String s3Prefix) {
+ String envSourceCoordinationIdentifier = sourceCoordinationIdentifierSupplier.get();
+ if (s3Prefix == null) {
+ return envSourceCoordinationIdentifier;
+ }
+ return s3Prefix + "/" + envSourceCoordinationIdentifier;
+ }
+
+ @TransformationFunction
+ public static String getIncludePrefixForRdsSource(String s3Prefix) {
+ final String envSourceCoordinationIdentifier = sourceCoordinationIdentifierSupplier.get();
+ final String shortenedSourceIdentifier = envSourceCoordinationIdentifier != null ?
+ shortenIdentifier(envSourceCoordinationIdentifier, MAX_SOURCE_IDENTIFIER_LENGTH) : null;
+ if (s3Prefix == null && envSourceCoordinationIdentifier == null) {
+ return S3_BUFFER_PREFIX;
+ } else if (s3Prefix == null) {
+ return shortenedSourceIdentifier + S3_BUFFER_PREFIX;
+ } else if (envSourceCoordinationIdentifier == null) {
+ return s3Prefix + S3_BUFFER_PREFIX;
+ }
+ return s3Prefix + "/" + shortenedSourceIdentifier + S3_BUFFER_PREFIX;
+ }
+
+ @TransformationFunction
+ public static String getAccountIdFromRole(final String roleArn) {
+ if (roleArn == null) {
+ return null;
+ }
+ try {
+ return Arn.fromString(roleArn).accountId().orElse(null);
+ } catch (Exception e) {
+ LOG.warn("Malformatted role ARN for dynamic transformation: {}", roleArn);
+ return null;
+ }
+ }
+
+ private static int getDepth(String s3Prefix, int baseDepth) {
+ if (s3Prefix == null) {
+ return baseDepth;
+ }
+ return s3Prefix.split("/").length + baseDepth;
+ }
+
+ @SkipTestCoverageGenerated
+ static String shortenIdentifier(final String identifier, final int maxLength) {
+ if (identifier.length() <= maxLength) {
+ return identifier;
+ }
+
+ try {
+ MessageDigest digest = MessageDigest.getInstance("SHA-256");
+ byte[] encodedHash = digest.digest(identifier.getBytes(StandardCharsets.UTF_8));
+ String base64Hash = Base64.getUrlEncoder().withoutPadding().encodeToString(encodedHash);
+ return base64Hash.substring(0, Math.min(base64Hash.length(), maxLength));
+ } catch (final NoSuchAlgorithmException e) {
+ return identifier.substring(0, maxLength);
+ }
+ }
+}
diff --git a/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/dataprepper_transformer/PipelineTransformFunctionsTest.java b/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/dataprepper_transformer/PipelineTransformFunctionsTest.java
new file mode 100644
index 0000000000..1f1936b9c3
--- /dev/null
+++ b/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/dataprepper_transformer/PipelineTransformFunctionsTest.java
@@ -0,0 +1,215 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.dataprepper.plugins.aws.dataprepper_transformer;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.opensearch.dataprepper.model.annotations.TransformationFunction;
+import org.opensearch.dataprepper.model.plugin.PipelineTransformFunctionProvider;
+
+import java.lang.reflect.Method;
+import java.util.function.Supplier;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class PipelineTransformFunctionsTest {
+
+ private Supplier originalSupplier;
+
+ @BeforeEach
+ void setUp() {
+ originalSupplier = PipelineTransformFunctions.sourceCoordinationIdentifierSupplier;
+ }
+
+ @AfterEach
+ void tearDown() {
+ PipelineTransformFunctions.sourceCoordinationIdentifierSupplier = originalSupplier;
+ }
+
+ @Test
+ void class_implements_PipelineTransformFunctionProvider() {
+ assertTrue(PipelineTransformFunctionProvider.class.isAssignableFrom(PipelineTransformFunctions.class));
+ }
+
+ @Test
+ void all_public_transformation_methods_are_annotated() throws Exception {
+ String[] methodNames = {
+ "calculateDepth",
+ "calculateDepthForRdsSource",
+ "getSourceCoordinationIdentifierEnvVariable",
+ "getIncludePrefixForRdsSource",
+ "getAccountIdFromRole"
+ };
+
+ for (String methodName : methodNames) {
+ Method method = PipelineTransformFunctions.class.getMethod(methodName, String.class);
+ assertTrue(method.isAnnotationPresent(TransformationFunction.class),
+ "Method " + methodName + " should be annotated with @TransformationFunction");
+ }
+ }
+
+ @Test
+ void default_sourceCoordinationIdentifierSupplier_reads_env_variable() {
+ // Exercises the default lambda on line 39: () -> System.getenv(...)
+ // env var is not set in test, so returns null — but the lambda bytecode is covered
+ String result = originalSupplier.get();
+ assertNull(result);
+ }
+
+
+ // --- calculateDepth ---
+
+ @Test
+ void calculateDepth_returns_4_when_prefix_is_null() {
+ assertEquals("4", PipelineTransformFunctions.calculateDepth(null));
+ }
+
+ @Test
+ void calculateDepth_returns_correct_depth_with_single_segment_prefix() {
+ assertEquals("5", PipelineTransformFunctions.calculateDepth("myprefix"));
+ }
+
+ @Test
+ void calculateDepth_returns_correct_depth_with_multi_segment_prefix() {
+ assertEquals("6", PipelineTransformFunctions.calculateDepth("prefix/subfolder"));
+ }
+
+ // --- calculateDepthForRdsSource ---
+
+ @Test
+ void calculateDepthForRdsSource_returns_2_when_prefix_is_null_and_no_env_var() {
+ PipelineTransformFunctions.sourceCoordinationIdentifierSupplier = () -> null;
+ assertEquals("2", PipelineTransformFunctions.calculateDepthForRdsSource(null));
+ }
+
+ @Test
+ void calculateDepthForRdsSource_returns_3_when_prefix_is_null_and_env_var_set() {
+ PipelineTransformFunctions.sourceCoordinationIdentifierSupplier = () -> "my-identifier";
+ assertEquals("3", PipelineTransformFunctions.calculateDepthForRdsSource(null));
+ }
+
+ @Test
+ void calculateDepthForRdsSource_adds_prefix_segments_with_env_var() {
+ PipelineTransformFunctions.sourceCoordinationIdentifierSupplier = () -> "my-identifier";
+ assertEquals("5", PipelineTransformFunctions.calculateDepthForRdsSource("a/b"));
+ }
+
+ @Test
+ void calculateDepthForRdsSource_adds_prefix_segments_without_env_var() {
+ PipelineTransformFunctions.sourceCoordinationIdentifierSupplier = () -> null;
+ assertEquals("4", PipelineTransformFunctions.calculateDepthForRdsSource("a/b"));
+ }
+
+ // --- getSourceCoordinationIdentifierEnvVariable ---
+
+ @Test
+ void getSourceCoordinationIdentifierEnvVariable_returns_env_var_when_prefix_null() {
+ PipelineTransformFunctions.sourceCoordinationIdentifierSupplier = () -> "test-id";
+ assertEquals("test-id", PipelineTransformFunctions.getSourceCoordinationIdentifierEnvVariable(null));
+ }
+
+ @Test
+ void getSourceCoordinationIdentifierEnvVariable_returns_null_when_env_not_set_and_prefix_null() {
+ PipelineTransformFunctions.sourceCoordinationIdentifierSupplier = () -> null;
+ assertNull(PipelineTransformFunctions.getSourceCoordinationIdentifierEnvVariable(null));
+ }
+
+ @Test
+ void getSourceCoordinationIdentifierEnvVariable_prepends_prefix_to_env_var() {
+ PipelineTransformFunctions.sourceCoordinationIdentifierSupplier = () -> "test-id";
+ assertEquals("myprefix/test-id",
+ PipelineTransformFunctions.getSourceCoordinationIdentifierEnvVariable("myprefix"));
+ }
+
+ // --- getIncludePrefixForRdsSource ---
+
+ @Test
+ void getIncludePrefixForRdsSource_returns_buffer_prefix_when_all_null() {
+ PipelineTransformFunctions.sourceCoordinationIdentifierSupplier = () -> null;
+ assertEquals("/buffer", PipelineTransformFunctions.getIncludePrefixForRdsSource(null));
+ }
+
+ @Test
+ void getIncludePrefixForRdsSource_returns_shortened_id_plus_buffer_when_prefix_null_and_env_set() {
+ PipelineTransformFunctions.sourceCoordinationIdentifierSupplier = () -> "short";
+ assertEquals("short/buffer", PipelineTransformFunctions.getIncludePrefixForRdsSource(null));
+ }
+
+ @Test
+ void getIncludePrefixForRdsSource_returns_prefix_plus_buffer_when_env_null() {
+ PipelineTransformFunctions.sourceCoordinationIdentifierSupplier = () -> null;
+ assertEquals("myprefix/buffer", PipelineTransformFunctions.getIncludePrefixForRdsSource("myprefix"));
+ }
+
+ @Test
+ void getIncludePrefixForRdsSource_returns_full_path_when_both_set() {
+ PipelineTransformFunctions.sourceCoordinationIdentifierSupplier = () -> "short";
+ assertEquals("myprefix/short/buffer",
+ PipelineTransformFunctions.getIncludePrefixForRdsSource("myprefix"));
+ }
+
+ @Test
+ void getIncludePrefixForRdsSource_shortens_long_identifier() {
+ PipelineTransformFunctions.sourceCoordinationIdentifierSupplier = () -> "this-is-a-very-long-identifier-exceeding-max";
+ String result = PipelineTransformFunctions.getIncludePrefixForRdsSource(null);
+ assertNotNull(result);
+ assertTrue(result.endsWith("/buffer"));
+ // shortened id is 15 chars + "/buffer" = 22 chars total
+ assertEquals(22, result.length());
+ }
+
+ // --- getAccountIdFromRole ---
+
+ @Test
+ void getAccountIdFromRole_returns_account_id_from_valid_arn() {
+ assertEquals("123456789012",
+ PipelineTransformFunctions.getAccountIdFromRole("arn:aws:iam::123456789012:role/MyRole"));
+ }
+
+ @Test
+ void getAccountIdFromRole_returns_null_when_arn_is_null() {
+ assertNull(PipelineTransformFunctions.getAccountIdFromRole(null));
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"", "not-an-arn", "arn:aws:iam:::role/test-role"})
+ void getAccountIdFromRole_returns_null_for_invalid_arns(String invalidArn) {
+ assertNull(PipelineTransformFunctions.getAccountIdFromRole(invalidArn));
+ }
+
+ // --- shortenIdentifier ---
+
+ @Test
+ void shortenIdentifier_returns_original_when_within_limit() {
+ assertEquals("short", PipelineTransformFunctions.shortenIdentifier("short", 15));
+ }
+
+ @Test
+ void shortenIdentifier_returns_shortened_hash_when_exceeds_limit() {
+ String longId = "this-is-a-very-long-identifier-that-exceeds-limit";
+ String result = PipelineTransformFunctions.shortenIdentifier(longId, 15);
+ assertNotNull(result);
+ assertEquals(15, result.length());
+ }
+
+ @Test
+ void shortenIdentifier_returns_consistent_results() {
+ String id = "consistent-test-identifier";
+ String result1 = PipelineTransformFunctions.shortenIdentifier(id, 10);
+ String result2 = PipelineTransformFunctions.shortenIdentifier(id, 10);
+ assertEquals(result1, result2);
+ }
+}
diff --git a/data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/rules/documentdb-rule.yaml b/data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/rules/documentdb-rule.yaml
index 60aa428d8a..22e814cac9 100644
--- a/data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/rules/documentdb-rule.yaml
+++ b/data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/rules/documentdb-rule.yaml
@@ -1,4 +1,6 @@
plugin_name: "documentdb"
apply_when:
- "$..source.documentdb"
- - "$..source.documentdb.s3_bucket"
\ No newline at end of file
+ - "$..source.documentdb.s3_bucket"
+function_providers:
+ - "org.opensearch.dataprepper.plugins.aws.dataprepper_transformer.PipelineTransformFunctions"
diff --git a/data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/rules/mongodb-rule.yaml b/data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/rules/mongodb-rule.yaml
index 33cc703072..07c35c3cd7 100644
--- a/data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/rules/mongodb-rule.yaml
+++ b/data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/rules/mongodb-rule.yaml
@@ -1,4 +1,6 @@
plugin_name: "mongodb"
apply_when:
- "$..source.mongodb"
- - "$..source.mongodb.s3_bucket"
\ No newline at end of file
+ - "$..source.mongodb.s3_bucket"
+function_providers:
+ - "org.opensearch.dataprepper.plugins.aws.dataprepper_transformer.PipelineTransformFunctions"
diff --git a/data-prepper-plugins/rds-source/src/main/resources/org/opensearch/dataprepper/transforms/rules/rds-joins-rule.yaml b/data-prepper-plugins/rds-source/src/main/resources/org/opensearch/dataprepper/transforms/rules/rds-joins-rule.yaml
index 6dc367e378..2e65f1481e 100644
--- a/data-prepper-plugins/rds-source/src/main/resources/org/opensearch/dataprepper/transforms/rules/rds-joins-rule.yaml
+++ b/data-prepper-plugins/rds-source/src/main/resources/org/opensearch/dataprepper/transforms/rules/rds-joins-rule.yaml
@@ -1,4 +1,3 @@
-#
# Copyright OpenSearch Contributors
# SPDX-License-Identifier: Apache-2.0
#
@@ -11,3 +10,5 @@ plugin_name: "rds-joins"
apply_when:
- "$..source.rds"
- "$..source.rds.joins"
+function_providers:
+ - "org.opensearch.dataprepper.plugins.aws.dataprepper_transformer.PipelineTransformFunctions"
diff --git a/data-prepper-plugins/rds-source/src/main/resources/org/opensearch/dataprepper/transforms/rules/rds-rule.yaml b/data-prepper-plugins/rds-source/src/main/resources/org/opensearch/dataprepper/transforms/rules/rds-rule.yaml
index f13f6b6fc4..668fc14e7a 100644
--- a/data-prepper-plugins/rds-source/src/main/resources/org/opensearch/dataprepper/transforms/rules/rds-rule.yaml
+++ b/data-prepper-plugins/rds-source/src/main/resources/org/opensearch/dataprepper/transforms/rules/rds-rule.yaml
@@ -1,3 +1,5 @@
plugin_name: "rds"
apply_when:
- - "$..source.rds"
\ No newline at end of file
+ - "$..source.rds"
+function_providers:
+ - "org.opensearch.dataprepper.plugins.aws.dataprepper_transformer.PipelineTransformFunctions"