From b7c508058cc4f4f01ac749b82871c1084a93bfc3 Mon Sep 17 00:00:00 2001 From: Dinu John <86094133+dinujoh@users.noreply.github.com> Date: Fri, 10 Apr 2026 09:56:31 -0500 Subject: [PATCH 1/3] Refactor getBulkOperationForAction into BulkOperationFactory Extract bulk operation creation logic from OpenSearchSink into a dedicated BulkOperationFactory class for improved testability and separation of concerns. Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> --- .../sink/opensearch/BulkOperationFactory.java | 129 ++++++++++++++++++ .../sink/opensearch/OpenSearchSink.java | 99 +------------- 2 files changed, 132 insertions(+), 96 deletions(-) create mode 100644 data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkOperationFactory.java diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkOperationFactory.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkOperationFactory.java new file mode 100644 index 0000000000..2be1587e85 --- /dev/null +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkOperationFactory.java @@ -0,0 +1,129 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.sink.opensearch; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang3.StringUtils; +import org.opensearch.client.opensearch._types.VersionType; +import org.opensearch.client.opensearch.core.bulk.BulkOperation; +import org.opensearch.client.opensearch.core.bulk.CreateOperation; +import org.opensearch.client.opensearch.core.bulk.DeleteOperation; +import org.opensearch.client.opensearch.core.bulk.IndexOperation; +import org.opensearch.client.opensearch.core.bulk.UpdateOperation; +import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions; +import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.SerializedJson; + +import java.io.IOException; +import java.util.Optional; + +public class BulkOperationFactory { + + private final VersionType versionType; + private final ScriptManager scriptManager; + private final ObjectMapper objectMapper; + private final boolean usingDocumentFilters; + + public BulkOperationFactory(final VersionType versionType, + final ScriptManager scriptManager, + final ObjectMapper objectMapper, + final boolean usingDocumentFilters) { + this.versionType = versionType; + this.scriptManager = scriptManager; + this.objectMapper = objectMapper; + this.usingDocumentFilters = usingDocumentFilters; + } + + public BulkOperation create(final String action, + final SerializedJson document, + final Long version, + final String indexName, + final JsonNode jsonNode) { + final Optional docId = document.getDocumentId(); + final Optional routing = document.getRoutingField(); + final Optional pipeline = document.getPipelineField(); + + if (StringUtils.equals(action, OpenSearchBulkActions.CREATE.toString())) { + final CreateOperation.Builder builder = + new CreateOperation.Builder<>() + .index(indexName) + .document(document); + docId.ifPresent(builder::id); + routing.ifPresent(builder::routing); + pipeline.ifPresent(builder::pipeline); + return new BulkOperation.Builder().create(builder.build()).build(); + } + + if (StringUtils.equals(action, OpenSearchBulkActions.UPDATE.toString()) || + StringUtils.equals(action, OpenSearchBulkActions.UPSERT.toString())) { + return createUpdateOperation(action, document, version, indexName, jsonNode, docId, routing); + } + + if (StringUtils.equals(action, OpenSearchBulkActions.DELETE.toString())) { + final DeleteOperation.Builder builder = new DeleteOperation.Builder() + .index(indexName) + .versionType(versionType) + .version(version); + docId.ifPresent(builder::id); + routing.ifPresent(builder::routing); + return new BulkOperation.Builder().delete(builder.build()).build(); + } + + // Default to "index" + final IndexOperation.Builder builder = new IndexOperation.Builder<>() + .index(indexName) + .document(document) + .version(version) + .versionType(versionType); + docId.ifPresent(builder::id); + routing.ifPresent(builder::routing); + pipeline.ifPresent(builder::pipeline); + return new BulkOperation.Builder().index(builder.build()).build(); + } + + private BulkOperation createUpdateOperation(final String action, + final SerializedJson document, + final Long version, + final String indexName, + final JsonNode jsonNode, + final Optional docId, + final Optional routing) { + JsonNode filteredJsonNode = jsonNode; + try { + if (usingDocumentFilters) { + filteredJsonNode = objectMapper.reader().readTree(document.getSerializedJson()); + } + } catch (final IOException e) { + throw new RuntimeException( + String.format("An exception occurred while deserializing a document for the %s action: %s", action, e.getMessage())); + } + + final boolean isUpsert = StringUtils.equals(action.toLowerCase(), OpenSearchBulkActions.UPSERT.toString()); + final UpdateOperation.Builder builder = new UpdateOperation.Builder<>() + .index(indexName) + .versionType(versionType) + .version(version); + + if (scriptManager.isScriptEnabled()) { + builder.script(scriptManager.buildScript(jsonNode, document.getResolvedScriptParameters().orElse(null))); + builder.upsert(filteredJsonNode); + builder.scriptedUpsert(true); + } else if (isUpsert) { + builder.document(filteredJsonNode).upsert(filteredJsonNode); + } else { + builder.document(filteredJsonNode); + } + + docId.ifPresent(builder::id); + routing.ifPresent(builder::routing); + return new BulkOperation.Builder().update(builder.build()).build(); + } +} diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index ace23d3260..8cdce533c9 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -10,7 +10,6 @@ package org.opensearch.dataprepper.plugins.sink.opensearch; import com.google.common.annotations.VisibleForTesting; -import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.DistributionSummary; @@ -21,10 +20,6 @@ import org.opensearch.client.opensearch._types.VersionType; import org.opensearch.client.opensearch.core.BulkRequest; import org.opensearch.client.opensearch.core.bulk.BulkOperation; -import org.opensearch.client.opensearch.core.bulk.CreateOperation; -import org.opensearch.client.opensearch.core.bulk.DeleteOperation; -import org.opensearch.client.opensearch.core.bulk.IndexOperation; -import org.opensearch.client.opensearch.core.bulk.UpdateOperation; import org.opensearch.client.transport.TransportOptions; import org.opensearch.common.unit.ByteSizeUnit; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; @@ -140,6 +135,7 @@ public class OpenSearchSink extends AbstractSink> { private final String action; private final List actions; private final ScriptManager scriptManager; + private final BulkOperationFactory bulkOperationFactory; private final String documentRootKey; private String configuredIndexAlias; private final ReentrantLock lock; @@ -221,6 +217,7 @@ public OpenSearchSink(final PluginSetting pluginSetting, this.lastFlushTimeMap = new ConcurrentHashMap<>(); this.pluginConfigObservable = pluginConfigObservable; this.objectMapper = new ObjectMapper(); + this.bulkOperationFactory = new BulkOperationFactory(versionType, scriptManager, objectMapper, isUsingDocumentFilters()); this.queryExecutorService = openSearchSinkConfig.getIndexConfiguration().getQueryTerm() != null ? Executors.newSingleThreadExecutor(BackgroundThreadFactory.defaultExecutorThreadFactory("existing-document-query-manager")) : null; @@ -344,96 +341,6 @@ public boolean isReady() { return initialized; } - BulkOperation getBulkOperationForAction(final String action, - final SerializedJson document, - final Long version, - final String indexName, - final JsonNode jsonNode) { - BulkOperation bulkOperation; - final Optional docId = document.getDocumentId(); - final Optional routing = document.getRoutingField(); - final Optional pipeline = document.getPipelineField(); - - if (StringUtils.equals(action, OpenSearchBulkActions.CREATE.toString())) { - final CreateOperation.Builder createOperationBuilder = - new CreateOperation.Builder<>() - .index(indexName) - .document(document); - docId.ifPresent(createOperationBuilder::id); - routing.ifPresent(createOperationBuilder::routing); - pipeline.ifPresent(createOperationBuilder::pipeline); - - bulkOperation = new BulkOperation.Builder() - .create(createOperationBuilder.build()) - .build(); - return bulkOperation; - } - if (StringUtils.equals(action, OpenSearchBulkActions.UPDATE.toString()) || - StringUtils.equals(action, OpenSearchBulkActions.UPSERT.toString())) { - - JsonNode filteredJsonNode = jsonNode; - try { - if (isUsingDocumentFilters()) { - filteredJsonNode = objectMapper.reader().readTree(document.getSerializedJson()); - } - } catch (final IOException e) { - throw new RuntimeException( - String.format("An exception occurred while deserializing a document for the %s action: %s", action, e.getMessage())); - } - - - final boolean isUpsert = StringUtils.equals(action.toLowerCase(), OpenSearchBulkActions.UPSERT.toString()); - final UpdateOperation.Builder updateOperationBuilder = new UpdateOperation.Builder<>() - .index(indexName) - .versionType(versionType) - .version(version); - - if (scriptManager.isScriptEnabled()) { - updateOperationBuilder.script(scriptManager.buildScript(jsonNode, document.getResolvedScriptParameters().orElse(null))); - updateOperationBuilder.upsert(filteredJsonNode); - updateOperationBuilder.scriptedUpsert(true); - } else if (isUpsert) { - updateOperationBuilder.document(filteredJsonNode).upsert(filteredJsonNode); - } else { - updateOperationBuilder.document(filteredJsonNode); - } - - docId.ifPresent(updateOperationBuilder::id); - routing.ifPresent(updateOperationBuilder::routing); - bulkOperation = new BulkOperation.Builder() - .update(updateOperationBuilder.build()) - .build(); - return bulkOperation; - } - if (StringUtils.equals(action, OpenSearchBulkActions.DELETE.toString())) { - final DeleteOperation.Builder deleteOperationBuilder = - new DeleteOperation.Builder().index(indexName); - docId.ifPresent(deleteOperationBuilder::id); - routing.ifPresent(deleteOperationBuilder::routing); - bulkOperation = new BulkOperation.Builder() - .delete(deleteOperationBuilder - .versionType(versionType) - .version(version) - .build()) - .build(); - return bulkOperation; - } - // Default to "index" - final IndexOperation.Builder indexOperationBuilder = - new IndexOperation.Builder<>() - .index(indexName) - .document(document) - .version(version) - .versionType(versionType); - docId.ifPresent(indexOperationBuilder::id); - routing.ifPresent(indexOperationBuilder::routing); - pipeline.ifPresent(indexOperationBuilder::pipeline); - bulkOperation = new BulkOperation.Builder() - .index(indexOperationBuilder.build()) - .build(); - return bulkOperation; - } - @Override public void doOutput(final Collection> records) { final long threadId = Thread.currentThread().getId(); @@ -541,7 +448,7 @@ public void doOutput(final Collection> records) { BulkOperation bulkOperation; try { - bulkOperation = getBulkOperationForAction(eventAction, document, version, indexName, event.getJsonNode()); + bulkOperation = bulkOperationFactory.create(eventAction, document, version, indexName, event.getJsonNode()); } catch (final Exception e) { LOG.error("An exception occurred while constructing the bulk operation for a document: ", e); logFailureForDlqObjects(List.of(createDlqObjectFromEvent(event, indexName, e.getMessage())), e); From d8bfbec295a96a94335fb9b4ba77ac34b0b6ce2a Mon Sep 17 00:00:00 2001 From: Dinu John <86094133+dinujoh@users.noreply.github.com> Date: Fri, 10 Apr 2026 09:56:39 -0500 Subject: [PATCH 2/3] Add BulkOperationFactory unit tests and update script tests Add BulkOperationFactoryTest covering create, index, update, upsert, delete, default action, optional fields, document filters, and pipeline setting. Update OpenSearchSinkScriptTest to test BulkOperationFactory directly instead of going through OpenSearchSink. Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> --- .../sink/opensearch/BulkOperationFactory.java | 6 +- .../opensearch/BulkOperationFactoryTest.java | 161 ++++++++++++++ .../opensearch/OpenSearchSinkScriptTest.java | 202 +++--------------- 3 files changed, 195 insertions(+), 174 deletions(-) create mode 100644 data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkOperationFactoryTest.java diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkOperationFactory.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkOperationFactory.java index 2be1587e85..6d60b7bbc0 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkOperationFactory.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkOperationFactory.java @@ -114,8 +114,10 @@ private BulkOperation createUpdateOperation(final String action, if (scriptManager.isScriptEnabled()) { builder.script(scriptManager.buildScript(jsonNode, document.getResolvedScriptParameters().orElse(null))); - builder.upsert(filteredJsonNode); - builder.scriptedUpsert(true); + if (isUpsert) { + builder.upsert(filteredJsonNode); + builder.scriptedUpsert(true); + } } else if (isUpsert) { builder.document(filteredJsonNode).upsert(filteredJsonNode); } else { diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkOperationFactoryTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkOperationFactoryTest.java new file mode 100644 index 0000000000..789355b0c1 --- /dev/null +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkOperationFactoryTest.java @@ -0,0 +1,161 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.sink.opensearch; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.client.opensearch._types.VersionType; +import org.opensearch.client.opensearch.core.bulk.BulkOperation; +import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions; +import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.SerializedJson; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +public class BulkOperationFactoryTest { + + private final ObjectMapper objectMapper = new ObjectMapper(); + private BulkOperationFactory factory; + + @BeforeEach + void setUp() { + final ScriptManager scriptManager = new ScriptManager(null, null); + factory = new BulkOperationFactory(VersionType.External, scriptManager, objectMapper, false); + } + + @Test + void create_action_returns_create_operation() { + final SerializedJson document = SerializedJson.fromStringAndOptionals("{\"key\":\"val\"}", "doc-1", "route-1", null); + + final BulkOperation result = factory.create( + OpenSearchBulkActions.CREATE.toString(), document, null, "test-index", objectMapper.createObjectNode()); + + assertThat(result.isCreate(), is(true)); + assertThat(result.create().index(), equalTo("test-index")); + assertThat(result.create().id(), equalTo("doc-1")); + assertThat(result.create().routing(), equalTo("route-1")); + } + + @Test + void index_action_returns_index_operation() { + final SerializedJson document = SerializedJson.fromStringAndOptionals("{\"key\":\"val\"}", "doc-1", null, null); + + final BulkOperation result = factory.create( + OpenSearchBulkActions.INDEX.toString(), document, 1L, "test-index", objectMapper.createObjectNode()); + + assertThat(result.isIndex(), is(true)); + assertThat(result.index().index(), equalTo("test-index")); + assertThat(result.index().id(), equalTo("doc-1")); + assertThat(result.index().version(), equalTo(1L)); + assertThat(result.index().versionType(), equalTo(VersionType.External)); + } + + @Test + void delete_action_returns_delete_operation() { + final SerializedJson document = SerializedJson.fromStringAndOptionals("{}", "doc-1", "route-1", null); + + final BulkOperation result = factory.create( + OpenSearchBulkActions.DELETE.toString(), document, 2L, "test-index", objectMapper.createObjectNode()); + + assertThat(result.isDelete(), is(true)); + assertThat(result.delete().index(), equalTo("test-index")); + assertThat(result.delete().id(), equalTo("doc-1")); + assertThat(result.delete().routing(), equalTo("route-1")); + assertThat(result.delete().version(), equalTo(2L)); + } + + @Test + void update_action_returns_update_operation() { + final ObjectNode jsonNode = objectMapper.createObjectNode().put("name", "test"); + final SerializedJson document = SerializedJson.fromStringAndOptionals("{\"name\":\"test\"}", "doc-1", null, null); + + final BulkOperation result = factory.create( + OpenSearchBulkActions.UPDATE.toString(), document, null, "test-index", jsonNode); + + assertThat(result.isUpdate(), is(true)); + assertThat(result.update().index(), equalTo("test-index")); + assertThat(result.update().id(), equalTo("doc-1")); + } + + @Test + void upsert_action_returns_update_operation() { + final ObjectNode jsonNode = objectMapper.createObjectNode().put("name", "test"); + final SerializedJson document = SerializedJson.fromStringAndOptionals("{\"name\":\"test\"}", "doc-1", null, null); + + final BulkOperation result = factory.create( + OpenSearchBulkActions.UPSERT.toString(), document, null, "test-index", jsonNode); + + assertThat(result.isUpdate(), is(true)); + assertThat(result.update().index(), equalTo("test-index")); + assertThat(result.update().id(), equalTo("doc-1")); + } + + @Test + void unknown_action_defaults_to_index_operation() { + final SerializedJson document = SerializedJson.fromStringAndOptionals("{}", "doc-1", null, null); + + final BulkOperation result = factory.create( + "unknown_action", document, null, "test-index", objectMapper.createObjectNode()); + + assertThat(result.isIndex(), is(true)); + assertThat(result.index().index(), equalTo("test-index")); + } + + @Test + void create_action_without_optional_fields() { + final SerializedJson document = SerializedJson.fromStringAndOptionals("{}", null, null, null); + + final BulkOperation result = factory.create( + OpenSearchBulkActions.CREATE.toString(), document, null, "test-index", objectMapper.createObjectNode()); + + assertThat(result.isCreate(), is(true)); + assertThat(result.create().index(), equalTo("test-index")); + } + + @Test + void update_with_document_filters_deserializes_from_serialized_json() { + final ScriptManager scriptManager = new ScriptManager(null, null); + final BulkOperationFactory filterFactory = new BulkOperationFactory(null, scriptManager, objectMapper, true); + final ObjectNode jsonNode = objectMapper.createObjectNode().put("name", "original"); + final SerializedJson document = SerializedJson.fromStringAndOptionals("{\"name\":\"filtered\"}", "doc-1", null, null); + + final BulkOperation result = filterFactory.create( + OpenSearchBulkActions.UPDATE.toString(), document, null, "test-index", jsonNode); + + assertThat(result.isUpdate(), is(true)); + assertThat(result.update().id(), equalTo("doc-1")); + } + + @Test + void create_action_sets_pipeline() { + final SerializedJson document = SerializedJson.fromStringAndOptionals("{}", "doc-1", null, "my-pipeline"); + + final BulkOperation result = factory.create( + OpenSearchBulkActions.CREATE.toString(), document, null, "test-index", objectMapper.createObjectNode()); + + assertThat(result.isCreate(), is(true)); + assertThat(result.create().pipeline(), equalTo("my-pipeline")); + } + + @Test + void index_action_sets_pipeline() { + final SerializedJson document = SerializedJson.fromStringAndOptionals("{}", "doc-1", null, "my-pipeline"); + + final BulkOperation result = factory.create( + OpenSearchBulkActions.INDEX.toString(), document, null, "test-index", objectMapper.createObjectNode()); + + assertThat(result.isIndex(), is(true)); + assertThat(result.index().pipeline(), equalTo("my-pipeline")); + } +} diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkScriptTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkScriptTest.java index fa41a5432d..219f1fba03 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkScriptTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkScriptTest.java @@ -13,156 +13,40 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; -import io.micrometer.core.instrument.Counter; -import io.micrometer.core.instrument.DistributionSummary; -import io.micrometer.core.instrument.Timer; import jakarta.json.stream.JsonGenerator; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.MockedConstruction; -import org.mockito.MockedStatic; -import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.client.json.jackson.JacksonJsonpMapper; import org.opensearch.client.opensearch.core.bulk.BulkOperation; import org.opensearch.client.opensearch.core.bulk.UpdateOperation; -import org.opensearch.client.RestHighLevelClient; -import org.opensearch.client.json.jackson.JacksonJsonpMapper; -import org.opensearch.client.opensearch.OpenSearchClient; -import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; -import org.opensearch.dataprepper.expression.ExpressionEvaluator; -import org.opensearch.dataprepper.metrics.MetricNames; -import org.opensearch.dataprepper.metrics.PluginMetrics; -import org.opensearch.dataprepper.model.configuration.PipelineDescription; -import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions; -import org.opensearch.dataprepper.model.plugin.PluginConfigObservable; -import org.opensearch.dataprepper.model.sink.SinkContext; import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.SerializedJson; -import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.OpenSearchSinkConfig; import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.ScriptConfiguration; -import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration; -import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexManagerFactory; -import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexType; -import org.opensearch.dataprepper.plugins.sink.opensearch.index.TemplateType; import java.io.IOException; import java.io.StringWriter; import java.util.Map; -import java.util.Optional; -import java.util.UUID; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.mockConstruction; -import static org.mockito.Mockito.mockStatic; -import static org.mockito.Mockito.when; -import static org.opensearch.dataprepper.model.sink.SinkLatencyMetrics.EXTERNAL_LATENCY; -import static org.opensearch.dataprepper.model.sink.SinkLatencyMetrics.INTERNAL_LATENCY; -import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.BULKREQUEST_ERRORS; -import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.BULKREQUEST_LATENCY; -import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.BULKREQUEST_SIZE_BYTES; -import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.DYNAMIC_INDEX_DROPPED_EVENTS; -import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.INVALID_ACTION_ERRORS; -import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.INVALID_VERSION_EXPRESSION_DROPPED_EVENTS; -import static org.opensearch.dataprepper.plugins.sink.opensearch.configuration.OpenSearchSinkConfig.DEFAULT_BULK_SIZE; -import static org.opensearch.dataprepper.plugins.sink.opensearch.configuration.OpenSearchSinkConfig.DEFAULT_FLUSH_TIMEOUT; - -@ExtendWith(MockitoExtension.class) + public class OpenSearchSinkScriptTest { private final ObjectMapper objectMapper = new ObjectMapper(); private final JacksonJsonpMapper jsonpMapper = new JacksonJsonpMapper(); - @Mock private IndexManagerFactory indexManagerFactory; - @Mock private OpenSearchClient openSearchClient; - @Mock private SinkContext sinkContext; - @Mock private PluginSetting pluginSetting; - @Mock private ExpressionEvaluator expressionEvaluator; - @Mock private AwsCredentialsSupplier awsCredentialsSupplier; - @Mock private OpenSearchSinkConfiguration openSearchSinkConfiguration; - @Mock private PipelineDescription pipelineDescription; - @Mock private IndexConfiguration indexConfiguration; - @Mock private PluginMetrics pluginMetrics; - @Mock private OpenSearchSinkConfig openSearchSinkConfig; - @Mock private PluginConfigObservable pluginConfigObservable; - @Mock private ScriptConfiguration scriptConfiguration; - - @BeforeEach - void setup() { - when(pipelineDescription.getPipelineName()).thenReturn(UUID.randomUUID().toString()); - - final RetryConfiguration retryConfiguration = mock(RetryConfiguration.class); - when(retryConfiguration.getDlq()).thenReturn(Optional.empty()); - lenient().when(retryConfiguration.getDlqFile()).thenReturn(null); - - final ConnectionConfiguration connectionConfiguration = mock(ConnectionConfiguration.class); - final RestHighLevelClient restHighLevelClient = mock(RestHighLevelClient.class); - lenient().when(connectionConfiguration.createClient(awsCredentialsSupplier)).thenReturn(restHighLevelClient); - lenient().when(connectionConfiguration.createOpenSearchClient(restHighLevelClient, awsCredentialsSupplier)).thenReturn(openSearchClient); - - when(indexConfiguration.getDocumentId()).thenReturn(null); - when(indexConfiguration.getDocumentIdField()).thenReturn(null); - when(indexConfiguration.getRouting()).thenReturn(null); - when(indexConfiguration.getActions()).thenReturn(null); - when(indexConfiguration.getDocumentRootKey()).thenReturn(null); - lenient().when(indexConfiguration.getVersionType()).thenReturn(null); - lenient().when(indexConfiguration.getVersionExpression()).thenReturn(null); - lenient().when(indexConfiguration.getIndexAlias()).thenReturn(UUID.randomUUID().toString()); - lenient().when(indexConfiguration.getTemplateType()).thenReturn(TemplateType.V1); - when(indexConfiguration.getIndexType()).thenReturn(IndexType.CUSTOM); - when(indexConfiguration.getBulkSize()).thenReturn(DEFAULT_BULK_SIZE); - when(indexConfiguration.getFlushTimeout()).thenReturn(DEFAULT_FLUSH_TIMEOUT); - - when(openSearchSinkConfiguration.getIndexConfiguration()).thenReturn(indexConfiguration); - when(openSearchSinkConfiguration.getRetryConfiguration()).thenReturn(retryConfiguration); - lenient().when(openSearchSinkConfiguration.getConnectionConfiguration()).thenReturn(connectionConfiguration); - - when(pluginMetrics.counter(MetricNames.RECORDS_IN)).thenReturn(mock(Counter.class)); - when(pluginMetrics.timer(MetricNames.TIME_ELAPSED)).thenReturn(mock(Timer.class)); - when(pluginMetrics.timer(INTERNAL_LATENCY)).thenReturn(mock(Timer.class)); - when(pluginMetrics.timer(EXTERNAL_LATENCY)).thenReturn(mock(Timer.class)); - when(pluginMetrics.timer(BULKREQUEST_LATENCY)).thenReturn(mock(Timer.class)); - when(pluginMetrics.counter(BULKREQUEST_ERRORS)).thenReturn(mock(Counter.class)); - when(pluginMetrics.counter(INVALID_ACTION_ERRORS)).thenReturn(mock(Counter.class)); - when(pluginMetrics.counter(DYNAMIC_INDEX_DROPPED_EVENTS)).thenReturn(mock(Counter.class)); - when(pluginMetrics.counter(INVALID_VERSION_EXPRESSION_DROPPED_EVENTS)).thenReturn(mock(Counter.class)); - when(pluginMetrics.summary(BULKREQUEST_SIZE_BYTES)).thenReturn(mock(DistributionSummary.class)); - - lenient().when(sinkContext.getTagsTargetKey()).thenReturn(null); - lenient().when(sinkContext.getIncludeKeys()).thenReturn(null); - lenient().when(sinkContext.getExcludeKeys()).thenReturn(null); + private BulkOperationFactory createFactory(final ScriptConfiguration scriptConfig) { + final ScriptManager scriptManager = new ScriptManager(scriptConfig, null); + return new BulkOperationFactory(null, scriptManager, new ObjectMapper(), false); } - private void configureScript(final String source) { - configureScript(source, null); - } - - private void configureScript(final String source, final Map params) { - when(scriptConfiguration.getSource()).thenReturn(source); - lenient().when(scriptConfiguration.getParams()).thenReturn(params); - when(indexConfiguration.getScriptConfiguration()).thenReturn(scriptConfiguration); - } - - private void configureAction(final String action) { - when(indexConfiguration.getAction()).thenReturn(action); - } - - private OpenSearchSink createObjectUnderTest() throws IOException { - try (final MockedStatic configMockedStatic = mockStatic(OpenSearchSinkConfiguration.class); - final MockedStatic metricsMockedStatic = mockStatic(PluginMetrics.class); - final MockedConstruction ignored = mockConstruction(IndexManagerFactory.class, (mock, context) -> { - indexManagerFactory = mock; - })) { - metricsMockedStatic.when(() -> PluginMetrics.fromPluginSetting(pluginSetting)).thenReturn(pluginMetrics); - configMockedStatic.when(() -> OpenSearchSinkConfiguration.readOSConfig(openSearchSinkConfig, expressionEvaluator)) - .thenReturn(openSearchSinkConfiguration); - return new OpenSearchSink( - pluginSetting, sinkContext, expressionEvaluator, awsCredentialsSupplier, pipelineDescription, pluginConfigObservable, openSearchSinkConfig); - } + private ScriptConfiguration mockScript(final String source, final Map params) { + final ScriptConfiguration config = mock(ScriptConfiguration.class); + lenient().when(config.getSource()).thenReturn(source); + lenient().when(config.getParams()).thenReturn(params); + return config; } private JsonNode serializeBody(final UpdateOperation updateOp) throws IOException { @@ -178,14 +62,11 @@ private JsonNode serializeBody(final UpdateOperation updateOp) throws IOExcep @Test void script_sets_script_source_and_lang() throws IOException { - configureAction(OpenSearchBulkActions.UPSERT.toString()); - configureScript("ctx._source.counter += 1"); - final OpenSearchSink sink = createObjectUnderTest(); - + final BulkOperationFactory factory = createFactory(mockScript("ctx._source.counter += 1", null)); final ObjectNode jsonNode = objectMapper.createObjectNode().put("counter", 0); final SerializedJson document = SerializedJson.fromStringAndOptionals("{\"counter\":0}", "doc-1", null, null); - final BulkOperation result = sink.getBulkOperationForAction( + final BulkOperation result = factory.create( OpenSearchBulkActions.UPSERT.toString(), document, null, "test-index", jsonNode); final JsonNode body = serializeBody(result.update()); @@ -195,14 +76,11 @@ void script_sets_script_source_and_lang() throws IOException { @Test void script_always_passes_event_as_params_doc() throws IOException { - configureAction(OpenSearchBulkActions.UPSERT.toString()); - configureScript("ctx._source.putAll(params.doc)"); - final OpenSearchSink sink = createObjectUnderTest(); - + final BulkOperationFactory factory = createFactory(mockScript("ctx._source.putAll(params.doc)", null)); final ObjectNode jsonNode = objectMapper.createObjectNode().put("price", 9.99).put("currency", "USD"); final SerializedJson document = SerializedJson.fromStringAndOptionals("{}", "doc-1", null, null); - final BulkOperation result = sink.getBulkOperationForAction( + final BulkOperation result = factory.create( OpenSearchBulkActions.UPSERT.toString(), document, null, "test-index", jsonNode); final JsonNode body = serializeBody(result.update()); @@ -214,10 +92,7 @@ void script_always_passes_event_as_params_doc() throws IOException { @Test void script_merges_resolved_params_alongside_doc() throws IOException { - configureAction(OpenSearchBulkActions.UPSERT.toString()); - configureScript("ctx._source.counter += params.increment", Map.of("increment", 5)); - final OpenSearchSink sink = createObjectUnderTest(); - + final BulkOperationFactory factory = createFactory(mockScript("ctx._source.counter += params.increment", Map.of("increment", 5))); final ObjectNode jsonNode = objectMapper.createObjectNode().put("counter", 0); final SerializedJson document = SerializedJson.builder() .withJsonString("{}") @@ -225,7 +100,7 @@ void script_merges_resolved_params_alongside_doc() throws IOException { .withResolvedScriptParameters(Map.of("increment", 5)) .build(); - final BulkOperation result = sink.getBulkOperationForAction( + final BulkOperation result = factory.create( OpenSearchBulkActions.UPSERT.toString(), document, null, "test-index", jsonNode); final JsonNode body = serializeBody(result.update()); @@ -236,14 +111,11 @@ void script_merges_resolved_params_alongside_doc() throws IOException { @Test void script_always_sets_scripted_upsert_true() throws IOException { - configureAction(OpenSearchBulkActions.UPSERT.toString()); - configureScript("ctx._source.counter += 1"); - final OpenSearchSink sink = createObjectUnderTest(); - + final BulkOperationFactory factory = createFactory(mockScript("ctx._source.counter += 1", null)); final ObjectNode jsonNode = objectMapper.createObjectNode(); final SerializedJson document = SerializedJson.fromStringAndOptionals("{}", "doc-1", null, null); - final BulkOperation result = sink.getBulkOperationForAction( + final BulkOperation result = factory.create( OpenSearchBulkActions.UPSERT.toString(), document, null, "test-index", jsonNode); final JsonNode body = serializeBody(result.update()); @@ -252,14 +124,11 @@ void script_always_sets_scripted_upsert_true() throws IOException { @Test void script_sets_upsert_body() throws IOException { - configureAction(OpenSearchBulkActions.UPSERT.toString()); - configureScript("ctx._source.counter += 1"); - final OpenSearchSink sink = createObjectUnderTest(); - + final BulkOperationFactory factory = createFactory(mockScript("ctx._source.counter += 1", null)); final ObjectNode jsonNode = objectMapper.createObjectNode().put("counter", 0); final SerializedJson document = SerializedJson.fromStringAndOptionals("{\"counter\":0}", "doc-1", null, null); - final BulkOperation result = sink.getBulkOperationForAction( + final BulkOperation result = factory.create( OpenSearchBulkActions.UPSERT.toString(), document, null, "test-index", jsonNode); final JsonNode body = serializeBody(result.update()); @@ -268,31 +137,26 @@ void script_sets_upsert_body() throws IOException { @Test void script_works_with_update_action() throws IOException { - configureAction(OpenSearchBulkActions.UPDATE.toString()); - configureScript("ctx._source.status = params.doc.status"); - final OpenSearchSink sink = createObjectUnderTest(); - + final BulkOperationFactory factory = createFactory(mockScript("ctx._source.status = params.doc.status", null)); final ObjectNode jsonNode = objectMapper.createObjectNode().put("status", "active"); final SerializedJson document = SerializedJson.fromStringAndOptionals("{}", "doc-1", null, null); - final BulkOperation result = sink.getBulkOperationForAction( + final BulkOperation result = factory.create( OpenSearchBulkActions.UPDATE.toString(), document, null, "test-index", jsonNode); final JsonNode body = serializeBody(result.update()); assertThat(body.get("script"), notNullValue()); - assertThat(body.get("scripted_upsert").asBoolean(), equalTo(true)); + assertThat(body.has("scripted_upsert"), equalTo(false)); + assertThat(body.has("upsert"), equalTo(false)); } @Test void script_sets_document_id_and_routing() throws IOException { - configureAction(OpenSearchBulkActions.UPSERT.toString()); - configureScript("ctx._source.counter += 1"); - final OpenSearchSink sink = createObjectUnderTest(); - + final BulkOperationFactory factory = createFactory(mockScript("ctx._source.counter += 1", null)); final ObjectNode jsonNode = objectMapper.createObjectNode(); final SerializedJson document = SerializedJson.fromStringAndOptionals("{}", "my-doc-id", "my-route", null); - final BulkOperation result = sink.getBulkOperationForAction( + final BulkOperation result = factory.create( OpenSearchBulkActions.UPSERT.toString(), document, null, "test-index", jsonNode); assertThat(result.update().id(), equalTo("my-doc-id")); @@ -301,14 +165,11 @@ void script_sets_document_id_and_routing() throws IOException { @Test void without_script_upsert_preserves_original_behavior() throws IOException { - configureAction(OpenSearchBulkActions.UPSERT.toString()); - when(indexConfiguration.getScriptConfiguration()).thenReturn(null); - final OpenSearchSink sink = createObjectUnderTest(); - + final BulkOperationFactory factory = createFactory(null); final ObjectNode jsonNode = objectMapper.createObjectNode().put("name", "test"); final SerializedJson document = SerializedJson.fromStringAndOptionals("{\"name\":\"test\"}", "doc-1", null, null); - final BulkOperation result = sink.getBulkOperationForAction( + final BulkOperation result = factory.create( OpenSearchBulkActions.UPSERT.toString(), document, null, "test-index", jsonNode); final JsonNode body = serializeBody(result.update()); @@ -319,14 +180,11 @@ void without_script_upsert_preserves_original_behavior() throws IOException { @Test void without_script_update_preserves_original_behavior() throws IOException { - configureAction(OpenSearchBulkActions.UPDATE.toString()); - when(indexConfiguration.getScriptConfiguration()).thenReturn(null); - final OpenSearchSink sink = createObjectUnderTest(); - + final BulkOperationFactory factory = createFactory(null); final ObjectNode jsonNode = objectMapper.createObjectNode().put("name", "test"); final SerializedJson document = SerializedJson.fromStringAndOptionals("{\"name\":\"test\"}", "doc-1", null, null); - final BulkOperation result = sink.getBulkOperationForAction( + final BulkOperation result = factory.create( OpenSearchBulkActions.UPDATE.toString(), document, null, "test-index", jsonNode); final JsonNode body = serializeBody(result.update()); From a2f6c76783779e736281edd152abb0eb03d8a51f Mon Sep 17 00:00:00 2001 From: Dinu John <86094133+dinujoh@users.noreply.github.com> Date: Mon, 13 Apr 2026 12:20:52 -0500 Subject: [PATCH 3/3] Fix script to use filtered document when document filters are active Pass filteredJsonNode instead of jsonNode to scriptManager.buildScript so that params.doc in the script reflects the post-filter document, consistent with how builder.document() and builder.upsert() use the filtered version. Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> --- .../plugins/sink/opensearch/BulkOperationFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkOperationFactory.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkOperationFactory.java index 6d60b7bbc0..543051a500 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkOperationFactory.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkOperationFactory.java @@ -113,7 +113,7 @@ private BulkOperation createUpdateOperation(final String action, .version(version); if (scriptManager.isScriptEnabled()) { - builder.script(scriptManager.buildScript(jsonNode, document.getResolvedScriptParameters().orElse(null))); + builder.script(scriptManager.buildScript(filteredJsonNode, document.getResolvedScriptParameters().orElse(null))); if (isUpsert) { builder.upsert(filteredJsonNode); builder.scriptedUpsert(true);