diff --git a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java index 5ad74f57b1..e0ce39bcb1 100644 --- a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java +++ b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java @@ -529,7 +529,7 @@ void testOutputRawSpanDefault(final boolean estimateBulkSizeUsingCompression, .add(OpenSearchSink.BULKREQUEST_SIZE_BYTES).toString()); assertThat(bulkRequestSizeBytesMetrics.size(), equalTo(3)); assertThat(bulkRequestSizeBytesMetrics.get(0).getValue(), closeTo(1.0, 0)); - final double expectedBulkRequestSizeBytes = isRequestCompressionEnabled && estimateBulkSizeUsingCompression ? 799.0 : 2058.0; + final double expectedBulkRequestSizeBytes = isRequestCompressionEnabled && estimateBulkSizeUsingCompression ? 830.0 : 2058.0; assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(expectedBulkRequestSizeBytes, 0)); assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(expectedBulkRequestSizeBytes, 0)); } @@ -615,7 +615,7 @@ void testOutputRawSpanWithDLQ(final boolean estimateBulkSizeUsingCompression, .add(OpenSearchSink.BULKREQUEST_SIZE_BYTES).toString()); assertThat(bulkRequestSizeBytesMetrics.size(), equalTo(3)); assertThat(bulkRequestSizeBytesMetrics.get(0).getValue(), closeTo(1.0, 0)); - final double expectedBulkRequestSizeBytes = isRequestCompressionEnabled && estimateBulkSizeUsingCompression ? 1085.0 : 2072.0; + final double expectedBulkRequestSizeBytes = isRequestCompressionEnabled && estimateBulkSizeUsingCompression ? 1114.0 : 2072.0; assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(expectedBulkRequestSizeBytes, 0)); assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(expectedBulkRequestSizeBytes, 0)); @@ -677,7 +677,7 @@ void testOutputServiceMapDefault(final boolean estimateBulkSizeUsingCompression, .add(OpenSearchSink.BULKREQUEST_SIZE_BYTES).toString()); assertThat(bulkRequestSizeBytesMetrics.size(), equalTo(3)); assertThat(bulkRequestSizeBytesMetrics.get(0).getValue(), closeTo(1.0, 0)); - final double expectedBulkRequestSizeBytes = isRequestCompressionEnabled && estimateBulkSizeUsingCompression ? 376.0 : 265.0; + final double expectedBulkRequestSizeBytes = isRequestCompressionEnabled && estimateBulkSizeUsingCompression ? 410.0 : 265.0; assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(expectedBulkRequestSizeBytes, 0)); assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(expectedBulkRequestSizeBytes, 0)); diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index 3145195477..ace23d3260 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -139,6 +139,7 @@ public class OpenSearchSink extends AbstractSink> { private final String pipeline; private final String action; private final List actions; + private final ScriptManager scriptManager; private final String documentRootKey; private String configuredIndexAlias; private final ReentrantLock lock; @@ -208,6 +209,7 @@ public OpenSearchSink(final PluginSetting pluginSetting, this.routing = openSearchSinkConfig.getIndexConfiguration().getRouting(); this.action = openSearchSinkConfig.getIndexConfiguration().getAction(); this.actions = openSearchSinkConfig.getIndexConfiguration().getActions(); + this.scriptManager = new ScriptManager(openSearchSinkConfig.getIndexConfiguration().getScriptConfiguration(), expressionEvaluator); this.documentRootKey = openSearchSinkConfig.getIndexConfiguration().getDocumentRootKey(); this.versionType = openSearchSinkConfig.getIndexConfiguration().getVersionType(); this.versionExpression = openSearchSinkConfig.getIndexConfiguration().getVersionExpression(); @@ -342,7 +344,7 @@ public boolean isReady() { return initialized; } - private BulkOperation getBulkOperationForAction(final String action, + BulkOperation getBulkOperationForAction(final String action, final SerializedJson document, final Long version, final String indexName, @@ -380,18 +382,22 @@ private BulkOperation getBulkOperationForAction(final String action, } - final UpdateOperation.Builder updateOperationBuilder = (StringUtils.equals(action.toLowerCase(), OpenSearchBulkActions.UPSERT.toString())) ? - new UpdateOperation.Builder<>() + final boolean isUpsert = StringUtils.equals(action.toLowerCase(), OpenSearchBulkActions.UPSERT.toString()); + final UpdateOperation.Builder updateOperationBuilder = new UpdateOperation.Builder<>() .index(indexName) - .document(filteredJsonNode) - .upsert(filteredJsonNode) - .versionType(versionType) - .version(version) : - new UpdateOperation.Builder<>() - .index(indexName) - .document(filteredJsonNode) .versionType(versionType) .version(version); + + if (scriptManager.isScriptEnabled()) { + updateOperationBuilder.script(scriptManager.buildScript(jsonNode, document.getResolvedScriptParameters().orElse(null))); + updateOperationBuilder.upsert(filteredJsonNode); + updateOperationBuilder.scriptedUpsert(true); + } else if (isUpsert) { + updateOperationBuilder.document(filteredJsonNode).upsert(filteredJsonNode); + } else { + updateOperationBuilder.document(filteredJsonNode); + } + docId.ifPresent(updateOperationBuilder::id); routing.ifPresent(updateOperationBuilder::routing); bulkOperation = new BulkOperation.Builder() @@ -596,7 +602,12 @@ SerializedJson getDocument(final Event event) { final String document = DocumentBuilder.build(event, documentRootKey, sinkContext.getTagsTargetKey(), sinkContext.getIncludeKeys(), sinkContext.getExcludeKeys()); - return SerializedJson.fromStringAndOptionals(document, docId, routingValue, null); + return SerializedJson.builder() + .withJsonString(document) + .withDocumentId(docId) + .withRoutingField(routingValue) + .withResolvedScriptParameters(scriptManager.resolveParams(event)) + .build(); } private void flushBatch(AccumulatingBulkRequest accumulatingBulkRequest) { diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/ScriptManager.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/ScriptManager.java new file mode 100644 index 0000000000..0e62714235 --- /dev/null +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/ScriptManager.java @@ -0,0 +1,70 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.sink.opensearch; + +import com.fasterxml.jackson.databind.JsonNode; +import org.opensearch.client.json.JsonData; +import org.opensearch.client.opensearch._types.InlineScript; +import org.opensearch.client.opensearch._types.Script; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.ScriptConfiguration; + +import java.util.HashMap; +import java.util.Map; + +public class ScriptManager { + + private static final String PAINLESS = "painless"; + private static final String DOC_PARAM = "doc"; + + private final ScriptConfiguration scriptConfiguration; + private final ExpressionEvaluator expressionEvaluator; + + public ScriptManager(final ScriptConfiguration scriptConfiguration, final ExpressionEvaluator expressionEvaluator) { + this.scriptConfiguration = scriptConfiguration; + this.expressionEvaluator = expressionEvaluator; + } + + public boolean isScriptEnabled() { + return scriptConfiguration != null; + } + + public Map resolveParams(final Event event) { + if (scriptConfiguration == null || scriptConfiguration.getParams() == null) { + return null; + } + final Map resolved = new HashMap<>(); + for (final Map.Entry entry : scriptConfiguration.getParams().entrySet()) { + final Object value = entry.getValue(); + if (value instanceof String && ((String) value).contains("${")) { + resolved.put(entry.getKey(), event.formatString((String) value, expressionEvaluator)); + } else { + resolved.put(entry.getKey(), value); + } + } + return resolved; + } + + public Script buildScript(final JsonNode jsonNode, final Map resolvedParams) { + final Map scriptParams = new HashMap<>(); + scriptParams.put(DOC_PARAM, JsonData.of(jsonNode)); + if (resolvedParams != null) { + resolvedParams.forEach((k, v) -> scriptParams.put(k, JsonData.of(v))); + } + final InlineScript inlineScript = new InlineScript.Builder() + .source(scriptConfiguration.getSource()) + .lang(PAINLESS) + .params(scriptParams) + .build(); + return Script.of(s -> s.inline(inlineScript)); + } +} diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/SerializedJson.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/SerializedJson.java index c1b5ae3799..06105fe1d8 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/SerializedJson.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/SerializedJson.java @@ -7,6 +7,7 @@ import com.fasterxml.jackson.databind.JsonNode; import java.nio.charset.StandardCharsets; +import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -18,6 +19,7 @@ public interface SerializedJson extends SizedDocument { Optional getDocumentId(); Optional getRoutingField(); Optional getPipelineField(); + Optional> getResolvedScriptParameters(); /** * Creates a new {@link SerializedJson} from a JSON string and optional documentId and routingField. @@ -30,11 +32,54 @@ public interface SerializedJson extends SizedDocument { */ static SerializedJson fromStringAndOptionals(String jsonString, String docId, String routingField, String pipelineField) { Objects.requireNonNull(jsonString); - return new SerializedJsonImpl(jsonString.getBytes(StandardCharsets.UTF_8), docId, routingField, pipelineField); + return new SerializedJsonImpl(jsonString.getBytes(StandardCharsets.UTF_8), docId, routingField, pipelineField, null); } static SerializedJson fromJsonNode(final JsonNode jsonNode, SerializedJson document) { return new SerializedJsonNode(jsonNode, document); } -} + static Builder builder() { + return new Builder(); + } + + class Builder { + private String jsonString; + private String documentId; + private String routingField; + private String pipelineField; + private Map resolvedScriptParameters; + + public Builder withJsonString(final String jsonString) { + this.jsonString = jsonString; + return this; + } + + public Builder withDocumentId(final String documentId) { + this.documentId = documentId; + return this; + } + + public Builder withRoutingField(final String routingField) { + this.routingField = routingField; + return this; + } + + public Builder withPipelineField(final String pipelineField) { + this.pipelineField = pipelineField; + return this; + } + + public Builder withResolvedScriptParameters(final Map resolvedScriptParameters) { + this.resolvedScriptParameters = resolvedScriptParameters; + return this; + } + + public SerializedJson build() { + Objects.requireNonNull(jsonString); + return new SerializedJsonImpl( + jsonString.getBytes(StandardCharsets.UTF_8), + documentId, routingField, pipelineField, resolvedScriptParameters); + } + } +} diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/SerializedJsonImpl.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/SerializedJsonImpl.java index 2f1fbd2b96..fe78134fc1 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/SerializedJsonImpl.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/SerializedJsonImpl.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.plugins.sink.opensearch.bulk; import java.io.Serializable; +import java.util.Map; import java.util.Optional; class SerializedJsonImpl implements SerializedJson, Serializable { @@ -13,12 +14,14 @@ class SerializedJsonImpl implements SerializedJson, Serializable { private String documentId = null; private String routingField = null; private String pipelineField = null; + private Map resolvedScriptParameters = null; - public SerializedJsonImpl(final byte[] document, String docId, String routingField, String pipelineField) { + public SerializedJsonImpl(final byte[] document, String docId, String routingField, String pipelineField, Map resolvedScriptParameters) { this.document = document; this.documentId = docId; this.routingField = routingField; this.pipelineField = pipelineField; + this.resolvedScriptParameters = resolvedScriptParameters; } public SerializedJsonImpl(final byte[] document) { @@ -47,4 +50,9 @@ public Optional getRoutingField() { @Override public Optional getPipelineField() { return Optional.ofNullable(pipelineField); } + + @Override + public Optional> getResolvedScriptParameters() { + return Optional.ofNullable(resolvedScriptParameters); + } } diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/SerializedJsonNode.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/SerializedJsonNode.java index 41d9459347..8281e7a33b 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/SerializedJsonNode.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/SerializedJsonNode.java @@ -7,6 +7,7 @@ import com.fasterxml.jackson.databind.JsonNode; import java.io.Serializable; +import java.util.Map; import java.util.Optional; class SerializedJsonNode implements SerializedJson, Serializable { @@ -15,13 +16,15 @@ class SerializedJsonNode implements SerializedJson, Serializable { private String documentId = null; private String routingField = null; private String pipelineField = null; + private Map resolvedScriptParameters = null; public SerializedJsonNode(final JsonNode jsonNode, SerializedJson doc) { this.jsonNode = jsonNode; this.documentId = doc.getDocumentId().orElse(null); this.routingField = doc.getRoutingField().orElse(null); this.document = jsonNode.toString().getBytes(); - this.pipelineField = doc.getPipelineField().orElse(null);; + this.pipelineField = doc.getPipelineField().orElse(null); + this.resolvedScriptParameters = doc.getResolvedScriptParameters().orElse(null); } public SerializedJsonNode(final JsonNode jsonNode) { @@ -51,4 +54,9 @@ public Optional getRoutingField() { @Override public Optional getPipelineField() { return Optional.ofNullable(pipelineField); } + + @Override + public Optional> getResolvedScriptParameters() { + return Optional.ofNullable(resolvedScriptParameters); + } } diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/configuration/OpenSearchSinkConfig.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/configuration/OpenSearchSinkConfig.java index 7fbe17d82b..d8f706685b 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/configuration/OpenSearchSinkConfig.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/configuration/OpenSearchSinkConfig.java @@ -164,6 +164,22 @@ public boolean getEnableRequestCompression() { @JsonProperty("actions") private List actions = null; + @Getter + @Valid + @JsonProperty("script") + private ScriptConfiguration scriptConfiguration = null; + + @AssertTrue(message = "script can only be used with update or upsert actions.") + public boolean isScriptActionValid() { + if (scriptConfiguration == null) { + return true; + } + final String effectiveAction = action != null ? action : OpenSearchBulkActions.INDEX.toString(); + return effectiveAction.contains("${") || + effectiveAction.equalsIgnoreCase(OpenSearchBulkActions.UPDATE.toString()) || + effectiveAction.equalsIgnoreCase(OpenSearchBulkActions.UPSERT.toString()); + } + @Getter @JsonProperty("document_root_key") private String documentRootKey = null; diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/configuration/ScriptConfiguration.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/configuration/ScriptConfiguration.java new file mode 100644 index 0000000000..d35dc63996 --- /dev/null +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/configuration/ScriptConfiguration.java @@ -0,0 +1,29 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.sink.opensearch.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.NotEmpty; +import lombok.Getter; + +import java.util.Map; + +public class ScriptConfiguration { + + @Getter + @NotEmpty + @JsonProperty("source") + private String source; + + @Getter + @JsonProperty("params") + private Map params; +} diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java index 279b49ec04..9bb73183b4 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java @@ -16,6 +16,7 @@ import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.OpenSearchSinkConfig; import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.ActionConfiguration; import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.AwsAuthenticationConfiguration; +import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.ScriptConfiguration; import org.opensearch.dataprepper.plugins.sink.opensearch.index.model.QueryForExistingDocumentConfiguration; import org.opensearch.dataprepper.plugins.sink.opensearch.s3.FileReader; import org.opensearch.dataprepper.plugins.sink.opensearch.s3.S3ClientProvider; @@ -103,6 +104,8 @@ public class IndexConfiguration { private final VersionType versionType; private final boolean normalizeIndex; + private final ScriptConfiguration scriptConfiguration; + private final String queryWhen; private final Duration queryDuration; @@ -174,6 +177,7 @@ private IndexConfiguration(final Builder builder) { this.ismPolicyFile = builder.ismPolicyFile; this.action = builder.action; this.actions = builder.actions; + this.scriptConfiguration = builder.scriptConfiguration; this.documentRootKey = builder.documentRootKey; this.queryWhen = builder.queryWhen; this.queryTerm = builder.queryTerm; @@ -293,6 +297,11 @@ public static IndexConfiguration readIndexConfig(final OpenSearchSinkConfig open builder = builder.withAction(openSearchSinkConfig.getAction(), expressionEvaluator); } + final ScriptConfiguration scriptConfiguration = openSearchSinkConfig.getScriptConfiguration(); + if (scriptConfiguration != null) { + builder = builder.withScriptConfiguration(scriptConfiguration); + } + AwsAuthenticationConfiguration awsAuthenticationConfiguration = openSearchSinkConfig.getAwsAuthenticationOptions(); if (awsAuthenticationConfiguration != null) { builder = builder.withServerless(awsAuthenticationConfiguration.isServerlessCollection()); @@ -372,6 +381,10 @@ public List getActions() { return actions; } + public ScriptConfiguration getScriptConfiguration() { + return scriptConfiguration; + } + public String getS3AwsRegion() { return s3AwsRegion; } @@ -501,6 +514,7 @@ public static class Builder { private Optional ismPolicyFile; private String action; private List actions; + private ScriptConfiguration scriptConfiguration; private String s3AwsRegion; private String s3AwsStsRoleArn; private String s3AwsStsExternalId; @@ -629,6 +643,11 @@ public Builder withActions(final List actions, final Expres return this; } + public Builder withScriptConfiguration(final ScriptConfiguration scriptConfiguration) { + this.scriptConfiguration = scriptConfiguration; + return this; + } + public Builder withS3AwsRegion(final String s3AwsRegion) { checkNotNull(s3AwsRegion, "s3AwsRegion cannot be null"); this.s3AwsRegion = s3AwsRegion; diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkScriptTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkScriptTest.java new file mode 100644 index 0000000000..fa41a5432d --- /dev/null +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkScriptTest.java @@ -0,0 +1,337 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.sink.opensearch; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.DistributionSummary; +import io.micrometer.core.instrument.Timer; +import jakarta.json.stream.JsonGenerator; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.MockedConstruction; +import org.mockito.MockedStatic; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.client.opensearch.core.bulk.BulkOperation; +import org.opensearch.client.opensearch.core.bulk.UpdateOperation; +import org.opensearch.client.RestHighLevelClient; +import org.opensearch.client.json.jackson.JacksonJsonpMapper; +import org.opensearch.client.opensearch.OpenSearchClient; +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.metrics.MetricNames; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.configuration.PipelineDescription; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions; +import org.opensearch.dataprepper.model.plugin.PluginConfigObservable; +import org.opensearch.dataprepper.model.sink.SinkContext; +import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.SerializedJson; +import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.OpenSearchSinkConfig; +import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.ScriptConfiguration; +import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration; +import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexManagerFactory; +import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexType; +import org.opensearch.dataprepper.plugins.sink.opensearch.index.TemplateType; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockConstruction; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.model.sink.SinkLatencyMetrics.EXTERNAL_LATENCY; +import static org.opensearch.dataprepper.model.sink.SinkLatencyMetrics.INTERNAL_LATENCY; +import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.BULKREQUEST_ERRORS; +import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.BULKREQUEST_LATENCY; +import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.BULKREQUEST_SIZE_BYTES; +import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.DYNAMIC_INDEX_DROPPED_EVENTS; +import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.INVALID_ACTION_ERRORS; +import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.INVALID_VERSION_EXPRESSION_DROPPED_EVENTS; +import static org.opensearch.dataprepper.plugins.sink.opensearch.configuration.OpenSearchSinkConfig.DEFAULT_BULK_SIZE; +import static org.opensearch.dataprepper.plugins.sink.opensearch.configuration.OpenSearchSinkConfig.DEFAULT_FLUSH_TIMEOUT; + +@ExtendWith(MockitoExtension.class) +public class OpenSearchSinkScriptTest { + + private final ObjectMapper objectMapper = new ObjectMapper(); + private final JacksonJsonpMapper jsonpMapper = new JacksonJsonpMapper(); + + @Mock private IndexManagerFactory indexManagerFactory; + @Mock private OpenSearchClient openSearchClient; + @Mock private SinkContext sinkContext; + @Mock private PluginSetting pluginSetting; + @Mock private ExpressionEvaluator expressionEvaluator; + @Mock private AwsCredentialsSupplier awsCredentialsSupplier; + @Mock private OpenSearchSinkConfiguration openSearchSinkConfiguration; + @Mock private PipelineDescription pipelineDescription; + @Mock private IndexConfiguration indexConfiguration; + @Mock private PluginMetrics pluginMetrics; + @Mock private OpenSearchSinkConfig openSearchSinkConfig; + @Mock private PluginConfigObservable pluginConfigObservable; + @Mock private ScriptConfiguration scriptConfiguration; + + @BeforeEach + void setup() { + when(pipelineDescription.getPipelineName()).thenReturn(UUID.randomUUID().toString()); + + final RetryConfiguration retryConfiguration = mock(RetryConfiguration.class); + when(retryConfiguration.getDlq()).thenReturn(Optional.empty()); + lenient().when(retryConfiguration.getDlqFile()).thenReturn(null); + + final ConnectionConfiguration connectionConfiguration = mock(ConnectionConfiguration.class); + final RestHighLevelClient restHighLevelClient = mock(RestHighLevelClient.class); + lenient().when(connectionConfiguration.createClient(awsCredentialsSupplier)).thenReturn(restHighLevelClient); + lenient().when(connectionConfiguration.createOpenSearchClient(restHighLevelClient, awsCredentialsSupplier)).thenReturn(openSearchClient); + + when(indexConfiguration.getDocumentId()).thenReturn(null); + when(indexConfiguration.getDocumentIdField()).thenReturn(null); + when(indexConfiguration.getRouting()).thenReturn(null); + when(indexConfiguration.getActions()).thenReturn(null); + when(indexConfiguration.getDocumentRootKey()).thenReturn(null); + lenient().when(indexConfiguration.getVersionType()).thenReturn(null); + lenient().when(indexConfiguration.getVersionExpression()).thenReturn(null); + lenient().when(indexConfiguration.getIndexAlias()).thenReturn(UUID.randomUUID().toString()); + lenient().when(indexConfiguration.getTemplateType()).thenReturn(TemplateType.V1); + when(indexConfiguration.getIndexType()).thenReturn(IndexType.CUSTOM); + when(indexConfiguration.getBulkSize()).thenReturn(DEFAULT_BULK_SIZE); + when(indexConfiguration.getFlushTimeout()).thenReturn(DEFAULT_FLUSH_TIMEOUT); + + when(openSearchSinkConfiguration.getIndexConfiguration()).thenReturn(indexConfiguration); + when(openSearchSinkConfiguration.getRetryConfiguration()).thenReturn(retryConfiguration); + lenient().when(openSearchSinkConfiguration.getConnectionConfiguration()).thenReturn(connectionConfiguration); + + when(pluginMetrics.counter(MetricNames.RECORDS_IN)).thenReturn(mock(Counter.class)); + when(pluginMetrics.timer(MetricNames.TIME_ELAPSED)).thenReturn(mock(Timer.class)); + when(pluginMetrics.timer(INTERNAL_LATENCY)).thenReturn(mock(Timer.class)); + when(pluginMetrics.timer(EXTERNAL_LATENCY)).thenReturn(mock(Timer.class)); + when(pluginMetrics.timer(BULKREQUEST_LATENCY)).thenReturn(mock(Timer.class)); + when(pluginMetrics.counter(BULKREQUEST_ERRORS)).thenReturn(mock(Counter.class)); + when(pluginMetrics.counter(INVALID_ACTION_ERRORS)).thenReturn(mock(Counter.class)); + when(pluginMetrics.counter(DYNAMIC_INDEX_DROPPED_EVENTS)).thenReturn(mock(Counter.class)); + when(pluginMetrics.counter(INVALID_VERSION_EXPRESSION_DROPPED_EVENTS)).thenReturn(mock(Counter.class)); + when(pluginMetrics.summary(BULKREQUEST_SIZE_BYTES)).thenReturn(mock(DistributionSummary.class)); + + lenient().when(sinkContext.getTagsTargetKey()).thenReturn(null); + lenient().when(sinkContext.getIncludeKeys()).thenReturn(null); + lenient().when(sinkContext.getExcludeKeys()).thenReturn(null); + } + + private void configureScript(final String source) { + configureScript(source, null); + } + + private void configureScript(final String source, final Map params) { + when(scriptConfiguration.getSource()).thenReturn(source); + lenient().when(scriptConfiguration.getParams()).thenReturn(params); + when(indexConfiguration.getScriptConfiguration()).thenReturn(scriptConfiguration); + } + + private void configureAction(final String action) { + when(indexConfiguration.getAction()).thenReturn(action); + } + + private OpenSearchSink createObjectUnderTest() throws IOException { + try (final MockedStatic configMockedStatic = mockStatic(OpenSearchSinkConfiguration.class); + final MockedStatic metricsMockedStatic = mockStatic(PluginMetrics.class); + final MockedConstruction ignored = mockConstruction(IndexManagerFactory.class, (mock, context) -> { + indexManagerFactory = mock; + })) { + metricsMockedStatic.when(() -> PluginMetrics.fromPluginSetting(pluginSetting)).thenReturn(pluginMetrics); + configMockedStatic.when(() -> OpenSearchSinkConfiguration.readOSConfig(openSearchSinkConfig, expressionEvaluator)) + .thenReturn(openSearchSinkConfiguration); + return new OpenSearchSink( + pluginSetting, sinkContext, expressionEvaluator, awsCredentialsSupplier, pipelineDescription, pluginConfigObservable, openSearchSinkConfig); + } + } + + private JsonNode serializeBody(final UpdateOperation updateOp) throws IOException { + final java.util.Iterator parts = updateOp._serializables(); + parts.next(); // skip action line + final Object body = parts.next(); + final StringWriter writer = new StringWriter(); + try (final JsonGenerator generator = jsonpMapper.jsonProvider().createGenerator(writer)) { + ((org.opensearch.client.json.PlainJsonSerializable) body).serialize(generator, jsonpMapper); + } + return objectMapper.readTree(writer.toString()); + } + + @Test + void script_sets_script_source_and_lang() throws IOException { + configureAction(OpenSearchBulkActions.UPSERT.toString()); + configureScript("ctx._source.counter += 1"); + final OpenSearchSink sink = createObjectUnderTest(); + + final ObjectNode jsonNode = objectMapper.createObjectNode().put("counter", 0); + final SerializedJson document = SerializedJson.fromStringAndOptionals("{\"counter\":0}", "doc-1", null, null); + + final BulkOperation result = sink.getBulkOperationForAction( + OpenSearchBulkActions.UPSERT.toString(), document, null, "test-index", jsonNode); + + final JsonNode body = serializeBody(result.update()); + assertThat(body.get("script").get("source").asText(), equalTo("ctx._source.counter += 1")); + assertThat(body.get("script").get("lang").asText(), equalTo("painless")); + } + + @Test + void script_always_passes_event_as_params_doc() throws IOException { + configureAction(OpenSearchBulkActions.UPSERT.toString()); + configureScript("ctx._source.putAll(params.doc)"); + final OpenSearchSink sink = createObjectUnderTest(); + + final ObjectNode jsonNode = objectMapper.createObjectNode().put("price", 9.99).put("currency", "USD"); + final SerializedJson document = SerializedJson.fromStringAndOptionals("{}", "doc-1", null, null); + + final BulkOperation result = sink.getBulkOperationForAction( + OpenSearchBulkActions.UPSERT.toString(), document, null, "test-index", jsonNode); + + final JsonNode body = serializeBody(result.update()); + final JsonNode doc = body.get("script").get("params").get("doc"); + assertThat(doc, notNullValue()); + assertThat(doc.get("price").asDouble(), equalTo(9.99)); + assertThat(doc.get("currency").asText(), equalTo("USD")); + } + + @Test + void script_merges_resolved_params_alongside_doc() throws IOException { + configureAction(OpenSearchBulkActions.UPSERT.toString()); + configureScript("ctx._source.counter += params.increment", Map.of("increment", 5)); + final OpenSearchSink sink = createObjectUnderTest(); + + final ObjectNode jsonNode = objectMapper.createObjectNode().put("counter", 0); + final SerializedJson document = SerializedJson.builder() + .withJsonString("{}") + .withDocumentId("doc-1") + .withResolvedScriptParameters(Map.of("increment", 5)) + .build(); + + final BulkOperation result = sink.getBulkOperationForAction( + OpenSearchBulkActions.UPSERT.toString(), document, null, "test-index", jsonNode); + + final JsonNode body = serializeBody(result.update()); + final JsonNode params = body.get("script").get("params"); + assertThat(params.get("doc"), notNullValue()); + assertThat(params.get("increment").asInt(), equalTo(5)); + } + + @Test + void script_always_sets_scripted_upsert_true() throws IOException { + configureAction(OpenSearchBulkActions.UPSERT.toString()); + configureScript("ctx._source.counter += 1"); + final OpenSearchSink sink = createObjectUnderTest(); + + final ObjectNode jsonNode = objectMapper.createObjectNode(); + final SerializedJson document = SerializedJson.fromStringAndOptionals("{}", "doc-1", null, null); + + final BulkOperation result = sink.getBulkOperationForAction( + OpenSearchBulkActions.UPSERT.toString(), document, null, "test-index", jsonNode); + + final JsonNode body = serializeBody(result.update()); + assertThat(body.get("scripted_upsert").asBoolean(), equalTo(true)); + } + + @Test + void script_sets_upsert_body() throws IOException { + configureAction(OpenSearchBulkActions.UPSERT.toString()); + configureScript("ctx._source.counter += 1"); + final OpenSearchSink sink = createObjectUnderTest(); + + final ObjectNode jsonNode = objectMapper.createObjectNode().put("counter", 0); + final SerializedJson document = SerializedJson.fromStringAndOptionals("{\"counter\":0}", "doc-1", null, null); + + final BulkOperation result = sink.getBulkOperationForAction( + OpenSearchBulkActions.UPSERT.toString(), document, null, "test-index", jsonNode); + + final JsonNode body = serializeBody(result.update()); + assertThat(body.get("upsert"), notNullValue()); + } + + @Test + void script_works_with_update_action() throws IOException { + configureAction(OpenSearchBulkActions.UPDATE.toString()); + configureScript("ctx._source.status = params.doc.status"); + final OpenSearchSink sink = createObjectUnderTest(); + + final ObjectNode jsonNode = objectMapper.createObjectNode().put("status", "active"); + final SerializedJson document = SerializedJson.fromStringAndOptionals("{}", "doc-1", null, null); + + final BulkOperation result = sink.getBulkOperationForAction( + OpenSearchBulkActions.UPDATE.toString(), document, null, "test-index", jsonNode); + + final JsonNode body = serializeBody(result.update()); + assertThat(body.get("script"), notNullValue()); + assertThat(body.get("scripted_upsert").asBoolean(), equalTo(true)); + } + + @Test + void script_sets_document_id_and_routing() throws IOException { + configureAction(OpenSearchBulkActions.UPSERT.toString()); + configureScript("ctx._source.counter += 1"); + final OpenSearchSink sink = createObjectUnderTest(); + + final ObjectNode jsonNode = objectMapper.createObjectNode(); + final SerializedJson document = SerializedJson.fromStringAndOptionals("{}", "my-doc-id", "my-route", null); + + final BulkOperation result = sink.getBulkOperationForAction( + OpenSearchBulkActions.UPSERT.toString(), document, null, "test-index", jsonNode); + + assertThat(result.update().id(), equalTo("my-doc-id")); + assertThat(result.update().routing(), equalTo("my-route")); + } + + @Test + void without_script_upsert_preserves_original_behavior() throws IOException { + configureAction(OpenSearchBulkActions.UPSERT.toString()); + when(indexConfiguration.getScriptConfiguration()).thenReturn(null); + final OpenSearchSink sink = createObjectUnderTest(); + + final ObjectNode jsonNode = objectMapper.createObjectNode().put("name", "test"); + final SerializedJson document = SerializedJson.fromStringAndOptionals("{\"name\":\"test\"}", "doc-1", null, null); + + final BulkOperation result = sink.getBulkOperationForAction( + OpenSearchBulkActions.UPSERT.toString(), document, null, "test-index", jsonNode); + + final JsonNode body = serializeBody(result.update()); + assertThat(body.has("script"), equalTo(false)); + assertThat(body.get("doc"), notNullValue()); + assertThat(body.get("upsert"), notNullValue()); + } + + @Test + void without_script_update_preserves_original_behavior() throws IOException { + configureAction(OpenSearchBulkActions.UPDATE.toString()); + when(indexConfiguration.getScriptConfiguration()).thenReturn(null); + final OpenSearchSink sink = createObjectUnderTest(); + + final ObjectNode jsonNode = objectMapper.createObjectNode().put("name", "test"); + final SerializedJson document = SerializedJson.fromStringAndOptionals("{\"name\":\"test\"}", "doc-1", null, null); + + final BulkOperation result = sink.getBulkOperationForAction( + OpenSearchBulkActions.UPDATE.toString(), document, null, "test-index", jsonNode); + + final JsonNode body = serializeBody(result.update()); + assertThat(body.has("script"), equalTo(false)); + assertThat(body.get("doc"), notNullValue()); + assertThat(body.has("upsert"), equalTo(false)); + } +} diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/SerializedJsonImplTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/SerializedJsonImplTest.java index 04fea232c8..54771bf579 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/SerializedJsonImplTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/SerializedJsonImplTest.java @@ -34,7 +34,7 @@ void setUp() { } private SerializedJsonImpl createObjectUnderTest() { - return new SerializedJsonImpl(documentBytes, documentId, routingField, pipelineField); + return new SerializedJsonImpl(documentBytes, documentId, routingField, pipelineField, null); } @Test diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/configuration/OpenSearchSinkConfigScriptValidationTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/configuration/OpenSearchSinkConfigScriptValidationTest.java new file mode 100644 index 0000000000..4e2203ec3d --- /dev/null +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/configuration/OpenSearchSinkConfigScriptValidationTest.java @@ -0,0 +1,61 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.sink.opensearch.configuration; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +public class OpenSearchSinkConfigScriptValidationTest { + + private static final ObjectMapper YAML_MAPPER = new ObjectMapper(new YAMLFactory()); + + private OpenSearchSinkConfig deserialize(final String yaml) throws Exception { + return YAML_MAPPER.readValue(yaml, OpenSearchSinkConfig.class); + } + + @ParameterizedTest + @ValueSource(strings = {"update", "upsert"}) + void script_with_update_or_upsert_action_is_valid(final String action) throws Exception { + final String yaml = String.format( + "action: \"%s\"\nscript:\n source: \"ctx._source.putAll(params.doc)\"\n", action); + final OpenSearchSinkConfig config = deserialize(yaml); + assertThat(config.isScriptActionValid(), equalTo(true)); + } + + @ParameterizedTest + @ValueSource(strings = {"index", "create", "delete"}) + void script_with_index_create_or_delete_action_is_invalid(final String action) throws Exception { + final String yaml = String.format( + "action: \"%s\"\nscript:\n source: \"ctx._source.putAll(params.doc)\"\n", action); + final OpenSearchSinkConfig config = deserialize(yaml); + assertThat(config.isScriptActionValid(), equalTo(false)); + } + + @Test + void script_with_expression_action_is_valid() throws Exception { + final String yaml = "action: \"${getMetadata(\\\"opensearch_action\\\")}\"\nscript:\n source: \"ctx._source.putAll(params.doc)\"\n"; + final OpenSearchSinkConfig config = deserialize(yaml); + assertThat(config.isScriptActionValid(), equalTo(true)); + } + + @Test + void no_script_is_always_valid() throws Exception { + final String yaml = "action: \"index\"\n"; + final OpenSearchSinkConfig config = deserialize(yaml); + assertThat(config.isScriptActionValid(), equalTo(true)); + } +}